You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/19 19:33:46 UTC

samza git commit: SAMZA-1213 - StreamProcessorLifeCycleAware interface should not use processorId

Repository: samza
Updated Branches:
  refs/heads/master 1f3db9cc6 -> 2a92980de


SAMZA-1213 - StreamProcessorLifeCycleAware interface should not use processorId

Refactoring LocalApplicationRunner s.t. each processor has its own listener instance, instead of a single listener keeping track of all processors.

Author: Navina Ramesh <na...@apache.org>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Xinyu Liu <xi...@linkedin.com>

Closes #125 from navina/SAMZA-1213


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2a92980d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2a92980d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2a92980d

Branch: refs/heads/master
Commit: 2a92980de321e8c7a73296fcead1f8ca4e12860c
Parents: 1f3db9c
Author: Navina Ramesh <na...@apache.org>
Authored: Wed Apr 19 12:33:37 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Wed Apr 19 12:33:37 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../processor/SamzaContainerController.java     |  16 ++-
 .../apache/samza/processor/StreamProcessor.java |  54 ++++------
 .../StreamProcessorLifeCycleAware.java          |  51 ---------
 .../StreamProcessorLifecycleListener.java       |  48 +++++++++
 .../samza/runtime/LocalApplicationRunner.java   |  67 +++++++++---
 .../runtime/TestLocalApplicationRunner.java     |  43 ++++----
 .../test/processor/TestStreamProcessor.java     | 104 +++++++++++++++----
 8 files changed, 237 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 204c470..b6abd47 100644
--- a/build.gradle
+++ b/build.gradle
@@ -161,6 +161,7 @@ project(":samza-core_$scalaVersion") {
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
     compile "com.101tec:zkclient:$zkClientVersion"
     compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
+    compile "org.apache.commons:commons-lang3:$commonsLang3Version"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index a7e4982..4af413a 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -48,8 +48,7 @@ public class SamzaContainerController {
   private final Map<String, MetricsReporter> metricsReporterMap;
   private final Object taskFactory;
   private final long containerShutdownMs;
-  private final String processorId;
-  private final StreamProcessorLifeCycleAware lifeCycleAware;
+  private final StreamProcessorLifecycleListener lifecycleListener;
 
   // Internal Member Variables
   private Future containerFuture;
@@ -58,20 +57,17 @@ public class SamzaContainerController {
    * Creates an instance of a controller for instantiating, starting and/or stopping {@link SamzaContainer}
    * Requests to execute a container are submitted to the {@link ExecutorService}
    *
-   * @param processorId         {@link StreamProcessor} ID
    * @param taskFactory         Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
    *                            {@link org.apache.samza.task.AsyncStreamTask}
    * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
    * @param metricsReporterMap  Map of metric reporter name and {@link MetricsReporter} instance
-   * @param lifeCycleAware {@link StreamProcessorLifeCycleAware}
+   * @param lifecycleListener {@link StreamProcessorLifecycleListener}
    */
   public SamzaContainerController(
-      String processorId,
       Object taskFactory,
       long containerShutdownMs,
       Map<String, MetricsReporter> metricsReporterMap,
-      StreamProcessorLifeCycleAware lifeCycleAware) {
-    this.processorId = processorId;
+      StreamProcessorLifecycleListener lifecycleListener) {
     this.taskFactory = taskFactory;
     this.metricsReporterMap = metricsReporterMap;
     if (containerShutdownMs == -1) {
@@ -80,7 +76,7 @@ public class SamzaContainerController {
       this.containerShutdownMs = containerShutdownMs;
     }
     // life cycle callbacks when shutdown and failure happens
-    this.lifeCycleAware = lifeCycleAware;
+    this.lifecycleListener = lifecycleListener;
   }
 
   /**
@@ -116,9 +112,9 @@ public class SamzaContainerController {
     containerFuture = executorService.submit(() -> {
         try {
           container.run();
-          lifeCycleAware.onShutdown(processorId);
+          lifecycleListener.onShutdown();
         } catch (Throwable t) {
-          lifeCycleAware.onFailure(processorId, t);
+          lifecycleListener.onFailure(t);
         }
       });
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 3ee7519..1910594 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -18,22 +18,19 @@
  */
 package org.apache.samza.processor;
 
-import java.util.Map;
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.Util;
 
+import java.util.Map;
+
 /**
  * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
  * independent process.
@@ -60,7 +57,7 @@ import org.apache.samza.util.Util;
 @InterfaceStability.Evolving
 public class StreamProcessor {
   private final JobCoordinator jobCoordinator;
-  private final StreamProcessorLifeCycleAware lifeCycleAware;
+  private final StreamProcessorLifecycleListener lifecycleListener;
   private final String processorId;
 
   /**
@@ -76,51 +73,36 @@ public class StreamProcessor {
    * @param config                 Instance of config object - contains all configuration required for processing
    * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
    * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
-   * @param lifeCycleAware         listener to the StreamProcessor life cycle
+   * @param lifecycleListener         listener to the StreamProcessor life cycle
    */
-  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
-                         AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifeCycleAware lifeCycleAware) {
-    this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, lifeCycleAware);
+  public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+                         AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) {
+    this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory, lifecycleListener);
   }
 
 
   /**
-   *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifeCycleAware)}, except task
+   *Same as {@link #StreamProcessor(String, Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
    * instances are created using the provided {@link StreamTaskFactory}.
    * @param config - config
    * @param customMetricsReporters metric Reporter
    * @param streamTaskFactory task factory to instantiate the Task
-   * @param lifeCycleAware  listener to the StreamProcessor life cycle
+   * @param lifecycleListener  listener to the StreamProcessor life cycle
    */
-  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
-                         StreamTaskFactory streamTaskFactory, StreamProcessorLifeCycleAware lifeCycleAware) {
-    this(config, customMetricsReporters, (Object) streamTaskFactory, lifeCycleAware);
+  public StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+                         StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener lifecycleListener) {
+    this(processorId, config, customMetricsReporters, (Object) streamTaskFactory, lifecycleListener);
   }
 
-  private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
-                          Object taskFactory, StreamProcessorLifeCycleAware lifeCycleAware) {
-
-    // TODO: This check to be removed after 0.13+
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    if (appConfig.getProcessorId() != null) {
-      this.processorId = appConfig.getProcessorId();
-    } else if (appConfig.getAppProcessorIdGeneratorClass() == null) {
-      ProcessorIdGenerator idGenerator =
-          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(),
-              ProcessorIdGenerator.class);
-      this.processorId = idGenerator.generateProcessorId(config);
-    } else {
-      throw new ConfigException(
-          String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
-              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
-    }
+  private StreamProcessor(String processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+                          Object taskFactory, StreamProcessorLifecycleListener lifecycleListener) {
+    this.processorId = processorId;
 
     SamzaContainerController containerController = new SamzaContainerController(
-        processorId,
         taskFactory,
         new TaskConfigJava(config).getShutdownMs(),
         customMetricsReporters,
-        lifeCycleAware);
+        lifecycleListener);
 
     this.jobCoordinator = Util.
         <JobCoordinatorFactory>getObj(
@@ -128,7 +110,7 @@ public class StreamProcessor {
                 .getJobCoordinatorFactoryClassName())
         .getJobCoordinator(processorId, config, containerController);
 
-    this.lifeCycleAware = lifeCycleAware;
+    this.lifecycleListener = lifecycleListener;
   }
 
   /**
@@ -142,7 +124,7 @@ public class StreamProcessor {
    */
   public void start() {
     jobCoordinator.start();
-    lifeCycleAware.onStart(processorId);
+    lifecycleListener.onStart();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java
deleted file mode 100644
index cc89154..0000000
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifeCycleAware.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.processor;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This class listens to the life cycle events in a {@link StreamProcessor},
- * and triggers the corresponding callbacks.
- */
-
-@InterfaceStability.Evolving
-public interface StreamProcessorLifeCycleAware {
-  /**
-   * Callback when the {@link StreamProcessor} is started
-   * @param processorId id of the StreamProcessor
-   */
-  void onStart(String processorId);
-
-  /**
-   * Callback when the {@link StreamProcessor} is shut down.
-   * @param processorId id of the StreamProcessor
-   */
-  void onShutdown(String processorId);
-
-  /**
-   * Callback when the {@link StreamProcessor} fails
-   * @param processorId id of the StreamProcessor
-   * @param t exception of the failure
-   */
-  void onFailure(String processorId, Throwable t);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
new file mode 100644
index 0000000..7bca074
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This class listens to the life cycle events in a {@link StreamProcessor},
+ * and triggers the corresponding callbacks.
+ */
+
+@InterfaceStability.Evolving
+public interface StreamProcessorLifecycleListener {
+  /**
+   * Callback when the {@link StreamProcessor} is started
+   */
+  void onStart();
+
+  /**
+   * Callback when the {@link StreamProcessor} is shut down.
+   */
+  void onShutdown();
+
+  /**
+   * Callback when the {@link StreamProcessor} fails
+   * @param t Cause of the failure
+   */
+  void onFailure(Throwable t);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 9987fff..adc76d5 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -26,17 +26,20 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifeCycleAware;
+import org.apache.samza.processor.StreamProcessorLifecycleListener;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
@@ -61,19 +64,27 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   private final List<StreamProcessor> processors = new ArrayList<>();
   private final CountDownLatch latch = new CountDownLatch(1);
   private final AtomicReference<Throwable> throwable = new AtomicReference<>();
+  private final ConcurrentHashSet<String> processorIds = new ConcurrentHashSet<>();
 
   private ApplicationStatus appStatus = ApplicationStatus.New;
 
-  final class LocalProcessorListener implements StreamProcessorLifeCycleAware {
-    private final ConcurrentHashSet<String> processorIds = new ConcurrentHashSet<>();
+  final class LocalStreamProcessorListener implements StreamProcessorLifecycleListener {
+    public final String processorId;
+
+    public LocalStreamProcessorListener(String processorId) {
+      if (StringUtils.isEmpty(processorId)) {
+        throw new NullPointerException("processorId has to be defined in LocalStreamProcessorListener.");
+      }
+      this.processorId = processorId;
+    }
 
     @Override
-    public void onStart(String processorId) {
+    public void onStart() {
       processorIds.add(processorId);
     }
 
     @Override
-    public void onShutdown(String processorId) {
+    public void onShutdown() {
       processorIds.remove(processorId);
       if (processorIds.isEmpty()) {
         latch.countDown();
@@ -81,7 +92,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     }
 
     @Override
-    public void onFailure(String processorId, Throwable t) {
+    public void onFailure(Throwable t) {
       processorIds.remove(processorId);
       throwable.compareAndSet(null, t);
       latch.countDown();
@@ -107,10 +118,11 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       if (plan.getJobConfigs().isEmpty()) {
         throw new SamzaException("No jobs to run.");
       }
-      LocalProcessorListener listener = new LocalProcessorListener();
       plan.getJobConfigs().forEach(jobConfig -> {
           log.info("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
-          StreamProcessor processor = createStreamProcessor(jobConfig, app, listener);
+          String processorId = getProcessorId(config);
+          StreamProcessor processor =
+              createStreamProcessor(processorId, jobConfig, app, new LocalStreamProcessorListener(processorId));
           processor.start();
           processors.add(processor);
         });
@@ -179,20 +191,51 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   /**
+   * Generates processorId for each {@link StreamProcessor} using the configured {@link ProcessorIdGenerator}
+   *
+   * @param config Application config
+   * @return String that uniquely represents an instance of {@link StreamProcessor} in the current JVM
+   */
+  /* package private */
+  String getProcessorId(Config config) {
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {
+      return appConfig.getProcessorId();
+    } else if (appConfig.getAppProcessorIdGeneratorClass() == null) {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(),
+              ProcessorIdGenerator.class);
+      return idGenerator.generateProcessorId(config);
+    } else {
+      throw new ConfigException(
+          String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
+  }
+
+  /**
    * Create {@link StreamProcessor} based on {@link StreamApplication} and the config
    * @param config config
    * @param app {@link StreamApplication}
    * @return {@link StreamProcessor]}
    */
-  /* package private */ StreamProcessor createStreamProcessor(Config config, StreamApplication app, StreamProcessorLifeCycleAware listener) {
+  /* package private */
+  StreamProcessor createStreamProcessor(
+      String processorId,
+      Config config,
+      StreamApplication app,
+      StreamProcessorLifecycleListener listener) {
     Object taskFactory = TaskFactoryUtil.createTaskFactory(config, app, this);
     if (taskFactory instanceof StreamTaskFactory) {
-      return new StreamProcessor(config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
+      return new StreamProcessor(
+          processorId, config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
     } else if (taskFactory instanceof AsyncStreamTaskFactory) {
-      return new StreamProcessor(config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
+      return new StreamProcessor(
+          processorId, config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
     } else {
       throw new SamzaException(String.format("%s is not a valid task factory",
-          taskFactory.getClass().getCanonicalName().toString()));
+          taskFactory.getClass().getCanonicalName()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index ac3bc69..210336f 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -19,14 +19,8 @@
 
 package org.apache.samza.runtime;
 
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
@@ -38,11 +32,19 @@ import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifeCycleAware;
+import org.apache.samza.processor.StreamProcessorLifecycleListener;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -197,6 +199,7 @@ public class TestLocalApplicationRunner {
   @Test
   public void testRunComplete() throws Exception {
     final Map<String, String> config = new HashMap<>();
+    config.put(ApplicationConfig.PROCESSOR_ID, "0");
     LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
     StreamApplication app = mock(StreamApplication.class);
     doNothing().when(app).init(anyObject(), anyObject());
@@ -226,18 +229,19 @@ public class TestLocalApplicationRunner {
     when(planner.plan(anyObject())).thenReturn(plan);
 
     StreamProcessor sp = mock(StreamProcessor.class);
-    ArgumentCaptor<StreamProcessorLifeCycleAware> captor =
-        ArgumentCaptor.forClass(StreamProcessorLifeCycleAware.class);
+    ArgumentCaptor<StreamProcessorLifecycleListener> captor =
+        ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
 
-    doAnswer(i -> {
-        StreamProcessorLifeCycleAware listener = captor.getValue();
-        listener.onShutdown("0");
+    doAnswer(i ->
+      {
+        StreamProcessorLifecycleListener listener = captor.getValue();
+        listener.onShutdown();
         return null;
       }).when(sp).start();
 
 
     LocalApplicationRunner spy = spy(runner);
-    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+    doReturn(sp).when(spy).createStreamProcessor(anyString(), anyObject(), anyObject(), captor.capture());
 
     spy.run(app);
 
@@ -247,6 +251,7 @@ public class TestLocalApplicationRunner {
   @Test
   public void testRunFailure() throws Exception {
     final Map<String, String> config = new HashMap<>();
+    config.put(ApplicationConfig.PROCESSOR_ID, "0");
     LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
     StreamApplication app = mock(StreamApplication.class);
     doNothing().when(app).init(anyObject(), anyObject());
@@ -277,18 +282,18 @@ public class TestLocalApplicationRunner {
 
     Throwable t = new Throwable("test failure");
     StreamProcessor sp = mock(StreamProcessor.class);
-    ArgumentCaptor<StreamProcessorLifeCycleAware> captor =
-        ArgumentCaptor.forClass(StreamProcessorLifeCycleAware.class);
+    ArgumentCaptor<StreamProcessorLifecycleListener> captor =
+        ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
 
     doAnswer(i -> {
-        StreamProcessorLifeCycleAware listener = captor.getValue();
-        listener.onFailure("0", t);
+        StreamProcessorLifecycleListener listener = captor.getValue();
+        listener.onFailure(t);
         return null;
       }).when(sp).start();
 
 
     LocalApplicationRunner spy = spy(runner);
-    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+    doReturn(sp).when(spy).createStreamProcessor(anyString(), anyObject(), anyObject(), captor.capture());
 
     try {
       spy.run(app);

http://git-wip-us.apache.org/repos/asf/samza/blob/2a92980d/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 365b31a..f37a224 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -40,7 +40,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifeCycleAware;
+import org.apache.samza.processor.StreamProcessorLifecycleListener;
 import org.apache.samza.task.AsyncStreamTaskAdapter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
@@ -52,20 +52,6 @@ import org.junit.Test;
 import static org.apache.samza.test.processor.IdentityStreamTask.endLatch;
 
 public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
-  private final StreamProcessorLifeCycleAware listener = new StreamProcessorLifeCycleAware() {
-    @Override
-    public void onStart(String processorId) {
-    }
-
-    @Override
-    public void onShutdown(String processorId) {
-    }
-
-    @Override
-    public void onFailure(String processorId, Throwable t) {
-    }
-  };
-
   /**
    * Testing a basic identity stream task - reads data from a topic and writes it to another topic
    * (without any modifications)
@@ -85,7 +71,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
     // TopicExistsException since StreamProcessor auto-creates them.
     createTopics(inputTopic, outputTopic);
-    final StreamProcessor processor = new StreamProcessor(new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new, listener);
+    final StreamProcessor processor = new StreamProcessor(
+        "1",
+        new MapConfig(configs),
+        new HashMap<>(),
+        IdentityStreamTask::new,
+        new StreamProcessorLifecycleListener() {
+          @Override
+          public void onStart() {
+
+          }
+
+          @Override
+          public void onShutdown() {
+
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+
+          }
+        });
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -105,7 +111,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
     createTopics(inputTopic, outputTopic);
     final StreamTaskFactory stf = IdentityStreamTask::new;
-    final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf, listener);
+    final StreamProcessor processor =
+        new StreamProcessor("1", configs, new HashMap<>(), stf, new StreamProcessorLifecycleListener() {
+          /**
+           * Callback when the {@link StreamProcessor} is started
+           */
+          @Override
+          public void onStart() { }
+          /**
+           * Callback when the {@link StreamProcessor} is shut down.
+           */
+          @Override
+          public void onShutdown() { }
+
+          /**
+           * Callback when the {@link StreamProcessor} fails
+           *
+           * @param t exception of the failure
+           */
+          @Override
+          public void onFailure(Throwable t) { }
+        });
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -126,7 +152,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     final ExecutorService executorService = Executors.newSingleThreadExecutor();
     createTopics(inputTopic, outputTopic);
     final AsyncStreamTaskFactory stf = () -> new AsyncStreamTaskAdapter(new IdentityStreamTask(), executorService);
-    final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf, listener);
+    final StreamProcessor processor = new StreamProcessor(
+        "1",
+        configs,
+        new HashMap<>(),
+        stf,
+        new StreamProcessorLifecycleListener() {
+          @Override
+          public void onStart() {
+
+          }
+
+          @Override
+          public void onShutdown() {
+
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+
+          }
+        });
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -148,7 +194,27 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     configMap.remove("task.class");
     final Config configs = new MapConfig(configMap);
 
-    StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), (StreamTaskFactory) null, listener);
+    StreamProcessor processor = new StreamProcessor(
+        "1",
+        configs,
+        new HashMap<>(),
+        (StreamTaskFactory) null,
+        new StreamProcessorLifecycleListener() {
+          @Override
+          public void onStart() {
+
+          }
+
+          @Override
+          public void onShutdown() {
+
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+
+          }
+        });
     run(processor, endLatch);
   }