You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/08 16:39:53 UTC
samza git commit: SAMZA-1267: ApplicationRunner#getLocalRunner
returns null
Repository: samza
Updated Branches:
refs/heads/master e6cc3b713 -> fa2f47559
SAMZA-1267: ApplicationRunner#getLocalRunner returns null
Remove ApplicationRunner#getLocalRunner and clean up any usage examples.
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Reviewers: Jake Maes <jm...@apache.org>
Closes #168 from xinyuiscool/SAMZA-1267
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa2f4755
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa2f4755
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa2f4755
Branch: refs/heads/master
Commit: fa2f475595dcef174ac880a4c21a6c83239fec55
Parents: e6cc3b7
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Mon May 8 09:39:43 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon May 8 09:39:43 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/samza/runtime/ApplicationRunner.java | 10 ----------
.../java/org/apache/samza/example/BroadcastExample.java | 4 ++--
.../org/apache/samza/example/KeyValueStoreExample.java | 4 ++--
.../apache/samza/example/OrderShipmentJoinExample.java | 4 ++--
.../org/apache/samza/example/PageViewCounterExample.java | 4 ++--
.../java/org/apache/samza/example/RepartitionExample.java | 4 ++--
.../test/java/org/apache/samza/example/WindowExample.java | 4 ++--
.../org/apache/samza/container/TestSamzaContainer.scala | 2 +-
8 files changed, 13 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index bf2c643..eda09a2 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -40,16 +40,6 @@ public abstract class ApplicationRunner {
protected final Config config;
/**
- * Static method to create the local {@link ApplicationRunner}.
- *
- * @param config configuration passed in to initialize the Samza local process
- * @return the local {@link ApplicationRunner} to run the user-defined stream applications
- */
- public static ApplicationRunner getLocalRunner(Config config) {
- return null;
- }
-
- /**
* Static method to load the {@link ApplicationRunner}
*
* @param config configuration passed in to initialize the Samza processes
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
index a09247a..73a89af 100644
--- a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
@@ -24,7 +24,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.util.CommandLine;
@@ -53,7 +53,7 @@ public class BroadcastExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
localRunner.run(new BroadcastExample());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 6b913c4..5be3046 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -25,7 +25,7 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.CommandLine;
@@ -57,7 +57,7 @@ public class KeyValueStoreExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
localRunner.run(new KeyValueStoreExample());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 80d0e16..f65c4ed 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -24,7 +24,7 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -50,7 +50,7 @@ public class OrderShipmentJoinExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
localRunner.run(new OrderShipmentJoinExample());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 547cac6..a3471a2 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -60,7 +60,7 @@ public class PageViewCounterExample implements StreamApplication {
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
localRunner.run(new PageViewCounterExample());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 37375cd..7bf939b 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -25,7 +25,7 @@ import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -55,7 +55,7 @@ public class RepartitionExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
localRunner.run(new RepartitionExample());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
index 159dba2..1fd3be5 100644
--- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -62,7 +62,7 @@ public class WindowExample implements StreamApplication {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
localRunner.run(new WindowExample());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa2f4755/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a3e70b8..980c2a2 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -422,7 +422,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
@volatile var onContainerFailedThrowable: Throwable = null
val mockRunLoop = mock[RunLoop]
- when(mockRunLoop.run).then(new Answer[Unit] {
+ when(mockRunLoop.run).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
Thread.sleep(100)
}