You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/11/22 19:54:18 UTC

samza git commit: SAMZA-1507; Create changelog streams in Leader(ZkJobCoordinator) for stateful operators.

Repository: samza
Updated Branches:
  refs/heads/master 7a2e19250 -> a322972d0


SAMZA-1507; Create changelog streams in Leader(ZkJobCoordinator) for stateful operators.

Verified with a test standalone job. Will add integration test for this as a part of fixing and reenabling standalone integration tests.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Boris Shkolnik <bo...@apache.org>

Closes #362 from shanthoosh/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a322972d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a322972d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a322972d

Branch: refs/heads/master
Commit: a322972d00474082917e6700c450dc1e5e98d7dc
Parents: 7a2e192
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Wed Nov 22 11:54:10 2017 -0800
Committer: Boris S <bo...@apache.org>
Committed: Wed Nov 22 11:54:10 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 30 ++++++++++++++++++--
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  2 +-
 .../samza/coordinator/JobModelManager.scala     |  2 +-
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 9d44ec1..0509474 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,12 +20,13 @@ package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.I0Itec.zkclient.IZkStateListener;
+import java.util.Objects;
 import java.util.Set;
+import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
@@ -33,11 +34,14 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ZkConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsReporter;
@@ -88,6 +92,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
   private int debounceTimeMs;
+  private boolean hasCreatedChangeLogStreams = false;
+  private String cachedJobModelVersion = null;
+  private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
 
   ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
     this.config = config;
@@ -188,6 +195,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
+    if (!hasCreatedChangeLogStreams) {
+      JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions);
+      hasCreatedChangeLogStreams = true;
+    }
     // Assign the next version of JobModel
     String currentJMVersion = zkUtils.getJobModelVersion();
     String nextJMVersion;
@@ -279,7 +290,20 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
   private JobModel generateNewJobModel(List<String> processors) {
-    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, processors);
+    String zkJobModelVersion = zkUtils.getJobModelVersion();
+    // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
+    if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
+      JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
+      for (ContainerModel containerModel : jobModel.getContainers().values()) {
+        containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
+      }
+      cachedJobModelVersion = zkJobModelVersion;
+    }
+    /**
+     * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
+     * to host mapping) is passed in as null when building the jobModel.
+     */
+    return JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {

http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 6ca9052..2f60d52 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -407,7 +407,7 @@ public class ZkUtils {
    * @return jobmodel version as a string
    */
   public String getJobModelVersion() {
-    String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath());
+    String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath(), true);
     metrics.reads.inc();
     return jobModelVersion;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index e915a8a..c2e0665 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -276,7 +276,7 @@ object JobModelManager extends Logging {
     systemAdmins
   }
 
-  private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
+  def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
     val changeLogSystemStreams = config
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)