You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/24 01:04:00 UTC

[3/7] samza git commit: job model generation cleanup

job model generation cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f15b9071
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f15b9071
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f15b9071

Branch: refs/heads/samza-standalone
Commit: f15b907127774bfb38f753a906b563560c6f657a
Parents: be6be8c
Author: navina <na...@apache.org>
Authored: Fri Dec 23 17:00:04 2016 -0800
Committer: navina <na...@apache.org>
Committed: Fri Dec 23 17:00:04 2016 -0800

----------------------------------------------------------------------
 .../grouper/task/BalancingTaskNameGrouper.java          |  9 ++++++++-
 .../main/java/org/apache/samza/zk/ZkJobCoordinator.java | 12 +++++++-----
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f15b9071/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
index f8295c8..1946271 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.container.grouper.task;
 
+import java.util.Collections;
 import java.util.Set;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.job.model.ContainerModel;
@@ -54,5 +55,11 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper {
    * @param localityManager provides a persisted task to container map to use as a baseline
    * @return                the grouped tasks in the form of ContainerModels
    */
-  Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
+  default Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) {
+    return Collections.<ContainerModel>emptySet();
+  }
+
+  default Set<ContainerModel> balance(Set<Integer> containerIds, Set<TaskModel> tasks, LocalityManager localityManager) {
+    return Collections.<ContainerModel>emptySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f15b9071/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 8c36ff2..3014c1b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -8,6 +8,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelManager$;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.processor.SamzaContainerController;
@@ -41,7 +43,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
   private Config config;
   private ZkKeyBuilder keyBuilder;
   private final ScheduleAfterDebounceTime debounceTimer;
-  //JobModelManager jobModelManager;
+  JobModelManager jobModelManager;
 
   public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
     this.zkUtils = zkUtils;
@@ -75,7 +77,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
         streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock
         .instance());
 
-    //jobModelManager = //JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null);
+    jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null);
 
     ////////////////////////////////////////////////////////////////////////////////////////////
   }
@@ -132,7 +134,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
     Map<String, String> configMap = new HashMap<>();
     Map<Integer, ContainerModel> containers = new HashMap<>();
     MapConfig config = new MapConfig(configMap);
-    JobModel jobModel = new JobModel(config, containers);
+    //JobModel jobModel = new JobModel(config, containers);
+    JobModel jobModel = jobModelManager.jobModel();
 
     log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
 
@@ -188,11 +191,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
     // ?????
     JobModel jobModel = getJobModel();
     log.info("pid=" + processorId + "got the new job model =" + jobModel);
-    /*
+
     containerController.startContainer(
         jobModel.getContainers().get(processorId),
         jobModel.getConfig(),
         jobModel.maxChangeLogStreamPartitions);
-   */
   }
 }