You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/04 18:49:36 UTC
samza git commit: Reenable LocalZkApplicationRunner tests.
Repository: samza
Updated Branches:
refs/heads/master 966730ee6 -> cf5efe761
Reenable LocalZkApplicationRunner tests.
Add back commented out ZkLocalApplicationRunner tests(Was dependent upon error propagation from SamzaContainer to LocalApplicationRunner).
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #250 from shanthoosh/fix_broken_tests
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cf5efe76
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cf5efe76
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cf5efe76
Branch: refs/heads/master
Commit: cf5efe761d0d901e8a98d195fba9a6778879d0fc
Parents: 966730e
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Fri Aug 4 11:49:29 2017 -0700
Committer: navina <na...@apache.org>
Committed: Fri Aug 4 11:49:29 2017 -0700
----------------------------------------------------------------------
.../apache/samza/zk/TestZkJobCoordinator.java | 1 -
.../processor/TestZkLocalApplicationRunner.java | 145 +++++++++++--------
2 files changed, 84 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/cf5efe76/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index fd6065a..117d458 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -44,7 +44,6 @@ public class TestZkJobCoordinator {
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION);
- Mockito.doNothing().when(zkJobCoordinator).stop();
Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/cf5efe76/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index cf0a242..9ca48ad 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -33,9 +33,11 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminUtils;
+import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
@@ -64,6 +66,7 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
import static org.junit.Assert.*;
@@ -78,14 +81,15 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private static final Logger LOGGER = LoggerFactory.getLogger(TestZkLocalApplicationRunner.class);
private static final int NUM_KAFKA_EVENTS = 300;
- private static final int ZK_CONNECTION_TIMEOUT_MS = 10000;
+ private static final int ZK_CONNECTION_TIMEOUT_MS = 100;
private static final String TEST_SYSTEM = "TestSystemName";
private static final String TEST_SSP_GROUPER_FACTORY = "org.apache.samza.container.grouper.stream.GroupByPartitionFactory";
private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
private static final String TEST_JOB_NAME = "test-job";
- private static final String TASK_SHUTDOWN_MS = "5000";
+ private static final String TASK_SHUTDOWN_MS = "3000";
+ private static final String JOB_DEBOUNCE_TIME_MS = "1000";
private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
private String inputKafkaTopic;
@@ -100,9 +104,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private String testStreamAppName;
private String testStreamAppId;
- // Set 90 seconds as max execution time for each test.
@Rule
- public Timeout testTimeOutInMillis = new Timeout(90000);
+ public Timeout testTimeOutInMillis = new Timeout(120000);
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@@ -138,12 +141,14 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
@Override
public void tearDown() {
- for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
- LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
- AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
+ if (zookeeper().zookeeper().isRunning()) {
+ for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
+ LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
+ AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
+ }
+ zkUtils.close();
+ super.tearDown();
}
- zkUtils.close();
- super.tearDown();
}
private void publishKafkaEvents(String topic, int numEvents, String streamProcessorId) {
@@ -175,6 +180,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
.put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
.put(JobConfig.JOB_NAME(), TEST_JOB_NAME)
.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
+ .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
.build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
@@ -255,10 +261,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
@Test
public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
// Set up kafka topics.
- publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+ publishKafkaEvents(inputKafkaTopic, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
// Create stream applications.
- CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
+ CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(2 * NUM_KAFKA_EVENTS);
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
@@ -278,11 +284,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
processedMessagesLatch3.await();
// Verifications before killing the leader.
- JobModel jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion());
- String prevJobModelVersion = zkUtils.getJobModelVersion();
+ String jobModelVersion = zkUtils.getJobModelVersion();
+ JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
assertEquals(3, jobModel.getContainers().size());
assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), jobModel.getContainers().keySet());
- assertEquals("1", prevJobModelVersion);
+ assertEquals("1", jobModelVersion);
List<String> processorIdsFromZK = zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS));
@@ -291,6 +297,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Kill the leader. Since streamApp1 is the first to join the cluster, it's the leader.
applicationRunner1.kill(streamApp1);
+ applicationRunner1.waitForFinish();
kafkaEventsConsumedLatch.await();
// Verifications after killing the leader.
@@ -298,11 +305,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
processorIdsFromZK = zkUtils.getActiveProcessorsIDs(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2]));
assertEquals(2, processorIdsFromZK.size());
assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0));
- jobModel = zkUtils.getJobModel(zkUtils.getJobModelVersion());
+ jobModelVersion = zkUtils.getJobModelVersion();
+ assertEquals("2", jobModelVersion);
+ jobModel = zkUtils.getJobModel(jobModelVersion);
assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
- String currentJobModelVersion = zkUtils.getJobModelVersion();
assertEquals(2, jobModel.getContainers().size());
- assertEquals("2", currentJobModelVersion);
}
@Test
@@ -337,34 +344,22 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
applicationRunner3.run(streamApp3);
}
- // Depends upon SAMZA-1302
- // @Test(expected = Exception.class)
- public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException {
- // Create StreamApplications.
- StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, null);
- StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, null);
-
- // Run stream applications.
- applicationRunner1.run(streamApp1);
- applicationRunner2.run(streamApp2);
-
- applicationRunner1.kill(streamApp1);
- applicationRunner1.waitForFinish();
- assertEquals(ApplicationStatus.SuccessfulFinish, applicationRunner1.status(streamApp2));
+ @Test
+ public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() throws Exception {
+ // Set up kafka topics.
+ publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
- // Kill zookeeper server and expect job model regeneration and ZK fencing will fail with exception.
- zookeeper().shutdown();
+ /**
+ * Custom listeners can't be plugged in for transition events(generatingNewJobModel, waitingForProcessors, waitingForBarrierCompletion etc) from zkJobCoordinator. Only possible listeners
+ * are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry.
+ */
+ Map<String, String> debounceTimeConfig = ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
- applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
- applicationRunner1.run(streamApp1);
- // This line should throw exception since Zookeeper is unavailable.
- applicationRunner1.waitForFinish();
- }
+ Config applicationConfig1 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId), debounceTimeConfig));
+ Config applicationConfig2 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId), debounceTimeConfig));
- // Depends upon SAMZA-1302
- // @Test
- public void testRollingUpgrade() throws Exception {
- publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+ LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
+ LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
StreamApplicationCallback streamApplicationCallback = messagesProcessed::add;
@@ -393,37 +388,65 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
int lastProcessedMessageId = -1;
for (TestKafkaEvent message : messagesProcessed) {
- lastProcessedMessageId = Math.max(lastProcessedMessageId, Integer.parseInt(message.getEventId()));
+ lastProcessedMessageId = Math.max(lastProcessedMessageId, Integer.parseInt(message.getEventData()));
}
+ messagesProcessed.clear();
LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1);
+ processedMessagesLatch1 = new CountDownLatch(1);
+ publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+ streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch);
applicationRunner4.run(streamApp1);
- applicationRunner4.waitForFinish();
- // Kill both the stream applications.
- applicationRunner4.kill(streamApp1);
- applicationRunner4.waitForFinish();
- applicationRunner2.kill(streamApp2);
- applicationRunner2.waitForFinish();
+ processedMessagesLatch1.await();
// Read new job model after rolling upgrade.
String newJobModelVersion = zkUtils.getJobModelVersion();
JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
// This should be continuation of last processed message.
- int nextSeenMessageId = Integer.parseInt(messagesProcessed.get(0).getEventId());
- assertTrue(lastProcessedMessageId < nextSeenMessageId);
+ int nextSeenMessageId = Integer.parseInt(messagesProcessed.get(0).getEventData());
+ assertTrue(lastProcessedMessageId <= nextSeenMessageId);
+ assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion));
+ assertEquals(jobModel.getContainers(), newJobModel.getContainers());
+ }
- // Assertions on job model read from zookeeper.
- assertFalse(newJobModelVersion.equals(jobModelVersion));
- assertEquals(jobModel, newJobModel);
- assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
- String currentJobModelVersion = zkUtils.getJobModelVersion();
- List<String> processorIdsFromZK = zkUtils.getActiveProcessorsIDs(Arrays.asList(PROCESSOR_IDS));
- assertEquals(3, processorIdsFromZK.size());
- assertEquals(ImmutableList.of(PROCESSOR_IDS[1], PROCESSOR_IDS[2]), processorIdsFromZK);
- assertEquals(2, jobModel.getContainers().size());
- assertEquals("2", currentJobModelVersion);
+ @Test
+ public void shouldKillStreamAppWhenZooKeeperDiesBeforeLeaderReElection() throws InterruptedException {
+ // Set up kafka topics.
+ publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG), "1000"));
+ MapConfig applicationRunnerConfig1 = new MapConfig(ImmutableList.of(applicationConfig1, kafkaProducerConfig));
+ MapConfig applicationRunnerConfig2 = new MapConfig(ImmutableList.of(applicationConfig2, kafkaProducerConfig));
+ LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationRunnerConfig1);
+ LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationRunnerConfig2);
+
+ CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+ CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+ // Create StreamApplications.
+ StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, null);
+ StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, null);
+
+ // Run stream applications.
+ applicationRunner1.run(streamApp1);
+ applicationRunner2.run(streamApp2);
+
+ processedMessagesLatch1.await();
+ processedMessagesLatch2.await();
+
+ // Non daemon thread in brokers reconnect repeatedly to zookeeper on failures. Manually shutting them down.
+ List<KafkaServer> kafkaServers = JavaConverters.bufferAsJavaListConverter(this.servers()).asJava();
+ kafkaServers.forEach(KafkaServer::shutdown);
+
+ zookeeper().shutdown();
+
+ applicationRunner1.waitForFinish();
+ applicationRunner2.waitForFinish();
+
+ assertEquals(ApplicationStatus.UnsuccessfulFinish, applicationRunner1.status(streamApp1));
+ assertEquals(ApplicationStatus.UnsuccessfulFinish, applicationRunner2.status(streamApp2));
}
public interface StreamApplicationCallback {