You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:50 UTC

[04/23] samza git commit: SAMZA-1336. Session disconnect propagation.

http://git-wip-us.apache.org/repos/asf/samza/blob/ce777164/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
deleted file mode 100644
index 58d92fb..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkStreamProcessorFailures.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.processor;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.ZkConfig;
-import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.TestZkStreamProcessorBase;
-import org.apache.samza.zk.TestZkUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-
-/**
- * Failure tests:
- * ZK unavailable.
- * One processor fails in process.
- */
-public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
-
-  private final static int BAD_MESSAGE_KEY = 1000;
-
-  @Override
-  protected String prefix() {
-    return "test_ZK_failure_";
-  }
-
-  @Before
-  public void setUp() {
-    super.setUp();
-  }
-
-  @Test(expected = org.apache.samza.SamzaException.class)
-  public void testZkUnavailable() {
-    map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk
-    map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); // shorter timeout
-    CountDownLatch startLatch = new CountDownLatch(1);
-    createStreamProcessor("1", map, startLatch, null); // this should fail with timeout exception
-    Assert.fail("should've thrown an exception");
-  }
-
-  @Test
-  // Test with a single processor failing.
-  // One processor fails (to simulate the failure we inject a special message (id > 1000) which causes the processor to
-  // throw an exception.
-  public void testFailStreamProcessor() {
-    final int numBadMessages = 4; // either of these bad messages will cause p1 to throw and exception
-    map.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "100");
-    map.put("processor.id.to.fail", "101");
-
-    // set number of events we expect to read by both processes in total:
-    // p1 will read messageCount/2 messages
-    // p2 will read messageCount/2 messages
-    // numBadMessages "bad" messages will be generated
-    // p2 will read 2 of the "bad" messages
-    // p1 will fail on the first of the "bad" messages
-    // a new job model will be generated
-    // and p2 will read all 2 * messageCount messages again, + numBadMessages (all of them this time)
-    // total 2 x messageCount / 2 + numBadMessages/2 + 2 * messageCount + numBadMessages
-    int totalEventsToBeConsumed = 3 * messageCount;
-
-    TestStreamTask.endLatch = new CountDownLatch(totalEventsToBeConsumed);
-    // create first processor
-    Object waitStart1 = new Object();
-    Object waitStop1 = new Object();
-    StreamProcessor sp1 = createStreamProcessor("101", map, waitStart1, waitStop1);
-    // start the first processor
-    Thread t1 = runInThread(sp1, TestStreamTask.endLatch);
-    t1.start();
-
-    // start the second processor
-    Object waitStart2 = new Object();
-    Object waitStop2 = new Object();
-    StreamProcessor sp2 = createStreamProcessor("102", map, waitStart2, waitStop2);
-    Thread t2 = runInThread(sp2, TestStreamTask.endLatch);
-    t2.start();
-
-    // wait until the 1st processor reports that it has started
-    waitForProcessorToStartStop(waitStart1);
-
-    // wait until the 2nd processor reports that it has started
-    waitForProcessorToStartStop(waitStart2);
-
-    // produce first batch of messages starting with 0
-    produceMessages(0, inputTopic, messageCount);
-
-    // make sure they consume all the messages
-    waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount);
-
-    // produce the bad messages
-    produceMessages(BAD_MESSAGE_KEY, inputTopic, 4);
-
-    waitForProcessorToStartStop(waitStop1);
-
-    // wait until the 2nd processor reports that it has stopped
-    waitForProcessorToStartStop(waitStop2);
-
-    // give some extra time to let the system to publish and distribute the new job model
-    TestZkUtils.sleepMs(300);
-
-    // produce the second batch of the messages, starting with 'messageCount'
-    produceMessages(messageCount, inputTopic, messageCount);
-
-    // wait until p2 consumes all the message by itself
-    waitUntilMessagesLeftN(0);
-
-    // shutdown p2
-    try {
-      stopProcessor(t2);
-      t2.join(1000);
-    } catch (InterruptedException e) {
-      Assert.fail("Failed to join finished thread:" + e.getLocalizedMessage());
-    }
-
-    // number of unique values we gonna read is from 0 to (2*messageCount - 1)
-    Map<Integer, Boolean> expectedValues = new HashMap<>(2 * messageCount);
-    for (int i = 0; i < 2 * messageCount; i++) {
-      expectedValues.put(i, false);
-    }
-    for (int i = BAD_MESSAGE_KEY; i < numBadMessages + BAD_MESSAGE_KEY; i++) {
-      //expectedValues.put(i, false);
-    }
-
-    verifyNumMessages(outputTopic, expectedValues, totalEventsToBeConsumed);
-  }
-}