You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/01 18:51:30 UTC
samza git commit: SAMZA-1305. SAMZA-1306. Unit test - zk unavailable.
Repository: samza
Updated Branches:
refs/heads/master 19f4d1295 -> a90ca11ef
SAMZA-1305. SAMZA-1306. Unit test - zk unavailable.
Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #198 from sborya/unitTestZkUnavailable2
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a90ca11e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a90ca11e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a90ca11e
Branch: refs/heads/master
Commit: a90ca11efc7c4ef31b70814caf74233dfc5cea3e
Parents: 19f4d12
Author: Boris Shkolnik <bo...@apache.org>
Authored: Thu Jun 1 11:51:22 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Thu Jun 1 11:51:22 2017 -0700
----------------------------------------------------------------------
.../apache/samza/processor/StreamProcessor.java | 29 +-
.../processor/TestZkStreamProcessorBase.java | 336 +++++++++++++++++++
.../test/processor/TestZkStreamProcessor.java | 151 +++------
.../processor/TestZkStreamProcessorBase.java | 270 ---------------
.../TestZkStreamProcessorFailures.java | 147 ++++++++
5 files changed, 545 insertions(+), 388 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 5c6a474..c0a2163 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -20,6 +20,11 @@ package org.apache.samza.processor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaContainerStatus;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
@@ -40,12 +45,6 @@ import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
/**
* StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
* independent process.
@@ -93,7 +92,7 @@ public class StreamProcessor {
*/
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
- this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener);
+ this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener, null);
}
/**
@@ -106,7 +105,7 @@ public class StreamProcessor {
*/
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) {
- this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener);
+ this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null);
}
/* package private */
@@ -118,7 +117,6 @@ public class StreamProcessor {
.getJobCoordinator(config);
}
- @VisibleForTesting
StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory,
StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
this.taskFactory = taskFactory;
@@ -126,22 +124,11 @@ public class StreamProcessor {
this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
this.customMetricsReporter = customMetricsReporters;
this.processorListener = processorListener;
- this.jobCoordinator = jobCoordinator;
+ this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : getJobCoordinator();
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
}
- private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
- Object taskFactory, StreamProcessorLifecycleListener processorListener) {
- this.taskFactory = taskFactory;
- this.config = config;
- this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
- this.customMetricsReporter = customMetricsReporters;
- this.processorListener = processorListener;
- this.jobCoordinator = getJobCoordinator();
- this.jobCoordinator.setListener(createJobCoordinatorListener());
- }
-
/**
* Asynchronously starts this {@link StreamProcessor}.
* <p>
http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
new file mode 100644
index 0000000..a315083
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.StandaloneIntegrationTestHarness;
+import org.apache.samza.test.StandaloneTestUtils;
+import org.apache.samza.util.Util;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness {
+ public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class);
+ public final static int BAD_MESSAGE_KEY = 1000;
+ // to avoid long sleeps, we rather use multiple attempts with shorter sleeps
+ protected final static int ATTEMPTS_NUMBER = 5;
+
+ protected AtomicInteger counter = new AtomicInteger(1);
+ protected String testSystem;
+ protected String inputTopic;
+ protected String outputTopic;
+ protected int messageCount = 40;
+
+ protected Map<String, String> map;
+
+ protected String prefix() {
+ return "";
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ // for each tests - make the common parts unique
+ int seqNum = counter.getAndAdd(1);
+ testSystem = prefix() + "test-system" + seqNum;
+ inputTopic = prefix() + "numbers" + seqNum;
+ outputTopic = prefix() + "output" + seqNum;
+
+ map = createConfigs(testSystem, inputTopic, outputTopic, messageCount);
+
+ // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
+ // TopicExistsException since StreamProcessor auto-creates them.
+ createTopics(inputTopic, outputTopic);
+ }
+
+ protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map, final Object mutexStart,
+ final Object mutexStop) {
+ map.put(ApplicationConfig.PROCESSOR_ID, pId);
+
+ Config config = new MapConfig(map);
+ JobCoordinator jobCoordinator =
+ Util.<JobCoordinatorFactory>getObj(new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName())
+ .getJobCoordinator(config);
+
+ StreamProcessorLifecycleListener listener = new StreamProcessorLifecycleListener() {
+ @Override
+ public void onStart() {
+ if (mutexStart != null) {
+ synchronized (mutexStart) {
+ mutexStart.notifyAll();
+ }
+ }
+ LOG.info("onStart is called for pid=" + pId);
+ }
+
+ @Override
+ public void onShutdown() {
+ if (mutexStop != null) {
+ synchronized (mutexStart) {
+ mutexStart.notify();
+ }
+ }
+ LOG.info("onShutdown is called for pid=" + pId);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.info("onFailure is called for pid=" + pId);
+ }
+ };
+
+ StreamProcessor processor =
+ new StreamProcessor(config, new HashMap<>(), (StreamTaskFactory) TestStreamTask::new, listener, jobCoordinator);
+
+ return processor;
+ }
+
+ protected void createTopics(String inputTopic, String outputTopic) {
+ TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties());
+ TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties());
+ }
+
+ protected Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic,
+ int messageCount) {
+ Map<String, String> configs = new HashMap<>();
+ configs.putAll(StandaloneTestUtils
+ .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask"));
+ configs.putAll(StandaloneTestUtils
+ .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
+ true));
+ configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
+ configs.put("app.messageCount", String.valueOf(messageCount));
+ configs.put("app.outputTopic", outputTopic);
+ configs.put("app.outputSystem", testSystem);
+ configs.put(ZkConfig.ZK_CONNECT, zkConnect());
+
+ configs.put("job.systemstreampartition.grouper.factory",
+ "org.apache.samza.container.grouper.stream.GroupByPartitionFactory");
+ configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+ configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
+
+ return configs;
+ }
+
+ /**
+ * Produces the provided number of messages to the topic.
+ */
+ protected void produceMessages(final int start, String topic, int numMessages) {
+ KafkaProducer producer = getKafkaProducer();
+ for (int i = start; i < numMessages + start; i++) {
+ try {
+ LOG.info("producing " + i);
+ producer.send(new ProducerRecord(topic, i % 2, String.valueOf(i), String.valueOf(i).getBytes())).get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout,
+ * and then stopping it.
+ */
+ protected Thread runInThread(final StreamProcessor processor, CountDownLatch latch) {
+ Thread t = new Thread() {
+
+ @Override
+ public void run() {
+ processor.start();
+ try {
+ // just wait
+ synchronized (this) {
+ this.wait(100000);
+ }
+ LOG.info("notified. Abandon the wait.");
+ } catch (InterruptedException e) {
+ LOG.error("wait interrupted" + e);
+ }
+ LOG.info("Stopping the processor");
+ processor.stop();
+ }
+ };
+ return t;
+ }
+
+ // for sequential values we can generate them automatically
+ protected void verifyNumMessages(String topic, int numberOfSequentialValues, int expectedNumMessages) {
+ // we should get each value one time
+ // create a map of all expected values to validate
+ Map<Integer, Boolean> expectedValues = new HashMap<>(numberOfSequentialValues);
+ for (int i = 0; i < numberOfSequentialValues; i++) {
+ expectedValues.put(i, false);
+ }
+ verifyNumMessages(topic, expectedValues, expectedNumMessages);
+ }
+
+ /**
+ * Consumes data from the topic until there are no new messages for a while
+ * and asserts that the number of consumed messages is as expected.
+ */
+ protected void verifyNumMessages(String topic, final Map<Integer, Boolean> expectedValues, int expectedNumMessages) {
+ KafkaConsumer consumer = getKafkaConsumer();
+ consumer.subscribe(Collections.singletonList(topic));
+
+ Map<Integer, Boolean> map = new HashMap<>(expectedValues);
+ int count = 0;
+ int emptyPollCount = 0;
+
+ while (count < expectedNumMessages && emptyPollCount < 5) {
+ ConsumerRecords records = consumer.poll(5000);
+ if (!records.isEmpty()) {
+ Iterator<ConsumerRecord> iterator = records.iterator();
+ while (iterator.hasNext()) {
+ ConsumerRecord record = iterator.next();
+ String val = new String((byte[]) record.value());
+ LOG.info("Got value " + val + "; count = " + count + "; out of " + expectedNumMessages);
+ Integer valI = Integer.valueOf(val);
+ if (valI < BAD_MESSAGE_KEY) {
+ map.put(valI, true);
+ count++;
+ }
+ }
+ } else {
+ emptyPollCount++;
+ LOG.warn("empty polls " + emptyPollCount);
+ }
+ }
+ // filter out numbers we did not get
+ long numFalse = map.values().stream().filter(v -> !v).count();
+ Assert.assertEquals("didn't get this number of events ", 0, numFalse);
+ Assert.assertEquals(expectedNumMessages, count);
+ }
+
+ protected void waitUntilMessagesLeftN(int untilLeft) {
+ int attempts = ATTEMPTS_NUMBER;
+ while (attempts > 0) {
+ long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount();
+ //System.out.println("2current count = " + leftEventsCount);
+ if (leftEventsCount == untilLeft) { // that much should be left
+ System.out.println("2read all. current count = " + leftEventsCount);
+ break;
+ }
+ TestZkUtils.sleepMs(1000);
+ attempts--;
+ }
+ Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
+ }
+
+ protected void waitForProcessorToStartStop(Object waitObject) {
+ try {
+ synchronized (waitObject) {
+ waitObject.wait(1000);
+ }
+ } catch (InterruptedException e) {
+ Assert.fail("got interrupted while waiting for the first processor to start.");
+ }
+ }
+
+ protected void stopProcessor(Thread threadName) {
+ synchronized (threadName) {
+ threadName.notify();
+ }
+ }
+
+ // StreamTaskClass
+ public static class TestStreamTask implements StreamTask, InitableTask {
+ // static field since there's no other way to share state b/w a task instance and
+ // stream processor when constructed from "task.class".
+ public static CountDownLatch endLatch;
+ protected int processedMessageCount = 0;
+ protected String processorId;
+ protected String outputTopic;
+ protected String outputSystem;
+ protected String processorIdToFail;
+
+ @Override
+ public void init(Config config, TaskContext taskContext)
+ throws Exception {
+ this.processorId = config.get(ApplicationConfig.PROCESSOR_ID);
+ this.outputTopic = config.get("app.outputTopic", "output");
+ this.outputSystem = config.get("app.outputSystem", "test-system");
+ this.processorIdToFail = config.get("processor.id.to.fail", "1");
+ }
+
+ @Override
+ public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
+ TaskCoordinator taskCoordinator)
+ throws Exception {
+
+ Object message = incomingMessageEnvelope.getMessage();
+
+ String key = new String((byte[]) incomingMessageEnvelope.getKey());
+ Integer val = Integer.valueOf((String) message);
+
+ LOG.info("Stream processor " + processorId + ";key=" + key + ";offset=" + incomingMessageEnvelope.getOffset()
+ + "; totalRcvd=" + processedMessageCount + ";val=" + val + "; ssp=" + incomingMessageEnvelope
+ .getSystemStreamPartition());
+
+ // inject a failure
+ if (val >= BAD_MESSAGE_KEY && processorId.equals(processorIdToFail)) {
+ LOG.info("process method failing for msg=" + message);
+ throw new Exception("Processing in the processor " + processorId + " failed ");
+ }
+
+ messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message));
+ processedMessageCount++;
+
+ synchronized (endLatch) {
+ if (Integer.valueOf(key) < BAD_MESSAGE_KEY) {
+ endLatch.countDown();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
index b409532..05b6ebe 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessor.java
@@ -19,14 +19,12 @@
package org.apache.samza.test.processor;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.processor.TestZkStreamProcessorBase;
import org.apache.samza.zk.TestZkUtils;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
@@ -36,28 +34,9 @@ import org.junit.Test;
*/
public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
- private AtomicInteger counter = new AtomicInteger(1);
- private String testSystem;
- private String inputTopic;
- private String outputTopic;
- private int messageCount = 40;
-
- private Map<String, String> map;
-
-
- @Before
- public void setupTest() {
- // for each tests - make the common parts unique
- int seqNum = counter.getAndAdd(1);
- testSystem = "test-system" + seqNum;
- inputTopic = "numbers" + seqNum;
- outputTopic = "output" + seqNum;
-
- map = createConfigs(testSystem, inputTopic, outputTopic, messageCount);
-
- // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
- // TopicExistsException since StreamProcessor auto-creates them.
- createTopics(inputTopic, outputTopic);
+ @Override
+ protected String prefix() {
+ return "test_ZK_";
}
@Test
@@ -67,40 +46,42 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
@Test
public void testTwoStreamProcessors() {
- testStreamProcessor(new String[]{"1", "2"});
+ testStreamProcessor(new String[]{"2", "3"});
}
@Test
public void testFiveStreamProcessors() {
- testStreamProcessor(new String[]{"1", "2", "3", "4", "5"});
+ testStreamProcessor(new String[]{"4", "5", "6", "7", "8"});
}
// main test method for happy path with fixed number of processors
private void testStreamProcessor(String[] processorIds) {
- // create a latch of the size == number of messages
+ // create a latch of the size equals to the number of messages
TestZkStreamProcessorBase.TestStreamTask.endLatch = new CountDownLatch(messageCount);
- // initialize the the processors
- // we need startLatch to know when the processor has been completely initialized
+ // initialize the processors
StreamProcessor[] streamProcessors = new StreamProcessor[processorIds.length];
- CountDownLatch[] startCountDownLatches = new CountDownLatch[processorIds.length];
+ // we need to know when the processor has started
+ Object[] startWait = new Object[processorIds.length];
for (int i = 0; i < processorIds.length; i++) {
- startCountDownLatches[i] = new CountDownLatch(1);
- streamProcessors[i] = createStreamProcessor(processorIds[i], map, startCountDownLatches[i], null);
+ startWait[i] = new Object();
+ streamProcessors[i] = createStreamProcessor(processorIds[i], map, startWait[i], null);
}
- // produce messageCount messages, starting with key '0'
+ // produce messageCount messages, starting with key 0
produceMessages(0, inputTopic, messageCount);
- // run the processors in a separate threads
+ // run the processors in separate threads
Thread[] threads = new Thread[processorIds.length];
for (int i = 0; i < processorIds.length; i++) {
threads[i] = runInThread(streamProcessors[i], TestZkStreamProcessorBase.TestStreamTask.endLatch);
threads[i].start();
// wait until the processor reports that it has started
try {
- startCountDownLatches[i].await(1000, TimeUnit.MILLISECONDS);
+ synchronized (startWait[i]) {
+ startWait[i].wait(1000);
+ }
} catch (InterruptedException e) {
Assert.fail("got interrupted while waiting for the " + i + "th processor to start.");
}
@@ -116,9 +97,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
// collect all the threads
try {
for (Thread t : threads) {
- synchronized (t) {
- t.notify(); // to stop the thread
- }
+ stopProcessor(t);
t.join(1000);
}
} catch (InterruptedException e) {
@@ -133,7 +112,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
* Similar to the previous tests, but add another processor in the middle
*/ public void testStreamProcessorWithAdd() {
- // set number of events we expect wo read by both processes in total:
+ // set number of events we expect to read by both processes in total:
// p1 - reads 'messageCount' at first
// p1 and p2 read all messageCount together, since they start from the beginning.
// so we expect total 3 x messageCounts
@@ -141,8 +120,9 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
// create first processor
- CountDownLatch startCountDownLatch1 = new CountDownLatch(1);
- StreamProcessor sp = createStreamProcessor("1", map, startCountDownLatch1, null);
+ Object startWait1 = new Object();
+ Object stopWait1 = new Object();
+ StreamProcessor sp = createStreamProcessor("20", map, startWait1, stopWait1);
// produce first batch of messages starting with 0
produceMessages(0, inputTopic, messageCount);
@@ -152,46 +132,36 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
t1.start();
// wait until the processor reports that it has started
- try {
- startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Assert.fail("got interrupted while waiting for the first processor to start.");
- }
+ waitForProcessorToStartStop(startWait1);
// make sure it consumes all the messages from the first batch
- waitUntilConsumedN(totalEventsToGenerate - messageCount);
+ waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
// start the second processor
- CountDownLatch countDownLatch2 = new CountDownLatch(1);
- StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null);
+ Object startWait2 = new Object();
+ StreamProcessor sp2 = createStreamProcessor("21", map, startWait2, null);
Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
t2.start();
- // wait until the processor reports that it has started
- try {
- countDownLatch2.await(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Assert.fail("got interrupted while waiting for the 2nd processor to start.");
- }
+ // wait until 2nd processor reports that it has started
+ waitForProcessorToStartStop(startWait2);
+
+ // wait until the 1st processor reports that it has stopped
+ waitForProcessorToStartStop(stopWait1);
- // wait for at least one full debounce time to let the system to publish and distribute the new job model
- TestZkUtils.sleepMs(3000);
+ // let the system to publish and distribute the new job model
+ TestZkUtils.sleepMs(600);
// produce the second batch of the messages, starting with 'messageCount'
produceMessages(messageCount, inputTopic, messageCount);
// wait until all the events are consumed
- // make sure it consumes all the messages from the first batch
- waitUntilConsumedN(0);
+ waitUntilMessagesLeftN(0);
// shutdown both
try {
- synchronized (t1) {
- t1.notify();
- }
- synchronized (t2) {
- t2.notify();
- }
+ stopProcessor(t1);
+ stopProcessor(t2);
t1.join(1000);
t2.join(1000);
} catch (InterruptedException e) {
@@ -216,68 +186,55 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
TestStreamTask.endLatch = new CountDownLatch(totalEventsToGenerate);
// create first processor
- CountDownLatch startCountDownLatch1 = new CountDownLatch(1);
- CountDownLatch stopCountDownLatch1 = new CountDownLatch(1);
- StreamProcessor sp1 = createStreamProcessor("1", map, startCountDownLatch1, stopCountDownLatch1);
+ Object waitStart1 = new Object();
+ Object waitStop1 = new Object();
+ StreamProcessor sp1 = createStreamProcessor("30", map, waitStart1, waitStop1);
// start the first processor
Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
t1.start();
// start the second processor
- CountDownLatch countDownLatch2 = new CountDownLatch(1);
- StreamProcessor sp2 = createStreamProcessor("2", map, countDownLatch2, null);
+ Object waitStart2 = new Object();
+ Object waitStop2 = new Object();
+ StreamProcessor sp2 = createStreamProcessor("31", map, waitStart2, waitStop2);
Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
t2.start();
// wait until the processor reports that it has started
- try {
- startCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Assert.fail("got interrupted while waiting for the first processor to start.");
- }
+ waitForProcessorToStartStop(waitStart1);
// wait until the processor reports that it has started
- try {
- countDownLatch2.await(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Assert.fail("got interrupted while waiting for the 2nd processor to start.");
- }
+ waitForProcessorToStartStop(waitStart2);
// produce first batch of messages starting with 0
produceMessages(0, inputTopic, messageCount);
// make sure they consume all the messages from the first batch
- waitUntilConsumedN(totalEventsToGenerate - messageCount);
+ waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
// stop the first processor
- synchronized (t1) {
- t1.notify(); // this should stop it
- }
+ stopProcessor(t1);
// wait until it's really down
- try {
- stopCountDownLatch1.await(1000, TimeUnit.MILLISECONDS);
- System.out.println("Processor 1 is down");
- } catch (InterruptedException e) {
- Assert.fail("got interrupted while waiting for the 1st processor to stop.");
- }
+ waitForProcessorToStartStop(waitStop1);
+
+ // processor1 will stop and start again. We wait for its stop to make sure we can count EXACTLY how many messages it reads.
+ waitForProcessorToStartStop(waitStop2);
- // wait for at least one full debounce time to let the system to publish and distribute the new job model
- TestZkUtils.sleepMs(3000);
+ // let the system to publish and distribute the new job model
+ TestZkUtils.sleepMs(300);
// produce the second batch of the messages, starting with 'messageCount'
produceMessages(messageCount, inputTopic, messageCount);
// wait until p2 consumes all the message by itself;
- waitUntilConsumedN(0);
+ waitUntilMessagesLeftN(0);
// shutdown p2
try {
- synchronized (t2) {
- t2.notify();
- }
+ stopProcessor(t2);
t2.join(1000);
} catch (InterruptedException e) {
Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
deleted file mode 100644
index 9320feb..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorBase.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.processor;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import kafka.utils.TestUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.ZkConfig;
-import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifecycleListener;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.StandaloneIntegrationTestHarness;
-import org.apache.samza.test.StandaloneTestUtils;
-import org.apache.samza.zk.TestZkUtils;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness {
- public final static Logger LOG = LoggerFactory.getLogger(TestZkStreamProcessorBase.class);
- public final static int BAD_MESSAGE_KEY = 1000;
- // to avoid long sleeps, we rather use multiple attempts with shorter sleeps
- private final static int ATTEMPTS_NUMBER = 5;
-
-
-
- // auxiliary methods
- protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map,
- final CountDownLatch startLatchCountDown, final CountDownLatch stopLatchCountDown) {
- map.put(ApplicationConfig.PROCESSOR_ID, pId);
-
- StreamProcessor processor = new StreamProcessor(new MapConfig(map), new HashMap<>(), TestStreamTask::new,
- new StreamProcessorLifecycleListener() {
-
- @Override
- public void onStart() {
- if (startLatchCountDown != null) {
- startLatchCountDown.countDown();
- }
- }
-
- @Override
- public void onShutdown() {
- if (stopLatchCountDown != null) {
- stopLatchCountDown.countDown();
- }
- LOG.info("onShutdown is called for pid=" + pId);
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.info("onFailure is called for pid=" + pId);
- }
- });
-
- return processor;
- }
-
- protected void createTopics(String inputTopic, String outputTopic) {
- TestUtils.createTopic(zkUtils(), inputTopic, 5, 1, servers(), new Properties());
- TestUtils.createTopic(zkUtils(), outputTopic, 5, 1, servers(), new Properties());
- }
-
- protected Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic,
- int messageCount) {
- Map<String, String> configs = new HashMap<>();
- configs.putAll(StandaloneTestUtils
- .getStandaloneConfigs("test-job", "org.apache.samza.test.processor.TestZkStreamProcessor.TestStreamTask"));
- configs.putAll(StandaloneTestUtils
- .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
- true));
- configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
- configs.put("app.messageCount", String.valueOf(messageCount));
- configs.put("app.outputTopic", outputTopic);
- configs.put("app.outputSystem", testSystem);
- configs.put(ZkConfig.ZK_CONNECT, zkConnect());
-
- configs.put("job.systemstreampartition.grouper.factory",
- "org.apache.samza.container.grouper.stream.GroupByPartitionFactory");
- configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-
- configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
-
- return configs;
- }
-
- /**
- * Produces the provided number of messages to the topic.
- */
- protected void produceMessages(final int start, String topic, int numMessages) {
- KafkaProducer producer = getKafkaProducer();
- for (int i = start; i < numMessages + start; i++) {
- try {
- LOG.info("producing " + i);
- producer.send(new ProducerRecord(topic, String.valueOf(i).getBytes())).get();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * Runs the provided stream processor by starting it, waiting on the provided latch with a timeout,
- * and then stopping it.
- */
- protected Thread runInThread(final StreamProcessor processor, CountDownLatch latch) {
- Thread t = new Thread() {
-
- @Override
- public void run() {
- processor.start();
- try {
- // just wait
- synchronized (this) {
- this.wait(100000);
- }
- LOG.info("notified. Abandon the wait.");
- } catch (InterruptedException e) {
- LOG.error("wait interrupted" + e);
- }
- LOG.info("Stopping the processor");
- processor.stop();
- }
- };
- return t;
- }
-
- // for sequential values we can generate them automatically
- protected void verifyNumMessages(String topic, int numberOfSequentialValues, int exectedNumMessages) {
- // we should get each value one time
- // create a map of all expected values to validate
- Map<Integer, Boolean> expectedValues = new HashMap<>(numberOfSequentialValues);
- for (int i = 0; i < numberOfSequentialValues; i++) {
- expectedValues.put(i, false);
- }
- verifyNumMessages(topic, expectedValues, exectedNumMessages);
- }
-
- /**
- * Consumes data from the topic until there are no new messages for a while
- * and asserts that the number of consumed messages is as expected.
- */
- protected void verifyNumMessages(String topic, final Map<Integer, Boolean> expectedValues, int expectedNumMessages) {
- KafkaConsumer consumer = getKafkaConsumer();
- consumer.subscribe(Collections.singletonList(topic));
-
- Map<Integer, Boolean> map = new HashMap<>(expectedValues);
- int count = 0;
- int emptyPollCount = 0;
-
- while (count < expectedNumMessages && emptyPollCount < 5) {
- ConsumerRecords records = consumer.poll(5000);
- if (!records.isEmpty()) {
- Iterator<ConsumerRecord> iterator = records.iterator();
- while (iterator.hasNext()) {
- ConsumerRecord record = iterator.next();
- String val = new String((byte[]) record.value());
- LOG.info("Got value " + val);
- map.put(Integer.valueOf(val), true);
- count++;
- }
- } else {
- emptyPollCount++;
- LOG.warn("empty polls " + emptyPollCount);
- }
- }
- // filter out numbers we did not get
- long numFalse = map.values().stream().filter(v -> !v).count();
- Assert.assertEquals("didn't get this number of events ", 0, numFalse);
- Assert.assertEquals(expectedNumMessages, count);
- }
-
- // StreamTaskClass
- public static class TestStreamTask implements StreamTask, InitableTask {
- // static field since there's no other way to share state b/w a task instance and
- // stream processor when constructed from "task.class".
- static CountDownLatch endLatch;
- private int processedMessageCount = 0;
- private String processorId;
- private String outputTopic;
- private String outputSystem;
-
- @Override
- public void init(Config config, TaskContext taskContext)
- throws Exception {
- this.processorId = config.get(ApplicationConfig.PROCESSOR_ID);
- this.outputTopic = config.get("app.outputTopic", "output");
- this.outputSystem = config.get("app.outputSystem", "test-system");
- }
-
- @Override
- public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
- TaskCoordinator taskCoordinator)
- throws Exception {
-
- Object message = incomingMessageEnvelope.getMessage();
-
- // inject a failure
- if (Integer.valueOf((String) message) >= BAD_MESSAGE_KEY && processorId.equals("1")) {
- LOG.info("process method will fail for msg=" + message);
- throw new Exception("Processing in the processor " + processorId + " failed ");
- }
-
- messageCollector.send(new OutgoingMessageEnvelope(new SystemStream(outputSystem, outputTopic), message));
- processedMessageCount++;
-
- LOG.info("Stream processor " + processorId + ";offset=" + incomingMessageEnvelope.getOffset() + "; totalRcvd="
- + processedMessageCount + ";received " + message + "; ssp=" + incomingMessageEnvelope
- .getSystemStreamPartition());
-
- synchronized (endLatch) {
- endLatch.countDown();
- }
- }
- }
-
- protected void waitUntilConsumedN(int untilLeft) {
- int attempts = ATTEMPTS_NUMBER;
- while (attempts > 0) {
- long leftEventsCount = TestZkStreamProcessorBase.TestStreamTask.endLatch.getCount();
- //System.out.println("2current count = " + leftEventsCount);
- if (leftEventsCount == untilLeft) { // should read all of them
- //System.out.println("2read all. current count = " + leftEventsCount);
- break;
- }
- TestZkUtils.sleepMs(1000);
- attempts--;
- }
- Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a90ca11e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
new file mode 100644
index 0000000..58d92fb
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.processor.TestZkStreamProcessorBase;
+import org.apache.samza.zk.TestZkUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Failure tests:
+ * ZK unavailable.
+ * One processor fails in process.
+ */
+public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
+
+ private final static int BAD_MESSAGE_KEY = 1000;
+
+ @Override
+ protected String prefix() {
+ return "test_ZK_failure_";
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ }
+
+ @Test(expected = org.apache.samza.SamzaException.class)
+ public void testZkUnavailable() {
+ map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk
+ map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); // shorter timeout
+ CountDownLatch startLatch = new CountDownLatch(1);
+ createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception
+ Assert.fail("should've thrown an exception");
+ }
+
+ @Test
+ // Test with a single processor failing.
+ // One processor fails (to simulate the failure we inject a special message (id > 1000) which causes the processor to
+ // throw an exception.
+ public void testFailStreamProcessor() {
+ final int numBadMessages = 4; // either of these bad messages will cause p1 to throw and exception
+ map.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "100");
+ map.put("processor.id.to.fail", "101");
+
+ // set number of events we expect to read by both processes in total:
+ // p1 will read messageCount/2 messages
+ // p2 will read messageCount/2 messages
+ // numBadMessages "bad" messages will be generated
+ // p2 will read 2 of the "bad" messages
+ // p1 will fail on the first of the "bad" messages
+ // a new job model will be generated
+ // and p2 will read all 2 * messageCount messages again, + numBadMessages (all of them this time)
+ // total 2 x messageCount / 2 + numBadMessages/2 + 2 * messageCount + numBadMessages
+ int totalEventsToBeConsumed = 3 * messageCount;
+
+ TestStreamTask.endLatch = new CountDownLatch(totalEventsToBeConsumed);
+ // create first processor
+ Object waitStart1 = new Object();
+ Object waitStop1 = new Object();
+ StreamProcessor sp1 = createStreamProcessor("101", map, waitStart1, waitStop1);
+ // start the first processor
+ Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
+ t1.start();
+
+ // start the second processor
+ Object waitStart2 = new Object();
+ Object waitStop2 = new Object();
+ StreamProcessor sp2 = createStreamProcessor("102", map, waitStart2, waitStop2);
+ Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
+ t2.start();
+
+ // wait until the 1st processor reports that it has started
+ waitForProcessorToStartStop(waitStart1);
+
+ // wait until the 2nd processor reports that it has started
+ waitForProcessorToStartStop(waitStart2);
+
+ // produce first batch of messages starting with 0
+ produceMessages(0, inputTopic, messageCount);
+
+ // make sure they consume all the messages
+ waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount);
+
+ // produce the bad messages
+ produceMessages(BAD_MESSAGE_KEY, inputTopic, 4);
+
+ waitForProcessorToStartStop(waitStop1);
+
+ // wait until the 2nd processor reports that it has stopped
+ waitForProcessorToStartStop(waitStop2);
+
+ // give some extra time to let the system to publish and distribute the new job model
+ TestZkUtils.sleepMs(300);
+
+ // produce the second batch of the messages, starting with 'messageCount'
+ produceMessages(messageCount, inputTopic, messageCount);
+
+ // wait until p2 consumes all the message by itself
+ waitUntilMessagesLeftN(0);
+
+ // shutdown p2
+ try {
+ stopProcessor(t2);
+ t2.join(1000);
+ } catch (InterruptedException e) {
+ Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
+ }
+
+ // number of unique values we gonna read is from 0 to (2*messageCount - 1)
+ Map<Integer, Boolean> expectedValues = new HashMap<>(2 * messageCount);
+ for (int i = 0; i < 2 * messageCount; i++) {
+ expectedValues.put(i, false);
+ }
+ for (int i = BAD_MESSAGE_KEY; i < numBadMessages + BAD_MESSAGE_KEY; i++) {
+ //expectedValues.put(i, false);
+ }
+
+ verifyNumMessages(outputTopic, expectedValues, totalEventsToBeConsumed);
+ }
+}