You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/12/24 01:04:00 UTC
[3/7] samza git commit: job model generation cleanup
job model generation cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f15b9071
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f15b9071
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f15b9071
Branch: refs/heads/samza-standalone
Commit: f15b907127774bfb38f753a906b563560c6f657a
Parents: be6be8c
Author: navina <na...@apache.org>
Authored: Fri Dec 23 17:00:04 2016 -0800
Committer: navina <na...@apache.org>
Committed: Fri Dec 23 17:00:04 2016 -0800
----------------------------------------------------------------------
.../grouper/task/BalancingTaskNameGrouper.java | 9 ++++++++-
.../main/java/org/apache/samza/zk/ZkJobCoordinator.java | 12 +++++++-----
2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f15b9071/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
index f8295c8..1946271 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.container.grouper.task;
+import java.util.Collections;
import java.util.Set;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.job.model.ContainerModel;
@@ -54,5 +55,11 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper {
* @param localityManager provides a persisted task to container map to use as a baseline
* @return the grouped tasks in the form of ContainerModels
*/
- Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
+ default Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) {
+ return Collections.<ContainerModel>emptySet();
+ }
+
+ default Set<ContainerModel> balance(Set<Integer> containerIds, Set<TaskModel> tasks, LocalityManager localityManager) {
+ return Collections.<ContainerModel>emptySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f15b9071/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 8c36ff2..3014c1b 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -8,6 +8,8 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelManager$;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
@@ -41,7 +43,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
private Config config;
private ZkKeyBuilder keyBuilder;
private final ScheduleAfterDebounceTime debounceTimer;
- //JobModelManager jobModelManager;
+ JobModelManager jobModelManager;
public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
this.zkUtils = zkUtils;
@@ -75,7 +77,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
streamMetadataCache = new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock
.instance());
- //jobModelManager = //JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null);
+ jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null);
////////////////////////////////////////////////////////////////////////////////////////////
}
@@ -132,7 +134,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
Map<String, String> configMap = new HashMap<>();
Map<Integer, ContainerModel> containers = new HashMap<>();
MapConfig config = new MapConfig(configMap);
- JobModel jobModel = new JobModel(config, containers);
+ //JobModel jobModel = new JobModel(config, containers);
+ JobModel jobModel = jobModelManager.jobModel();
log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
@@ -188,11 +191,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkListener {
// ?????
JobModel jobModel = getJobModel();
log.info("pid=" + processorId + "got the new job model =" + jobModel);
- /*
+
containerController.startContainer(
jobModel.getContainers().get(processorId),
jobModel.getConfig(),
jobModel.maxChangeLogStreamPartitions);
- */
}
}