You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/07/23 06:07:50 UTC
[samza] branch master updated: SAMZA-2260: Standalone - coordinator
stream metadata store lifecycle (#1090)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 106286f SAMZA-2260: Standalone - coordinator stream metadata store lifecycle (#1090)
106286f is described below
commit 106286fc2161bc093f85bf785d56530467ceff45
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Mon Jul 22 23:07:44 2019 -0700
SAMZA-2260: Standalone - coordinator stream metadata store lifecycle (#1090)
* SAMZA-2260: Standalone and coordinator metadata store lifecycle
* Trigger build again
* Address PR comments from @shanthoosh
---
.../coordinator/AzureJobCoordinatorFactory.java | 4 +-
.../samza/coordinator/JobCoordinatorFactory.java | 5 +-
.../apache/samza/processor/StreamProcessor.java | 74 ++++++++++++---
.../samza/runtime/LocalApplicationRunner.java | 78 +++++++++++----
.../PassthroughJobCoordinatorFactory.java | 4 +-
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 71 ++++++--------
.../apache/samza/zk/ZkJobCoordinatorFactory.java | 17 ++--
.../samza/processor/TestStreamProcessor.java | 35 ++++---
.../samza/runtime/TestLocalApplicationRunner.java | 105 ++++++++++++++++++---
.../org/apache/samza/zk/TestZkJobCoordinator.java | 38 +++++---
.../sql/runner/SamzaSqlApplicationRunner.java | 4 +-
.../apache/samza/sql/util/SamzaSqlTestConfig.java | 6 +-
.../apache/samza/test/framework/TestRunner.java | 2 +-
.../samza/processor/TestZkStreamProcessorBase.java | 5 +-
.../samzasql/SamzaSqlIntegrationTestHarness.java | 24 ++++-
15 files changed, 341 insertions(+), 131 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
index ff8925a..206e0d4 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinatorFactory.java
@@ -20,11 +20,13 @@
package org.apache.samza.coordinator;
import org.apache.samza.config.Config;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
public class AzureJobCoordinatorFactory implements JobCoordinatorFactory {
@Override
- public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
+ public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+ MetadataStore metadataStore) {
return new AzureJobCoordinator(processorId, config, metricsRegistry);
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index 8f3d96e..b555bae 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -20,6 +20,7 @@ package org.apache.samza.coordinator;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
@InterfaceStability.Evolving
@@ -29,7 +30,9 @@ public interface JobCoordinatorFactory {
* @param processorId a unique logical identifier assigned to the {@link org.apache.samza.processor.StreamProcessor}.
* @param config the configuration of the samza application.
* @param metricsRegistry used to publish the coordination specific metrics.
+ * @param metadataStore used to read and write metadata for the samza application.
* @return the {@link JobCoordinator} instance.
*/
- JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry);
+ JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+ MetadataStore metadataStore);
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 9877a62..815725e 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -51,10 +51,13 @@ import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.runtime.ProcessorLifecycleListener;
+import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.util.DiagnosticsUtil;
import org.apache.samza.util.ReflectionUtil;
@@ -133,6 +136,7 @@ public class StreamProcessor {
private final ExecutorService containerExcecutorService;
private final Object lock = new Object();
private final MetricsRegistryMap metricsRegistry;
+ private final MetadataStore metadataStore;
private volatile Throwable containerException = null;
@@ -181,9 +185,8 @@ public class StreamProcessor {
* @param customMetricsReporters registered with the metrics system to report metrics.
* @param taskFactory the task factory to instantiate the Task.
* @param processorListener listener to the StreamProcessor life cycle.
- *
- * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
- * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
+ * @deprecated use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
+ * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)} instead.
*/
@Deprecated
public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
@@ -193,7 +196,7 @@ public class StreamProcessor {
/**
* Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
- * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences:
+ * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)}, with the following differences:
* <ol>
* <li>Passes null for application-defined context factories</li>
* <li>Accepts a {@link ProcessorLifecycleListener} directly instead of a
@@ -206,15 +209,14 @@ public class StreamProcessor {
* @param taskFactory task factory to instantiate the Task
* @param processorListener listener to the StreamProcessor life cycle
* @param jobCoordinator the instance of {@link JobCoordinator}
- *
- * Deprecated: Use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
- * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
+ * @deprecated use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
+ * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)} instead.
*/
@Deprecated
public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
this(processorId, config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), Optional.empty(), sp -> processorListener,
- jobCoordinator);
+ jobCoordinator, null);
}
/**
@@ -228,13 +230,44 @@ public class StreamProcessor {
* @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
* @param externalContextOptional optional {@link ExternalContext} to pass through to the application
* @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
- * @param jobCoordinator the instance of {@link JobCoordinator}
+ * @param jobCoordinator the instance of {@link JobCoordinator}. If null, the jobCoordinator instance will be created.
+ * If the jobCoordinator is passed in externally, the jobCoordinator and StreamProcessor may not
+ * share the same instance of the {@link MetadataStore}.
+ * @deprecated use {@link #StreamProcessor(String, Config, Map, TaskFactory, Optional, Optional, Optional,
+ * StreamProcessorLifecycleListenerFactory, JobCoordinator, MetadataStore)} instead.
*/
+ @Deprecated
public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
Optional<ExternalContext> externalContextOptional, StreamProcessorLifecycleListenerFactory listenerFactory,
JobCoordinator jobCoordinator) {
+ this(processorId, config, customMetricsReporters, taskFactory, applicationDefinedContainerContextFactoryOptional,
+ applicationDefinedTaskContextFactoryOptional, externalContextOptional, listenerFactory,
+ jobCoordinator, null);
+ }
+
+ /**
+ * Builds a {@link StreamProcessor} with full specification of processing components.
+ *
+ * @param processorId a unique logical identifier assigned to the stream processor.
+ * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}
+ * @param customMetricsReporters registered with the metrics system to report metrics
+ * @param taskFactory task factory to instantiate the Task
+ * @param applicationDefinedContainerContextFactoryOptional optional factory for application-defined container context
+ * @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
+ * @param externalContextOptional optional {@link ExternalContext} to pass through to the application
+ * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
+ * @param jobCoordinator the instance of {@link JobCoordinator}. If null, the jobCoordinator instance will be created.
+ * If the jobCoordinator is passed in externally, the jobCoordinator and StreamProcessor may not
+ * share the same instance of the {@link MetadataStore}.
+ * @param metadataStore the instance of {@link MetadataStore} used by managers such as {@link StartpointManager}
+ */
+ public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
+ Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
+ Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
+ Optional<ExternalContext> externalContextOptional, StreamProcessorLifecycleListenerFactory listenerFactory,
+ JobCoordinator jobCoordinator, MetadataStore metadataStore) {
Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null.");
Preconditions.checkArgument(StringUtils.isNotBlank(processorId), "ProcessorId cannot be null.");
this.config = config;
@@ -249,7 +282,10 @@ public class StreamProcessor {
this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional;
this.externalContextOptional = externalContextOptional;
this.taskShutdownMs = new TaskConfig(config).getShutdownMs();
- this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator();
+ this.metadataStore = metadataStore;
+ this.jobCoordinator = (jobCoordinator != null)
+ ? jobCoordinator
+ : createJobCoordinator(config, processorId, metricsRegistry, metadataStore);
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
@@ -349,19 +385,28 @@ public class StreamProcessor {
this.customMetricsReporter.put(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS, diagnosticsManagerReporterPair.get().getValue());
}
+ // Metadata store lifecycle managed outside of the SamzaContainer.
+ // All manager lifecycles are managed in the SamzaContainer including startpointManager
+ StartpointManager startpointManager = null;
+ if (metadataStore != null) {
+ startpointManager = new StartpointManager(metadataStore);
+ } else {
+ LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor");
+ }
+
return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
- Option.apply(this.externalContextOptional.orElse(null)), getClass().getClassLoader(), null, null,
+ Option.apply(this.externalContextOptional.orElse(null)), getClass().getClassLoader(), null, startpointManager,
diagnosticsManager);
}
- private JobCoordinator createJobCoordinator() {
+ private static JobCoordinator createJobCoordinator(Config config, String processorId, MetricsRegistry metricsRegistry, MetadataStore metadataStore) {
String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
JobCoordinatorFactory jobCoordinatorFactory =
- ReflectionUtil.getObj(getClass().getClassLoader(), jobCoordinatorFactoryClassName, JobCoordinatorFactory.class);
- return jobCoordinatorFactory.getJobCoordinator(processorId, config, metricsRegistry);
+ ReflectionUtil.getObj(StreamProcessor.class.getClassLoader(), jobCoordinatorFactoryClassName, JobCoordinatorFactory.class);
+ return jobCoordinatorFactory.getJobCoordinator(processorId, config, metricsRegistry, metadataStore);
}
/**
@@ -453,7 +498,6 @@ public class StreamProcessor {
processorListener.afterFailure(containerException);
else
processorListener.afterStop();
-
}
@Override
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index adbb4c6..5549570 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -21,7 +21,6 @@ package org.apache.samza.runtime;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,12 +32,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
-import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
@@ -48,6 +49,7 @@ import org.apache.samza.context.ExternalContext;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.RunIdGenerator;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.metadatastore.MetadataStore;
@@ -75,12 +77,13 @@ public class LocalApplicationRunner implements ApplicationRunner {
public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();
private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
- private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
+ private final Set<Pair<StreamProcessor, MetadataStore>> processors = ConcurrentHashMap.newKeySet();
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final AtomicInteger numProcessorsToStart = new AtomicInteger();
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private final boolean isAppModeBatch;
private final Optional<CoordinationUtils> coordinationUtils;
+ private final MetadataStoreFactory metadataStoreFactory;
private Optional<String> runId = Optional.empty();
private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
@@ -93,9 +96,21 @@ public class LocalApplicationRunner implements ApplicationRunner {
* @param config configuration for the application
*/
public LocalApplicationRunner(SamzaApplication app, Config config) {
+ this(app, config, new CoordinatorStreamMetadataStoreFactory());
+ }
+
+ /**
+ * Constructors a {@link LocalApplicationRunner} to run the {@code app} with the {@code config}.
+ *
+ * @param app application to run
+ * @param config configuration for the application
+ * @param metadataStoreFactory the instance of {@link MetadataStoreFactory} to read and write to coordinator stream.
+ */
+ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStoreFactory metadataStoreFactory) {
this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
- isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
- coordinationUtils = getCoordinationUtils(config, getClass().getClassLoader());
+ this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+ this.coordinationUtils = getCoordinationUtils(config, getClass().getClassLoader());
+ this.metadataStoreFactory = metadataStoreFactory;
}
/**
@@ -104,8 +119,9 @@ public class LocalApplicationRunner implements ApplicationRunner {
@VisibleForTesting
LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) {
this.appDesc = appDesc;
- isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+ this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
this.coordinationUtils = coordinationUtils;
+ this.metadataStoreFactory = new CoordinatorStreamMetadataStoreFactory();
}
private Optional<CoordinationUtils> getCoordinationUtils(Config config, ClassLoader classLoader) {
@@ -145,7 +161,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
}
try {
- MetadataStore metadataStore = getMetadataStore();
+ MetadataStore metadataStore = getMetadataStoreForRunID();
runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore));
runId = runIdGenerator.flatMap(RunIdGenerator::getRunId);
} catch (Exception e) {
@@ -172,14 +188,16 @@ public class LocalApplicationRunner implements ApplicationRunner {
}
jobConfigs.forEach(jobConfig -> {
LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
+ MetadataStore coordinatorStreamStore = createCoordinatorStreamStore(jobConfig);
+ coordinatorStreamStore.init();
StreamProcessor processor = createStreamProcessor(jobConfig, appDesc,
- sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig), Optional.ofNullable(externalContext));
- processors.add(processor);
+ sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig), Optional.ofNullable(externalContext), coordinatorStreamStore);
+ processors.add(Pair.of(processor, coordinatorStreamStore));
});
numProcessorsToStart.set(processors.size());
// start the StreamProcessors
- processors.forEach(StreamProcessor::start);
+ processors.forEach(sp -> sp.getLeft().start());
} catch (Throwable throwable) {
cleanup();
appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
@@ -191,7 +209,10 @@ public class LocalApplicationRunner implements ApplicationRunner {
@Override
public void kill() {
- processors.forEach(StreamProcessor::stop);
+ processors.forEach(sp -> {
+ sp.getLeft().stop(); // Stop StreamProcessor
+ sp.getRight().close(); // Close associated coordinator metadata store
+ });
cleanup();
}
@@ -231,7 +252,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
@VisibleForTesting
protected Set<StreamProcessor> getProcessors() {
- return Collections.unmodifiableSet(processors);
+ return processors.stream().map(sp -> sp.getLeft()).collect(Collectors.toSet());
}
@VisibleForTesting
@@ -240,16 +261,26 @@ public class LocalApplicationRunner implements ApplicationRunner {
}
@VisibleForTesting
+ MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+ MetadataStore coordinatorStreamStore =
+ metadataStoreFactory.getMetadataStore("NoOp", jobConfig, new MetricsRegistryMap());
+ return coordinatorStreamStore;
+ }
+
+ @VisibleForTesting
StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory,
- Optional<ExternalContext> externalContextOptional) {
+ Optional<ExternalContext> externalContextOptional, MetadataStore coordinatorStreamStore) {
TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
Map<String, MetricsReporter> reporters = new HashMap<>();
String processorId = createProcessorId(new ApplicationConfig(config), getClass().getClassLoader());
appDesc.getMetricsReporterFactories().forEach((name, factory) ->
reporters.put(name, factory.getMetricsReporter(name, processorId, config)));
- return new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
- appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null);
+
+ StreamProcessor streamProcessor = new StreamProcessor(processorId, config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
+ appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null, coordinatorStreamStore);
+
+ return streamProcessor;
}
/**
@@ -282,7 +313,13 @@ public class LocalApplicationRunner implements ApplicationRunner {
coordinationUtils.ifPresent(CoordinationUtils::close);
}
- private MetadataStore getMetadataStore() {
+ /**
+ * This is not to be confused with the metadata store created from the member {@link #metadataStoreFactory}.
+ * The reason for the two Metadata store types (ZK and coordinator stream) is that the job model needs to be stored in
+ * ZK because of the versioning requirements. Configs and startpoints are stored in the coordinator stream. This
+ * disparity will be resolved with the next gen metadata store abstraction.
+ */
+ private MetadataStore getMetadataStoreForRunID() {
String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY);
MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
@@ -316,7 +353,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
@Override
public void afterStop() {
- processors.remove(processor);
+ processors.removeIf(pair -> pair.getLeft().equals(processor));
// successful shutdown
handleProcessorShutdown(null);
@@ -324,13 +361,16 @@ public class LocalApplicationRunner implements ApplicationRunner {
@Override
public void afterFailure(Throwable t) {
- processors.remove(processor);
+ processors.removeIf(pair -> pair.getLeft().equals(processor));
// the processor stopped with failure, this is logging the first processor's failure as the cause of
// the whole application failure
if (failure.compareAndSet(null, t)) {
// shutdown the other processors
- processors.forEach(StreamProcessor::stop);
+ processors.forEach(sp -> {
+ sp.getLeft().stop(); // Stop StreamProcessor
+ sp.getRight().close(); // Close associated coordinator metadata store
+ });
}
// handle the current processor's shutdown failure.
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
index 5d5fecf..bacf312 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinatorFactory.java
@@ -21,11 +21,13 @@ package org.apache.samza.standalone;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
public class PassthroughJobCoordinatorFactory implements JobCoordinatorFactory {
@Override
- public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
+ public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+ MetadataStore metadataStore) {
return new PassthroughJobCoordinator(processorId, config, metricsRegistry);
}
}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 167d458..b4a2481 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.samza.zk;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -60,11 +61,9 @@ import org.apache.samza.runtime.LocationIdProvider;
import org.apache.samza.runtime.LocationIdProviderFactory;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
import org.apache.samza.zk.ZkUtils.ProcessorNode;
@@ -109,6 +108,7 @@ public class ZkJobCoordinator implements JobCoordinator {
private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
private final LocationId locationId;
private final MetadataStore jobModelMetadataStore;
+ private final CoordinatorStreamStore coordinatorStreamStore;
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
@@ -124,11 +124,14 @@ public class ZkJobCoordinator implements JobCoordinator {
@VisibleForTesting
StreamPartitionCountMonitor streamPartitionCountMonitor = null;
- ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils, MetadataStore jobModelMetadataStore) {
+ ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils, MetadataStore jobModelMetadataStore, MetadataStore coordinatorStreamStore) {
+ // TODO: When we consolidate metadata stores for standalone, this check can be removed. For now, we expect this type.
+ // Keeping method signature as MetadataStore to avoid public API changes in the future
+ Preconditions.checkArgument(coordinatorStreamStore instanceof CoordinatorStreamStore);
+
this.config = config;
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
-
this.processorId = processorId;
this.zkUtils = zkUtils;
// setup a listener for a session state change
@@ -150,6 +153,7 @@ public class ZkJobCoordinator implements JobCoordinator {
LocationIdProviderFactory.class);
LocationIdProvider locationIdProvider = locationIdProviderFactory.getLocationIdProvider(config);
this.locationId = locationIdProvider.getLocationId();
+ this.coordinatorStreamStore = (CoordinatorStreamStore) coordinatorStreamStore;
this.jobModelMetadataStore = jobModelMetadataStore;
}
@@ -308,38 +312,33 @@ public class ZkJobCoordinator implements JobCoordinator {
*/
@VisibleForTesting
void loadMetadataResources(JobModel jobModel) {
- CoordinatorStreamStore coordinatorStreamStore = null;
try {
- // Creates the coordinator stream if it does not exists.
- coordinatorStreamStore = createCoordinatorStreamStore();
- coordinatorStreamStore.init();
-
MetadataResourceUtil metadataResourceUtil = createMetadataResourceUtil(jobModel, getClass().getClassLoader());
metadataResourceUtil.createResources();
- CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
- NamespaceAwareCoordinatorStreamStore configStore =
- new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
- for (Map.Entry<String, String> entry : config.entrySet()) {
- byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
- configStore.put(entry.getKey(), serializedValue);
- }
+ if (coordinatorStreamStore != null) {
+ // TODO: SAMZA-2273 - publish configs async
+ CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+ NamespaceAwareCoordinatorStreamStore configStore =
+ new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
+ configStore.put(entry.getKey(), serializedValue);
+ }
- // fan out the startpoints
- StartpointManager startpointManager = createStartpointManager(coordinatorStreamStore);
- startpointManager.start();
- try {
- startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
- } finally {
- startpointManager.stop();
+ // fan out the startpoints
+ StartpointManager startpointManager = createStartpointManager();
+ startpointManager.start();
+ try {
+ startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
+ } finally {
+ startpointManager.stop();
+ }
+ } else {
+ LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out.");
}
} catch (IOException ex) {
throw new SamzaException(String.format("IO exception while loading metadata resources."), ex);
- } finally {
- if (coordinatorStreamStore != null) {
- LOG.info("Stopping the coordinator stream metadata store.");
- coordinatorStreamStore.close();
- }
}
}
@@ -349,17 +348,6 @@ public class ZkJobCoordinator implements JobCoordinator {
}
/**
- * Creates a coordinator stream kafka topic.
- */
- @VisibleForTesting
- CoordinatorStreamStore createCoordinatorStreamStore() {
- SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
- SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
- CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin);
- return new CoordinatorStreamStore(config, metrics.getMetricsRegistry());
- }
-
- /**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
@VisibleForTesting
@@ -398,8 +386,9 @@ public class ZkJobCoordinator implements JobCoordinator {
}
@VisibleForTesting
- StartpointManager createStartpointManager(CoordinatorStreamStore metadataStore) {
- return new StartpointManager(metadataStore);
+ StartpointManager createStartpointManager() {
+ // This method is for easy mocking.
+ return new StartpointManager(coordinatorStreamStore);
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 8aacb96..2ee116e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -26,6 +26,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,20 +38,18 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
private static final String DEFAULT_JOB_NAME = "defaultJob";
private static final String PROTOCOL_VERSION = "2.0";
- /**
- * Instantiates an {@link ZkJobCoordinator} using the {@link Config}.
- *
- * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator}
- * @return An instance of {@link ZkJobCoordinator}
- */
@Override
- public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
+ public JobCoordinator getJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry,
+ MetadataStore coordinatorStreamStore) {
// TODO: Separate JC related configs into a "ZkJobCoordinatorConfig"
String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config);
ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath);
LOG.debug("Creating ZkJobCoordinator with config: {}.", config);
- ZkMetadataStore metadataStore = new ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), config, metricsRegistry);
- return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils, metadataStore);
+
+ // TODO: This should be merged with coordinatorStreamStore - SAMZA-2272
+ ZkMetadataStore zkMetadataStore = new ZkMetadataStore(zkUtils.getKeyBuilder().getRootPath(), config, metricsRegistry);
+
+ return new ZkJobCoordinator(processorId, config, metricsRegistry, zkUtils, zkMetadataStore, coordinatorStreamStore);
}
private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, String coordinatorZkBasePath) {
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index b809ead..6d78b77 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.processor;
+import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
@@ -29,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.ImmutableMap;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -40,6 +40,7 @@ import org.apache.samza.container.SamzaContainerStatus;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.processor.StreamProcessor.State;
import org.apache.samza.runtime.ProcessorLifecycleListener;
@@ -87,7 +88,7 @@ public class TestStreamProcessor {
processorListenerState.clear();
}
- class TestableStreamProcessor extends StreamProcessor {
+ static class TestableStreamProcessor extends StreamProcessor {
private final CountDownLatch containerStop = new CountDownLatch(1);
private final CountDownLatch runLoopStartForMain = new CountDownLatch(1);
private SamzaContainer container = null;
@@ -110,7 +111,9 @@ public class TestStreamProcessor {
JobCoordinator jobCoordinator,
SamzaContainer container,
Duration runLoopShutdownDuration) {
- super("TEST_PROCESSOR_ID", config, customMetricsReporters, streamTaskFactory, processorListener, jobCoordinator);
+
+ super("TEST_PROCESSOR_ID", config, customMetricsReporters, streamTaskFactory, Optional.empty(), Optional.empty(), Optional.empty(), sp -> processorListener,
+ jobCoordinator, Mockito.mock(MetadataStore.class));
this.container = container;
this.runLoopShutdownDuration = runLoopShutdownDuration;
}
@@ -151,6 +154,7 @@ public class TestStreamProcessor {
when(mockJobModel.getContainers()).thenReturn(containers);
return mockJobModel;
}
+
/**
* Tests stop() method when Container AND JobCoordinator are running
*/
@@ -217,7 +221,7 @@ public class TestStreamProcessor {
}).when(mockJobCoordinator).start();
processor.start();
- processorListenerStart.await();
+ processorListenerStart.await(10, TimeUnit.SECONDS);
assertEquals(SamzaContainerStatus.STARTED, processor.getContainerStatus());
@@ -406,7 +410,9 @@ public class TestStreamProcessor {
JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
Mockito.doNothing().when(mockJobCoordinator).start();
ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
- StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+ StreamProcessor streamProcessor = Mockito.spy(new StreamProcessor("TestProcessorId", new MapConfig(),
+ new HashMap<>(), null, Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener,
+ mockJobCoordinator, Mockito.mock(MetadataStore.class)));
assertEquals(State.NEW, streamProcessor.getState());
streamProcessor.start();
@@ -425,7 +431,8 @@ public class TestStreamProcessor {
ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
- StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+ StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null,
+ Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
/**
* Without a SamzaContainer running in StreamProcessor and current StreamProcessor state is STARTED,
@@ -493,7 +500,9 @@ public class TestStreamProcessor {
ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
- StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
+ StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor("TestProcessorId", config, new HashMap<>(),
+ null, Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator,
+ Mockito.mock(MetadataStore.class)));
Mockito.doNothing().when(mockJobCoordinator).stop();
Mockito.doNothing().when(mockSamzaContainer).shutdown();
@@ -515,7 +524,8 @@ public class TestStreamProcessor {
ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
- StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+ StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null,
+ Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
Exception failureException = new Exception("dummy exception");
@@ -536,7 +546,8 @@ public class TestStreamProcessor {
JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
- StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+ StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null,
+ Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class));
streamProcessor.state = State.RUNNING;
streamProcessor.jobCoordinatorListener.onCoordinatorStop();
@@ -549,10 +560,10 @@ public class TestStreamProcessor {
public void testStreamProcessorWithStreamProcessorListenerFactory() {
AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>();
StreamProcessor streamProcessor =
- new StreamProcessor("TestProcessorId", mock(Config.class), new HashMap<>(), mock(TaskFactory.class), Optional.empty(),
- Optional.empty(), Optional.empty(),
+ new StreamProcessor("TestProcessorId", mock(Config.class), new HashMap<>(), mock(TaskFactory.class),
+ Optional.empty(), Optional.empty(), Optional.empty(),
sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)),
- mock(JobCoordinator.class));
+ mock(JobCoordinator.class), Mockito.mock(MetadataStore.class));
assertEquals(streamProcessor, mockListener.get().processor);
}
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index ea5f8a0..a5fd760 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -25,12 +25,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import org.apache.samza.application.descriptors.ApplicationDescriptor;
-import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
-import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
@@ -41,9 +41,10 @@ import org.apache.samza.coordinator.ClusterMembership;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.DistributedLock;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.execution.LocalJobPlanner;
import org.apache.samza.task.IdentityStreamTask;
import org.apache.samza.zk.ZkMetadataStore;
import org.apache.samza.zk.ZkMetadataStoreFactory;
@@ -60,8 +61,17 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@@ -94,6 +104,7 @@ public class TestLocalApplicationRunner {
prepareTest();
StreamProcessor sp = mock(StreamProcessor.class);
+ CoordinatorStreamStore metadataStore = mock(CoordinatorStreamStore.class);
ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
@@ -108,11 +119,16 @@ public class TestLocalApplicationRunner {
ExternalContext externalContext = mock(ExternalContext.class);
doReturn(sp).when(runner)
- .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
+ .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(
+ CoordinatorStreamStore.class));
+ doReturn(metadataStore).when(runner).createCoordinatorStreamStore(any(Config.class));
doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status();
runner.run(externalContext);
+ verify(metadataStore).init();
+ verify(metadataStore, never()).close();
+
assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
}
@@ -127,6 +143,7 @@ public class TestLocalApplicationRunner {
prepareTest();
StreamProcessor sp = mock(StreamProcessor.class);
+ CoordinatorStreamStore metadataStore = mock(CoordinatorStreamStore.class);
ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
@@ -139,11 +156,16 @@ public class TestLocalApplicationRunner {
return null;
}).when(sp).start();
- doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.empty()));
+ doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(),
+ captor.capture(), eq(Optional.empty()), any(CoordinatorStreamStore.class));
+ doReturn(metadataStore).when(runner).createCoordinatorStreamStore(any(Config.class));
doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status();
runner.run();
+ verify(metadataStore).init();
+ verify(metadataStore, never()).close();
+
assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
}
@@ -162,6 +184,7 @@ public class TestLocalApplicationRunner {
doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
StreamProcessor sp = mock(StreamProcessor.class);
+ CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
@@ -175,11 +198,15 @@ public class TestLocalApplicationRunner {
ExternalContext externalContext = mock(ExternalContext.class);
doReturn(sp).when(runner)
- .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
+ .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(CoordinatorStreamStore.class));
+ doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
runner.run(externalContext);
runner.waitForFinish();
+ verify(coordinatorStreamStore).init();
+ verify(coordinatorStreamStore, never()).close();
+
assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
}
@@ -198,6 +225,7 @@ public class TestLocalApplicationRunner {
doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
StreamProcessor sp = mock(StreamProcessor.class);
+ CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
@@ -208,7 +236,9 @@ public class TestLocalApplicationRunner {
ExternalContext externalContext = mock(ExternalContext.class);
doReturn(sp).when(runner)
- .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
+ .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(
+ CoordinatorStreamStore.class));
+ doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
try {
runner.run(externalContext);
@@ -217,10 +247,60 @@ public class TestLocalApplicationRunner {
assertNotNull(th);
}
+ verify(coordinatorStreamStore).init();
+ verify(coordinatorStreamStore, never()).close();
+
assertEquals(runner.status(), ApplicationStatus.UnsuccessfulFinish);
}
@Test
+ public void testKill() throws Exception {
+ Map<String, String> cfgs = new HashMap<>();
+ cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+ config = new MapConfig(cfgs);
+ ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> mock(ProcessorLifecycleListener.class);
+ mockApp = (StreamApplication) appDesc -> {
+ appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+ };
+ prepareTest();
+
+ // return the jobConfigs from the planner
+ doReturn(Collections.singletonList(new JobConfig(new MapConfig(config)))).when(localPlanner).prepareJobs();
+
+ StreamProcessor sp = mock(StreamProcessor.class);
+ CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
+ ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
+ ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+ doAnswer(i ->
+ {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStart();
+ return null;
+ }).when(sp).start();
+
+ doAnswer(i ->
+ {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStop();
+ return null;
+ }).when(sp).stop();
+
+ ExternalContext externalContext = mock(ExternalContext.class);
+ doReturn(sp).when(runner)
+ .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)), any(CoordinatorStreamStore.class));
+ doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
+
+ runner.run(externalContext);
+ runner.kill();
+
+ verify(coordinatorStreamStore).init();
+ verify(coordinatorStreamStore).close();
+
+ assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
+ }
+
+ @Test
public void testWaitForFinishReturnsBeforeTimeout() {
long timeoutInMs = 1000;
@@ -351,7 +431,10 @@ public class TestLocalApplicationRunner {
localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
doReturn(localPlanner).when(runner).getPlanner(getClass().getClassLoader());
StreamProcessor sp = mock(StreamProcessor.class);
- doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), anyObject(), anyObject());
+ CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
+ doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), anyObject(), anyObject(), any(
+ CoordinatorStreamStore.class));
+ doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index 94a45d6..1b3ee12 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -69,7 +69,8 @@ public class TestZkJobCoordinator {
private final Config config;
private final JobModel jobModel;
- private final MetadataStore metadataStore;
+ private final MetadataStore zkMetadataStore;
+ private final CoordinatorStreamStore coordinatorStreamStore;
public TestZkJobCoordinator() {
Map<String, String> configMap = ImmutableMap.of(
@@ -85,7 +86,8 @@ public class TestZkJobCoordinator {
new TaskName("t1"), new TaskModel(new TaskName("t1"), ssps, new Partition(0)));
ContainerModel containerModel = new ContainerModel("0", tasksForContainer);
jobModel = new JobModel(config, ImmutableMap.of("0", containerModel));
- metadataStore = Mockito.mock(MetadataStore.class);
+ zkMetadataStore = Mockito.mock(MetadataStore.class);
+ coordinatorStreamStore = Mockito.mock(CoordinatorStreamStore.class);
}
@Test
@@ -99,7 +101,8 @@ public class TestZkJobCoordinator {
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
doReturn(new JobModel(new MapConfig(), new HashMap<>())).when(zkJobCoordinator).readJobModelFromMetadataStore(TEST_JOB_MODEL_VERSION);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
@@ -127,7 +130,8 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -153,7 +157,8 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
@@ -184,7 +189,8 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
zkJobCoordinator.streamPartitionCountMonitor = monitor;
@@ -207,7 +213,8 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -234,7 +241,8 @@ public class TestZkJobCoordinator {
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils, metadataStore));
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(),
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
zkJobCoordinator.debounceTimer = mockDebounceTimer;
@@ -257,9 +265,10 @@ public class TestZkJobCoordinator {
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
StartpointManager mockStartpointManager = Mockito.mock(StartpointManager.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils, metadataStore));
- doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
- doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
+
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils,
+ zkMetadataStore, coordinatorStreamStore));
+ doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
MetadataResourceUtil mockMetadataResourceUtil = mock(MetadataResourceUtil.class);
doReturn(mockMetadataResourceUtil).when(zkJobCoordinator)
@@ -287,9 +296,10 @@ public class TestZkJobCoordinator {
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel);
StartpointManager mockStartpointManager = Mockito.mock(StartpointManager.class);
- ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils, metadataStore));
- doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(any(CoordinatorStreamStore.class));
- doReturn(mock(CoordinatorStreamStore.class)).when(zkJobCoordinator).createCoordinatorStreamStore();
+
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config,
+ new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore));
+ doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager();
doReturn(jobModel).when(zkJobCoordinator).generateNewJobModel(any());
doNothing().when(zkJobCoordinator).loadMetadataResources(jobModel);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 3ff170a..ce42bee 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -65,8 +65,8 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
this(app, false, config);
}
- public SamzaSqlApplicationRunner(Boolean localRunner, Config config) {
- this(new SamzaSqlApplication(), localRunner, config);
+ public SamzaSqlApplicationRunner(Boolean isLocalRunner, Config config) {
+ this(new SamzaSqlApplication(), isLocalRunner, config);
}
private SamzaSqlApplicationRunner(SamzaApplication app, Boolean localRunner, Config config) {
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
index a5b03a0..3714fcc 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/util/SamzaSqlTestConfig.java
@@ -56,6 +56,8 @@ public class SamzaSqlTestConfig {
public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
+ public static final String SQL_JOB = "sql-job";
+ public static final String SQL_JOB_PROCESSOR_ID = "1";
public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
@@ -79,8 +81,8 @@ public class SamzaSqlTestConfig {
boolean includeNullForeignKeys, boolean includeNullSimpleRecords, long windowDurationMs) {
HashMap<String, String> staticConfigs = new HashMap<>();
- staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
- staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
+ staticConfigs.put(JobConfig.JOB_NAME(), SQL_JOB);
+ staticConfigs.put(JobConfig.PROCESSOR_ID(), SQL_JOB_PROCESSOR_ID);
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
staticConfigs.put(TaskConfig.GROUPER_FACTORY, SingleContainerGrouperFactory.class.getName());
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 42ebf32..d1e5a8a 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -283,7 +283,7 @@ public class TestRunner {
// Cleaning store directories to ensure current run does not pick up state from previous run
deleteStoreDirectories();
Config config = new MapConfig(JobPlanner.generateSingleJobConfig(configs));
- final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(app, config, new InMemoryMetadataStoreFactory());
runner.run(externalContext);
if (!runner.waitForFinish(timeout)) {
throw new SamzaException("Timed out waiting for application to finish");
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 81ac963..6239c61 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -45,6 +45,7 @@ import org.apache.samza.config.ZkConfig;
import org.apache.samza.context.Context;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.runtime.ProcessorLifecycleListener;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -63,6 +64,7 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.Assert;
import org.junit.Before;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +135,8 @@ public class TestZkStreamProcessorBase extends IntegrationTestHarness {
String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
JobCoordinator jobCoordinator =
ReflectionUtil.getObj(getClass().getClassLoader(), jobCoordinatorFactoryClassName, JobCoordinatorFactory.class)
- .getJobCoordinator(pId, config, new MetricsRegistryMap());
+ .getJobCoordinator(pId, config, new MetricsRegistryMap(), Mockito.mock(
+ CoordinatorStreamStore.class));
ProcessorLifecycleListener listener = new ProcessorLifecycleListener() {
@Override
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
index 4ab543d..9ce1ea4 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/SamzaSqlIntegrationTestHarness.java
@@ -18,14 +18,36 @@
*/
package org.apache.samza.test.samzasql;
+import java.util.ArrayList;
+import java.util.HashMap;
+import org.apache.samza.Partition;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SystemConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.apache.samza.system.MockSystemFactory;
+import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.util.CoordinatorStreamUtil;
public class SamzaSqlIntegrationTestHarness extends IntegrationTestHarness {
+
+ public static final String MOCK_METADATA_SYSTEM = "mockmetadatasystem";
+
protected void runApplication(Config config) {
- SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, config);
+ // Use MockSystemFactory for the coordinator system
+ MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition(MOCK_METADATA_SYSTEM,
+ CoordinatorStreamUtil.getCoordinatorStreamName(SamzaSqlTestConfig.SQL_JOB, SamzaSqlTestConfig.SQL_JOB_PROCESSOR_ID),
+ new Partition(0)), new ArrayList<>());
+ HashMap<String, String> mapConfig = new HashMap<>();
+ mapConfig.put(JobConfig.JOB_COORDINATOR_SYSTEM(), MOCK_METADATA_SYSTEM);
+ mapConfig.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_METADATA_SYSTEM), MockSystemFactory.class.getName());
+ mapConfig.putAll(config);
+
+ SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(mapConfig));
executeRun(runner, config);
runner.waitForFinish();
}