You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/05 18:57:04 UTC
[3/3] samza git commit: SAMZA-1973: Unify the TaskNameGrouper
interface for yarn and standalone.
SAMZA-1973: Unify the TaskNameGrouper interface for yarn and standalone.
This patch consists of the following changes:
* Unify the different methods present in the TaskNameGrouper interface. This will enable us to have a single interface method usable for both the yarn and standalone models.
* Generate locationId aware task assignment to processors in standalone.
* Move the task assignment persistence logic from a custom `TaskNameGrouper` implementation to `JobModelManager`, so that this works for any kind of custom group.
* General code clean up in `JobModelManager`, `TaskAssignmentManager` and in other samza internal classes.
* Read/write taskLocality of the processors in standalone.
* Updated the existing java docs and added java docs where they were missing.
Testing:
* Fixed the existing unit-tests due to the changes.
* Added new unit tests for the functionality changed added as a part of this patch.
* Tested this patch with a sample job from `hello-samza` project and verified that it works as expected.
Please refer to [SEP-11](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309) for more details.
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Author: svenkata <sv...@linkedin.com>
Reviewers: Prateek M<pm...@linkedin.com>
Closes #790 from shanthoosh/task_name_grouper_changes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5ea72584
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5ea72584
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5ea72584
Branch: refs/heads/master
Commit: 5ea72584f6b92937ec130f486d6f70603b7188c2
Parents: c7e5dcb
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Wed Dec 5 10:56:55 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Dec 5 10:56:55 2018 -0800
----------------------------------------------------------------------
.../org/apache/samza/job/model/TaskModel.java | 1 -
.../samza/coordinator/AzureJobCoordinator.java | 7 +-
.../ClusterBasedJobCoordinator.java | 3 +-
.../samza/config/ClusterManagerConfig.java | 14 +-
.../grouper/task/BalancingTaskNameGrouper.java | 5 +-
.../grouper/task/GroupByContainerCount.java | 228 ++++---------
.../task/GroupByContainerCountFactory.java | 3 +-
.../grouper/task/GroupByContainerIds.java | 171 ++++++++--
.../container/grouper/task/GrouperMetadata.java | 58 ++++
.../grouper/task/GrouperMetadataImpl.java | 72 +++++
.../grouper/task/TaskAssignmentManager.java | 3 -
.../samza/container/grouper/task/TaskGroup.java | 85 +++++
.../container/grouper/task/TaskNameGrouper.java | 39 ++-
.../grouper/task/TaskNameGrouperFactory.java | 2 +-
.../samza/execution/ExecutionPlanner.java | 2 +-
.../apache/samza/processor/StreamProcessor.java | 3 +-
.../samza/runtime/LocalContainerRunner.java | 18 +-
.../standalone/PassthroughJobCoordinator.java | 30 +-
.../apache/samza/storage/StorageRecovery.java | 5 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 74 ++++-
.../main/java/org/apache/samza/zk/ZkUtils.java | 27 ++
.../apache/samza/container/SamzaContainer.scala | 12 +-
.../samza/coordinator/JobModelManager.scala | 231 +++++++++----
.../samza/job/local/ProcessJobFactory.scala | 2 +-
.../samza/job/local/ThreadJobFactory.scala | 2 +-
.../grouper/task/TestGroupByContainerCount.java | 320 ++++++-------------
.../grouper/task/TestGroupByContainerIds.java | 292 +++++++++++++++--
.../grouper/task/TestTaskAssignmentManager.java | 3 -
.../samza/container/mock/ContainerMocks.java | 6 +-
.../coordinator/JobModelManagerTestUtil.java | 17 +-
.../samza/coordinator/TestJobModelManager.java | 114 ++++++-
.../java/org/apache/samza/zk/TestZkUtils.java | 73 ++++-
.../samza/container/TestSamzaContainer.scala | 38 ++-
.../apache/samza/test/framework/TestRunner.java | 2 +-
.../processor/TestZkStreamProcessorBase.java | 2 -
.../processor/TestZkLocalApplicationRunner.java | 10 +-
.../samza/validation/YarnJobValidationTool.java | 6 +-
.../webapp/TestApplicationMasterRestClient.java | 4 +-
38 files changed, 1348 insertions(+), 636 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
index 7ee7609..36917cf 100644
--- a/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -99,7 +99,6 @@ public class TaskModel implements Comparable<TaskModel> {
}
@Override
-
public String toString() {
return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 96f628c..076ab54 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -30,6 +30,8 @@ import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.coordinator.data.BarrierState;
import org.apache.samza.coordinator.data.ProcessorEntity;
import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
@@ -54,7 +56,6 @@ import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -365,8 +366,8 @@ public class AzureJobCoordinator implements JobCoordinator {
}
// Generate the new JobModel
- JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(),
- null, streamMetadataCache, currentProcessorIds);
+ GrouperMetadata grouperMetadata = new GrouperMetadataImpl(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+ JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), streamMetadataCache, grouperMetadata);
LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
// Publish the new job model
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 4c5a34b..0eddbf2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -185,8 +185,7 @@ public class ClusterBasedJobCoordinator {
// build a JobModelManager and ChangelogStreamManager and perform partition assignments.
changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
- jobModelManager =
- JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
+ jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping(), metrics);
config = jobModelManager.jobModel().getConfig();
hasDurableStores = new StorageConfig(config).hasDurableStores();
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index cb86a58..eda1be8 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -52,10 +52,14 @@ public class ClusterManagerConfig extends MapConfig {
private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000;
/**
- * Flag to indicate if host-affinity is enabled for the job or not
+ * NOTE: This field is deprecated.
*/
public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled";
- public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled";
+
+ /**
+ * Flag to indicate if host-affinity is enabled for the job or not
+ */
+ public static final String JOB_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled";
/**
* Number of CPU cores to request from the cluster manager per container
@@ -145,10 +149,10 @@ public class ClusterManagerConfig extends MapConfig {
}
public boolean getHostAffinityEnabled() {
- if (containsKey(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED)) {
- return getBoolean(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED);
+ if (containsKey(JOB_HOST_AFFINITY_ENABLED)) {
+ return getBoolean(JOB_HOST_AFFINITY_ENABLED);
} else if (containsKey(HOST_AFFINITY_ENABLED)) {
- log.info("Configuration {} is deprecated. Please use {}", HOST_AFFINITY_ENABLED, CLUSTER_MANAGER_HOST_AFFINITY_ENABLED);
+ log.warn("Configuration {} is deprecated. Please use {}", HOST_AFFINITY_ENABLED, JOB_HOST_AFFINITY_ENABLED);
return getBoolean(HOST_AFFINITY_ENABLED);
} else {
return false;
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
index f8295c8..91eab54 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
@@ -54,5 +54,8 @@ public interface BalancingTaskNameGrouper extends TaskNameGrouper {
* @param localityManager provides a persisted task to container map to use as a baseline
* @return the grouped tasks in the form of ContainerModels
*/
- Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager);
+ @Deprecated
+ default Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) {
+ return group(tasks);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 759f82e..8a741db 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -27,19 +27,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.SamzaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Group the SSP taskNames by dividing the number of taskNames into the number
* of containers (n) and assigning n taskNames to each container as returned by
@@ -51,19 +45,21 @@ import org.slf4j.LoggerFactory;
* TODO: SAMZA-1197 - need to modify balance to work with processorId strings
*/
public class GroupByContainerCount implements BalancingTaskNameGrouper {
- private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class);
+ private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerCount.class);
private final int containerCount;
- private final Config config;
- public GroupByContainerCount(Config config) {
- this.containerCount = new JobConfig(config).getContainerCount();
- this.config = config;
- if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container");
+ public GroupByContainerCount(int containerCount) {
+ if (containerCount <= 0) {
+ throw new IllegalArgumentException("Must have at least one container");
+ }
+ this.containerCount = containerCount;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public Set<ContainerModel> group(Set<TaskModel> tasks) {
-
validateTasks(tasks);
// Sort tasks by taskName.
@@ -89,79 +85,63 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
return Collections.unmodifiableSet(containerModels);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- public Set<ContainerModel> balance(Set<TaskModel> tasks, LocalityManager localityManager) {
-
+ public Set<ContainerModel> group(Set<TaskModel> tasks, GrouperMetadata grouperMetadata) {
validateTasks(tasks);
- if (localityManager == null) {
- log.info("Locality manager is null. Cannot read or write task assignments. Invoking grouper.");
+ List<TaskGroup> containers = getPreviousContainers(grouperMetadata, tasks.size());
+ if (containers == null || containers.size() == 1 || containerCount == 1) {
+ LOG.info("Balancing does not apply. Invoking grouper.");
return group(tasks);
}
- TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
- taskAssignmentManager.init();
- try {
- List<TaskGroup> containers = getPreviousContainers(taskAssignmentManager, tasks.size());
- if (containers == null || containers.size() == 1 || containerCount == 1) {
- log.info("Balancing does not apply. Invoking grouper.");
- Set<ContainerModel> models = group(tasks);
- saveTaskAssignments(models, taskAssignmentManager);
- return models;
- }
-
- int prevContainerCount = containers.size();
- int containerDelta = containerCount - prevContainerCount;
- if (containerDelta == 0) {
- log.info("Container count has not changed. Reusing previous container models.");
- return buildContainerModels(tasks, containers);
- }
- log.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount);
-
- // Calculate the expected task count per container
- int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount);
+ int prevContainerCount = containers.size();
+ int containerDelta = containerCount - prevContainerCount;
+ if (containerDelta == 0) {
+ LOG.info("Container count has not changed. Reusing previous container models.");
+ return TaskGroup.buildContainerModels(tasks, containers);
+ }
+ LOG.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount);
- // Collect excess tasks from over-assigned containers
- List<String> taskNamesToReassign = new LinkedList<>();
- for (int i = 0; i < prevContainerCount; i++) {
- TaskGroup taskGroup = containers.get(i);
- while (taskGroup.size() > expectedTaskCountPerContainer[i]) {
- taskNamesToReassign.add(taskGroup.removeTask());
- }
- }
+ // Calculate the expected task count per container
+ int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount);
- // Assign tasks to the under-assigned containers
- if (containerDelta > 0) {
- List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount);
- containers.addAll(newContainers);
- } else {
- containers = containers.subList(0, containerCount);
+ // Collect excess tasks from over-assigned containers
+ List<String> taskNamesToReassign = new LinkedList<>();
+ for (int i = 0; i < prevContainerCount; i++) {
+ TaskGroup taskGroup = containers.get(i);
+ while (taskGroup.size() > expectedTaskCountPerContainer[i]) {
+ taskNamesToReassign.add(taskGroup.removeLastTaskName());
}
- assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers);
+ }
- // Transform containers to containerModel
- Set<ContainerModel> models = buildContainerModels(tasks, containers);
+ // Assign tasks to the under-assigned containers
+ if (containerDelta > 0) {
+ List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount);
+ containers.addAll(newContainers);
+ } else {
+ containers = containers.subList(0, containerCount);
+ }
- // Save the results
- saveTaskAssignments(models, taskAssignmentManager);
+ assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers);
- return models;
- } finally {
- taskAssignmentManager.close();
- }
+ return TaskGroup.buildContainerModels(tasks, containers);
}
/**
- * Reads the task-container mapping from the provided {@link TaskAssignmentManager} and returns a
+ * Reads the task-container mapping from the provided {@link GrouperMetadata} and returns a
* list of TaskGroups, ordered ascending by containerId.
*
- * @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to retrieve the previous mapping.
- * @param taskCount the number of tasks, for validation against the persisted tasks.
- * @return a list of TaskGroups, ordered ascending by containerId or {@code null}
- * if the previous mapping doesn't exist or isn't usable.
+ * @param grouperMetadata the {@link GrouperMetadata} will be used to retrieve the previous task to container assignments.
+ * @param taskCount the number of tasks, for validation against the persisted tasks.
+ * @return a list of TaskGroups, ordered ascending by containerId or {@code null}
+ * if the previous mapping doesn't exist or isn't usable.
*/
- private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignmentManager, int taskCount) {
- Map<String, String> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+ private List<TaskGroup> getPreviousContainers(GrouperMetadata grouperMetadata, int taskCount) {
+ Map<TaskName, String> taskToContainerId = grouperMetadata.getPreviousTaskToProcessorAssignment();
taskToContainerId.values().forEach(id -> {
try {
int intId = Integer.parseInt(id);
@@ -169,19 +149,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
}
});
+
if (taskToContainerId.isEmpty()) {
- log.info("No task assignment map was saved.");
+ LOG.info("No task assignment map was saved.");
return null;
} else if (taskCount != taskToContainerId.size()) {
- log.warn(
- "Current task count {} does not match saved task count {}. Stateful jobs may observe misalignment of keys!",
- taskCount, taskToContainerId.size());
- // If the tasks changed, then the partition-task grouping is also likely changed and we can't handle that
- // without a much more complicated mapping. Further, the partition count may have changed, which means
- // input message keys are likely reshuffled w.r.t. partitions, so the local state may not contain necessary
- // data associated with the incoming keys. Warn the user and default to grouper
- // In this scenario the tasks may have been reduced, so we need to delete all the existing messages
- taskAssignmentManager.deleteTaskContainerMappings(taskToContainerId.keySet());
return null;
}
@@ -189,27 +161,13 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
try {
containers = getOrderedContainers(taskToContainerId);
} catch (Exception e) {
- log.error("Exception while parsing task mapping", e);
+ LOG.error("Exception while parsing task mapping", e);
return null;
}
return containers;
}
/**
- * Saves the task assignments specified by containers using the provided TaskAssignementManager.
- *
- * @param containers the set of containers from which the task assignments will be saved.
- * @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to save the mappings.
- */
- private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
- for (ContainerModel container : containers) {
- for (TaskName taskName : container.getTasks().keySet()) {
- taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getId());
- }
- }
- }
-
- /**
* Verifies the input tasks argument and throws {@link IllegalArgumentException} if it is invalid.
*
* @param tasks the tasks to validate.
@@ -252,13 +210,12 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* @param containers the containers (as {@link TaskGroup}) to which the tasks will be assigned.
*/
// TODO: Change logic from using int arrays to a Map<String, Integer> (id -> taskCount)
- private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign,
- List<TaskGroup> containers) {
+ private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign, List<TaskGroup> containers) {
for (TaskGroup taskGroup : containers) {
for (int j = taskGroup.size(); j < taskCountPerContainer[Integer.valueOf(taskGroup.getContainerId())]; j++) {
String taskName = taskNamesToAssign.remove(0);
taskGroup.addTaskName(taskName);
- log.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
+ LOG.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
}
}
}
@@ -288,53 +245,20 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
}
/**
- * Translates the list of TaskGroup instances to a set of ContainerModel instances, using the
- * set of TaskModel instances.
- *
- * @param tasks the TaskModels to assign to the ContainerModels.
- * @param containerTasks the TaskGroups defining how the tasks should be grouped.
- * @return a mutable set of ContainerModels.
- */
- private Set<ContainerModel> buildContainerModels(Set<TaskModel> tasks, List<TaskGroup> containerTasks) {
- // Map task names to models
- Map<String, TaskModel> taskNameToModel = new HashMap<>();
- for (TaskModel model : tasks) {
- taskNameToModel.put(model.getTaskName().getTaskName(), model);
- }
-
- // Build container models
- Set<ContainerModel> containerModels = new HashSet<>();
- for (TaskGroup container : containerTasks) {
- Map<TaskName, TaskModel> containerTaskModels = new HashMap<>();
- for (String taskName : container.taskNames) {
- TaskModel model = taskNameToModel.get(taskName);
- containerTaskModels.put(model.getTaskName(), model);
- }
- containerModels.add(
- new ContainerModel(container.containerId, containerTaskModels));
- }
- return Collections.unmodifiableSet(containerModels);
- }
-
- /**
* Converts the task->containerId map to an ordered list of {@link TaskGroup} instances.
*
* @param taskToContainerId a map from each task name to the containerId to which it is assigned.
* @return a list of TaskGroups ordered ascending by containerId.
*/
- private List<TaskGroup> getOrderedContainers(Map<String, String> taskToContainerId) {
- log.debug("Got task to container map: {}", taskToContainerId);
+ private List<TaskGroup> getOrderedContainers(Map<TaskName, String> taskToContainerId) {
+ LOG.debug("Got task to container map: {}", taskToContainerId);
// Group tasks by container Id
- HashMap<String, List<String>> containerIdToTaskNames = new HashMap<>();
- for (Map.Entry<String, String> entry : taskToContainerId.entrySet()) {
- String taskName = entry.getKey();
+ Map<String, List<String>> containerIdToTaskNames = new HashMap<>();
+ for (Map.Entry<TaskName, String> entry : taskToContainerId.entrySet()) {
+ String taskName = entry.getKey().getTaskName();
String containerId = entry.getValue();
- List<String> taskNames = containerIdToTaskNames.get(containerId);
- if (taskNames == null) {
- taskNames = new ArrayList<>();
- containerIdToTaskNames.put(containerId, taskNames);
- }
+ List<String> taskNames = containerIdToTaskNames.computeIfAbsent(containerId, k -> new ArrayList<>());
taskNames.add(taskName);
}
@@ -347,36 +271,4 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
return containerTasks;
}
-
- /**
- * A mutable group of tasks and an associated container id.
- *
- * Used as a temporary mutable container until the final ContainerModel is known.
- */
- private static class TaskGroup {
- private final List<String> taskNames = new LinkedList<>();
- private final String containerId;
-
- private TaskGroup(String containerId, List<String> taskNames) {
- this.containerId = containerId;
- Collections.sort(taskNames); // For consistency because the taskNames came from a Map
- this.taskNames.addAll(taskNames);
- }
-
- public String getContainerId() {
- return containerId;
- }
-
- public void addTaskName(String taskName) {
- taskNames.add(taskName);
- }
-
- public String removeTask() {
- return taskNames.remove(taskNames.size() - 1);
- }
-
- public int size() {
- return taskNames.size();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
index 06aba33..5acf5b8 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
@@ -19,6 +19,7 @@
package org.apache.samza.container.grouper.task;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
/**
* Factory to build the GroupByContainerCount class.
@@ -26,6 +27,6 @@ import org.apache.samza.config.Config;
public class GroupByContainerCountFactory implements TaskNameGrouperFactory {
@Override
public TaskNameGrouper build(Config config) {
- return new GroupByContainerCount(config);
+ return new GroupByContainerCount(new JobConfig(config).getContainerCount());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 9dab943..7c11da4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -19,27 +19,34 @@
package org.apache.samza.container.grouper.task;
-import java.util.Arrays;
-import java.util.stream.Collectors;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.TaskModel;
-
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.runtime.LocationId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * Simple grouper.
- * It exposes two group methods - one that assumes sequential container numbers and one that gets a set of container
- * IDs as an argument. Please note - this first implementation ignores locality information.
+ * A {@link TaskNameGrouper} implementation that provides two different grouping strategies:
+ *
+ * - One that assigns the tasks to the available containerIds in a round robin fashion.
+ * - The other that generates a equidistributed and locality-aware task to container assignment.
*/
public class GroupByContainerIds implements TaskNameGrouper {
private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerIds.class);
@@ -49,6 +56,9 @@ public class GroupByContainerIds implements TaskNameGrouper {
this.startContainerCount = count;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public Set<ContainerModel> group(Set<TaskModel> tasks) {
List<String> containerIds = new ArrayList<>(startContainerCount);
@@ -58,30 +68,40 @@ public class GroupByContainerIds implements TaskNameGrouper {
return group(tasks, containerIds);
}
- public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
- if (containersIds == null)
+ /**
+ * {@inheritDoc}
+ *
+ * When number of taskModels are less than number of available containerIds,
+ * then chooses then selects the lexicographically least `x` containerIds.
+ *
+ * Otherwise, assigns the tasks to the available containerIds in a round robin fashion
+ * preserving the containerId in the final assignment.
+ */
+ @Override
+ public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containerIds) {
+ if (containerIds == null)
return this.group(tasks);
- if (containersIds.isEmpty())
+ if (containerIds.isEmpty())
throw new IllegalArgumentException("Must have at least one container");
if (tasks.isEmpty())
- throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
- .toString(containersIds.toArray()));
+ throw new IllegalArgumentException("cannot group an empty set. containerIds=" + Arrays
+ .toString(containerIds.toArray()));
- if (containersIds.size() > tasks.size()) {
- LOG.warn("Number of containers: {} is greater than number of tasks: {}.", containersIds.size(), tasks.size());
+ if (containerIds.size() > tasks.size()) {
+ LOG.warn("Number of containers: {} is greater than number of tasks: {}.", containerIds.size(), tasks.size());
/**
* Choose lexicographically least `x` containerIds(where x = tasks.size()).
*/
- containersIds = containersIds.stream()
+ containerIds = containerIds.stream()
.sorted()
.limit(tasks.size())
.collect(Collectors.toList());
- LOG.info("Generating containerModel with containers: {}.", containersIds);
+ LOG.info("Generating containerModel with containers: {}.", containerIds);
}
- int containerCount = containersIds.size();
+ int containerCount = containerIds.size();
// Sort tasks by taskName.
List<TaskModel> sortedTasks = new ArrayList<>(tasks);
@@ -100,9 +120,118 @@ public class GroupByContainerIds implements TaskNameGrouper {
// Convert to a Set of ContainerModel
Set<ContainerModel> containerModels = new HashSet<>();
for (int i = 0; i < containerCount; i++) {
- containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i]));
+ containerModels.add(new ContainerModel(containerIds.get(i), taskGroups[i]));
}
return Collections.unmodifiableSet(containerModels);
}
+
+ /**
+ * {@inheritDoc}
+ *
+ * When the are `t` tasks and `p` processors, where t <= p, a fair task distribution should ideally assign
+ * (t / p) tasks to each processor. In addition to guaranteeing a fair distribution, this {@link TaskNameGrouper}
+ * implementation generates a locationId aware task assignment to processors where it makes best efforts in assigning
+ * the tasks to processors with the same locality.
+ *
+ * Task assignment to processors is accomplished through the following two phases:
+ *
+ * 1. In the first phase, each task(T) is assigned to a processor(P) that satisfies the following constraints:
+ * A. The processor(P) should have the same locality of the task(T).
+ * B. Number of tasks already assigned to the processor should be less than the (number of tasks / number of processors).
+ *
+ * 2. Each unassigned task from phase 1 are then mapped to any processor with task count less than the
+ * (number of tasks / number of processors). When no such processor exists, then the unassigned
+ * task is mapped to any processor from available processors in a round robin fashion.
+ */
+ @Override
+ public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
+ // Validate that the task models are not empty.
+ Map<TaskName, LocationId> taskLocality = grouperMetadata.getTaskLocality();
+ Preconditions.checkArgument(!taskModels.isEmpty(), "No tasks found. Likely due to no input partitions. Can't run a job with no tasks.");
+
+ // Invoke the default grouper when the processor locality does not exist.
+ if (MapUtils.isEmpty(grouperMetadata.getProcessorLocality())) {
+ LOG.info("ProcessorLocality is empty. Generating with the default group method.");
+ return group(taskModels, new ArrayList<>());
+ }
+
+ Map<String, LocationId> processorLocality = new TreeMap<>(grouperMetadata.getProcessorLocality());
+ /**
+ * When there're more task models than processors then choose the lexicographically least `x` processors(where x = tasks.size()).
+ */
+ if (processorLocality.size() > taskModels.size()) {
+ processorLocality = processorLocality.entrySet()
+ .stream()
+ .limit(taskModels.size())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ Map<LocationId, List<String>> locationIdToProcessors = new HashMap<>();
+ Map<String, TaskGroup> processorIdToTaskGroup = new HashMap<>();
+
+ // Generate the {@see LocationId} to processors mapping and processorId to {@see TaskGroup} mapping.
+ processorLocality.forEach((processorId, locationId) -> {
+ List<String> processorIds = locationIdToProcessors.getOrDefault(locationId, new ArrayList<>());
+ processorIds.add(processorId);
+ locationIdToProcessors.put(locationId, processorIds);
+ processorIdToTaskGroup.put(processorId, new TaskGroup(processorId, new ArrayList<>()));
+ });
+
+ int numTasksPerProcessor = taskModels.size() / processorLocality.size();
+ Set<TaskName> assignedTasks = new HashSet<>();
+
+ /**
+ * A processor is considered under-assigned when number of tasks assigned to it is less than
+ * (number of tasks / number of processors).
+ * Map the tasks to the under-assigned processors with same locality.
+ */
+ for (TaskModel taskModel : taskModels) {
+ LocationId taskLocationId = taskLocality.get(taskModel.getTaskName());
+ if (taskLocationId != null) {
+ List<String> processorIds = locationIdToProcessors.getOrDefault(taskLocationId, new ArrayList<>());
+ for (String processorId : processorIds) {
+ TaskGroup taskGroup = processorIdToTaskGroup.get(processorId);
+ if (taskGroup.size() < numTasksPerProcessor) {
+ taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
+ assignedTasks.add(taskModel.getTaskName());
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * In some scenarios, the task either might not have any previous locality or might not have any
+ * processor that maps to its previous locality. This cyclic processorId's iterator helps us in
+ * those scenarios to assign the processorIds to those kind of tasks in a round robin fashion.
+ */
+ Iterator<String> processorIdsCyclicIterator = Iterators.cycle(processorLocality.keySet());
+
+ // Order the taskGroups to choose a task group in a deterministic fashion for unassigned tasks.
+ List<TaskGroup> taskGroups = new ArrayList<>(processorIdToTaskGroup.values());
+ taskGroups.sort(Comparator.comparing(TaskGroup::getContainerId));
+
+ /**
+ * For the tasks left over from the previous stage, map them to any under-assigned processor.
+ * When a under-assigned processor doesn't exist, then map them to any processor from the
+ * available processors in a round robin manner.
+ */
+ for (TaskModel taskModel : taskModels) {
+ if (!assignedTasks.contains(taskModel.getTaskName())) {
+ Optional<TaskGroup> underAssignedTaskGroup = taskGroups.stream()
+ .filter(taskGroup -> taskGroup.size() < numTasksPerProcessor)
+ .findFirst();
+ if (underAssignedTaskGroup.isPresent()) {
+ underAssignedTaskGroup.get().addTaskName(taskModel.getTaskName().getTaskName());
+ } else {
+ TaskGroup taskGroup = processorIdToTaskGroup.get(processorIdsCyclicIterator.next());
+ taskGroup.addTaskName(taskModel.getTaskName().getTaskName());
+ }
+ assignedTasks.add(taskModel.getTaskName());
+ }
+ }
+
+ return TaskGroup.buildContainerModels(taskModels, taskGroups);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java
new file mode 100644
index 0000000..ef919d1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadata.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.SystemStreamPartition;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Provides the historical metadata of the samza application.
+ */
+@InterfaceStability.Evolving
+public interface GrouperMetadata {
+
+ /**
+ * Gets the current processor locality of the job.
+ * @return the processorId to the {@link LocationId} assignment.
+ */
+ Map<String, LocationId> getProcessorLocality();
+
+ /**
+ * Gets the current task locality of the job.
+ * @return the current {@link TaskName} to {@link LocationId} assignment.
+ */
+ Map<TaskName, LocationId> getTaskLocality();
+
+ /**
+ * Gets the previous {@link TaskName} to {@link SystemStreamPartition} assignment of the job.
+ * @return the previous {@link TaskName} to {@link SystemStreamPartition} assignment.
+ */
+ Map<TaskName, List<SystemStreamPartition>> getPreviousTaskToSSPAssignment();
+
+
+ /**
+ * Gets the previous {@link TaskName} to processorId assignments of the job.
+ * @return the previous task to processorId assignment.
+ */
+ Map<TaskName, String> getPreviousTaskToProcessorAssignment();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java
new file mode 100644
index 0000000..bc40bc4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GrouperMetadataImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.system.SystemStreamPartition;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link GrouperMetadata} that holds the necessary historical metadata of
+ * the samza job. This is used by the {@link TaskNameGrouper} to generate optimal task assignments.
+ */
+public class GrouperMetadataImpl implements GrouperMetadata {
+
+ // Map of processorId to LocationId.
+ private final Map<String, LocationId> processorLocality;
+
+ // Map of TaskName to LocationId.
+ private final Map<TaskName, LocationId> taskLocality;
+
+ // Map of TaskName to a list of the input SystemStreamPartition's assigned to it.
+ private final Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignment;
+
+ // Map of TaskName to ProcessorId.
+ private final Map<TaskName, String> previousTaskToProcessorAssignment;
+
+ public GrouperMetadataImpl(Map<String, LocationId> processorLocality, Map<TaskName, LocationId> taskLocality, Map<TaskName, List<SystemStreamPartition>> previousTaskToSSPAssignments, Map<TaskName, String> previousTaskToProcessorAssignment) {
+ this.processorLocality = Collections.unmodifiableMap(processorLocality);
+ this.taskLocality = Collections.unmodifiableMap(taskLocality);
+ this.previousTaskToSSPAssignment = Collections.unmodifiableMap(previousTaskToSSPAssignments);
+ this.previousTaskToProcessorAssignment = Collections.unmodifiableMap(previousTaskToProcessorAssignment);
+ }
+
+ @Override
+ public Map<String, LocationId> getProcessorLocality() {
+ return processorLocality;
+ }
+
+ @Override
+ public Map<TaskName, LocationId> getTaskLocality() {
+ return taskLocality;
+ }
+
+ @Override
+ public Map<TaskName, List<SystemStreamPartition>> getPreviousTaskToSSPAssignment() {
+ return previousTaskToSSPAssignment;
+ }
+
+ @Override
+ public Map<TaskName, String> getPreviousTaskToProcessorAssignment() {
+ return this.previousTaskToProcessorAssignment;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 32bbf29..b6e946c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -81,9 +81,6 @@ public class TaskAssignmentManager {
this.valueSerde = valueSerde;
MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
this.metadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
- }
-
- public void init() {
this.metadataStore.init();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java
new file mode 100644
index 0000000..1fe0f40
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskGroup.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+
+import java.util.*;
+
+/**
+ * A mutable group of tasks and an associated container id.
+ *
+ * Used as a temporary mutable container until the final ContainerModel is known.
+ */
+class TaskGroup {
+ private final List<String> taskNames = new LinkedList<>();
+ private final String containerId;
+
+ TaskGroup(String containerId, List<String> taskNames) {
+ this.containerId = containerId;
+ this.taskNames.addAll(taskNames);
+ Collections.sort(this.taskNames); // For consistency because the taskNames came from a Map
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ public void addTaskName(String taskName) {
+ taskNames.add(taskName);
+ }
+
+ public String removeLastTaskName() {
+ return taskNames.remove(taskNames.size() - 1);
+ }
+
+ public int size() {
+ return taskNames.size();
+ }
+
+ /**
+ * Converts the {@link TaskGroup} list to a set of ContainerModel.
+ *
+ * @param taskModels the TaskModels to assign to the ContainerModels.
+ * @param taskGroups the TaskGroups defining how the tasks should be grouped.
+ * @return a set of ContainerModels.
+ */
+ public static Set<ContainerModel> buildContainerModels(Set<TaskModel> taskModels, Collection<TaskGroup> taskGroups) {
+ // Map task names to models
+ Map<String, TaskModel> taskNameToModel = new HashMap<>();
+ for (TaskModel model : taskModels) {
+ taskNameToModel.put(model.getTaskName().getTaskName(), model);
+ }
+
+ // Build container models
+ Set<ContainerModel> containerModels = new HashSet<>();
+ for (TaskGroup container : taskGroups) {
+ Map<TaskName, TaskModel> containerTaskModels = new HashMap<>();
+ for (String taskName : container.taskNames) {
+ TaskModel model = taskNameToModel.get(taskName);
+ containerTaskModels.put(model.getTaskName(), model);
+ }
+ containerModels.add(new ContainerModel(container.containerId, containerTaskModels));
+ }
+
+ return Collections.unmodifiableSet(containerModels);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
index 71b80cc..2124dfc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -18,10 +18,9 @@
*/
package org.apache.samza.container.grouper.task;
-import java.util.List;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
-
+import java.util.List;
import java.util.Set;
/**
@@ -44,15 +43,39 @@ import java.util.Set;
* </p>
*/
public interface TaskNameGrouper {
+
/**
- * Group tasks into the containers they will share.
+ * Groups the taskModels into set of {@link ContainerModel} using the metadata of
+ * the job from {@link GrouperMetadata}.
*
- * @param tasks Set of tasks to group into containers.
- * @return Set of containers, which contain the tasks that were passed in.
+ * @param taskModels the set of tasks to group into containers.
+ * @param grouperMetadata provides the historical metadata of the samza job.
+ * @return the grouped {@link ContainerModel} built from the provided taskModels.
*/
- Set<ContainerModel> group(Set<TaskModel> tasks);
+ default Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
+ return group(taskModels);
+ }
- default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
- return group(tasks);
+ /**
+ * Group the taskModels into set of {@link ContainerModel}.
+ *
+ * @param taskModels the set of {@link TaskModel} to group into containers.
+ * @return the grouped {@link ContainerModel} built from the provided taskModels.
+ */
+ @Deprecated
+ default Set<ContainerModel> group(Set<TaskModel> taskModels) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Group the taskModels into set of {@link ContainerModel}.
+ *
+ * @param taskModels the set of {@link TaskModel} to group into containers.
+ * @param containersIds the list of container ids that has to be used in the {@link ContainerModel}.
+ * @return the grouped {@link ContainerModel} built from the provided taskModels.
+ */
+ @Deprecated
+ default Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containersIds) {
+ return group(taskModels);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
index 8b967b7..37684f4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouperFactory.java
@@ -28,7 +28,7 @@ public interface TaskNameGrouperFactory {
* Builds a {@link TaskNameGrouper}. The config can be used to read the necessary values which are needed int the
* process of building the {@link TaskNameGrouper}
*
- * @param config configuration to which values can be used to build a {@link TaskNameGrouper}
+ * @param config configuration to use for building the {@link TaskNameGrouper}
* @return a {@link TaskNameGrouper} implementation
*/
TaskNameGrouper build(Config config);
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 0110551..0c5e368 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -104,7 +104,7 @@ public class ExecutionPlanner {
// currently we don't support host-affinity in batch mode
if (appConfig.getAppMode() == ApplicationConfig.ApplicationMode.BATCH && clusterConfig.getHostAffinityEnabled()) {
throw new SamzaException(String.format("Host affinity is not supported in batch mode. Please configure %s=false.",
- ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED));
+ ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 7328bc7..34d67cc 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -317,7 +317,8 @@ public class StreamProcessor {
return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
- Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)));
+ Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
+ null);
}
private JobCoordinator createJobCoordinator() {
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index c5c0d78..a5a45ba 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -22,6 +22,14 @@ package org.apache.samza.runtime;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.MDC;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
@@ -31,11 +39,6 @@ import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.container.ContainerHeartbeatClient;
-import org.apache.samza.container.ContainerHeartbeatMonitor;
-import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.context.JobContextImpl;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsReporter;
@@ -47,7 +50,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
-
/**
* Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
*/
@@ -93,6 +95,7 @@ public class LocalContainerRunner {
private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId,
JobModel jobModel, Config config) {
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
+ LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
SamzaContainer container = SamzaContainer$.MODULE$.apply(
containerId,
jobModel,
@@ -100,7 +103,8 @@ public class LocalContainerRunner {
taskFactory,
JobContextImpl.fromConfigWithDefaults(config),
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
- Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)));
+ Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
+ localityManager);
ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
.createInstance(new ProcessorContext() { }, config);
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 737ac3e..44fd811 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -18,16 +18,22 @@
*/
package org.apache.samza.standalone;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.runtime.LocationIdProvider;
+import org.apache.samza.runtime.LocationIdProviderFactory;
import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.StreamMetadataCache;
@@ -35,7 +41,6 @@ import org.apache.samza.system.SystemAdmins;
import org.apache.samza.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.Collections;
/**
@@ -65,11 +70,15 @@ public class PassthroughJobCoordinator implements JobCoordinator {
private static final Logger LOGGER = LoggerFactory.getLogger(PassthroughJobCoordinator.class);
private final String processorId;
private final Config config;
+ private final LocationId locationId;
private JobCoordinatorListener coordinatorListener = null;
public PassthroughJobCoordinator(Config config) {
this.processorId = createProcessorId(config);
this.config = config;
+ LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class);
+ LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config);
+ this.locationId = locationIdProvider.getLocationId();
}
@Override
@@ -119,18 +128,13 @@ public class PassthroughJobCoordinator implements JobCoordinator {
SystemAdmins systemAdmins = new SystemAdmins(config);
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
systemAdmins.start();
- String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
-
- /** TODO:
- Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
- in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
- TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
- (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
- */
- JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
- Collections.singletonList(containerId));
- systemAdmins.stop();
- return jobModel;
+ try {
+ String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
+ GrouperMetadata grouperMetadata = new GrouperMetadataImpl(ImmutableMap.of(String.valueOf(containerId), locationId), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+ return JobModelManager.readJobModel(this.config, Collections.emptyMap(), streamMetadataCache, grouperMetadata);
+ } finally {
+ systemAdmins.stop();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 64ae310..5442d6e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -125,12 +125,13 @@ public class StorageRecovery extends CommandLine {
* map
*/
private void getContainerModels() {
- CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(jobConfig, new MetricsRegistryMap());
+ MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
+ CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(jobConfig, metricsRegistryMap);
coordinatorStreamManager.register(getClass().getSimpleName());
coordinatorStreamManager.start();
coordinatorStreamManager.bootstrap();
ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
- JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()).jobModel();
+ JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping(), metricsRegistryMap).jobModel();
containers = jobModel.getContainers();
coordinatorStreamManager.stop();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 81c0465..8c5a3ba 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.samza.zk;
import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
@@ -40,6 +41,8 @@ import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.GrouperMetadata;
+import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
@@ -47,17 +50,23 @@ import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.runtime.LocationId;
+import org.apache.samza.runtime.LocationIdProvider;
+import org.apache.samza.runtime.LocationIdProviderFactory;
import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkUtils.ProcessorNode;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +107,7 @@ public class ZkJobCoordinator implements JobCoordinator {
private final SystemAdmins systemAdmins;
private final int debounceTimeMs;
private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
+ private final LocationId locationId;
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
@@ -131,14 +141,16 @@ public class ZkJobCoordinator implements JobCoordinator {
this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
systemAdmins = new SystemAdmins(config);
streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
+ LocationIdProviderFactory locationIdProviderFactory = Util.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class);
+ LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config);
+ this.locationId = locationIdProvider.getLocationId();
}
@Override
public void start() {
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
zkUtils.validateZkVersion();
- zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
- .getJobModelPathPrefix()});
+ zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder.getJobModelPathPrefix(), keyBuilder.getTaskLocalityPath()});
startMetrics();
systemAdmins.start();
@@ -246,7 +258,13 @@ public class ZkJobCoordinator implements JobCoordinator {
}
void doOnProcessorChange() {
- List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs();
+ List<ProcessorNode> processorNodes = zkUtils.getAllProcessorNodes();
+
+ List<String> currentProcessorIds = new ArrayList<>();
+ for (ProcessorNode processorNode : processorNodes) {
+ currentProcessorIds.add(processorNode.getProcessorData().getProcessorId());
+ }
+
Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds);
if (currentProcessorIds.size() != uniqueProcessorIds.size()) {
@@ -256,7 +274,7 @@ public class ZkJobCoordinator implements JobCoordinator {
// Generate the JobModel
LOG.info("Generating new JobModel with processors: {}.", currentProcessorIds);
- JobModel jobModel = generateNewJobModel(currentProcessorIds);
+ JobModel jobModel = generateNewJobModel(processorNodes);
// Create checkpoint and changelog streams if they don't exist
if (!hasCreatedStreams) {
@@ -308,7 +326,7 @@ public class ZkJobCoordinator implements JobCoordinator {
/**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
- private JobModel generateNewJobModel(List<String> processors) {
+ private JobModel generateNewJobModel(List<ProcessorNode> processorNodes) {
String zkJobModelVersion = zkUtils.getJobModelVersion();
// If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper.
if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) {
@@ -318,11 +336,9 @@ public class ZkJobCoordinator implements JobCoordinator {
}
cachedJobModelVersion = zkJobModelVersion;
}
- /**
- * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container
- * to host mapping) is passed in as null when building the jobModel.
- */
- JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors);
+
+ GrouperMetadata grouperMetadata = getGrouperMetadata(zkJobModelVersion, processorNodes);
+ JobModel model = JobModelManager.readJobModel(config, changeLogPartitionMap, streamMetadataCache, grouperMetadata);
return new JobModel(new MapConfig(), model.getContainers());
}
@@ -343,6 +359,39 @@ public class ZkJobCoordinator implements JobCoordinator {
});
}
+ /**
+ * Builds the {@link GrouperMetadataImpl} based upon provided {@param jobModelVersion}
+ * and {@param processorNodes}.
+ * @param jobModelVersion the most recent jobModelVersion available in the zookeeper.
+ * @param processorNodes the list of live processors in the zookeeper.
+ * @return the built grouper metadata.
+ */
+ private GrouperMetadataImpl getGrouperMetadata(String jobModelVersion, List<ProcessorNode> processorNodes) {
+ Map<TaskName, String> taskToProcessorId = new HashMap<>();
+ Map<TaskName, List<SystemStreamPartition>> taskToSSPs = new HashMap<>();
+ if (jobModelVersion != null) {
+ JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
+ for (ContainerModel containerModel : jobModel.getContainers().values()) {
+ for (TaskModel taskModel : containerModel.getTasks().values()) {
+ taskToProcessorId.put(taskModel.getTaskName(), containerModel.getId());
+ for (SystemStreamPartition partition : taskModel.getSystemStreamPartitions()) {
+ taskToSSPs.computeIfAbsent(taskModel.getTaskName(), k -> new ArrayList<>());
+ taskToSSPs.get(taskModel.getTaskName()).add(partition);
+ }
+ }
+ }
+ }
+
+ Map<String, LocationId> processorLocality = new HashMap<>();
+ for (ProcessorNode processorNode : processorNodes) {
+ ProcessorData processorData = processorNode.getProcessorData();
+ processorLocality.put(processorData.getProcessorId(), processorData.getLocationId());
+ }
+
+ Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality();
+ return new GrouperMetadataImpl(processorLocality, taskLocality, taskToSSPs, taskToProcessorId);
+ }
+
class LeaderElectorListenerImpl implements LeaderElectorListener {
@Override
public void onBecomingLeader() {
@@ -390,6 +439,11 @@ public class ZkJobCoordinator implements JobCoordinator {
JobModel jobModel = getJobModel();
// start the container with the new model
if (coordinatorListener != null) {
+ for (ContainerModel containerModel : jobModel.getContainers().values()) {
+ for (TaskName taskName : containerModel.getTasks().keySet()) {
+ zkUtils.writeTaskLocality(taskName, locationId);
+ }
+ }
coordinatorListener.onNewJobModel(processorId, jobModel);
}
});
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 6349432..56ea577 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -37,8 +39,10 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.runtime.LocationId;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.map.ObjectMapper;
@@ -239,6 +243,29 @@ public class ZkUtils {
return processorNodes;
}
+ public void writeTaskLocality(TaskName taskName, LocationId locationId) {
+ String taskLocalityPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName);
+ validatePaths(new String[] {taskLocalityPath});
+ writeData(taskLocalityPath, locationId.getId());
+ }
+
+ public Map<TaskName, LocationId> readTaskLocality() {
+ Map<TaskName, LocationId> taskLocality = new HashMap<>();
+ String taskLocalityPath = keyBuilder.getTaskLocalityPath();
+ List<String> tasks = new ArrayList<>();
+ if (zkClient.exists(taskLocalityPath)) {
+ tasks = zkClient.getChildren(taskLocalityPath);
+ }
+ for (String taskName : tasks) {
+ String taskPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName);
+ String locationId = zkClient.readData(taskPath, true);
+ if (locationId != null) {
+ taskLocality.put(new TaskName(taskName), new LocationId(locationId));
+ }
+ }
+ return taskLocality;
+ }
+
/**
* Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
*
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 03effe6..865658f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -127,8 +127,8 @@ object SamzaContainer extends Logging {
taskFactory: TaskFactory[_],
jobContext: JobContext,
applicationContainerContextFactoryOption: Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
- applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]
- ) = {
+ applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
+ localityManager: LocalityManager = null) = {
val config = jobContext.getConfig
val containerModel = jobModel.getContainers.get(containerId)
val containerName = "samza-container-%s" format containerId
@@ -735,6 +735,7 @@ object SamzaContainer extends Logging {
systemAdmins = systemAdmins,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
+ localityManager = localityManager,
offsetManager = offsetManager,
securityManager = securityManager,
metrics = samzaContainerMetrics,
@@ -990,16 +991,13 @@ class SamzaContainer(
def storeContainerLocality {
val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(config).getHostAffinityEnabled
- if (isHostAffinityEnabled) {
- val localityManager: LocalityManager = new LocalityManager(config, containerContext.getContainerMetricsRegistry)
+ if (isHostAffinityEnabled && localityManager != null) {
val containerId = containerContext.getContainerModel.getId
val containerName = "SamzaContainer-" + containerId
info("Registering %s with metadata store" format containerName)
try {
val hostInet = Util.getLocalHost
- val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
- val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
- info("Writing container locality and JMX address to metadata store")
+ info("Writing container locality to metadata store")
localityManager.writeContainerToHostMapping(containerId, hostInet.getHostName)
} catch {
case uhe: UnknownHostException =>