You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/11/09 18:24:54 UTC
samza git commit: SAMZA-1487: Disable Flaky Zk Integration tests.
Repository: samza
Updated Branches:
refs/heads/master d806e9dab -> 33010a731
SAMZA-1487: Disable Flaky Zk Integration tests.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Prateek Maheshwari <pr...@apache.org>
Closes #353 from shanthoosh/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/33010a73
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/33010a73
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/33010a73
Branch: refs/heads/master
Commit: 33010a7314eca58ff6a92c2b1dbee1c331220637
Parents: d806e9d
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Nov 9 10:24:53 2017 -0800
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Thu Nov 9 10:24:53 2017 -0800
----------------------------------------------------------------------
.../samza/processor/TestZkStreamProcessor.java | 11 +++++------
.../samza/processor/TestZkStreamProcessorBase.java | 3 +--
.../processor/TestZkStreamProcessorFailures.java | 8 +++-----
.../processor/TestZkStreamProcessorSession.java | 5 ++---
.../test/processor/TestZkLocalApplicationRunner.java | 15 +++++++--------
5 files changed, 18 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
index 7253b29..d5e7221 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
@@ -21,7 +21,6 @@ package org.apache.samza.processor;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
-import org.junit.Test;
/**
@@ -35,17 +34,17 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
return "test_ZK_";
}
- @Test
+ //@Test
public void testSingleStreamProcessor() {
testStreamProcessor(new String[]{"1"});
}
- @Test
+ //@Test
public void testTwoStreamProcessors() {
testStreamProcessor(new String[]{"2", "3"});
}
- @Test
+ //@Test
public void testFiveStreamProcessors() {
testStreamProcessor(new String[]{"4", "5", "6", "7", "8"});
}
@@ -98,7 +97,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
verifyNumMessages(outputTopic, messageCount, messageCount);
}
- @Test
+ //@Test
/**
* Similar to the previous tests, but add another processor in the middle
*/ public void testStreamProcessorWithAdd() {
@@ -170,7 +169,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
verifyNumMessages(outputTopic, 2 * messageCount, totalEventsToGenerate);
}
- @Test
+ //@Test
/**
* same as other happy path messages, but with one processor removed in the middle
*/ public void testStreamProcessorWithRemove() {
http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index c848cde..5cde446 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -62,7 +62,6 @@ import org.apache.samza.zk.TestZkUtils;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.Assert;
-import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +90,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
return "";
}
- @Before
+// @Before
public void setUp() {
super.setUp();
// for each tests - make the common parts unique
http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
index 374e77c..540c69b 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
@@ -26,8 +26,6 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.zk.TestZkUtils;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
/**
@@ -44,12 +42,12 @@ public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
return "test_ZK_failure_";
}
- @Before
+// @Before
public void setUp() {
super.setUp();
}
- @Test(expected = org.apache.samza.SamzaException.class)
+ //@Test(expected = org.apache.samza.SamzaException.class)
public void testZkUnavailable() {
map.put(ZkConfig.ZK_CONNECT, "localhost:2222"); // non-existing zk
map.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, "3000"); // shorter timeout
@@ -58,7 +56,7 @@ public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
Assert.fail("should've thrown an exception");
}
- @Test
+ //@Test
// Test with a single processor failing.
// One processor fails (to simulate the failure we inject a special message (id > 1000) which causes the processor to
// throw an exception.
http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
index 880d766..40eeaf0 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
@@ -24,7 +24,6 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.zk.ZkJobCoordinator;
import org.junit.Assert;
-import org.junit.Test;
/**
@@ -38,7 +37,7 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase {
return "test_ZKS_";
}
- @Test
+ //@Test
public void testSingleStreamProcessor() {
testStreamProcessorWithSessionRestart(new String[]{"1"});
}
@@ -49,7 +48,7 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase {
testStreamProcessorWithSessionRestart(new String[]{"2", "3"});
}
- @Test
+ //@Test
public void testFiveStreamProcessors() {
testStreamProcessorWithSessionRestart(new String[]{"4", "5", "6", "7", "8"});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/33010a73/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index c550a3b..eb087bb 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -55,7 +55,6 @@ import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkKeyBuilder;
import org.apache.samza.zk.ZkUtils;
import org.junit.Rule;
-import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
@@ -114,7 +113,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
@Rule
public final ExpectedException expectedException = ExpectedException.none();
- @Override
+// @Override
public void setUp() {
super.setUp();
String uniqueTestId = UUID.randomUUID().toString();
@@ -150,7 +149,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
}
}
- @Override
+// @Override
public void tearDown() {
if (zookeeper().zookeeper().isRunning()) {
for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
@@ -198,7 +197,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
return applicationConfig;
}
- @Test
+ //@Test
public void shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() throws InterruptedException {
/**
* sspGrouper is set to AllSspToSingleTaskGrouperFactory for this test case(All ssp's from input kafka topic are mapped to a single task).
@@ -269,7 +268,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
}
- @Test
+ //@Test
public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
@@ -323,7 +322,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertEquals(2, jobModel.getContainers().size());
}
- @Test
+ //@Test
public void shouldFailWhenNewProcessorJoinsWithSameIdAsExistingProcessor() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
@@ -355,7 +354,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
applicationRunner3.run(streamApp3);
}
- @Test
+ //@Test
public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() throws Exception {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
@@ -427,7 +426,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertEquals(jobModel.getContainers(), newJobModel.getContainers());
}
- @Test
+ //@Test
public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);