You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/28 16:44:44 UTC
[samza] branch master updated: SAMZA-2215 : StartpointManager fix
for previous CoordinatorStreamStore refactor (#1048)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3da0d06 SAMZA-2215 : StartpointManager fix for previous CoordinatorStreamStore refactor (#1048)
3da0d06 is described below
commit 3da0d06b4ba0a2a3550929c53729c9a2dd3954cd
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Tue May 28 09:44:40 2019 -0700
SAMZA-2215 : StartpointManager fix for previous CoordinatorStreamStore refactor (#1048)
* SAMZA-2215 : StartpointManager fix for previous CoordinatorStreamStore refactor
* Optimize imports on ContainerLaunchUtil
---
.../java/org/apache/samza/runtime/ContainerLaunchUtil.java | 10 ++++------
.../src/main/scala/org/apache/samza/config/JobConfig.scala | 2 --
2 files changed, 4 insertions(+), 8 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 978ab24..aa0d9f6 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -27,7 +27,6 @@ import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.container.ContainerHeartbeatClient;
@@ -93,10 +92,9 @@ public class ContainerLaunchUtil {
try {
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));
- Optional<StartpointManager> startpointManager = Optional.empty();
- if (new JobConfig(config).getStartpointMetadataStoreFactory() != null) {
- startpointManager = Optional.of(new StartpointManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, StartpointManager.NAMESPACE)));
- }
+
+ // StartpointManager wraps the coordinatorStreamStore in the namespaces internally
+ StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore);
Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);
@@ -115,7 +113,7 @@ public class ContainerLaunchUtil {
JobContextImpl.fromConfigWithDefaults(config),
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
- Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager.orElse(null), diagnosticsManager);
+ Option.apply(externalContextOptional.orElse(null)), localityManager, startpointManager, diagnosticsManager);
ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
.createInstance(new ProcessorContext() { }, config);
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 24140d9..f8f6851 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -260,8 +260,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getMetadataStoreFactory = getOption(JobConfig.METADATA_STORE_FACTORY).getOrElse(classOf[CoordinatorStreamMetadataStoreFactory].getCanonicalName)
- def getStartpointMetadataStoreFactory = getOption(JobConfig.STARTPOINT_METADATA_STORE_FACTORY).getOrElse(null)
-
def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }
def getJMXEnabled = {