You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2017/11/22 19:54:18 UTC
samza git commit: SAMZA-1507;
Create changelog streams in Leader(ZkJobCoordinator) for stateful
operators.
Repository: samza
Updated Branches:
refs/heads/master 7a2e19250 -> a322972d0
SAMZA-1507; Create changelog streams in Leader(ZkJobCoordinator) for stateful operators.
Verified with a test standalone job. Will add integration test for this as a part of fixing and reenabling standalone integration tests.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Boris Shkolnik <bo...@apache.org>
Closes #362 from shanthoosh/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a322972d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a322972d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a322972d
Branch: refs/heads/master
Commit: a322972d00474082917e6700c450dc1e5e98d7dc
Parents: 7a2e192
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Wed Nov 22 11:54:10 2017 -0800
Committer: Boris S <bo...@apache.org>
Committed: Wed Nov 22 11:54:10 2017 -0800
----------------------------------------------------------------------
.../org/apache/samza/zk/ZkJobCoordinator.java | 30 ++++++++++++++++++--
.../main/java/org/apache/samza/zk/ZkUtils.java | 2 +-
.../samza/coordinator/JobModelManager.scala | 2 +-
3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 9d44ec1..0509474 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,12 +20,13 @@ package org.apache.samza.zk;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.I0Itec.zkclient.IZkStateListener;
+import java.util.Objects;
import java.util.Set;
+import org.I0Itec.zkclient.IZkStateListener;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
@@ -33,11 +34,14 @@ import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.ZkConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.LeaderElector;
import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsReporter;
@@ -88,6 +92,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
private int debounceTimeMs;
+ private boolean hasCreatedChangeLogStreams = false;
+ private String cachedJobModelVersion = null;
+ private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
this.config = config;
@@ -188,6 +195,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
// Generate the JobModel
JobModel jobModel = generateNewJobModel(currentProcessorIds);
+ if (!hasCreatedChangeLogStreams) {
+ JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions);
+ hasCreatedChangeLogStreams = true;
+ }
// Assign the next version of JobModel
String currentJMVersion = zkUtils.getJobModelVersion();
String nextJMVersion;
@@ -279,7 +290,20 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<String> processors) {
- return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, processors);
+ String zkJobModelVersion = zkUtils.getJobModelVersion();
+ // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
+ if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
+ JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion);
+ for (ContainerModel containerModel : jobModel.getContainers().values()) {
+ containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId()));
+ }
+ cachedJobModelVersion = zkJobModelVersion;
+ }
+ /**
+ * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
+ * to host mapping) is passed in as null when building the jobModel.
+ */
+ return JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
}
class LeaderElectorListenerImpl implements LeaderElectorListener {
http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 6ca9052..2f60d52 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -407,7 +407,7 @@ public class ZkUtils {
* @return jobmodel version as a string
*/
public String getJobModelVersion() {
- String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath());
+ String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath(), true);
metrics.reads.inc();
return jobModelVersion;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index e915a8a..c2e0665 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -276,7 +276,7 @@ object JobModelManager extends Logging {
systemAdmins
}
- private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
+ def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
val changeLogSystemStreams = config
.getStoreNames
.filter(config.getChangelogStream(_).isDefined)