You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/06/26 19:26:01 UTC

samza git commit: SAMZA-1337: Use StreamTask with the LocalApplicationRunner

Repository: samza
Updated Branches:
  refs/heads/master 8aa75467e -> 4875842b3


SAMZA-1337: Use StreamTask with the LocalApplicationRunner

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>, Xinyu Liu <xi...@apache.org>, Bharath Kumarasubramanian <co...@gmail.com>

Closes #231 from sborya/LocalAppRunnerWithStreamTask


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4875842b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4875842b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4875842b

Branch: refs/heads/master
Commit: 4875842b37e0285100e2d1d753a7bc4a1448e897
Parents: 8aa7546
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon Jun 26 12:25:48 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Jun 26 12:25:48 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/runtime/ApplicationRunner.java | 11 +++++++
 .../samza/runtime/LocalApplicationRunner.java   | 27 ++++++++++++++++--
 .../samza/runtime/LocalContainerRunner.java     | 10 +++++--
 .../samza/runtime/RemoteApplicationRunner.java  | 13 ++++++---
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  3 +-
 .../runtime/TestAbstractApplicationRunner.java  |  5 ++++
 .../runtime/TestApplicationRunnerMain.java      |  5 ++++
 .../runtime/TestLocalApplicationRunner.java     | 30 ++++++++++++++++++++
 8 files changed, 93 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index 0586e9e..440dd33 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -71,6 +71,17 @@ public abstract class ApplicationRunner {
   }
 
   /**
+   * Deploy and run the Samza jobs to execute {@link org.apache.samza.task.StreamTask}.
+   * It is non-blocking so it doesn't wait for the application running.
+   * This method assumes you task.class is specified in the configs.
+   *
+   * NOTE. this interface will most likely change in the future.
+   */
+  @InterfaceStability.Evolving
+  public abstract void runTask();
+
+
+  /**
    * Deploy and run the Samza jobs to execute {@link StreamApplication}.
    * It is non-blocking so it doesn't wait for the application running.
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index b1f0aba..b0bfc8a 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -32,7 +32,9 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
@@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory;
  */
 public class LocalApplicationRunner extends AbstractApplicationRunner {
 
-  private static final Logger log = LoggerFactory.getLogger(LocalApplicationRunner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
   // Latch id that's used for awaiting the init of application before creating the StreamProcessors
   private static final String INIT_LATCH_ID = "init";
   // Latch timeout is set to 10 min
@@ -134,6 +136,25 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   @Override
+  public void runTask() {
+    JobConfig jobConfig = new JobConfig(this.config);
+
+    // validation
+    String taskName = new TaskConfig(config).getTaskClass().getOrElse(null);
+    if (taskName == null) {
+      throw new SamzaException("Neither APP nor task.class are defined defined");
+    }
+    LOG.info("LocalApplicationRunner will run " + taskName);
+    LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
+
+    StreamProcessor processor = createStreamProcessor(jobConfig, null, listener);
+
+    numProcessorsToStart.set(1);
+    listener.setProcessor(processor);
+    processor.start();
+  }
+
+  @Override
   public void run(StreamApplication app) {
     try {
       // 1. initialize and plan
@@ -148,7 +169,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
         throw new SamzaException("No jobs to run.");
       }
       plan.getJobConfigs().forEach(jobConfig -> {
-          log.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
+          LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
           LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
           StreamProcessor processor = createStreamProcessor(jobConfig, app, listener);
           listener.setProcessor(processor);
@@ -180,7 +201,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     try {
       shutdownLatch.await();
     } catch (Exception e) {
-      log.error("Wait is interrupted by exception", e);
+      LOG.error("Wait is interrupted by exception", e);
       throw new SamzaException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index d690c80..5d0e455 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.runtime;
 
+import java.util.HashMap;
+import java.util.Random;
 import org.apache.log4j.MDC;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
@@ -41,9 +43,6 @@ import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Random;
-
 /**
  * LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to
  * have a local runner for yarn before we consolidate the Yarn container and coordination into a
@@ -68,6 +67,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
   }
 
   @Override
+  public void runTask() {
+    throw new UnsupportedOperationException("Running StreamTask is not implemented for LocalContainerRunner");
+  }
+
+  @Override
   public void run(StreamApplication streamApp) {
     ContainerModel containerModel = jobModel.getContainers().get(containerId);
     Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);

http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 309d8c8..53cd2f6 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -35,12 +35,17 @@ import org.slf4j.LoggerFactory;
  */
 public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
-  private static final Logger log = LoggerFactory.getLogger(RemoteApplicationRunner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class);
 
   public RemoteApplicationRunner(Config config) {
     super(config);
   }
 
+  @Override
+  public void runTask() {
+    throw new UnsupportedOperationException("Running StreamTask is not implemented for RemoteReplicationRunner");
+  }
+
   /**
    * Run the {@link StreamApplication} on the remote cluster
    * @param app a StreamApplication
@@ -57,7 +62,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
       // 3. submit jobs for remote execution
       plan.getJobConfigs().forEach(jobConfig -> {
-          log.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
+          LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
           JobRunner runner = new JobRunner(jobConfig);
           runner.run(true);
         });
@@ -72,7 +77,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       ExecutionPlan plan = getExecutionPlan(app);
 
       plan.getJobConfigs().forEach(jobConfig -> {
-          log.info("Killing job {}", jobConfig.getName());
+          LOG.info("Killing job {}", jobConfig.getName());
           JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
@@ -92,7 +97,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       for (JobConfig jobConfig : plan.getJobConfigs()) {
         JobRunner runner = new JobRunner(jobConfig);
         ApplicationStatus status = runner.status();
-        log.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
+        LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
 
         switch (status.getStatusCode()) {
           case New:

http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index a0558ef..cb32252 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.samza.zk;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
@@ -201,7 +202,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     ApplicationConfig appConfig = new ApplicationConfig(config);
     if (appConfig.getProcessorId() != null) {
       return appConfig.getProcessorId();
-    } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
+    } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
       ProcessorIdGenerator idGenerator =
           ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
       return idGenerator.generateProcessorId(config);

http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
index aaacd6e..ed13b5b 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
@@ -368,6 +368,11 @@ public class TestAbstractApplicationRunner {
     }
 
     @Override
+    public void runTask() {
+      throw new UnsupportedOperationException("runTask is not supported in this test");
+    }
+
+    @Override
     public void run(StreamApplication streamApp) {
       // do nothing. We're only testing the stream creation methods at this point.
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 05f3cc2..d22fbae 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -87,6 +87,11 @@ public class TestApplicationRunnerMain {
     }
 
     @Override
+    public void runTask() {
+      throw new UnsupportedOperationException("runTask() not supported in this test");
+    }
+
+    @Override
     public void run(StreamApplication streamApp) {
       runCount++;
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 9d15211..a04bd3b 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -23,6 +23,7 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
@@ -205,6 +206,35 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
+  public void testRunStreamTask() throws Exception {
+    final Map<String, String> config = new HashMap<>();
+    config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+    config.put(TaskConfig.TASK_CLASS(), "org.apache.samza.test.processor.IdentityStreamTask");
+
+
+    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
+
+    StreamProcessor sp = mock(StreamProcessor.class);
+    ArgumentCaptor<StreamProcessorLifecycleListener> captor =
+        ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
+
+    doAnswer(i ->
+      {
+        StreamProcessorLifecycleListener listener = captor.getValue();
+        listener.onStart();
+        listener.onShutdown();
+        return null;
+      }).when(sp).start();
+
+    LocalApplicationRunner spy = spy(runner);
+    doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+
+    spy.runTask();
+
+    assertEquals(ApplicationStatus.SuccessfulFinish, spy.status(null));
+
+  }
+  @Test
   public void testRunComplete() throws Exception {
     final Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());