You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/26 22:46:04 UTC
samza git commit: SAMZA-1313 : Rename StandaloneJobCoordinator to
PassthroughJobCoordinator
Repository: samza
Updated Branches:
refs/heads/master e5e9705c1 -> 8f1609d0b
SAMZA-1313 : Rename StandaloneJobCoordinator to PassthroughJobCoordinator
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #206 from sborya/RenameStandAloneGJobCoordinator
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8f1609d0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8f1609d0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8f1609d0
Branch: refs/heads/master
Commit: 8f1609d0b196f1aa64265d0768825ff7a4e3359c
Parents: e5e9705
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri May 26 15:45:52 2017 -0700
Committer: navina <na...@apache.org>
Committed: Fri May 26 15:45:52 2017 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 2 +-
.../standalone/PassthroughJobCoordinator.java | 157 +++++++++++++++++++
.../PassthroughJobCoordinatorFactory.java | 30 ++++
.../standalone/StandaloneJobCoordinator.java | 157 -------------------
.../StandaloneJobCoordinatorFactory.java | 30 ----
.../apache/samza/test/StandaloneTestUtils.java | 4 +-
6 files changed, 190 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8f1609d0/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index f34146c..3eea37d 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -420,7 +420,7 @@
<td class="description">
Class to use for job coordination. Currently available values are:
<dl>
- <dt><code>org.apache.samza.standalone.StandaloneJobCoordinatorFactory</code></dt>
+ <dt><code>org.apache.samza.standalone.PassthroughJobCoordinatorFactory</code></dt>
<dd>Fixed partition mapping. No Zoookeeper. </dd>
<dt><code>org.apache.samza.zk.ZkJobCoordinatorFactory</code></dt>
<dd>Zookeeper-based coordination. </dd>
http://git-wip-us.apache.org/repos/asf/samza/blob/8f1609d0/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
new file mode 100644
index 0000000..87a1cfa
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standalone Job Coordinator does not implement any leader elector module or cluster manager
+ *
+ * It generates the JobModel using the Config passed into the constructor.
+ *
+ * Since the standalone JobCoordinator does not perform partition management, it allows two kinds of partition
+ * distribution mechanism:
+ * <ul>
+ * <li>
+ * Consumer-managed Partition Distribution - For example, using the kafka consumer which also handles partition
+ * load balancing across its consumers. In such a case, all input SystemStreamPartition(s) can be grouped to the same
+ * task instance using {@link org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory} and the
+ * task can be added to a single container using
+ * {@link org.apache.samza.container.grouper.task.SingleContainerGrouperFactory}.
+ * </li>
+ * <li>
+ * User-defined Fixed Partition Distribution - For example, the application may always run a fixed number of
+ * processors and use a static distribution of partitions that doesn't change. This can be achieved by adding custom
+ * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper} and
+ * {@link org.apache.samza.container.grouper.task.TaskNameGrouper}.
+ * </li>
+ * </ul>
+ * */
+public class PassthroughJobCoordinator implements JobCoordinator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PassthroughJobCoordinator.class);
+ private final String processorId;
+ private final Config config;
+ private JobCoordinatorListener coordinatorListener = null;
+
+ public PassthroughJobCoordinator(Config config) {
+ this.processorId = createProcessorId(config);
+ this.config = config;
+ }
+
+ @Override
+ public void start() {
+ // No-op
+ JobModel jobModel = null;
+ try {
+ jobModel = getJobModel();
+ } catch (Exception e) {
+ LOGGER.error("Exception while trying to getJobModel.", e);
+ if (coordinatorListener != null) {
+ coordinatorListener.onCoordinatorFailure(e);
+ }
+ }
+ if (jobModel != null && jobModel.getContainers().containsKey(processorId)) {
+ if (coordinatorListener != null) {
+ coordinatorListener.onNewJobModel(processorId, jobModel);
+ }
+ } else {
+ stop();
+ }
+ }
+
+ @Override
+ public void stop() {
+ // No-op
+ if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ coordinatorListener.onCoordinatorStop();
+ }
+ }
+
+ @Override
+ public void setListener(JobCoordinatorListener listener) {
+ this.coordinatorListener = listener;
+ }
+
+ @Override
+ public JobModel getJobModel() {
+ JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+ Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+ for (String systemName: systemConfig.getSystemNames()) {
+ String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
+ if (systemFactoryClassName == null) {
+ LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+ throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+ }
+ SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
+ systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
+ }
+
+ StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
+ Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+
+ /** TODO:
+ Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
+ in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
+ TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
+ (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
+ */
+ return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
+ }
+
+ @Override
+ public String getProcessorId() {
+ return this.processorId;
+ }
+
+ private String createProcessorId(Config config) {
+ // TODO: This check to be removed after 0.13+
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ if (appConfig.getProcessorId() != null) {
+ return appConfig.getProcessorId();
+ } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
+ ProcessorIdGenerator idGenerator =
+ ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+ return idGenerator.generateProcessorId(config);
+ } else {
+ throw new ConfigException(String
+ .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+ ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8f1609d0/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
new file mode 100644
index 0000000..2ba56a6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+
+public class PassthroughJobCoordinatorFactory implements JobCoordinatorFactory {
+ @Override
+ public JobCoordinator getJobCoordinator(Config config) {
+ return new PassthroughJobCoordinator(config);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/8f1609d0/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
deleted file mode 100644
index 68a89ab..0000000
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.standalone;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JavaSystemConfig;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.coordinator.JobCoordinatorListener;
-import org.apache.samza.runtime.ProcessorIdGenerator;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Standalone Job Coordinator does not implement any leader elector module or cluster manager
- *
- * It generates the JobModel using the Config passed into the constructor.
- *
- * Since the standalone JobCoordinator does not perform partition management, it allows two kinds of partition
- * distribution mechanism:
- * <ul>
- * <li>
- * Consumer-managed Partition Distribution - For example, using the kafka consumer which also handles partition
- * load balancing across its consumers. In such a case, all input SystemStreamPartition(s) can be grouped to the same
- * task instance using {@link org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory} and the
- * task can be added to a single container using
- * {@link org.apache.samza.container.grouper.task.SingleContainerGrouperFactory}.
- * </li>
- * <li>
- * User-defined Fixed Partition Distribution - For example, the application may always run a fixed number of
- * processors and use a static distribution of partitions that doesn't change. This can be achieved by adding custom
- * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper} and
- * {@link org.apache.samza.container.grouper.task.TaskNameGrouper}.
- * </li>
- * </ul>
- * */
-public class StandaloneJobCoordinator implements JobCoordinator {
- private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
- private final String processorId;
- private final Config config;
- private JobCoordinatorListener coordinatorListener = null;
-
- public StandaloneJobCoordinator(Config config) {
- this.processorId = createProcessorId(config);
- this.config = config;
- }
-
- @Override
- public void start() {
- // No-op
- JobModel jobModel = null;
- try {
- jobModel = getJobModel();
- } catch (Exception e) {
- LOGGER.error("Exception while trying to getJobModel.", e);
- if (coordinatorListener != null) {
- coordinatorListener.onCoordinatorFailure(e);
- }
- }
- if (jobModel != null && jobModel.getContainers().containsKey(processorId)) {
- if (coordinatorListener != null) {
- coordinatorListener.onNewJobModel(processorId, jobModel);
- }
- } else {
- stop();
- }
- }
-
- @Override
- public void stop() {
- // No-op
- if (coordinatorListener != null) {
- coordinatorListener.onJobModelExpired();
- coordinatorListener.onCoordinatorStop();
- }
- }
-
- @Override
- public void setListener(JobCoordinatorListener listener) {
- this.coordinatorListener = listener;
- }
-
- @Override
- public JobModel getJobModel() {
- JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
- Map<String, SystemAdmin> systemAdmins = new HashMap<>();
- for (String systemName: systemConfig.getSystemNames()) {
- String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
- if (systemFactoryClassName == null) {
- LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
- throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
- }
- SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
- systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
- }
-
- StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
- Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
-
- /** TODO:
- Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
- in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
- TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
- (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
- */
- return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
- }
-
- @Override
- public String getProcessorId() {
- return this.processorId;
- }
-
- private String createProcessorId(Config config) {
- // TODO: This check to be removed after 0.13+
- ApplicationConfig appConfig = new ApplicationConfig(config);
- if (appConfig.getProcessorId() != null) {
- return appConfig.getProcessorId();
- } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
- ProcessorIdGenerator idGenerator =
- ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
- return idGenerator.generateProcessorId(config);
- } else {
- throw new ConfigException(String
- .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
- ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8f1609d0/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
deleted file mode 100644
index 764dc4c..0000000
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.standalone;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.JobCoordinatorFactory;
-
-public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory {
- @Override
- public JobCoordinator getJobCoordinator(Config config) {
- return new StandaloneJobCoordinator(config);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/8f1609d0/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java b/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
index 2057a8b..93605f5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
+++ b/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.Map;
public class StandaloneTestUtils {
- private static final String STANDALONE_JOB_COORDINATOR_FACTORY = "org.apache.samza.standalone.StandaloneJobCoordinatorFactory";
+ private static final String PASSTHROUGH_JOB_COORDINATOR_FACTORY = "org.apache.samza.standalone.PassthroughJobCoordinatorFactory";
private static final String STANDALONE_SSP_GROUPER_FACTORY = "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory";
private static final String STANDALONE_TASK_NAME_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory";
private static final String KAFKA_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
@@ -55,7 +55,7 @@ public class StandaloneTestUtils {
{
put(JOB_NAME, jobName);
put(TASK_CLASS, taskClass);
- put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, STANDALONE_JOB_COORDINATOR_FACTORY);
+ put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PASSTHROUGH_JOB_COORDINATOR_FACTORY);
put(SSP_GROUPER_FACTORY, STANDALONE_SSP_GROUPER_FACTORY);
put(TASK_NAME_GROUPER_FACTORY, STANDALONE_TASK_NAME_GROUPER_FACTORY);
}