You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/28 16:44:44 UTC

[samza] branch master updated: SAMZA-2215 : StartpointManager fix for previous CoordinatorStreamStore refactor (#1048)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da0d06  SAMZA-2215 : StartpointManager fix for previous CoordinatorStreamStore refactor (#1048)
3da0d06 is described below

commit 3da0d06b4ba0a2a3550929c53729c9a2dd3954cd
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Tue May 28 09:44:40 2019 -0700

    SAMZA-2215 : StartpointManager fix for previous CoordinatorStreamStore refactor (#1048)
    
    * SAMZA-2215 : StartpointManager fix for previous CoordinatorStreamStore refactor
    
    * Optimize imports on ContainerLaunchUtil
---
 .../java/org/apache/samza/runtime/ContainerLaunchUtil.java     | 10 ++++------
 .../src/main/scala/org/apache/samza/config/JobConfig.scala     |  2 --
 2 files changed, 4 insertions(+), 8 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 978ab24..aa0d9f6 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -27,7 +27,6 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.ContainerHeartbeatClient;
@@ -93,10 +92,9 @@ public class ContainerLaunchUtil {
     try {
       TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
       LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
-      Optional<StartpointManager> startpointManager = Optional.empty();
-      if (new JobConfig(config).getStartpointMetadataStoreFactory() != null) {
-        startpointManager = Optional.of(new StartpointManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, StartpointManager.NAMESPACE)));
-      }
+
+      // StartpointManager wraps the coordinatorStreamStore in the namespaces internally
+      StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore);
 
       Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
 
@@ -115,7 +113,7 @@ public class ContainerLaunchUtil {
           JobContextImpl.fromConfigWithDefaults(config),
           Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
           Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
-          Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager.orElse(null), diagnosticsManager);
+          Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager, diagnosticsManager);
 
       ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
           .createInstance(new ProcessorContext() { }, config);
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 24140d9..f8f6851 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -260,8 +260,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getMetadataStoreFactory = getOption(JobConfig.METADATA_STORE_FACTORY).getOrElse(classOf[CoordinatorStreamMetadataStoreFactory].getCanonicalName)
 
-  def getStartpointMetadataStoreFactory = getOption(JobConfig.STARTPOINT_METADATA_STORE_FACTORY).getOrElse(null)
-
   def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }
 
   def getJMXEnabled = {