You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/11/02 16:38:40 UTC

samza git commit: SAMZA-1952: StreamPartitionCountMonitor for standalone.

Repository: samza
Updated Branches:
  refs/heads/master a528cc680 -> 743903272


SAMZA-1952: StreamPartitionCountMonitor for standalone.

This patch adds the capability to detect the partition change of the input streams of a stateless standalone jobs and trigger a re-balancing phase(which will essentially account for new partitions from input stream and distribute it to the live processors of the group).

Existing partition count detection of input streams is broken in yarn for stateful jobs. This will be addressed for both yarn and standalone as a part of #622

Author: Shanthoosh Venkataraman <sp...@usc.edu>

Reviewers: Boris Shkolnik <bo...@apache.org>

Closes #726 from shanthoosh/stream_partition_count_monitor_for_standalone


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74390327
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74390327
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74390327

Branch: refs/heads/master
Commit: 743903272ddd4639da32224b1ce544df69314aaa
Parents: a528cc6
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Fri Nov 2 09:38:30 2018 -0700
Committer: Boris S <bs...@linkedin.com>
Committed: Fri Nov 2 09:38:30 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 61 ++++++++++++----
 .../apache/samza/zk/TestZkJobCoordinator.java   | 76 +++++++++++++++++++-
 .../processor/TestZkLocalApplicationRunner.java | 65 ++++++++++++++++-
 3 files changed, 185 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/74390327/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 6d85c66..81c0465 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.HashMap;
 import java.util.List;
@@ -38,12 +37,14 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -53,6 +54,7 @@ import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
@@ -105,9 +107,11 @@ public class ZkJobCoordinator implements JobCoordinator {
   @VisibleForTesting
   ScheduleAfterDebounceTime debounceTimer;
 
+  @VisibleForTesting
+  StreamPartitionCountMonitor streamPartitionCountMonitor = null;
+
   ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
     this.config = config;
-
     this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
 
     this.processorId = createProcessorId(config);
@@ -179,6 +183,10 @@ public class ZkJobCoordinator implements JobCoordinator {
         LOG.debug("Shutting down metrics.");
         shutdownMetrics();
 
+        if (streamPartitionCountMonitor != null) {
+          streamPartitionCountMonitor.stop();
+        }
+
         if (coordinatorListener != null) {
           coordinatorListener.onCoordinatorStop();
         }
@@ -233,13 +241,11 @@ public class ZkJobCoordinator implements JobCoordinator {
   public void onProcessorChange(List<String> processors) {
     if (leaderElector.amILeader()) {
       LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + processors.size());
-      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors));
+      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, this::doOnProcessorChange);
     }
   }
 
-  void doOnProcessorChange(List<String> processors) {
-    // if list of processors is empty - it means we are called from 'onBecomeLeader'
-    // TODO: Handle empty currentProcessorIds.
+  void doOnProcessorChange() {
     List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs();
     Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds);
 
@@ -320,16 +326,39 @@ public class ZkJobCoordinator implements JobCoordinator {
     return new JobModel(new MapConfig(), model.getContainers());
   }
 
+  @VisibleForTesting
+  StreamPartitionCountMonitor getPartitionCountMonitor() {
+    StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
+
+    return new StreamPartitionCountMonitor(
+            inputStreamsToMonitor,
+            streamMetadata,
+            metrics.getMetricsRegistry(),
+            new JobConfig(config).getMonitorPartitionChangeFrequency(),
+            streamsChanged -> {
+        if (leaderElector.amILeader()) {
+          debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 0, this::doOnProcessorChange);
+        }
+      });
+  }
+
   class LeaderElectorListenerImpl implements LeaderElectorListener {
     @Override
     public void onBecomingLeader() {
       LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader");
       metrics.isLeader.set(true);
       zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils));
-      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
-          // actual actions to do are the same as onProcessorChange
-          doOnProcessorChange(new ArrayList<>());
-        });
+      if (!new StorageConfig(config).hasDurableStores()) {
+        // 1. Stop if there's a existing StreamPartitionCountMonitor running.
+        if (streamPartitionCountMonitor != null) {
+          streamPartitionCountMonitor.stop();
+        }
+        // 2. Start a new instance of StreamPartitionCountMonitor.
+        streamPartitionCountMonitor = getPartitionCountMonitor();
+        streamPartitionCountMonitor.start();
+      }
+      debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, ZkJobCoordinator.this::doOnProcessorChange);
     }
   }
 
@@ -371,10 +400,8 @@ public class ZkJobCoordinator implements JobCoordinator {
           LOG.warn("Barrier for version " + version + " timed out.");
           if (leaderElector.amILeader()) {
             LOG.info("Leader will schedule a new job model generation");
-            debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
-                // actual actions to do are the same as onProcessorChange
-                doOnProcessorChange(new ArrayList<>());
-              });
+            // actual actions to do are the same as onProcessorChange
+            debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, ZkJobCoordinator.this::doOnProcessorChange);
           }
         }
       }
@@ -477,6 +504,11 @@ public class ZkJobCoordinator implements JobCoordinator {
           if (leaderElector.amILeader()) {
             leaderElector.resignLeadership();
           }
+
+          if (streamPartitionCountMonitor != null) {
+            streamPartitionCountMonitor.stop();
+          }
+
           /**
            * After this event, one amongst the following two things could potentially happen:
            * A. On successful reconnect to another zookeeper server in ensemble, this processor is going to
@@ -513,7 +545,6 @@ public class ZkJobCoordinator implements JobCoordinator {
         default:
           // received SyncConnected, ConnectedReadOnly, and SaslAuthenticated. NoOp
           LOG.info("Got ZK event " + state.toString() + " for processor=" + processorId + ". Continue");
-          return;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74390327/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 50d6a42..083caad 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
@@ -36,7 +37,6 @@ import org.mockito.stubbing.Answer;
 import static junit.framework.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
-
 public class TestZkJobCoordinator {
   private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
   private static final String TEST_JOB_MODEL_VERSION = "1";
@@ -90,4 +90,78 @@ public class TestZkJobCoordinator {
     verify(mockDebounceTimer).cancelAllScheduledActions();
     verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
   }
+
+  @Test
+  public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+    ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    zkJobCoordinator.streamPartitionCountMonitor = monitor;
+
+    ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
+    zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
+    Mockito.verify(monitor).stop();
+  }
+
+  @Test
+  public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+    ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+
+    StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    zkJobCoordinator.streamPartitionCountMonitor = monitor;
+    when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
+
+    ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
+
+    listener.onBecomingLeader();
+
+    Mockito.verify(monitor).start();
+  }
+
+  @Test
+  public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+    ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+
+    StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    zkJobCoordinator.streamPartitionCountMonitor = monitor;
+
+    zkJobCoordinator.stop();
+
+    Mockito.verify(monitor).stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/74390327/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 2ee17c0..7411318 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -29,14 +29,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.Objects;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
 import kafka.utils.TestUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -51,6 +54,8 @@ import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.util.NoOpMetricsRegistry;
@@ -203,6 +208,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
         .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
         .put(TaskConfig.DROP_PRODUCER_ERROR(), "true")
         .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
+        .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
         .build();
     Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
 
@@ -646,4 +652,61 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status());
   }
 
+  /**
+   * A. Create a kafka topic with partition count set to 5.
+   * B. Create and launch a samza application which consumes events from the kafka topic.
+   * C. Validate that the {@link JobModel} contains 5 {@link SystemStreamPartition}'s.
+   * D. Increase the partition count of the input kafka topic to 100.
+   * E. Validate that the new {@link JobModel} contains 100 {@link SystemStreamPartition}'s.
+   */
+  @Test
+  public void testShouldGenerateJobModelOnPartitionCountChange() throws Exception {
+    publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    // Create StreamApplication from configuration.
+    CountDownLatch kafkaEventsConsumedLatch1 = new CountDownLatch(NUM_KAFKA_EVENTS);
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+
+    ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+            TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch1,
+            applicationConfig1), applicationConfig1);
+
+    appRunner1.run();
+    processedMessagesLatch1.await();
+
+    String jobModelVersion = zkUtils.getJobModelVersion();
+    JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+    Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
+
+    // Validate that the input partition count is 5 in the JobModel.
+    Assert.assertEquals(5, ssps.size());
+
+    // Increase the partition count of input kafka topic to 100.
+    AdminUtils.addPartitions(zkUtils(), inputKafkaTopic, 100, "", true, RackAwareMode.Enforced$.MODULE$);
+
+    long jobModelWaitTimeInMillis = 10;
+    while (Objects.equals(zkUtils.getJobModelVersion(), jobModelVersion)) {
+      LOGGER.info("Waiting for new jobModel to be published");
+      Thread.sleep(jobModelWaitTimeInMillis);
+      jobModelWaitTimeInMillis = jobModelWaitTimeInMillis * 2;
+    }
+
+    String newJobModelVersion = zkUtils.getJobModelVersion();
+    JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
+    ssps = getSystemStreamPartitions(newJobModel);
+
+    // Validate that the input partition count is 100 in the new JobModel.
+    Assert.assertEquals(100, ssps.size());
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
+  }
+
+  private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    jobModel.getContainers().forEach((containerName, containerModel) -> {
+        containerModel.getTasks().forEach((taskName, taskModel) -> ssps.addAll(taskModel.getSystemStreamPartitions()));
+      });
+    return ssps;
+  }
 }