You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/04/05 01:40:59 UTC
kafka git commit: KAFKA-4916: test streams with brokers failing
Repository: kafka
Updated Branches:
refs/heads/trunk 916081007 -> 49f80b236
KAFKA-4916: test streams with brokers failing
Several fixes for handling broker failures:
- default replication value for internal topics is now 3 in test itself (not in streams code, that will require a KIP.
- streams producer waits for acks from all replicas in test itself (not in streams code, that will require a KIP.
- backoff time for streams client to try again after a failure to contact controller.
- fix bug related to state store locks (this helps in multi-threaded scenarios)
- fix related to catching exceptions property for network errors.
- system test for all the above
Author: Eno Thereska <en...@confluent.io>
Author: Eno Thereska <en...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Dan Norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2719 from enothereska/KAFKA-4916-broker-bounce-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49f80b23
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49f80b23
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49f80b23
Branch: refs/heads/trunk
Commit: 49f80b2360ab55274ad28405352596f9d560ec1c
Parents: 9160810
Author: Eno Thereska <en...@confluent.io>
Authored: Tue Apr 4 18:32:58 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Apr 4 18:32:58 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../internals/InternalTopicManager.java | 32 ++-
.../internals/StreamPartitionAssignor.java | 13 +-
.../processor/internals/StreamThread.java | 53 +++--
.../processor/internals/StreamsKafkaClient.java | 20 +-
.../internals/InternalTopicManagerTest.java | 15 +-
.../kafka/streams/tests/SmokeTestClient.java | 29 ++-
.../kafka/streams/tests/SmokeTestDriver.java | 20 +-
.../kafka/test/MockInternalTopicManager.java | 5 +-
.../tests/streams/streams_bounce_test.py | 4 +-
.../tests/streams/streams_broker_bounce_test.py | 213 +++++++++++++++++++
11 files changed, 354 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 94719c6..52721ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -241,7 +241,7 @@ public class StreamsConfig extends AbstractConfig {
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
1,
- Importance.MEDIUM,
+ Importance.HIGH,
REPLICATION_FACTOR_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index d8575e9..7dab99d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,16 +36,18 @@ public class InternalTopicManager {
public static final String RETENTION_MS = "retention.ms";
public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
private static final int MAX_TOPIC_READY_TRY = 5;
-
+ private final Time time;
private final long windowChangeLogAdditionalRetention;
private final int replicationFactor;
private final StreamsKafkaClient streamsKafkaClient;
- public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) {
+ public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor,
+ final long windowChangeLogAdditionalRetention, final Time time) {
this.streamsKafkaClient = streamsKafkaClient;
this.replicationFactor = replicationFactor;
this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
+ this.time = time;
}
/**
@@ -60,11 +63,19 @@ public class InternalTopicManager {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
+ if (metadata.brokers().size() < replicationFactor) {
+ throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " +
+ " but replication factor is " + replicationFactor + "." +
+ " Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\"" +
+ " or add more brokers to your cluster.");
+ }
streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata);
return;
} catch (StreamsException ex) {
log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
}
+ // backoff
+ time.sleep(100L);
}
throw new StreamsException("Could not create internal topics.");
}
@@ -73,11 +84,20 @@ public class InternalTopicManager {
* Get the number of partitions for the given topics
*/
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
- final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
- final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
- existingTopicPartitions.keySet().retainAll(topics);
+ for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
+ try {
+ final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
+ final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
+ existingTopicPartitions.keySet().retainAll(topics);
- return existingTopicPartitions;
+ return existingTopicPartitions;
+ } catch (StreamsException ex) {
+ log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i);
+ }
+ // backoff
+ time.sleep(100L);
+ }
+ throw new StreamsException("Could not get number of partitions.");
}
public void close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 24e6709..004926f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -57,7 +58,7 @@ import static org.apache.kafka.streams.processor.internals.InternalTopicManager.
public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
-
+ private Time time = Time.SYSTEM;
private final static int UNKNOWN = -1;
public final static int NOT_AVAILABLE = -2;
@@ -159,6 +160,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private CopartitionedTopicsValidator copartitionedTopicsValidator;
/**
+ * Package-private method to set the time. Used for tests.
+ * @param time Time to be used.
+ */
+ void time(final Time time) {
+ this.time = time;
+ }
+
+ /**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible
* since the former needs later's cached metadata while sending subscriptions,
* and the latter needs former's returned assignment when adding tasks.
@@ -207,7 +216,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ?
(Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
- : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+ : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9791a0a..8bd6d1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -401,17 +401,21 @@ public class StreamThread extends Thread {
@SuppressWarnings("ThrowableNotThrown")
private void shutdownTasksAndState() {
- log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix,
- activeTasks.keySet(), standbyTasks.keySet());
+ log.debug("{} shutdownTasksAndState: shutting down" +
+ "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
+ logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
+ suspendedTasks.keySet(), suspendedStandbyTasks.keySet());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// Close all processors in topology order
- firstException.compareAndSet(null, closeAllTasks());
+ firstException.compareAndSet(null, closeTasks(activeAndStandbytasks()));
+ firstException.compareAndSet(null, closeTasks(suspendedAndSuspendedStandbytasks()));
// flush state
firstException.compareAndSet(null, flushAllState());
// Close all task state managers. Don't need to set exception as all
// state would have been flushed above
- closeAllStateManagers(firstException.get() == null);
+ closeStateManagers(activeAndStandbytasks(), firstException.get() == null);
+ closeStateManagers(suspendedAndSuspendedStandbytasks(), firstException.get() == null);
// only commit under clean exit
if (cleanRun && firstException.get() == null) {
firstException.set(commitOffsets());
@@ -430,7 +434,7 @@ public class StreamThread extends Thread {
activeTasks.keySet(), standbyTasks.keySet());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// Close all topology nodes
- firstException.compareAndSet(null, closeAllTasksTopologies());
+ firstException.compareAndSet(null, closeActiveAndStandbyTasksTopologies());
// flush state
firstException.compareAndSet(null, flushAllState());
// only commit after all state has been flushed and there hasn't been an exception
@@ -453,12 +457,11 @@ public class StreamThread extends Thread {
void apply(final AbstractTask task);
}
- private RuntimeException performOnAllTasks(final AbstractTaskAction action,
- final String exceptionMessage) {
+ private RuntimeException performOnTasks(final List<AbstractTask> tasks,
+ final AbstractTaskAction action,
+ final String exceptionMessage) {
RuntimeException firstException = null;
- final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values());
- allTasks.addAll(standbyTasks.values());
- for (final AbstractTask task : allTasks) {
+ for (final AbstractTask task : tasks) {
try {
action.apply(task);
} catch (RuntimeException t) {
@@ -476,8 +479,20 @@ public class StreamThread extends Thread {
return firstException;
}
- private Throwable closeAllStateManagers(final boolean writeCheckpoint) {
- return performOnAllTasks(new AbstractTaskAction() {
+ private List<AbstractTask> activeAndStandbytasks() {
+ final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values());
+ tasks.addAll(standbyTasks.values());
+ return tasks;
+ }
+
+ private List<AbstractTask> suspendedAndSuspendedStandbytasks() {
+ final List<AbstractTask> tasks = new ArrayList<AbstractTask>(suspendedTasks.values());
+ tasks.addAll(suspendedStandbyTasks.values());
+ return tasks;
+ }
+
+ private Throwable closeStateManagers(final List<AbstractTask> tasks, final boolean writeCheckpoint) {
+ return performOnTasks(tasks, new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id());
@@ -488,7 +503,7 @@ public class StreamThread extends Thread {
private RuntimeException commitOffsets() {
// Exceptions should not prevent this call from going through all shutdown steps
- return performOnAllTasks(new AbstractTaskAction() {
+ return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, task.id());
@@ -498,7 +513,7 @@ public class StreamThread extends Thread {
}
private RuntimeException flushAllState() {
- return performOnAllTasks(new AbstractTaskAction() {
+ return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, task.id());
@@ -579,6 +594,7 @@ public class StreamThread extends Thread {
}
}
+
/**
* Schedule the records processing by selecting which record is processed next. Commits may
* happen as records are processed.
@@ -1072,8 +1088,8 @@ public class StreamThread extends Thread {
standbyRecords.clear();
}
- private RuntimeException closeAllTasks() {
- return performOnAllTasks(new AbstractTaskAction() {
+ private RuntimeException closeTasks(final List<AbstractTask> tasks) {
+ return performOnTasks(tasks, new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id());
@@ -1083,8 +1099,9 @@ public class StreamThread extends Thread {
}, "close");
}
- private RuntimeException closeAllTasksTopologies() {
- return performOnAllTasks(new AbstractTaskAction() {
+
+ private RuntimeException closeActiveAndStandbyTasksTopologies() {
+ return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id());
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 3217e46..ca3ab1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -205,7 +205,11 @@ public class StreamsKafkaClient {
break;
}
}
- kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+ try {
+ kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+ } catch (final Exception e) {
+ throw new StreamsException("Could not poll.", e);
+ }
}
if (brokerId == null) {
throw new StreamsException("Could not find any available broker.");
@@ -237,11 +241,20 @@ public class StreamsKafkaClient {
}
private ClientResponse sendRequest(final ClientRequest clientRequest) {
- kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
+ try {
+ kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
+ } catch (final Exception e) {
+ throw new StreamsException("Could not send request.", e);
+ }
final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
// Poll for the response.
while (Time.SYSTEM.milliseconds() < responseTimeout) {
- List<ClientResponse> responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+ final List<ClientResponse> responseList;
+ try {
+ responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+ } catch (final IllegalStateException e) {
+ throw new StreamsException("Could not poll.", e);
+ }
if (!responseList.isEmpty()) {
if (responseList.size() > 1) {
throw new StreamsException("Sent one request but received multiple or no responses.");
@@ -271,6 +284,7 @@ public class StreamsKafkaClient {
Time.SYSTEM.milliseconds(),
true);
final ClientResponse clientResponse = sendRequest(clientRequest);
+
if (!clientResponse.hasResponse()) {
throw new StreamsException("Empty response for client request.");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 84f0e8a..031c732 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -40,7 +42,7 @@ public class InternalTopicManagerTest {
private final String topic = "test_topic";
private final String userEndPoint = "localhost:2171";
private MockStreamKafkaClient streamsKafkaClient;
-
+ private final Time time = new MockTime();
@Before
public void init() {
final StreamsConfig config = new StreamsConfig(configProps());
@@ -54,19 +56,22 @@ public class InternalTopicManagerTest {
@Test
public void shouldReturnCorrectPartitionCounts() throws Exception {
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+ InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
}
@Test
public void shouldCreateRequiredTopics() throws Exception {
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+ InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
}
@Test
public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception {
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+ InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
boolean exceptionWasThrown = false;
try {
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2));
@@ -104,7 +109,7 @@ public class InternalTopicManagerTest {
Node node = new Node(1, "host1", 1001);
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>());
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
- MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID,
+ MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
Collections.singletonList(topicMetadata));
return response;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 7691948..263474c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
@@ -39,6 +40,7 @@ public class SmokeTestClient extends SmokeTestUtil {
private final File stateDir;
private KafkaStreams streams;
private Thread thread;
+ private boolean uncaughtException = false;
public SmokeTestClient(File stateDir, String kafka) {
super();
@@ -51,10 +53,19 @@ public class SmokeTestClient extends SmokeTestUtil {
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
+ System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
+ uncaughtException = true;
e.printStackTrace();
}
});
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ close();
+ }
+ }));
+
thread = new Thread() {
public void run() {
streams.start();
@@ -64,32 +75,38 @@ public class SmokeTestClient extends SmokeTestUtil {
}
public void close() {
- streams.close();
+ streams.close(5, TimeUnit.SECONDS);
+ // do not remove these printouts since they are needed for health scripts
+ if (!uncaughtException) {
+ System.out.println("SMOKE-TEST-CLIENT-CLOSED");
+ }
try {
thread.join();
} catch (Exception ex) {
+ // do not remove these printouts since they are needed for health scripts
+ System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
// ignore
}
}
private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
- Properties props = new Properties();
+ final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
- props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
- KStreamBuilder builder = new KStreamBuilder();
+ KStreamBuilder builder = new KStreamBuilder();
KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
-
source.to(stringSerde, intSerde, "echo");
-
KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index b3520fb..11e1ae8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -131,13 +131,17 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
- Properties props = new Properties();
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ // the next 4 config values make sure that all records are produced with no loss and
+ // no duplicates
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
int numRecordsProduced = 0;
@@ -232,7 +236,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
int retry = 0;
final long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(3)) {
+ while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
if (verifyMin(min, allData, false)
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 58c6c6d..d6ada4b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.test;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
@@ -29,13 +30,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+
+
public class MockInternalTopicManager extends InternalTopicManager {
public Map<String, Integer> readyTopics = new HashMap<>();
private MockConsumer<byte[], byte[]> restoreConsumer;
public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[], byte[]> restoreConsumer) {
- super(new StreamsKafkaClient(streamsConfig), 0, 0);
+ super(new StreamsKafkaClient(streamsConfig), 0, 0, new MockTime());
this.restoreConsumer = restoreConsumer;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/tests/kafkatest/tests/streams/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py
index 169bbc1..7ac7939 100644
--- a/tests/kafkatest/tests/streams/streams_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_bounce_test.py
@@ -26,7 +26,7 @@ class StreamsBounceTest(KafkaTest):
"""
def __init__(self, test_context):
- super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+ super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
'echo' : { 'partitions': 5, 'replication-factor': 2 },
'data' : { 'partitions': 5, 'replication-factor': 2 },
'min' : { 'partitions': 5, 'replication-factor': 2 },
@@ -42,7 +42,7 @@ class StreamsBounceTest(KafkaTest):
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
- @cluster(num_nodes=5)
+ @cluster(num_nodes=6)
def test_bounce(self):
"""
Start a smoke test client, then abort (kill -9) and restart it a few times.
http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
new file mode 100644
index 0000000..86c19f9
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+from ducktape.mark import matrix
+from ducktape.mark import parametrize, ignore
+from kafkatest.services.kafka import KafkaService
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+import time
+import signal
+from random import randint
+
+def broker_node(test, topic, broker_type):
+ """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
+ """
+ if broker_type == "leader":
+ node = test.kafka.leader(topic, partition=0)
+ elif broker_type == "controller":
+ node = test.kafka.controller()
+ else:
+ raise Exception("Unexpected broker type %s." % (broker_type))
+
+ return node
+
+def signal_node(test, node, sig):
+ test.kafka.signal_node(node, sig)
+
+def clean_shutdown(test, topic, broker_type):
+ """Discover broker node of requested type and shut it down cleanly.
+ """
+ node = broker_node(test, topic, broker_type)
+ signal_node(test, node, signal.SIGTERM)
+
+def hard_shutdown(test, topic, broker_type):
+ """Discover broker node of requested type and shut it down with a hard kill."""
+ node = broker_node(test, topic, broker_type)
+ signal_node(test, node, signal.SIGKILL)
+
+
+failures = {
+ "clean_shutdown": clean_shutdown,
+ "hard_shutdown": hard_shutdown
+}
+
+class StreamsBrokerBounceTest(Test):
+ """
+ Simple test of Kafka Streams with brokers failing
+ """
+
+ def __init__(self, test_context):
+ super(StreamsBrokerBounceTest, self).__init__(test_context)
+ self.replication = 3
+ self.partitions = 3
+ self.topics = {
+ 'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2}},
+ 'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} },
+ 'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+ 'configs': {"min.insync.replicas": 2} }
+ }
+
+ def fail_broker_type(self, failure_mode, broker_type):
+ # Pick a random topic and bounce it's leader
+ topic_index = randint(0, len(self.topics.keys()) - 1)
+ topic = self.topics.keys()[topic_index]
+ failures[failure_mode](self, topic, broker_type)
+
+ def fail_many_brokers(self, failure_mode, num_failures):
+ sig = signal.SIGTERM
+ if (failure_mode == "clean_shutdown"):
+ sig = signal.SIGTERM
+ else:
+ sig = signal.SIGKILL
+
+ for num in range(0, num_failures - 1):
+ signal_node(self, self.kafka.nodes[num], sig)
+
+
+ def setup_system(self):
+ # Setup phase
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.zk.start()
+
+ self.kafka = KafkaService(self.test_context, num_nodes=self.replication,
+ zk=self.zk, topics=self.topics)
+ self.kafka.start()
+ # Start test harness
+ self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+ self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+
+
+ self.driver.start()
+ self.processor1.start()
+
+ def collect_results(self, sleep_time_secs):
+ data = {}
+ # End test
+ self.driver.wait()
+ self.driver.stop()
+
+ self.processor1.stop()
+
+ node = self.driver.node
+
+ # Success is declared if streams does not crash when sleep time > 0
+ # It should give an exception when sleep time is 0 since we kill the brokers immediately
+ # and the topic manager cannot create internal topics with the desired replication factor
+ if (sleep_time_secs == 0):
+ output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-EXCEPTION %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+ else:
+ output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+
+ for line in output_streams:
+ data["Client closed"] = line
+
+ # Currently it is hard to guarantee anything about Kafka since we don't have exactly once.
+ # With exactly once in place, success will be defined as ALL-RECORDS-DELIEVERD and SUCCESS
+ output = node.account.ssh_capture("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED|PROCESSED-LESS-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
+ for line in output:
+ data["Records Delivered"] = line
+ output = node.account.ssh_capture("grep -E 'SUCCESS|FAILURE' %s" % self.driver.STDOUT_FILE, allow_fail=False)
+ for line in output:
+ data["Logic Success/Failure"] = line
+
+
+ return data
+
+ @cluster(num_nodes=7)
+ @matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
+ broker_type=["leader", "controller"],
+ sleep_time_secs=[120])
+ def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs):
+ """
+ Start a smoke test client, then kill one particular broker and ensure data is still received
+ Record if records are delivered.
+ """
+ self.setup_system()
+
+ # Sleep to allow test to run for a bit
+ time.sleep(sleep_time_secs)
+
+ # Fail brokers
+ self.fail_broker_type(failure_mode, broker_type);
+
+ return self.collect_results(sleep_time_secs)
+
+ @cluster(num_nodes=7)
+ @matrix(failure_mode=["clean_shutdown"],
+ broker_type=["controller"],
+ sleep_time_secs=[0])
+ def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs):
+ """
+ Start a smoke test client, then kill one particular broker immediately before streams stats
+ Streams should throw an exception since it cannot create topics with the desired
+ replication factor of 3
+ """
+ self.setup_system()
+
+ # Sleep to allow test to run for a bit
+ time.sleep(sleep_time_secs)
+
+ # Fail brokers
+ self.fail_broker_type(failure_mode, broker_type);
+
+ return self.collect_results(sleep_time_secs)
+
+ @cluster(num_nodes=7)
+ @matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
+ num_failures=[2])
+ def test_many_brokers_bounce(self, failure_mode, num_failures):
+ """
+ Start a smoke test client, then kill a few brokers and ensure data is still received
+ Record if records are delivered
+ """
+ self.setup_system()
+
+ # Sleep to allow test to run for a bit
+ time.sleep(120)
+
+ # Fail brokers
+ self.fail_many_brokers(failure_mode, num_failures);
+
+ return self.collect_results(120)