You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/07 22:22:30 UTC
[3/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza
SAMZA-1126 - Semantics of processorId in Samza
Implementation based on [SEP-1](https://cwiki.apache.org/confluence/display/SAMZA/SEP-1%3A+Semantics+of+ProcessorId+in+Samza)
Author: navina <na...@apache.org>
Reviewers: Yi Pan <ni...@gmail.com>, Jacob Maes <jm...@linkedin.com>, Jagadish <ja...@apache.org>
Closes #103 from navina/SAMZA-1126
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a7da1840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a7da1840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a7da1840
Branch: refs/heads/master
Commit: a7da1840f17a182b49c30451f35991c97fc51068
Parents: c74722b
Author: Navina Ramesh <na...@apache.org>
Authored: Fri Apr 7 15:22:13 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Fri Apr 7 15:22:13 2017 -0700
----------------------------------------------------------------------
.../samza/container/SamzaContainerContext.java | 4 +-
.../org/apache/samza/job/CommandBuilder.java | 4 +-
.../samza/runtime/ProcessorIdGenerator.java | 51 +++
.../AbstractContainerAllocator.java | 12 +-
.../clustermanager/ContainerProcessManager.java | 19 +-
.../HostAwareContainerAllocator.java | 2 +-
.../clustermanager/SamzaApplicationState.java | 2 +-
.../clustermanager/SamzaResourceRequest.java | 6 +-
.../apache/samza/config/ApplicationConfig.java | 60 +++
.../apache/samza/container/LocalityManager.java | 12 +-
.../grouper/task/GroupByContainerCount.java | 43 ++-
.../grouper/task/GroupByContainerIds.java | 10 +-
.../task/SingleContainerGrouperFactory.java | 8 +-
.../grouper/task/TaskAssignmentManager.java | 10 +-
.../container/grouper/task/TaskNameGrouper.java | 2 +-
.../samza/coordinator/JobCoordinator.java | 22 +-
.../coordinator/JobCoordinatorFactory.java | 3 +-
.../messages/SetTaskContainerMapping.java | 7 +-
.../apache/samza/job/model/ContainerModel.java | 31 +-
.../org/apache/samza/job/model/JobModel.java | 26 +-
.../processor/SamzaContainerController.java | 20 +-
.../apache/samza/processor/StreamProcessor.java | 53 +--
.../samza/runtime/LocalContainerRunner.java | 10 +-
.../org/apache/samza/runtime/UUIDGenerator.java | 41 ++
.../model/JsonContainerModelMixIn.java | 9 +-
.../serializers/model/JsonJobModelMixIn.java | 4 +-
.../serializers/model/SamzaObjectMapper.java | 30 +-
.../standalone/StandaloneJobCoordinator.java | 30 +-
.../StandaloneJobCoordinatorFactory.java | 4 +-
.../apache/samza/storage/StorageRecovery.java | 4 +-
.../apache/samza/util/ClassLoaderHelper.java | 19 +
.../samza/zk/ZkCoordinationServiceFactory.java | 4 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 40 +-
.../samza/zk/ZkJobCoordinatorFactory.java | 10 +-
.../main/java/org/apache/samza/zk/ZkUtils.java | 21 +-
.../samza/config/ShellCommandConfig.scala | 3 +-
.../apache/samza/container/SamzaContainer.scala | 8 +-
.../samza/coordinator/JobModelManager.scala | 6 +-
.../samza/job/local/ProcessJobFactory.scala | 2 +-
.../samza/job/local/ThreadJobFactory.scala | 6 +-
.../clustermanager/MockContainerAllocator.java | 2 +-
.../clustermanager/TestContainerAllocator.java | 28 +-
.../TestContainerProcessManager.java | 25 +-
.../TestContainerRequestState.java | 12 +-
.../TestHostAwareContainerAllocator.java | 45 +--
.../samza/container/TestLocalityManager.java | 12 +-
.../grouper/task/TestGroupByContainerCount.java | 377 ++++++++++---------
.../grouper/task/TestGroupByContainerIds.java | 54 ++-
.../grouper/task/TestTaskAssignmentManager.java | 45 +--
.../samza/container/mock/ContainerMocks.java | 18 +-
.../model/TestSamzaObjectMapper.java | 61 ++-
.../zk/TestZkBarrierForVersionUpgrade.java | 12 +-
.../apache/samza/zk/TestZkLeaderElector.java | 1 -
.../java/org/apache/samza/zk/TestZkUtils.java | 10 +-
.../samza/container/TestSamzaContainer.scala | 56 ++-
.../samza/container/TestTaskInstance.scala | 10 +-
.../samza/coordinator/TestJobCoordinator.scala | 13 +-
.../samza/job/TestShellCommandBuilder.scala | 4 +-
.../samza/storage/kv/RocksDbKeyValueReader.java | 2 +-
.../java/org/apache/samza/rest/model/Task.java | 12 +-
.../samza/rest/proxy/task/SamzaTaskProxy.java | 4 +-
.../samza/monitor/TestLocalStoreMonitor.java | 4 +-
.../rest/resources/mock/MockTaskProxy.java | 8 +-
.../performance/TestKeyValuePerformance.scala | 2 +-
.../test/processor/TestStreamProcessor.java | 19 +-
.../test/integration/StreamTaskTestUtil.scala | 3 +-
.../org/apache/samza/job/yarn/YarnAppState.java | 2 +-
.../job/yarn/YarnClusterResourceManager.java | 23 +-
.../samza/job/yarn/YarnContainerRunner.java | 4 +-
.../samza/validation/YarnJobValidationTool.java | 6 +-
70 files changed, 898 insertions(+), 634 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index fd7333b..4076a51 100644
--- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -28,7 +28,7 @@ import java.util.Collections;
* A SamzaContainerContext maintains per-container information for the tasks it executes.
*/
public class SamzaContainerContext {
- public final int id;
+ public final String id;
public final Config config;
public final Collection<TaskName> taskNames;
@@ -40,7 +40,7 @@ public class SamzaContainerContext {
* @param taskNames The set of taskName keys for which this container is responsible.
*/
public SamzaContainerContext(
- int id,
+ String id,
Config config,
Collection<TaskName> taskNames) {
this.id = id;
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
index 6d46f5d..fc7438b 100644
--- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
@@ -30,7 +30,7 @@ import org.apache.samza.config.Config;
*/
public abstract class CommandBuilder {
protected Config config;
- protected int id;
+ protected String id;
protected URL url;
protected String commandPath;
@@ -61,7 +61,7 @@ public abstract class CommandBuilder {
* associated with a specific instantiation of a SamzaContainer.
* @return self to support a builder style of use.
*/
- public CommandBuilder setId(int id) {
+ public CommandBuilder setId(String id) {
this.id = id;
return this;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
new file mode 100644
index 0000000..8790d69
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+@InterfaceStability.Evolving
+public interface ProcessorIdGenerator {
+ /**
+ * Generates a String representation to identify a single instance of StreamProcessor.
+ *
+ * This value can be representative of its current executing environment. It can also be custom-managed by the user,
+ * as long as it adheres to the specification below. More than one processor can co-exist within the same JVM,
+ * as long as their identifiers are guaranteed to be unique.
+ *
+ * <b>Specification of processor identifier</b>:
+ * <ul>
+ * <li>Processor identifier has to be unique among the processors within a job</li>
+ * <li>When more than one processor co-exist within the same JVM, the processor identifier can be of the format:
+ * $x_$y, where 'x' is a unique identifier for the executing JVM and 'y' is a unique identifier for the
+ * processor instance within the JVM. When there is only one processor within a JVM, 'x' should be sufficient to
+ * uniquely identify the processor instance.</li>
+ * </ul>
+ *
+ * <b>Note</b>:
+ * In case of more than one processors within the same JVM, the custom implementation of ProcessorIdGenerator can
+ * contain a static counter, which is incremented on each call to generateProcessorId. The counter value can
+ * be treated as the identifier for the processor instance within the JVM.
+ *
+ * @param config Config instance
+ * @return String Identifier for the processor
+ */
+ String generateProcessorId(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index d47f217..b83d83c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -145,7 +145,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
// Update state
resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource);
- int containerID = request.getContainerID();
+ String containerID = request.getContainerID();
//run container on resource
log.info("Found available resources on {}. Assigning request for container_id {} with "
@@ -176,9 +176,9 @@ public abstract class AbstractContainerAllocator implements Runnable {
* - when host-affinity is not enabled, or
* - when host-affinity is enabled and job is run for the first time
*/
- public void requestResources(Map<Integer, String> resourceToHostMappings) {
- for (Map.Entry<Integer, String> entry : resourceToHostMappings.entrySet()) {
- int containerId = entry.getKey();
+ public void requestResources(Map<String, String> resourceToHostMappings) {
+ for (Map.Entry<String, String> entry : resourceToHostMappings.entrySet()) {
+ String containerId = entry.getKey();
String preferredHost = entry.getValue();
if (preferredHost == null)
preferredHost = ResourceRequestState.ANY_HOST;
@@ -211,7 +211,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
* this request
* @param preferredHost Name of the host that you prefer to run the container on
*/
- public final void requestResource(int containerID, String preferredHost) {
+ public final void requestResource(String containerID, String preferredHost) {
SamzaResourceRequest request = new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
preferredHost, containerID);
resourceRequestState.addResourceRequest(request);
@@ -242,7 +242,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
* @param samzaContainerId to configure the builder with.
* @return the constructed builder object
*/
- private CommandBuilder getCommandBuilder(int samzaContainerId) {
+ private CommandBuilder getCommandBuilder(String samzaContainerId) {
String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index b4309d9..9b5e871 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
* value is the {@link ResourceFailure} object that has a count of failures.
*
*/
- private final Map<Integer, ResourceFailure> containerFailures = new HashMap<>();
+ private final Map<String, ResourceFailure> containerFailures = new HashMap<>();
private final ContainerProcessManagerMetrics metrics;
@@ -173,7 +173,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.neededContainers.set(containerCount);
// Request initial set of containers
- Map<Integer, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
+ Map<String, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
containerAllocator.requestResources(containerToHostMapping);
@@ -228,8 +228,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
*/
public void onResourceCompleted(SamzaResourceStatus containerStatus) {
String containerIdStr = containerStatus.getResourceID();
- int containerId = -1;
- for (Map.Entry<Integer, SamzaResource> entry: state.runningContainers.entrySet()) {
+ String containerId = null;
+ for (Map.Entry<String, SamzaResource> entry: state.runningContainers.entrySet()) {
if (entry.getValue().getResourceID().equals(containerStatus.getResourceID())) {
log.info("Matching container ID found " + entry.getKey() + " " + entry.getValue());
@@ -237,10 +237,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
break;
}
}
- if (containerId == -1) {
+ if (containerId == null) {
log.info("No matching container id found for " + containerStatus.toString());
+ } else {
+ state.runningContainers.remove(containerId);
}
- state.runningContainers.remove(containerId);
int exitStatus = containerStatus.getExitCode();
switch (exitStatus) {
@@ -249,7 +250,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.completedContainers.incrementAndGet();
- if (containerId != -1) {
+ if (containerId != null) {
state.finishedContainers.incrementAndGet();
containerFailures.remove(containerId);
}
@@ -275,7 +276,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
// clean up, and request a refactor container for the tasks. This only
// should happen if the container was 'lost' due to node failure, not
// if the AM released the container.
- if (containerId != -1) {
+ if (containerId != null) {
log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
state.neededContainers.incrementAndGet();
@@ -295,7 +296,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.failedContainersStatus.put(containerIdStr, containerStatus);
state.jobHealthy.set(false);
- if (containerId != -1) {
+ if (containerId != null) {
state.neededContainers.incrementAndGet();
// Find out previously running container location
String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index da73049..66e2246 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -64,7 +64,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
SamzaResourceRequest request = peekPendingRequest();
log.info("Handling request: " + request.getContainerID() + " " + request.getRequestTimestampMs() + " " + request.getPreferredHost());
String preferredHost = request.getPreferredHost();
- int containerID = request.getContainerID();
+ String containerID = request.getContainerID();
if (hasAllocatedResource(preferredHost)) {
// Found allocated container at preferredHost
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index cf91044..bde3fac 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -99,7 +99,7 @@ public class SamzaApplicationState {
* Map of the samzaContainerId to the {@link SamzaResource} on which it is running
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
- public final ConcurrentMap<Integer, SamzaResource> runningContainers = new ConcurrentHashMap<Integer, SamzaResource>(0);
+ public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
/**
* Final status of the application
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 3d1560f..4159ff2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -55,14 +55,14 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
/**
* The ID of the StreamProcessor which this request is for.
*/
- private final int containerID;
+ private final String containerID;
/**
* The timestamp in millis when the request was created.
*/
private final long requestTimestampMs;
- public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, int expectedContainerID) {
+ public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String expectedContainerID) {
this.numCores = numCores;
this.memoryMB = memoryMB;
this.preferredHost = preferredHost;
@@ -72,7 +72,7 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
log.info("Resource Request created for {} on {} at {}", new Object[] {this.containerID, this.preferredHost, this.requestTimestampMs});
}
- public int getContainerID() {
+ public String getContainerID() {
return containerID;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
new file mode 100644
index 0000000..708daa6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+/**
+ * Accessors for configs associated with Application scope
+ */
+public class ApplicationConfig extends MapConfig {
+ /**
+ * <p>processor.id is similar to the logical containerId generated in Samza. However, in addition to identifying the JVM
+ * of the processor, it also contains a segment to identify the instance of the
+ * {@link org.apache.samza.processor.StreamProcessor} within the JVM. More detail can be found in
+ * {@link org.apache.samza.runtime.ProcessorIdGenerator}. </p>
+ * <p>
+ * This is an important distinction because Samza 0.13.0 in Yarn has a 1:1 mapping between the processor and the Yarn
+ * container (JVM). However, Samza in an embedded execution can contain more than one processor within the same JVM.
+ * </p>
+ * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
+ * TODO: Deprecated in 0.13. After 0.13+, this id is generated using {@link org.apache.samza.runtime.ProcessorIdGenerator}
+ */
+ @Deprecated
+ public static final String PROCESSOR_ID = "processor.id";
+
+ /**
+ * Class implementing the {@link org.apache.samza.runtime.ProcessorIdGenerator} interface
+ * Used to generate a unique identifier for a {@link org.apache.samza.processor.StreamProcessor} based on the runtime
+ * environment. Hence, durability of the identifier is same as the guarantees provided by the runtime environment
+ */
+ public static final String APP_PROCESSOR_ID_GENERATOR_CLASS = "app.processor-id-generator.class";
+
+ public ApplicationConfig(Config config) {
+ super(config);
+ }
+
+ public String getAppProcessorIdGeneratorClass() {
+ return get(APP_PROCESSOR_ID_GENERATOR_CLASS, null);
+ }
+
+ @Deprecated
+ public String getProcessorId() {
+ return get(PROCESSOR_ID, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index a615d4f..22380d3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -37,7 +37,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
* */
public class LocalityManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
- private Map<Integer, Map<String, String>> containerToHostMapping = new HashMap<>();
+ private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>();
private final TaskAssignmentManager taskAssignmentManager;
private final boolean writeOnly;
@@ -92,23 +92,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
*
* @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
*/
- public Map<Integer, Map<String, String>> readContainerLocality() {
+ public Map<String, Map<String, String>> readContainerLocality() {
if (this.writeOnly) {
throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager");
}
- Map<Integer, Map<String, String>> allMappings = new HashMap<>();
+ Map<String, Map<String, String>> allMappings = new HashMap<>();
for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
SetContainerHostMapping mapping = new SetContainerHostMapping(message);
Map<String, String> localityMappings = new HashMap<>();
localityMappings.put(SetContainerHostMapping.HOST_KEY, mapping.getHostLocality());
localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
- allMappings.put(Integer.parseInt(mapping.getKey()), localityMappings);
+ allMappings.put(mapping.getKey(), localityMappings);
}
containerToHostMapping = Collections.unmodifiableMap(allMappings);
- for (Map.Entry<Integer, Map<String, String>> entry : containerToHostMapping.entrySet()) {
+ for (Map.Entry<String, Map<String, String>> entry : containerToHostMapping.entrySet()) {
log.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
}
@@ -123,7 +123,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
* @param jmxAddress the JMX URL address
* @param jmxTunnelingAddress the JMX Tunnel URL address
*/
- public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
+ public void writeContainerToHostMapping(String containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
Map<String, String> existingMappings = containerToHostMapping.get(containerId);
String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 5e6ccf8..246188e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -27,6 +27,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
+import org.apache.samza.SamzaException;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
@@ -42,6 +44,8 @@ import org.slf4j.LoggerFactory;
* happens to be). No consideration is given towards locality, even distribution
* of aggregate SSPs within a container, even distribution of the number of
* taskNames between containers, etc.
+ *
+ * TODO: SAMZA-1197 - need to modify balance to work with processorId strings
*/
public class GroupByContainerCount implements BalancingTaskNameGrouper {
private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class);
@@ -74,7 +78,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
// Convert to a Set of ContainerModel
Set<ContainerModel> containerModels = new HashSet<>();
for (int i = 0; i < containerCount; i++) {
- containerModels.add(new ContainerModel(i, taskGroups[i]));
+ containerModels.add(new ContainerModel(String.valueOf(i), i, taskGroups[i]));
}
return Collections.unmodifiableSet(containerModels);
@@ -142,7 +146,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* if the previous mapping doesn't exist or isn't usable.
*/
private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignmentManager, int taskCount) {
- Map<String, Integer> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+ Map<String, String> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+ taskToContainerId.values().forEach(id -> {
+ try {
+ int intId = Integer.parseInt(id);
+ } catch (NumberFormatException nfe) {
+ throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
+ }
+ });
if (taskToContainerId.isEmpty()) {
log.info("No task assignment map was saved.");
return null;
@@ -178,7 +189,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
for (ContainerModel container : containers) {
for (TaskName taskName : container.getTasks().keySet()) {
- taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getContainerId());
+ taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getProcessorId());
}
}
}
@@ -211,7 +222,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
private List<TaskGroup> createContainers(int startContainerId, int endContainerId) {
List<TaskGroup> containers = new ArrayList<>(endContainerId - startContainerId);
for (int i = startContainerId; i < endContainerId; i++) {
- TaskGroup taskGroup = new TaskGroup(i, new ArrayList<String>());
+ TaskGroup taskGroup = new TaskGroup(String.valueOf(i), new ArrayList<String>());
containers.add(taskGroup);
}
return containers;
@@ -225,10 +236,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* @param taskNamesToAssign the list of tasks to assign to the containers.
* @param containers the containers (as {@link TaskGroup}) to which the tasks will be assigned.
*/
+ // TODO: Change logic from using int arrays to a Map<String, Integer> (id -> taskCount)
private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign,
List<TaskGroup> containers) {
for (TaskGroup taskGroup : containers) {
- for (int j = taskGroup.size(); j < taskCountPerContainer[taskGroup.getContainerId()]; j++) {
+ for (int j = taskGroup.size(); j < taskCountPerContainer[Integer.valueOf(taskGroup.getContainerId())]; j++) {
String taskName = taskNamesToAssign.remove(0);
taskGroup.addTaskName(taskName);
log.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
@@ -283,7 +295,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
TaskModel model = taskNameToModel.get(taskName);
containerTaskModels.put(model.getTaskName(), model);
}
- containerModels.add(new ContainerModel(container.containerId, containerTaskModels));
+ containerModels.add(
+ new ContainerModel(container.containerId, Integer.valueOf(container.containerId), containerTaskModels));
}
return Collections.unmodifiableSet(containerModels);
}
@@ -294,14 +307,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* @param taskToContainerId a map from each task name to the containerId to which it is assigned.
* @return a list of TaskGroups ordered ascending by containerId.
*/
- private List<TaskGroup> getOrderedContainers(Map<String, Integer> taskToContainerId) {
+ private List<TaskGroup> getOrderedContainers(Map<String, String> taskToContainerId) {
log.debug("Got task to container map: {}", taskToContainerId);
// Group tasks by container Id
- HashMap<Integer, List<String>> containerIdToTaskNames = new HashMap<>();
- for (Map.Entry<String, Integer> entry : taskToContainerId.entrySet()) {
+ HashMap<String, List<String>> containerIdToTaskNames = new HashMap<>();
+ for (Map.Entry<String, String> entry : taskToContainerId.entrySet()) {
String taskName = entry.getKey();
- Integer containerId = entry.getValue();
+ String containerId = entry.getValue();
List<String> taskNames = containerIdToTaskNames.get(containerId);
if (taskNames == null) {
taskNames = new ArrayList<>();
@@ -313,8 +326,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
// Build container tasks
List<TaskGroup> containerTasks = new ArrayList<>(containerIdToTaskNames.size());
for (int i = 0; i < containerIdToTaskNames.size(); i++) {
- if (containerIdToTaskNames.get(i) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
- containerTasks.add(new TaskGroup(i, containerIdToTaskNames.get(i)));
+ if (containerIdToTaskNames.get(String.valueOf(i)) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
+ containerTasks.add(new TaskGroup(String.valueOf(i), containerIdToTaskNames.get(String.valueOf(i))));
}
return containerTasks;
@@ -327,15 +340,15 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
*/
private static class TaskGroup {
private final List<String> taskNames = new LinkedList<>();
- private final Integer containerId;
+ private final String containerId;
- private TaskGroup(Integer containerId, List<String> taskNames) {
+ private TaskGroup(String containerId, List<String> taskNames) {
this.containerId = containerId;
Collections.sort(taskNames); // For consistency because the taskNames came from a Map
this.taskNames.addAll(taskNames);
}
- public Integer getContainerId() {
+ public String getContainerId() {
return containerId;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 6d3f673..f2d88cd 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -52,14 +52,14 @@ public class GroupByContainerIds implements TaskNameGrouper {
if (startContainerCount > tasks.size())
throw new IllegalArgumentException("number of containers=" + startContainerCount + " is bigger than number of tasks=" + tasks.size());
- List<Integer> containerIds = new ArrayList<>(startContainerCount);
+ List<String> containerIds = new ArrayList<>(startContainerCount);
for (int i = 0; i < startContainerCount; i++) {
- containerIds.add(i);
+ containerIds.add(String.valueOf(i));
}
return group(tasks, containerIds);
}
- public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+ public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
if (tasks.isEmpty())
throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
.toString(containersIds.toArray()));
@@ -89,7 +89,9 @@ public class GroupByContainerIds implements TaskNameGrouper {
// Convert to a Set of ContainerModel
Set<ContainerModel> containerModels = new HashSet<>();
for (int i = 0; i < containerCount; i++) {
- containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i]));
+ // containerId in ContainerModel constructor is set to -1 because processorId can be any string and does
+ // not have an integer equivalent. So, we set it to -1. After 0.13, this parameter will be removed.
+ containerModels.add(new ContainerModel(containersIds.get(i), -1, taskGroups[i]));
}
return Collections.unmodifiableSet(containerModels);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
index 980f2a9..15cd224 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
@@ -33,14 +33,14 @@ import java.util.Set;
public class SingleContainerGrouperFactory implements TaskNameGrouperFactory {
@Override
public TaskNameGrouper build(Config config) {
- return new SingleContainerGrouper(config.getInt(JobConfig.PROCESSOR_ID()));
+ return new SingleContainerGrouper(config.get(JobConfig.PROCESSOR_ID()));
}
}
class SingleContainerGrouper implements TaskNameGrouper {
- private final int containerId;
+ private final String containerId;
- SingleContainerGrouper(int containerId) {
+ SingleContainerGrouper(String containerId) {
this.containerId = containerId;
}
@@ -50,7 +50,7 @@ class SingleContainerGrouper implements TaskNameGrouper {
for (TaskModel taskModel: taskModels) {
taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
}
- ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap);
+ ContainerModel containerModel = new ContainerModel(containerId, -1, taskNameTaskModelMap);
return Collections.singleton(containerModel);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 11207b2..d33a22b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* */
public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
- private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
+ private final Map<String, String> taskNameToContainerId = new HashMap<>();
private boolean registered = false;
/**
@@ -70,7 +70,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
*
* @return the map of taskName: containerId
*/
- public Map<String, Integer> readTaskAssignment() {
+ public Map<String, String> readTaskAssignment() {
taskNameToContainerId.clear();
for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
if (message.isDelete()) {
@@ -83,7 +83,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
}
}
- for (Map.Entry<String, Integer> entry : taskNameToContainerId.entrySet()) {
+ for (Map.Entry<String, String> entry : taskNameToContainerId.entrySet()) {
log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue());
}
@@ -96,8 +96,8 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
* @param taskName the task name
* @param containerId the SamzaContainer ID or {@code null} to delete the mapping
*/
- public void writeTaskContainerMapping(String taskName, Integer containerId) {
- Integer existingContainerId = taskNameToContainerId.get(taskName);
+ public void writeTaskContainerMapping(String taskName, String containerId) {
+ String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
log.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
} else {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
index d06bf62..71b80cc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -52,7 +52,7 @@ public interface TaskNameGrouper {
*/
Set<ContainerModel> group(Set<TaskModel> tasks);
- default Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+ default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
return group(tasks);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index 252e56b..af2ef6a 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -23,9 +23,13 @@ import org.apache.samza.job.model.JobModel;
/**
* A JobCoordinator is a pluggable module in each process that provides the JobModel and the ID to the StreamProcessor.
- * In some cases, ID assignment is completely config driven, while in other cases, ID assignment may require
- * coordination with JobCoordinators of other StreamProcessors.
- * */
+ *
+ * It is the responsibility of the JobCoordinator to assign a unique identifier to the StreamProcessor
+ * based on the underlying environment. In some cases, ID assignment is completely config driven, while in other
+ * cases, ID assignment may require coordination with JobCoordinators of other StreamProcessors.
+ *
+ * This interface contains methods required for the StreamProcessor to interact with JobCoordinator.
+ */
@InterfaceStability.Evolving
public interface JobCoordinator {
/**
@@ -55,12 +59,16 @@ public interface JobCoordinator {
* @throws InterruptedException if the current thread is interrupted while waiting for the JobCoordinator to start-up
*/
boolean awaitStart(long timeoutMs) throws InterruptedException;
+
/**
- * Returns the logical ID assigned to the processor
- * It is up to the user to ensure that different instances of StreamProcessor within a job have unique processor ID.
- * @return integer representing the logical processor ID
+ * Returns the identifier assigned to the processor that is local to the instance of StreamProcessor.
+ *
+ * The semantics and format of the identifier returned should adhere to the specification defined in
+ * {@link org.apache.samza.runtime.ProcessorIdGenerator}
+ *
+ * @return String representing a unique logical processor ID
*/
- int getProcessorId();
+ String getProcessorId();
/**
* Returns the current JobModel
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index d15bce1..8553f59 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -26,11 +26,10 @@ import org.apache.samza.processor.SamzaContainerController;
@InterfaceStability.Evolving
public interface JobCoordinatorFactory {
/**
- * @param processorId Unique identifier for the processor
* @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
* @param containerController Controller interface for starting and stopping container. In future, it may simply
* pause the container and add/remove tasks
* @return An instance of IJobCoordinator
*/
- JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController);
+ JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
index 431c05d..f8d4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
@@ -43,8 +43,9 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
public static final String TYPE = "set-task-container-assignment";
public static final String CONTAINER_KEY = "containerId";
+
/**
- * SteContainerToHostMapping is used to set the container to host mapping information.
+ * SetContainerToHostMapping is used to set the container to host mapping information.
* @param message which holds the container to host information.
*/
public SetTaskContainerMapping(CoordinatorStreamMessage message) {
@@ -64,8 +65,8 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
putMessageValue(CONTAINER_KEY, containerId);
}
- public Integer getTaskAssignment() {
- return Integer.parseInt(getMessageValue(CONTAINER_KEY));
+ public String getTaskAssignment() {
+ return getMessageValue(CONTAINER_KEY);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
index ed721b1..bd4fa94 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -19,9 +19,10 @@
package org.apache.samza.job.model;
+import org.apache.samza.container.TaskName;
+
import java.util.Collections;
import java.util.Map;
-import org.apache.samza.container.TaskName;
/**
* <p>
@@ -35,34 +36,49 @@ import org.apache.samza.container.TaskName;
* containers have tasks. Each data model contains relevant information, such as
* an id, partition information, etc.
* </p>
+ * <p>
+ * <b>Note</b>: This class has a natural ordering that is inconsistent with equals.
+ * </p>
*/
-public class ContainerModel implements Comparable<ContainerModel> {
+public class ContainerModel {
+ @Deprecated
private final int containerId;
+ private final String processorId;
private final Map<TaskName, TaskModel> tasks;
- public ContainerModel(int containerId, Map<TaskName, TaskModel> tasks) {
+ public ContainerModel(String processorId, int containerId, Map<TaskName, TaskModel> tasks) {
this.containerId = containerId;
+ if (processorId == null) {
+ this.processorId = String.valueOf(containerId);
+ } else {
+ this.processorId = processorId;
+ }
this.tasks = Collections.unmodifiableMap(tasks);
}
+ @Deprecated
public int getContainerId() {
return containerId;
}
+ public String getProcessorId() {
+ return processorId;
+ }
+
public Map<TaskName, TaskModel> getTasks() {
return tasks;
}
@Override
public String toString() {
- return "ContainerModel [containerId=" + containerId + ", tasks=" + tasks + "]";
+ return "ContainerModel [processorId=" + processorId + ", tasks=" + tasks + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + containerId;
+ result = prime * result + ((processorId == null) ? 0 : processorId.hashCode());
result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
return result;
}
@@ -76,7 +92,7 @@ public class ContainerModel implements Comparable<ContainerModel> {
if (getClass() != obj.getClass())
return false;
ContainerModel other = (ContainerModel) obj;
- if (containerId != other.containerId)
+ if (!processorId.equals(other.processorId))
return false;
if (tasks == null) {
if (other.tasks != null)
@@ -86,7 +102,4 @@ public class ContainerModel implements Comparable<ContainerModel> {
return true;
}
- public int compareTo(ContainerModel other) {
- return containerId - other.getContainerId();
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index dbd6dcc..dbb3867 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -41,24 +41,24 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
public class JobModel {
private static final String EMPTY_STRING = "";
private final Config config;
- private final Map<Integer, ContainerModel> containers;
+ private final Map<String, ContainerModel> containers;
private final LocalityManager localityManager;
- private Map<Integer, String> localityMappings = new HashMap<Integer, String>();
+ private Map<String, String> localityMappings = new HashMap<String, String>();
public int maxChangeLogStreamPartitions;
- public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+ public JobModel(Config config, Map<String, ContainerModel> containers) {
this(config, containers, null);
}
- public JobModel(Config config, Map<Integer, ContainerModel> containers, LocalityManager localityManager) {
+ public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
this.config = config;
this.containers = Collections.unmodifiableMap(containers);
this.localityManager = localityManager;
if (localityManager == null) {
- for (Integer containerId : containers.keySet()) {
+ for (String containerId : containers.keySet()) {
localityMappings.put(containerId, null);
}
} else {
@@ -89,7 +89,7 @@ public class JobModel {
* @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
* @return the value if it exists for a given container and key, otherwise an empty string
*/
- public String getContainerToHostValue(Integer containerId, String key) {
+ public String getContainerToHostValue(String containerId, String key) {
if (localityManager == null) {
return EMPTY_STRING;
}
@@ -103,12 +103,12 @@ public class JobModel {
return mappings.get(key);
}
- public Map<Integer, String> getAllContainerToHostValues(String key) {
+ public Map<String, String> getAllContainerToHostValues(String key) {
if (localityManager == null) {
return Collections.EMPTY_MAP;
}
- Map<Integer, String> allValues = new HashMap<>();
- for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
+ Map<String, String> allValues = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
String value = entry.getValue().get(key);
if (value != null) {
allValues.put(entry.getKey(), value);
@@ -118,8 +118,8 @@ public class JobModel {
}
private void populateContainerLocalityMappings() {
- Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
- for (Integer containerId: containers.keySet()) {
+ Map<String, Map<String, String>> allMappings = localityManager.readContainerLocality();
+ for (String containerId: containers.keySet()) {
if (allMappings.containsKey(containerId)) {
localityMappings.put(containerId, allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
} else {
@@ -128,14 +128,14 @@ public class JobModel {
}
}
- public Map<Integer, String> getAllContainerLocality() {
+ public Map<String, String> getAllContainerLocality() {
if (localityManager != null) {
populateContainerLocalityMappings();
}
return localityMappings;
}
- public Map<Integer, ContainerModel> getContainers() {
+ public Map<String, ContainerModel> getContainers() {
return containers;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index 76e2053..c292067 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeoutException;
public class SamzaContainerController {
private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class);
- private final ExecutorService executorService;
+ private ExecutorService executorService;
private volatile SamzaContainer container;
private final Map<String, MetricsReporter> metricsReporterMap;
private final Object taskFactory;
@@ -60,16 +60,12 @@ public class SamzaContainerController {
* @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
* {@link org.apache.samza.task.AsyncStreamTask}
* @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
- * @param processorId Id of the processor
* @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
*/
public SamzaContainerController(
Object taskFactory,
long containerShutdownMs,
- String processorId,
Map<String, MetricsReporter> metricsReporterMap) {
- this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("p" + processorId + "-container-thread-%d").build());
this.taskFactory = taskFactory;
this.metricsReporterMap = metricsReporterMap;
if (containerShutdownMs == -1) {
@@ -94,11 +90,11 @@ public class SamzaContainerController {
public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) {
LocalityManager localityManager = null;
if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
- localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config);
+ localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getProcessorId(), config);
}
- log.info("About to create container: " + containerModel.getContainerId());
+ log.info("About to create container: " + containerModel.getProcessorId());
container = SamzaContainer$.MODULE$.apply(
- containerModel.getContainerId(),
+ containerModel.getProcessorId(),
containerModel,
config,
maxChangelogStreamPartitions,
@@ -106,7 +102,9 @@ public class SamzaContainerController {
new JmxServer(),
Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
taskFactory);
- log.info("About to start container: " + containerModel.getContainerId());
+ log.info("About to start container: " + containerModel.getProcessorId());
+ executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("p-" + containerModel.getProcessorId() + "-container-thread-%d").build());
containerFuture = executorService.submit(() -> container.run());
}
@@ -148,6 +146,8 @@ public class SamzaContainerController {
*/
public void shutdown() {
stopContainer();
- executorService.shutdown();
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index a39c3b9..5a8673a 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -19,9 +19,10 @@
package org.apache.samza.processor;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -29,10 +30,7 @@ import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -60,17 +58,6 @@ import java.util.Map;
*/
@InterfaceStability.Evolving
public class StreamProcessor {
- private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
- /**
- * processor.id is equivalent to containerId in samza. It is a logical identifier used by Samza for a processor.
- * In a distributed environment, this logical identifier is mapped to a physical identifier of the resource. For
- * example, Yarn provides a "containerId" for every resource it allocates.
- * In an embedded environment, this identifier is provided by the user by directly using the StreamProcessor API.
- * <p>
- * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
- */
- private static final String PROCESSOR_ID = "processor.id";
- private final int processorId;
private final JobCoordinator jobCoordinator;
/**
@@ -83,51 +70,49 @@ public class StreamProcessor {
* <p>
* <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
*
- * @param processorId Unique identifier for a processor within the job. It has the same semantics as
- * "containerId" in Samza
* @param config Instance of config object - contains all configuration required for processing
* @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
* @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
*/
- public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
AsyncStreamTaskFactory asyncStreamTaskFactory) {
- this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
+ this(config, customMetricsReporters, (Object) asyncStreamTaskFactory);
}
/**
- *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
+ *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory)}, except task instances are created
* using the provided {@link StreamTaskFactory}.
- * @param processorId - this processor Id
* @param config - config
* @param customMetricsReporters metric Reporter
* @param streamTaskFactory task factory to instantiate the Task
*/
- public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
StreamTaskFactory streamTaskFactory) {
- this(processorId, config, customMetricsReporters, (Object) streamTaskFactory);
+ this(config, customMetricsReporters, (Object) streamTaskFactory);
}
- private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
Object taskFactory) {
- this.processorId = processorId;
-
- Map<String, String> updatedConfigMap = new HashMap<>();
- updatedConfigMap.putAll(config);
- updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
- Config updatedConfig = new MapConfig(updatedConfigMap);
+ // TODO: This check to be removed after 0.13+
+ ApplicationConfig applicationConfig = new ApplicationConfig(config);
+ if (applicationConfig.getProcessorId() == null &&
+ applicationConfig.getAppProcessorIdGeneratorClass() == null) {
+ throw new ConfigException(
+ String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+ ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+ }
SamzaContainerController containerController = new SamzaContainerController(
taskFactory,
- new TaskConfigJava(updatedConfig).getShutdownMs(),
- String.valueOf(processorId),
+ new TaskConfigJava(config).getShutdownMs(),
customMetricsReporters);
this.jobCoordinator = Util.
<JobCoordinatorFactory>getObj(
- new JobCoordinatorConfig(updatedConfig)
+ new JobCoordinatorConfig(config)
.getJobCoordinatorFactoryClassName())
- .getJobCoordinator(processorId, updatedConfig, containerController);
+ .getJobCoordinator(config, containerController);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 49c3228..d790fb1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -52,9 +52,9 @@ import org.slf4j.LoggerFactory;
public class LocalContainerRunner extends AbstractApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
private final JobModel jobModel;
- private final int containerId;
+ private final String containerId;
- public LocalContainerRunner(JobModel jobModel, int containerId) {
+ public LocalContainerRunner(JobModel jobModel, String containerId) {
super(jobModel.getConfig());
this.jobModel = jobModel;
this.containerId = containerId;
@@ -69,13 +69,13 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
SamzaContainer container = SamzaContainer$.MODULE$.apply(
- containerModel.getContainerId(),
+ containerModel.getProcessorId(),
containerModel,
config,
jobModel.maxChangeLogStreamPartitions,
SamzaContainer.getLocalityManager(containerId, config),
jmxServer,
- Util.javaMapAsScalaMap(new HashMap<String, MetricsReporter>()),
+ Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
taskFactory);
container.run();
@@ -104,7 +104,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
System.exit(1);
});
- Integer containerId = Integer.valueOf(System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
+ String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
log.info(String.format("Got container ID: %d", containerId));
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
new file mode 100644
index 0000000..afc20b1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+
+import java.util.UUID;
+
+public class UUIDGenerator implements ProcessorIdGenerator {
+ /**
+ * Generates a String representation to identify the processor instance
+ * This value can be representative of its current executing environment. It can also be custom-managed by the user.
+ * <p>
+ * <b>Note</b>: When more than one processor exist within the same JVM, there is no need to use a static counter in
+ * this generator to adhere to the "$x_$y" format specified in {@link ProcessorIdGenerator} since each UUID is already
+ * unique by itself
+ *
+ * @param config Config instance
+ * @return String Identifier for the processor
+ */
+ @Override
+ public String generateProcessorId(Config config) {
+ return UUID.randomUUID().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
index f197a95..e19afec 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
@@ -19,23 +19,28 @@
package org.apache.samza.serializers.model;
-import java.util.Map;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskModel;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
+import java.util.Map;
+
/**
* A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
*/
public abstract class JsonContainerModelMixIn {
@JsonCreator
- public JsonContainerModelMixIn(@JsonProperty("container-id") int containerId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
+ public JsonContainerModelMixIn(@JsonProperty("processor-id") String processorId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
}
+ @Deprecated
@JsonProperty("container-id")
abstract int getContainerId();
+ @JsonProperty("processor-id")
+ abstract String getProcessorId();
+
@JsonProperty("tasks")
abstract Map<TaskName, TaskModel> getTasks();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
index 037b5e2..4b0c404 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
@@ -30,12 +30,12 @@ import org.codehaus.jackson.annotate.JsonProperty;
*/
public abstract class JsonJobModelMixIn {
@JsonCreator
- public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<Integer, ContainerModel> containers) {
+ public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<String, ContainerModel> containers) {
}
@JsonProperty("config")
abstract Config getConfig();
@JsonProperty("containers")
- abstract Map<Integer, ContainerModel> getContainers();
+ abstract Map<String, ContainerModel> getContainers();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 83e6b8c..f8c4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -19,10 +19,8 @@
package org.apache.samza.serializers.model;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
@@ -50,6 +48,10 @@ import org.codehaus.jackson.map.introspect.AnnotatedMethod;
import org.codehaus.jackson.map.module.SimpleModule;
import org.codehaus.jackson.type.TypeReference;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* <p>
* A collection of utility classes and (de)serializers to make Samza's job model
@@ -89,10 +91,30 @@ public class SamzaObjectMapper {
mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
mapper.getDeserializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
mapper.getSerializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
- mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+ module.addDeserializer(ContainerModel.class, new JsonDeserializer<ContainerModel>() {
+ @Override
+ public ContainerModel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+ ObjectCodec oc = jp.getCodec();
+ JsonNode node = oc.readTree(jp);
+ int containerId = node.get("container-id").getIntValue();
+ if (node.get("container-id") == null) {
+ throw new SamzaException("JobModel did not contain a container-id. This can never happen. JobModel corrupt!");
+ }
+ String processorId;
+ if (node.get("processor-id") == null) {
+ processorId = String.valueOf(containerId);
+ } else {
+ processorId = node.get("processor-id").getTextValue();
+ }
+ Map<TaskName, TaskModel> tasksMapping =
+ OBJECT_MAPPER.readValue(node.get("tasks"), new TypeReference<Map<TaskName, TaskModel>>() { });
+ return new ContainerModel(processorId, containerId, tasksMapping);
+ }
+ });
+
// Convert camel case to hyphenated field names, and register the module.
mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
mapper.registerModule(module);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
index b2927f4..7efc6df 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -19,22 +19,25 @@
package org.apache.samza.standalone;
import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -63,28 +66,37 @@ import java.util.Map;
* */
public class StandaloneJobCoordinator implements JobCoordinator {
private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
- private final int processorId;
+ private final String processorId;
private final Config config;
private final JobModel jobModel;
private final SamzaContainerController containerController;
@VisibleForTesting
StandaloneJobCoordinator(
- int processorId,
+ ProcessorIdGenerator processorIdGenerator,
Config config,
SamzaContainerController containerController,
JobModel jobModel) {
- this.processorId = processorId;
+ this.processorId = processorIdGenerator.generateProcessorId(config);
this.config = config;
this.containerController = containerController;
this.jobModel = jobModel;
}
- public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
- this.processorId = processorId;
+ public StandaloneJobCoordinator(Config config, SamzaContainerController containerController) {
this.config = config;
this.containerController = containerController;
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ if (appConfig.getProcessorId() != null) { // TODO: This check to be removed after 0.13+
+ this.processorId = appConfig.getProcessorId();
+ } else {
+ ProcessorIdGenerator idGenerator =
+ ClassLoaderHelper.fromClassName(
+ new ApplicationConfig(config).getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+ this.processorId = idGenerator.generateProcessorId(config);
+ }
+
JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
Map<String, SystemAdmin> systemAdmins = new HashMap<>();
for (String systemName: systemConfig.getSystemNames()) {
@@ -113,7 +125,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
// No-op
JobModel jobModel = getJobModel();
containerController.startContainer(
- jobModel.getContainers().get(processorId),
+ jobModel.getContainers().get(getProcessorId()),
jobModel.getConfig(),
jobModel.maxChangeLogStreamPartitions);
}
@@ -137,8 +149,8 @@ public class StandaloneJobCoordinator implements JobCoordinator {
}
@Override
- public int getProcessorId() {
- return this.processorId;
+ public String getProcessorId() {
+ return processorId;
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
index 7ca85c0..eada6e9 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -25,7 +25,7 @@ import org.apache.samza.processor.SamzaContainerController;
public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory {
@Override
- public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
- return new StandaloneJobCoordinator(processorId, config, containerController);
+ public JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController) {
+ return new StandaloneJobCoordinator(config, containerController);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 9471a23..e50f221 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -65,7 +65,7 @@ public class StorageRecovery extends CommandLine {
private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
- private Map<Integer, ContainerModel> containers = new HashMap<Integer, ContainerModel>();
+ private Map<String, ContainerModel> containers = new HashMap<String, ContainerModel>();
private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
@@ -211,7 +211,7 @@ public class StorageRecovery extends CommandLine {
for (ContainerModel containerModel : containers.values()) {
HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
- SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getContainerId(), jobConfig, containerModel.getTasks()
+ SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getProcessorId(), jobConfig, containerModel.getTasks()
.keySet());
for (TaskModel taskModel : containerModel.getTasks().values()) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
index f2b389b..3680b4f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
@@ -19,6 +19,10 @@
package org.apache.samza.util;
+import org.apache.samza.config.ConfigException;
+
+import java.lang.reflect.Constructor;
+
public class ClassLoaderHelper {
public static <T> T fromClassName(String className) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
@@ -26,4 +30,19 @@ public class ClassLoaderHelper {
T instance = clazz.newInstance();
return instance;
}
+
+ public static <T> T fromClassName(String className, Class<T> classType) {
+ try {
+ Class<?> idGeneratorClass = Class.forName(className);
+ if (!classType.isAssignableFrom(idGeneratorClass)) {
+ throw new ConfigException(String.format(
+ "Class %s is not of type %s", className, classType));
+ }
+ Constructor<?> constructor = idGeneratorClass.getConstructor();
+ return (T) constructor.newInstance();
+ } catch (Exception e) {
+ throw new ConfigException(String.format(
+ "Problem in loading %s class %s", classType, className), e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index cc454e3..21a6b03 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -26,12 +26,10 @@ import org.apache.samza.coordinator.CoordinationServiceFactory;
public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
-
-
synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
ZkConfig zkConfig = new ZkConfig(config);
ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
- ZkUtils zkUtils = new ZkUtils(participantId, new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+ ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, debounceTimer);
}