You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/19 19:33:46 UTC
samza git commit: SAMZA-1213 - StreamProcessorLifeCycleAware
interface should not use processorId
Repository: samza
Updated Branches:
refs/heads/master 1f3db9cc6 -> 2a92980de
SAMZA-1213 - StreamProcessorLifeCycleAware interface should not use processorId
Refactoring LocalApplicationRunner s.t. each processor has its own listener instance, instead of a single listener keeping track of all processors.
Author: Navina Ramesh <na...@apache.org>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Xinyu Liu <xi...@linkedin.com>
Closes #125 from navina/SAMZA-1213
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a92980d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a92980d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a92980d
Branch: refs/heads/master
Commit: 2a92980de321e8c7a73296fcead1f8ca4e12860c
Parents: 1f3db9c
Author: Navina Ramesh <na...@apache.org>
Authored: Wed Apr 19 12:33:37 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Wed Apr 19 12:33:37 2017 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../processor/SamzaContainerController.java | 16 ++-
.../apache/samza/processor/StreamProcessor.java | 54 ++++------
.../StreamProcessorLifeCycleAware.java | 51 ---------
.../StreamProcessorLifecycleListener.java | 48 +++++++++
.../samza/runtime/LocalApplicationRunner.java | 67 +++++++++---
.../runtime/TestLocalApplicationRunner.java | 43 ++++----
.../test/processor/TestStreamProcessor.java | 104 +++++++++++++++----
8 files changed, 237 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 204c470..b6abd47 100644
--- a/build.gradle
+++ b/build.gradle
@@ -161,6 +161,7 @@ project(":samza-core_$scalaVersion") {
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
compile "com.101tec:zkclient:$zkClientVersion"
compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
+ compile "org.apache.commons:commons-lang3:$commonsLang3Version"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index a7e4982..4af413a 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -48,8 +48,7 @@ public class SamzaContainerController {
private final Map<String, MetricsReporter> metricsReporterMap;
private final Object taskFactory;
private final long containerShutdownMs;
- private final String processorId;
- private final StreamProcessorLifeCycleAware lifeCycleAware;
+ private final StreamProcessorLifecycleListener lifecycleListener;
// Internal Member Variables
private Future containerFuture;
@@ -58,20 +57,17 @@ public class SamzaContainerController {
* Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer}
* Requests to execute a container are submitted to the {@link ExecutorService}
*
- * @param processorId {@link StreamProcessor} ID
* @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
* {@link org.apache.samza.task.AsyncStreamTask}
* @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
* @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
- * @param lifeCycleAware {@link StreamProcessorLifeCycleAware}
+ * @param lifecycleListener {@link StreamProcessorLifecycleListener}
*/
public SamzaContainerController(
- String processorId,
Object taskFactory,
long containerShutdownMs,
Map<String, MetricsReporter> metricsReporterMap,
- StreamProcessorLifeCycleAware lifeCycleAware) {
- this.processorId = processorId;
+ StreamProcessorLifecycleListener lifecycleListener) {
this.taskFactory = taskFactory;
this.metricsReporterMap = metricsReporterMap;
if (containerShutdownMs == -1) {
@@ -80,7 +76,7 @@ public class SamzaContainerController {
this.containerShutdownMs = containerShutdownMs;
}
// life cycle callbacks when shutdown and failure happens
- this.lifeCycleAware = lifeCycleAware;
+ this.lifecycleListener = lifecycleListener;
}
/**
@@ -116,9 +112,9 @@ public class SamzaContainerController {
containerFuture = executorService.submit(() -> {
try {
container.run();
- lifeCycleAware.onShutdown(processorId);
+ lifecycleListener.onShutdown();
} catch (Throwable t) {
- lifeCycleAware.onFailure(processorId, t);
+ lifecycleListener.onFailure(t);
}
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 3ee7519..1910594 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -18,22 +18,19 @@
*/
package org.apache.samza.processor;
-import java.util.Map;
import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.Util;
+import java.util.Map;
+
/**
* StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
* independent process.
@@ -60,7 +57,7 @@ import org.apache.samza.util.Util;
@InterfaceStability.Evolving
public class StreamProcessor {
private final JobCoordinator jobCoordinator;
- private final StreamProcessorLifeCycleAware lifeCycleAware;
+ private final StreamProcessorLifecycleListener lifecycleListener;
private final String processorId;
/**
@@ -76,51 +73,36 @@ public class StreamProcessor {
* @param config Instance of config object - contains all configuration required for processing
* @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
* @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
- * @param lifeCycleAware listener to the StreamProcessor life cycle
+ * @param lifecycleListener listener to the StreamProcessor life cycle
*/
- public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
- AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifeCycleAware lifeCycleAware) {
- this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, lifeCycleAware);
+ public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) {
+ this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory, lifecycleListener);
}
/**
- *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifeCycleAware)}, except task
+ *Same as {@link #StreamProcessor(String, Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
* instances are created using the provided {@link StreamTaskFactory}.
* @param config - config
* @param customMetricsReporters metric Reporter
* @param streamTaskFactory task factory to instantiate the Task
- * @param lifeCycleAware listener to the StreamProcessor life cycle
+ * @param lifecycleListener listener to the StreamProcessor life cycle
*/
- public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
- StreamTaskFactory streamTaskFactory, StreamProcessorLifeCycleAware lifeCycleAware) {
- this(config, customMetricsReporters, (Object) streamTaskFactory, lifeCycleAware);
+ public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) {
+ this(processorId, config, customMetricsReporters, (Object) streamTaskFactory, lifecycleListener);
}
- private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
- Object taskFactory, StreamProcessorLifeCycleAware lifeCycleAware) {
-
- // TODO: This check to be removed after 0.13+
- ApplicationConfig appConfig = new ApplicationConfig(config);
- if (appConfig.getProcessorId() != null) {
- this.processorId = appConfig.getProcessorId();
- } else if (appConfig.getAppProcessorIdGeneratorClass() == null) {
- ProcessorIdGenerator idGenerator =
- ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(),
- ProcessorIdGenerator.class);
- this.processorId = idGenerator.generateProcessorId(config);
- } else {
- throw new ConfigException(
- String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
- ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
- }
+ private StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ Object taskFactory, StreamProcessorLifecycleListener lifecycleListener) {
+ this.processorId = processorId;
SamzaContainerController containerController = new SamzaContainerController(
- processorId,
taskFactory,
new TaskConfigJava(config).getShutdownMs(),
customMetricsReporters,
- lifeCycleAware);
+ lifecycleListener);
this.jobCoordinator = Util.
<JobCoordinatorFactory>getObj(
@@ -128,7 +110,7 @@ public class StreamProcessor {
.getJobCoordinatorFactoryClassName())
.getJobCoordinator(processorId, config, containerController);
- this.lifeCycleAware = lifeCycleAware;
+ this.lifecycleListener = lifecycleListener;
}
/**
@@ -142,7 +124,7 @@ public class StreamProcessor {
*/
public void start() {
jobCoordinator.start();
- lifeCycleAware.onStart(processorId);
+ lifecycleListener.onStart();
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java
deleted file mode 100644
index cc89154..0000000
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.processor;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This class listens to the life cycle events in a {@link StreamProcessor},
- * and triggers the corresponding callbacks.
- */
-
-@InterfaceStability.Evolving
-public interface StreamProcessorLifeCycleAware {
- /**
- * Callback when the {@link StreamProcessor} is started
- * @param processorId id of the StreamProcessor
- */
- void onStart(String processorId);
-
- /**
- * Callback when the {@link StreamProcessor} is shut down.
- * @param processorId id of the StreamProcessor
- */
- void onShutdown(String processorId);
-
- /**
- * Callback when the {@link StreamProcessor} fails
- * @param processorId id of the StreamProcessor
- * @param t exception of the failure
- */
- void onFailure(String processorId, Throwable t);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
new file mode 100644
index 0000000..7bca074
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This class listens to the life cycle events in a {@link StreamProcessor},
+ * and triggers the corresponding callbacks.
+ */
+
+@InterfaceStability.Evolving
+public interface StreamProcessorLifecycleListener {
+ /**
+ * Callback when the {@link StreamProcessor} is started
+ */
+ void onStart();
+
+ /**
+ * Callback when the {@link StreamProcessor} is shut down.
+ */
+ void onShutdown();
+
+ /**
+ * Callback when the {@link StreamProcessor} fails
+ * @param t Cause of the failure
+ */
+ void onFailure(Throwable t);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 9987fff..adc76d5 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -26,17 +26,20 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
import org.apache.samza.coordinator.CoordinationServiceFactory;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.execution.ExecutionPlan;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifeCycleAware;
+import org.apache.samza.processor.StreamProcessorLifecycleListener;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
@@ -61,19 +64,27 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
private final List<StreamProcessor> processors = new ArrayList<>();
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<Throwable> throwable = new AtomicReference<>();
+ private final ConcurrentHashSet<String> processorIds = new ConcurrentHashSet<>();
private ApplicationStatus appStatus = ApplicationStatus.New;
- final class LocalProcessorListener implements StreamProcessorLifeCycleAware {
- private final ConcurrentHashSet<String> processorIds = new ConcurrentHashSet<>();
+ final class LocalStreamProcessorListener implements StreamProcessorLifecycleListener {
+ public final String processorId;
+
+ public LocalStreamProcessorListener(String processorId) {
+ if (StringUtils.isEmpty(processorId)) {
+ throw new NullPointerException("processorId has to be defined in LocalStreamProcessorListener.");
+ }
+ this.processorId = processorId;
+ }
@Override
- public void onStart(String processorId) {
+ public void onStart() {
processorIds.add(processorId);
}
@Override
- public void onShutdown(String processorId) {
+ public void onShutdown() {
processorIds.remove(processorId);
if (processorIds.isEmpty()) {
latch.countDown();
@@ -81,7 +92,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
}
@Override
- public void onFailure(String processorId, Throwable t) {
+ public void onFailure(Throwable t) {
processorIds.remove(processorId);
throwable.compareAndSet(null, t);
latch.countDown();
@@ -107,10 +118,11 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
if (plan.getJobConfigs().isEmpty()) {
throw new SamzaException("No jobs to run.");
}
- LocalProcessorListener listener = new LocalProcessorListener();
plan.getJobConfigs().forEach(jobConfig -> {
log.info("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
- StreamProcessor processor = createStreamProcessor(jobConfig, app, listener);
+ String processorId = getProcessorId(config);
+ StreamProcessor processor =
+ createStreamProcessor(processorId, jobConfig, app, new LocalStreamProcessorListener(processorId));
processor.start();
processors.add(processor);
});
@@ -179,20 +191,51 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
}
/**
+ * Generates processorId for each {@link StreamProcessor} using the configured {@link ProcessorIdGenerator}
+ *
+ * @param config Application config
+ * @return String that uniquely represents an instance of {@link StreamProcessor} in the current JVM
+ */
+ /* package private */
+ String getProcessorId(Config config) {
+ // TODO: This check to be removed after 0.13+
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ if (appConfig.getProcessorId() != null) {
+ return appConfig.getProcessorId();
+ } else if (appConfig.getAppProcessorIdGeneratorClass() == null) {
+ ProcessorIdGenerator idGenerator =
+ ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(),
+ ProcessorIdGenerator.class);
+ return idGenerator.generateProcessorId(config);
+ } else {
+ throw new ConfigException(
+ String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+ ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+ }
+ }
+
+ /**
* Create {@link StreamProcessor} based on {@link StreamApplication} and the config
* @param config config
* @param app {@link StreamApplication}
* @return {@link StreamProcessor]}
*/
- /* package private */ StreamProcessor createStreamProcessor(Config config, StreamApplication app, StreamProcessorLifeCycleAware listener) {
+ /* package private */
+ StreamProcessor createStreamProcessor(
+ String processorId,
+ Config config,
+ StreamApplication app,
+ StreamProcessorLifecycleListener listener) {
Object taskFactory = TaskFactoryUtil.createTaskFactory(config, app, this);
if (taskFactory instanceof StreamTaskFactory) {
- return new StreamProcessor(config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
+ return new StreamProcessor(
+ processorId, config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
} else if (taskFactory instanceof AsyncStreamTaskFactory) {
- return new StreamProcessor(config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
+ return new StreamProcessor(
+ processorId, config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
} else {
throw new SamzaException(String.format("%s is not a valid task factory",
- taskFactory.getClass().getCanonicalName().toString()));
+ taskFactory.getClass().getCanonicalName()));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index ac3bc69..210336f 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -19,14 +19,8 @@
package org.apache.samza.runtime;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.CoordinationUtils;
@@ -38,11 +32,19 @@ import org.apache.samza.execution.ExecutionPlanner;
import org.apache.samza.execution.StreamManager;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifeCycleAware;
+import org.apache.samza.processor.StreamProcessorLifecycleListener;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -197,6 +199,7 @@ public class TestLocalApplicationRunner {
@Test
public void testRunComplete() throws Exception {
final Map<String, String> config = new HashMap<>();
+ config.put(ApplicationConfig.PROCESSOR_ID, "0");
LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
StreamApplication app = mock(StreamApplication.class);
doNothing().when(app).init(anyObject(), anyObject());
@@ -226,18 +229,19 @@ public class TestLocalApplicationRunner {
when(planner.plan(anyObject())).thenReturn(plan);
StreamProcessor sp = mock(StreamProcessor.class);
- ArgumentCaptor<StreamProcessorLifeCycleAware> captor =
- ArgumentCaptor.forClass(StreamProcessorLifeCycleAware.class);
+ ArgumentCaptor<StreamProcessorLifecycleListener> captor =
+ ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
- doAnswer(i -> {
- StreamProcessorLifeCycleAware listener = captor.getValue();
- listener.onShutdown("0");
+ doAnswer(i ->
+ {
+ StreamProcessorLifecycleListener listener = captor.getValue();
+ listener.onShutdown();
return null;
}).when(sp).start();
LocalApplicationRunner spy = spy(runner);
- doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+ doReturn(sp).when(spy).createStreamProcessor(anyString(), anyObject(), anyObject(), captor.capture());
spy.run(app);
@@ -247,6 +251,7 @@ public class TestLocalApplicationRunner {
@Test
public void testRunFailure() throws Exception {
final Map<String, String> config = new HashMap<>();
+ config.put(ApplicationConfig.PROCESSOR_ID, "0");
LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
StreamApplication app = mock(StreamApplication.class);
doNothing().when(app).init(anyObject(), anyObject());
@@ -277,18 +282,18 @@ public class TestLocalApplicationRunner {
Throwable t = new Throwable("test failure");
StreamProcessor sp = mock(StreamProcessor.class);
- ArgumentCaptor<StreamProcessorLifeCycleAware> captor =
- ArgumentCaptor.forClass(StreamProcessorLifeCycleAware.class);
+ ArgumentCaptor<StreamProcessorLifecycleListener> captor =
+ ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
doAnswer(i -> {
- StreamProcessorLifeCycleAware listener = captor.getValue();
- listener.onFailure("0", t);
+ StreamProcessorLifecycleListener listener = captor.getValue();
+ listener.onFailure(t);
return null;
}).when(sp).start();
LocalApplicationRunner spy = spy(runner);
- doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+ doReturn(sp).when(spy).createStreamProcessor(anyString(), anyObject(), anyObject(), captor.capture());
try {
spy.run(app);
http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 365b31a..f37a224 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -40,7 +40,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifeCycleAware;
+import org.apache.samza.processor.StreamProcessorLifecycleListener;
import org.apache.samza.task.AsyncStreamTaskAdapter;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
@@ -52,20 +52,6 @@ import org.junit.Test;
import static org.apache.samza.test.processor.IdentityStreamTask.endLatch;
public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
- private final StreamProcessorLifeCycleAware listener = new StreamProcessorLifeCycleAware() {
- @Override
- public void onStart(String processorId) {
- }
-
- @Override
- public void onShutdown(String processorId) {
- }
-
- @Override
- public void onFailure(String processorId, Throwable t) {
- }
- };
-
/**
* Testing a basic identity stream task - reads data from a topic and writes it to another topic
* (without any modifications)
@@ -85,7 +71,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
// Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
// TopicExistsException since StreamProcessor auto-creates them.
createTopics(inputTopic, outputTopic);
- final StreamProcessor processor = new StreamProcessor(new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new, listener);
+ final StreamProcessor processor = new StreamProcessor(
+ "1",
+ new MapConfig(configs),
+ new HashMap<>(),
+ IdentityStreamTask::new,
+ new StreamProcessorLifecycleListener() {
+ @Override
+ public void onStart() {
+
+ }
+
+ @Override
+ public void onShutdown() {
+
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+
+ }
+ });
produceMessages(inputTopic, messageCount);
run(processor, endLatch);
@@ -105,7 +111,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
createTopics(inputTopic, outputTopic);
final StreamTaskFactory stf = IdentityStreamTask::new;
- final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf, listener);
+ final StreamProcessor processor =
+ new StreamProcessor("1", configs, new HashMap<>(), stf, new StreamProcessorLifecycleListener() {
+ /**
+ * Callback when the {@link StreamProcessor} is started
+ */
+ @Override
+ public void onStart() { }
+ /**
+ * Callback when the {@link StreamProcessor} is shut down.
+ */
+ @Override
+ public void onShutdown() { }
+
+ /**
+ * Callback when the {@link StreamProcessor} fails
+ *
+ * @param t exception of the failure
+ */
+ @Override
+ public void onFailure(Throwable t) { }
+ });
produceMessages(inputTopic, messageCount);
run(processor, endLatch);
@@ -126,7 +152,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
createTopics(inputTopic, outputTopic);
final AsyncStreamTaskFactory stf = () -> new AsyncStreamTaskAdapter(new IdentityStreamTask(), executorService);
- final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf, listener);
+ final StreamProcessor processor = new StreamProcessor(
+ "1",
+ configs,
+ new HashMap<>(),
+ stf,
+ new StreamProcessorLifecycleListener() {
+ @Override
+ public void onStart() {
+
+ }
+
+ @Override
+ public void onShutdown() {
+
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+
+ }
+ });
produceMessages(inputTopic, messageCount);
run(processor, endLatch);
@@ -148,7 +194,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
configMap.remove("task.class");
final Config configs = new MapConfig(configMap);
- StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), (StreamTaskFactory) null, listener);
+ StreamProcessor processor = new StreamProcessor(
+ "1",
+ configs,
+ new HashMap<>(),
+ (StreamTaskFactory) null,
+ new StreamProcessorLifecycleListener() {
+ @Override
+ public void onStart() {
+
+ }
+
+ @Override
+ public void onShutdown() {
+
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+
+ }
+ });
run(processor, endLatch);
}