You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/06/26 19:26:01 UTC
samza git commit: SAMZA-1337: Use StreamTask with the
LocalApplicationRunner
Repository: samza
Updated Branches:
refs/heads/master 8aa75467e -> 4875842b3
SAMZA-1337: Use StreamTask with the LocalApplicationRunner
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>, Xinyu Liu <xi...@apache.org>, Bharath Kumarasubramanian <co...@gmail.com>
Closes #231 from sborya/LocalAppRunnerWithStreamTask
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4875842b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4875842b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4875842b
Branch: refs/heads/master
Commit: 4875842b37e0285100e2d1d753a7bc4a1448e897
Parents: 8aa7546
Author: Boris Shkolnik <bo...@apache.org>
Authored: Mon Jun 26 12:25:48 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Jun 26 12:25:48 2017 -0700
----------------------------------------------------------------------
.../apache/samza/runtime/ApplicationRunner.java | 11 +++++++
.../samza/runtime/LocalApplicationRunner.java | 27 ++++++++++++++++--
.../samza/runtime/LocalContainerRunner.java | 10 +++++--
.../samza/runtime/RemoteApplicationRunner.java | 13 ++++++---
.../org/apache/samza/zk/ZkJobCoordinator.java | 3 +-
.../runtime/TestAbstractApplicationRunner.java | 5 ++++
.../runtime/TestApplicationRunnerMain.java | 5 ++++
.../runtime/TestLocalApplicationRunner.java | 30 ++++++++++++++++++++
8 files changed, 93 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index 0586e9e..440dd33 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -71,6 +71,17 @@ public abstract class ApplicationRunner {
}
/**
+ * Deploy and run the Samza jobs to execute {@link org.apache.samza.task.StreamTask}.
+ * It is non-blocking so it doesn't wait for the application running.
+ * This method assumes you task.class is specified in the configs.
+ *
+ * NOTE. this interface will most likely change in the future.
+ */
+ @InterfaceStability.Evolving
+ public abstract void runTask();
+
+
+ /**
* Deploy and run the Samza jobs to execute {@link StreamApplication}.
* It is non-blocking so it doesn't wait for the application running.
*
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index b1f0aba..b0bfc8a 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -32,7 +32,9 @@ import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
@@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory;
*/
public class LocalApplicationRunner extends AbstractApplicationRunner {
- private static final Logger log = LoggerFactory.getLogger(LocalApplicationRunner.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
// Latch id that's used for awaiting the init of application before creating the StreamProcessors
private static final String INIT_LATCH_ID = "init";
// Latch timeout is set to 10 min
@@ -134,6 +136,25 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
}
@Override
+ public void runTask() {
+ JobConfig jobConfig = new JobConfig(this.config);
+
+ // validation
+ String taskName = new TaskConfig(config).getTaskClass().getOrElse(null);
+ if (taskName == null) {
+ throw new SamzaException("Neither APP nor task.class are defined defined");
+ }
+ LOG.info("LocalApplicationRunner will run " + taskName);
+ LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
+
+ StreamProcessor processor = createStreamProcessor(jobConfig, null, listener);
+
+ numProcessorsToStart.set(1);
+ listener.setProcessor(processor);
+ processor.start();
+ }
+
+ @Override
public void run(StreamApplication app) {
try {
// 1. initialize and plan
@@ -148,7 +169,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
throw new SamzaException("No jobs to run.");
}
plan.getJobConfigs().forEach(jobConfig -> {
- log.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
+ LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
StreamProcessor processor = createStreamProcessor(jobConfig, app, listener);
listener.setProcessor(processor);
@@ -180,7 +201,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
try {
shutdownLatch.await();
} catch (Exception e) {
- log.error("Wait is interrupted by exception", e);
+ LOG.error("Wait is interrupted by exception", e);
throw new SamzaException(e);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index d690c80..5d0e455 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -19,6 +19,8 @@
package org.apache.samza.runtime;
+import java.util.HashMap;
+import java.util.Random;
import org.apache.log4j.MDC;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
@@ -41,9 +43,6 @@ import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Random;
-
/**
* LocalContainerRunner is the local runner for Yarn {@link SamzaContainer}s. It is an intermediate step to
* have a local runner for yarn before we consolidate the Yarn container and coordination into a
@@ -68,6 +67,11 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
}
@Override
+ public void runTask() {
+ throw new UnsupportedOperationException("Running StreamTask is not implemented for LocalContainerRunner");
+ }
+
+ @Override
public void run(StreamApplication streamApp) {
ContainerModel containerModel = jobModel.getContainers().get(containerId);
Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 309d8c8..53cd2f6 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -35,12 +35,17 @@ import org.slf4j.LoggerFactory;
*/
public class RemoteApplicationRunner extends AbstractApplicationRunner {
- private static final Logger log = LoggerFactory.getLogger(RemoteApplicationRunner.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteApplicationRunner.class);
public RemoteApplicationRunner(Config config) {
super(config);
}
+ @Override
+ public void runTask() {
+ throw new UnsupportedOperationException("Running StreamTask is not implemented for RemoteReplicationRunner");
+ }
+
/**
* Run the {@link StreamApplication} on the remote cluster
* @param app a StreamApplication
@@ -57,7 +62,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
// 3. submit jobs for remote execution
plan.getJobConfigs().forEach(jobConfig -> {
- log.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
+ LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
JobRunner runner = new JobRunner(jobConfig);
runner.run(true);
});
@@ -72,7 +77,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
ExecutionPlan plan = getExecutionPlan(app);
plan.getJobConfigs().forEach(jobConfig -> {
- log.info("Killing job {}", jobConfig.getName());
+ LOG.info("Killing job {}", jobConfig.getName());
JobRunner runner = new JobRunner(jobConfig);
runner.kill();
});
@@ -92,7 +97,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
for (JobConfig jobConfig : plan.getJobConfigs()) {
JobRunner runner = new JobRunner(jobConfig);
ApplicationStatus status = runner.status();
- log.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
+ LOG.debug("Status is {} for job {}", new Object[]{status, jobConfig.getName()});
switch (status.getStatusCode()) {
case New:
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index a0558ef..cb32252 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.samza.zk;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
@@ -201,7 +202,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
ApplicationConfig appConfig = new ApplicationConfig(config);
if (appConfig.getProcessorId() != null) {
return appConfig.getProcessorId();
- } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
+ } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
ProcessorIdGenerator idGenerator =
ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
return idGenerator.generateProcessorId(config);
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
index aaacd6e..ed13b5b 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
@@ -368,6 +368,11 @@ public class TestAbstractApplicationRunner {
}
@Override
+ public void runTask() {
+ throw new UnsupportedOperationException("runTask is not supported in this test");
+ }
+
+ @Override
public void run(StreamApplication streamApp) {
// do nothing. We're only testing the stream creation methods at this point.
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 05f3cc2..d22fbae 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -87,6 +87,11 @@ public class TestApplicationRunnerMain {
}
@Override
+ public void runTask() {
+ throw new UnsupportedOperationException("runTask() not supported in this test");
+ }
+
+ @Override
public void run(StreamApplication streamApp) {
runCount++;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4875842b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 9d15211..a04bd3b 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -23,6 +23,7 @@ import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
@@ -205,6 +206,35 @@ public class TestLocalApplicationRunner {
}
@Test
+ public void testRunStreamTask() throws Exception {
+ final Map<String, String> config = new HashMap<>();
+ config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+ config.put(TaskConfig.TASK_CLASS(), "org.apache.samza.test.processor.IdentityStreamTask");
+
+
+ LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(config));
+
+ StreamProcessor sp = mock(StreamProcessor.class);
+ ArgumentCaptor<StreamProcessorLifecycleListener> captor =
+ ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
+
+ doAnswer(i ->
+ {
+ StreamProcessorLifecycleListener listener = captor.getValue();
+ listener.onStart();
+ listener.onShutdown();
+ return null;
+ }).when(sp).start();
+
+ LocalApplicationRunner spy = spy(runner);
+ doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+
+ spy.runTask();
+
+ assertEquals(ApplicationStatus.SuccessfulFinish, spy.status(null));
+
+ }
+ @Test
public void testRunComplete() throws Exception {
final Map<String, String> config = new HashMap<>();
config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());