You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/06/01 01:49:56 UTC

[2/5] samza git commit: SAMZA-1313 : Rename StandaloneJobCoordinator to PassthroughJobCoordinator

SAMZA-1313 : Rename StandaloneJobCoordinator to PassthroughJobCoordinator

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #206 from sborya/RenameStandAloneGJobCoordinator


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a31010ca
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a31010ca
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a31010ca

Branch: refs/heads/0.13.0
Commit: a31010ca895cf7f5dd6ab236cb6636bcfaea3f6a
Parents: 5549ab8
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri May 26 15:45:52 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Wed May 31 18:10:31 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |   2 +-
 .../standalone/PassthroughJobCoordinator.java   | 157 +++++++++++++++++++
 .../PassthroughJobCoordinatorFactory.java       |  30 ++++
 .../standalone/StandaloneJobCoordinator.java    | 157 -------------------
 .../StandaloneJobCoordinatorFactory.java        |  30 ----
 .../apache/samza/test/StandaloneTestUtils.java  |   4 +-
 6 files changed, 190 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a31010ca/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index f34146c..3eea37d 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -420,7 +420,7 @@
                     <td class="description">
 			Class to use for job coordination. Currently available values are:
                        <dl>
-                            <dt><code>org.apache.samza.standalone.StandaloneJobCoordinatorFactory</code></dt>
+                            <dt><code>org.apache.samza.standalone.PassthroughJobCoordinatorFactory</code></dt>
                             <dd>Fixed partition mapping. No Zoookeeper. </dd>
                             <dt><code>org.apache.samza.zk.ZkJobCoordinatorFactory</code></dt>
                             <dd>Zookeeper-based coordination. </dd>

http://git-wip-us.apache.org/repos/asf/samza/blob/a31010ca/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
new file mode 100644
index 0000000..87a1cfa
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.coordinator.JobCoordinatorListener;
+import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standalone Job Coordinator does not implement any leader elector module or cluster manager
+ *
+ * It generates the JobModel using the Config passed into the constructor.
+ *
+ * Since the standalone JobCoordinator does not perform partition management, it allows two kinds of partition
+ * distribution mechanism:
+ * <ul>
+ *   <li>
+ *     Consumer-managed Partition Distribution - For example, using the kafka consumer which also handles partition
+ *   load balancing across its consumers. In such a case, all input SystemStreamPartition(s) can be grouped to the same
+ *   task instance using {@link org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory} and the
+ *   task can be added to a single container using
+ *   {@link org.apache.samza.container.grouper.task.SingleContainerGrouperFactory}.
+ *   </li>
+ *   <li>
+ *     User-defined Fixed Partition Distribution - For example, the application may always run a fixed number of
+ *   processors and use a static distribution of partitions that doesn't change. This can be achieved by adding custom
+ *   {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper} and
+ *   {@link org.apache.samza.container.grouper.task.TaskNameGrouper}.
+ *   </li>
+ * </ul>
+ * */
+public class PassthroughJobCoordinator implements JobCoordinator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PassthroughJobCoordinator.class);
+  private final String processorId;
+  private final Config config;
+  private JobCoordinatorListener coordinatorListener = null;
+
+  public PassthroughJobCoordinator(Config config) {
+    this.processorId = createProcessorId(config);
+    this.config = config;
+  }
+
+  @Override
+  public void start() {
+    // No-op
+    JobModel jobModel = null;
+    try {
+      jobModel = getJobModel();
+    } catch (Exception e) {
+      LOGGER.error("Exception while trying to getJobModel.", e);
+      if (coordinatorListener != null) {
+        coordinatorListener.onCoordinatorFailure(e);
+      }
+    }
+    if (jobModel != null && jobModel.getContainers().containsKey(processorId)) {
+      if (coordinatorListener != null) {
+        coordinatorListener.onNewJobModel(processorId, jobModel);
+      }
+    } else {
+      stop();
+    }
+  }
+
+  @Override
+  public void stop() {
+    // No-op
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+      coordinatorListener.onCoordinatorStop();
+    }
+  }
+
+  @Override
+  public void setListener(JobCoordinatorListener listener) {
+    this.coordinatorListener = listener;
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    for (String systemName: systemConfig.getSystemNames()) {
+      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
+      if (systemFactoryClassName == null) {
+        LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+        throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+      }
+      SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
+      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
+    }
+
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
+        Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+
+    /** TODO:
+     Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
+     in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
+     TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
+     (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
+     */
+    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
+  }
+
+  @Override
+  public String getProcessorId() {
+    return this.processorId;
+  }
+
+  private String createProcessorId(Config config) {
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {
+      return appConfig.getProcessorId();
+    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      return idGenerator.generateProcessorId(config);
+    } else {
+      throw new ConfigException(String
+          .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a31010ca/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
new file mode 100644
index 0000000..2ba56a6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.standalone;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+
+public class PassthroughJobCoordinatorFactory implements JobCoordinatorFactory {
+  @Override
+  public JobCoordinator getJobCoordinator(Config config) {
+    return new PassthroughJobCoordinator(config);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a31010ca/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
deleted file mode 100644
index 68a89ab..0000000
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.standalone;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JavaSystemConfig;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.coordinator.JobCoordinatorListener;
-import org.apache.samza.runtime.ProcessorIdGenerator;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Standalone Job Coordinator does not implement any leader elector module or cluster manager
- *
- * It generates the JobModel using the Config passed into the constructor.
- *
- * Since the standalone JobCoordinator does not perform partition management, it allows two kinds of partition
- * distribution mechanism:
- * <ul>
- *   <li>
- *     Consumer-managed Partition Distribution - For example, using the kafka consumer which also handles partition
- *   load balancing across its consumers. In such a case, all input SystemStreamPartition(s) can be grouped to the same
- *   task instance using {@link org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory} and the
- *   task can be added to a single container using
- *   {@link org.apache.samza.container.grouper.task.SingleContainerGrouperFactory}.
- *   </li>
- *   <li>
- *     User-defined Fixed Partition Distribution - For example, the application may always run a fixed number of
- *   processors and use a static distribution of partitions that doesn't change. This can be achieved by adding custom
- *   {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper} and
- *   {@link org.apache.samza.container.grouper.task.TaskNameGrouper}.
- *   </li>
- * </ul>
- * */
-public class StandaloneJobCoordinator implements JobCoordinator {
-  private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
-  private final String processorId;
-  private final Config config;
-  private JobCoordinatorListener coordinatorListener = null;
-
-  public StandaloneJobCoordinator(Config config) {
-    this.processorId = createProcessorId(config);
-    this.config = config;
-  }
-
-  @Override
-  public void start() {
-    // No-op
-    JobModel jobModel = null;
-    try {
-      jobModel = getJobModel();
-    } catch (Exception e) {
-      LOGGER.error("Exception while trying to getJobModel.", e);
-      if (coordinatorListener != null) {
-        coordinatorListener.onCoordinatorFailure(e);
-      }
-    }
-    if (jobModel != null && jobModel.getContainers().containsKey(processorId)) {
-      if (coordinatorListener != null) {
-        coordinatorListener.onNewJobModel(processorId, jobModel);
-      }
-    } else {
-      stop();
-    }
-  }
-
-  @Override
-  public void stop() {
-    // No-op
-    if (coordinatorListener != null) {
-      coordinatorListener.onJobModelExpired();
-      coordinatorListener.onCoordinatorStop();
-    }
-  }
-
-  @Override
-  public void setListener(JobCoordinatorListener listener) {
-    this.coordinatorListener = listener;
-  }
-
-  @Override
-  public JobModel getJobModel() {
-    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
-    for (String systemName: systemConfig.getSystemNames()) {
-      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
-      if (systemFactoryClassName == null) {
-        LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-        throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-      }
-      SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
-      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
-    }
-
-    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
-        Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
-
-    /** TODO:
-     Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
-     in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
-     TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
-     (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
-     */
-    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, null);
-  }
-
-  @Override
-  public String getProcessorId() {
-    return this.processorId;
-  }
-
-  private String createProcessorId(Config config) {
-    // TODO: This check to be removed after 0.13+
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    if (appConfig.getProcessorId() != null) {
-      return appConfig.getProcessorId();
-    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
-      ProcessorIdGenerator idGenerator =
-          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
-      return idGenerator.generateProcessorId(config);
-    } else {
-      throw new ConfigException(String
-          .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
-              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/a31010ca/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
deleted file mode 100644
index 764dc4c..0000000
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.standalone;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.coordinator.JobCoordinator;
-import org.apache.samza.coordinator.JobCoordinatorFactory;
-
-public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory {
-  @Override
-  public JobCoordinator getJobCoordinator(Config config) {
-    return new StandaloneJobCoordinator(config);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a31010ca/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java b/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
index 2057a8b..93605f5 100644
--- a/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
+++ b/samza-test/src/test/java/org/apache/samza/test/StandaloneTestUtils.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class StandaloneTestUtils {
-  private static final String STANDALONE_JOB_COORDINATOR_FACTORY = "org.apache.samza.standalone.StandaloneJobCoordinatorFactory";
+  private static final String PASSTHROUGH_JOB_COORDINATOR_FACTORY = "org.apache.samza.standalone.PassthroughJobCoordinatorFactory";
   private static final String STANDALONE_SSP_GROUPER_FACTORY = "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory";
   private static final String STANDALONE_TASK_NAME_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory";
   private static final String KAFKA_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
@@ -55,7 +55,7 @@ public class StandaloneTestUtils {
       {
         put(JOB_NAME, jobName);
         put(TASK_CLASS, taskClass);
-        put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, STANDALONE_JOB_COORDINATOR_FACTORY);
+        put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PASSTHROUGH_JOB_COORDINATOR_FACTORY);
         put(SSP_GROUPER_FACTORY, STANDALONE_SSP_GROUPER_FACTORY);
         put(TASK_NAME_GROUPER_FACTORY, STANDALONE_TASK_NAME_GROUPER_FACTORY);
       }