You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/27 21:01:39 UTC
[1/3] samza git commit: SAMZA-1093: instantiating StreamOperatorTask
in SamzaContainer (WIP)
Repository: samza
Updated Branches:
refs/heads/samza-fluent-api-v1 dde754246 -> a9b213c13
SAMZA-1093: instantiating StreamOperatorTask in SamzaContainer (WIP)
SAMZA-1093: Instantiating StreamOperatorTask in SamzaContainer
SAMZA-1093: fix import
SAMZA-1093: code ready, more unit tests pending
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/82b839f9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/82b839f9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/82b839f9
Branch: refs/heads/samza-fluent-api-v1
Commit: 82b839f99de832aed9f2f4dd935d9f6bbf6932ec
Parents: 7e7747f
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Feb 21 16:18:25 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Feb 24 23:33:26 2017 -0800
----------------------------------------------------------------------
.../samza/operators/StreamGraphBuilder.java | 2 +
.../apache/samza/container/RunLoopFactory.java | 16 +-
.../org/apache/samza/task/TaskFactories.java | 95 +++++++
.../org/apache/samza/util/ScalaToJavaUtils.java | 41 +++
.../apache/samza/container/SamzaContainer.scala | 81 +++---
.../apache/samza/task/TestTaskFactories.java | 256 +++++++++++++++++++
.../testUtils/InvalidStreamGraphBuilder.java | 25 ++
.../samza/testUtils/TestAsyncStreamTask.java | 35 +++
.../samza/testUtils/TestStreamGraphBuilder.java | 33 +++
.../apache/samza/testUtils/TestStreamTask.java | 34 +++
.../apache/samza/zk/TestZkLeaderElector.java | 2 +-
.../java/org/apache/samza/zk/TestZkUtils.java | 2 +-
.../test/integration/StreamTaskTestUtil.scala | 3 +-
13 files changed, 564 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
index b415cf8..23e8625 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
@@ -27,6 +27,8 @@ import org.apache.samza.config.Config;
*/
@InterfaceStability.Unstable
public interface StreamGraphBuilder {
+ static final String BUILDER_CLASS_CONFIG = "job.graph.builder.class";
+
/**
* Users are required to implement this abstract method to initialize the processing logic of the application, in terms
* of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 32ab47a..f81266b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -27,12 +27,12 @@ import org.apache.samza.util.HighResolutionClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
-import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import java.util.concurrent.ExecutorService;
import static org.apache.samza.util.Util.asScalaClock;
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
/**
* Factory class to create runloop for a Samza task, based on the type
@@ -109,18 +109,4 @@ public class RunLoopFactory {
}
}
- /**
- * Returns a default value object for scala option.getOrDefault() to use
- * @param value default value
- * @param <T> value type
- * @return object containing default value
- */
- public static <T> AbstractFunction0<T> defaultValue(final T value) {
- return new AbstractFunction0<T>() {
- @Override
- public T apply() {
- return value;
- }
- };
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
new file mode 100644
index 0000000..0bfa8c4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.TaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
+
+public class TaskFactories {
+ private static final Logger log = LoggerFactory.getLogger(TaskFactories.class);
+
+ public static Object fromTaskClassConfig(Config config) throws ClassNotFoundException {
+
+ String taskClassName;
+
+ // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory
+ if (isStreamOperatorTask(config)) {
+ taskClassName = StreamOperatorTask.class.getName();
+ } else {
+ taskClassName = new TaskConfig(config).getTaskClass().getOrElse(defaultValue(null));
+ }
+
+ if (taskClassName == null) {
+ throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory");
+ }
+ log.info("Got task class name: {}", taskClassName);
+
+ boolean isAsyncTaskClass = AsyncStreamTask.class.isAssignableFrom(Class.forName(taskClassName));
+ if (isAsyncTaskClass) {
+ return new AsyncStreamTaskFactory() {
+ @Override
+ public AsyncStreamTask createInstance() {
+ try {
+ return (AsyncStreamTask) Class.forName(taskClassName).newInstance();
+ } catch (Exception e) {
+ log.error("Error loading AsyncStreamTask class: {}. error: {}", taskClassName, e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ return new StreamTaskFactory() {
+ @Override
+ public StreamTask createInstance() {
+ try {
+ return taskClassName == StreamOperatorTask.class.getName() ? createStreamOperatorTask(config) :
+ (StreamTask) Class.forName(taskClassName).newInstance();
+ } catch (Exception e) {
+ log.error("Error loading StreamTask class: {}. error: {}", taskClassName, e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static StreamTask createStreamOperatorTask(Config config) throws Exception {
+ StreamGraphBuilder graphBuilder = (StreamGraphBuilder) Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance();
+ return new StreamOperatorTask(graphBuilder);
+ }
+
+ private static boolean isStreamOperatorTask(Config config) {
+ try {
+ if (config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != null && config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != "") {
+ return StreamGraphBuilder.class.isAssignableFrom(Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)));
+ }
+ return false;
+ } catch (Exception e) {
+ log.error("Failed to validate StreamGraphBuilder class from the config. {}={}",
+ StreamGraphBuilder.BUILDER_CLASS_CONFIG, config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
new file mode 100644
index 0000000..6c7fc2d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import scala.runtime.AbstractFunction0;
+
+/**
+ * Common utils methods that helps to convert or use Scala objects in Java code
+ */
+public class ScalaToJavaUtils {
+ /**
+ * Returns a default value object for scala option.getOrDefault() to use
+ * @param value default value
+ * @param <T> value type
+ * @return object containing default value
+ */
+ public static <T> AbstractFunction0<T> defaultValue(final T value) {
+ return new AbstractFunction0<T>() {
+ @Override
+ public T apply() {
+ return value;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89522dc..5476fb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -25,6 +25,8 @@ import java.util
import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
import java.lang.Thread.UncaughtExceptionHandler
import java.net.{URL, UnknownHostException}
+import javax.servlet.SingleThreadModel
+
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
@@ -64,13 +66,7 @@ import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.system.chooser.MessageChooserFactory
import org.apache.samza.system.chooser.RoundRobinChooserFactory
-import org.apache.samza.task.AsyncRunLoop
-import org.apache.samza.task.AsyncStreamTask
-import org.apache.samza.task.AsyncStreamTaskAdapter
-import org.apache.samza.task.AsyncStreamTaskFactory
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.StreamTaskFactory
-import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.task._
import org.apache.samza.util.HighResolutionClock
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.Logging
@@ -432,28 +428,6 @@ object SamzaContainer extends Logging {
val singleThreadMode = config.getSingleThreadMode
info("Got single thread mode: " + singleThreadMode)
- val taskClassName = config.getTaskClass.orNull
- info("Got task class name: %s" format taskClassName)
-
- if (taskClassName == null && taskFactory == null) {
- throw new SamzaException("Either the task class name or the task factory instance is required.")
- }
-
- val isAsyncTask: Boolean =
- if (taskFactory != null) {
- taskFactory.isInstanceOf[AsyncStreamTaskFactory]
- } else {
- classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
- }
-
- if (isAsyncTask) {
- info("Got an AsyncStreamTask implementation.")
- }
-
- if(singleThreadMode && isAsyncTask) {
- throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
- }
-
val threadPoolSize = config.getThreadPoolSize
info("Got thread pool size: " + threadPoolSize)
@@ -462,6 +436,39 @@ object SamzaContainer extends Logging {
else
null
+ def maybeAsyncStreamTaskFactory : Object = {
+
+ val taskClassFactory = taskFactory match {
+ case null => TaskFactories.fromTaskClassConfig(config)
+ case _ => taskFactory
+ }
+
+ if (taskClassFactory == null) {
+ throw new SamzaException("Either the task class name or the task factory instance is required.")
+ }
+
+ val isAsyncTaskClass: Boolean = taskClassFactory.isInstanceOf[AsyncStreamTaskFactory]
+
+ if (isAsyncTaskClass) {
+ info("Got an AsyncStreamTask implementation.")
+ }
+
+ if (singleThreadMode && isAsyncTaskClass) {
+ throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
+ }
+
+ if (!singleThreadMode && !isAsyncTaskClass) {
+ info("Converting StreamTask to AsyncStreamTaskAdapter when running StreamTask w/ multiple threads")
+ new AsyncStreamTaskFactory {
+ override def createInstance() = new AsyncStreamTaskAdapter(taskClassFactory.asInstanceOf[StreamTaskFactory].createInstance(), taskThreadPool)
+ }
+ } else {
+ taskClassFactory
+ }
+ }
+
+ val finalTaskFactory = maybeAsyncStreamTaskFactory
+
// Wire up all task-instance-level (unshared) objects.
val taskNames = containerModel
.getTasks
@@ -483,24 +490,14 @@ object SamzaContainer extends Logging {
val taskName = taskModel.getTaskName
- val taskObj = if (taskFactory != null) {
- debug("Using task factory to create task instance")
- taskFactory match {
+ debug("Using task factory to create task instance")
+ val task =
+ finalTaskFactory match {
case tf: AsyncStreamTaskFactory => tf.createInstance()
case tf: StreamTaskFactory => tf.createInstance()
case _ =>
throw new SamzaException("taskFactory must be an instance of StreamTaskFactory or AsyncStreamTaskFactory")
}
- } else {
- debug("Using task class name: %s to create instance" format taskClassName)
- Class.forName(taskClassName).newInstance
- }
-
- val task = if (!singleThreadMode && !isAsyncTask)
- // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool
- new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], taskThreadPool)
- else
- taskObj
val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName)
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
new file mode 100644
index 0000000..29741a4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.testUtils.TestAsyncStreamTask;
+import org.apache.samza.testUtils.TestStreamTask;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test methods to create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on task class configuration
+ */
+public class TestTaskFactories {
+
+ @Test
+ public void testStreamTaskClass() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "no.such.class");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ClassNotFoundException cfe) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStreamOperatorTaskClass() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ConfigException ce) {
+ // expected
+ }
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "no.such.class");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ConfigException ce) {
+ // expected
+ }
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ empty class name for StreamGraphBuilder");
+ } catch (ConfigException ce) {
+ // expected
+ }
+
+ config = new MapConfig(new HashMap<String, String>());
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail(String.format("Should have failed w/ non-existing entry for %s", StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+ } catch (ConfigException ce) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStreamOperatorTaskClassWithTaskClass() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "no.such.class");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+ }
+
+ @Test
+ public void testStreamTaskClassWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof StreamTaskFactory);
+ assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no class not found");
+ } catch (ClassNotFoundException cne) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testAsyncStreamTask() throws ClassNotFoundException {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "no.such.class");
+ }
+ });
+ try {
+ TaskFactories.fromTaskClassConfig(config);
+ fail("Should have failed w/ no.such.class");
+ } catch (ClassNotFoundException cfe) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+ }
+ });
+ Object retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+ config = new MapConfig(new HashMap<String, String>() {
+ {
+ this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+ this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+ }
+ });
+ retFactory = TaskFactories.fromTaskClassConfig(config);
+ assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+ assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
new file mode 100644
index 0000000..2fe6946
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+/**
+ * Test class. Invalid class to implement {@link org.apache.samza.operators.StreamGraphBuilder}
+ */
+public class InvalidStreamGraphBuilder {
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
new file mode 100644
index 0000000..81f3fd4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link AsyncStreamTask}
+ */
+public class TestAsyncStreamTask implements AsyncStreamTask {
+ @Override
+ public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
new file mode 100644
index 0000000..10eb02f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+
+/**
+ * Test implementation class for {@link StreamGraphBuilder}
+ */
+public class TestStreamGraphBuilder implements StreamGraphBuilder {
+ @Override
+ public void init(StreamGraph graph, Config config) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
new file mode 100644
index 0000000..ce0980a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link StreamTask}
+ */
+public class TestStreamTask implements StreamTask {
+ @Override
+ public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index b999ec5..9b5033b 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -57,7 +57,7 @@ public class TestZkLeaderElector {
@Before
public void testSetup() {
- testZkConnectionString = "localhost:" + zkServer.getPort();
+ testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
try {
testZkUtils = getZkUtilsWithNewClient();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 855d29d..1cf901e 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -47,7 +47,7 @@ public class TestZkUtils {
public void testSetup() {
try {
zkClient = new ZkClient(
- new ZkConnection("localhost:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
+ new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
CONNECTION_TIMEOUT_MS);
} catch (Exception e) {
Assert.fail("Client connection setup failed. Aborting tests..");
http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 5d82b92..b803dfe 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -89,8 +89,6 @@ object StreamTaskTestUtil {
"systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
"systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
"systems.kafka.samza.msg.serde" -> "string",
- "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181",
- "systems.kafka.producer.bootstrap.servers" -> "localhost:9092",
// Since using state, need a checkpoint manager
"task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
"task.checkpoint.system" -> "kafka",
@@ -122,6 +120,7 @@ object StreamTaskTestUtil {
val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+ // setup the zookeeper and bootstrap servers for local kafka cluster
jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
"systems.kafka.producer.bootstrap.servers" -> brokers)
[3/3] samza git commit: Merge branch 'samza-fluent-api-v1' of
http://git-wip-us.apache.org/repos/asf/samza into samza-fluent-api-v1
Posted by ni...@apache.org.
Merge branch 'samza-fluent-api-v1' of http://git-wip-us.apache.org/repos/asf/samza into samza-fluent-api-v1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a9b213c1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a9b213c1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a9b213c1
Branch: refs/heads/samza-fluent-api-v1
Commit: a9b213c1389fe46c7f5575b22dc4e1d20ac7b367
Parents: cf267f8 dde7542
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Mon Feb 27 13:00:57 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Feb 27 13:00:57 2017 -0800
----------------------------------------------------------------------
.../org/apache/samza/operators/StreamGraph.java | 2 +-
.../org/apache/samza/operators/StreamSpec.java | 46 -----------
.../apache/samza/operators/StreamGraphImpl.java | 83 +++++++++-----------
.../samza/processorgraph/ExecutionPlanner.java | 34 ++++----
.../samza/example/KeyValueStoreExample.java | 24 +-----
.../samza/example/NoContextStreamExample.java | 33 +-------
.../samza/example/OrderShipmentJoinExample.java | 35 +--------
.../samza/example/PageViewCounterExample.java | 23 +-----
.../samza/example/RepartitionExample.java | 23 +-----
.../samza/example/TestBroadcastExample.java | 15 +---
.../apache/samza/example/TestJoinExample.java | 26 ++----
.../apache/samza/example/TestWindowExample.java | 15 +---
.../operators/impl/TestStreamOperatorImpl.java | 1 +
13 files changed, 89 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
[2/3] samza git commit: Merge branch 'SAMZA-1093' into
samza-fluent-api-v1
Posted by ni...@apache.org.
Merge branch 'SAMZA-1093' into samza-fluent-api-v1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cf267f8a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cf267f8a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cf267f8a
Branch: refs/heads/samza-fluent-api-v1
Commit: cf267f8a5e21aff4b37feb30557231515753b840
Parents: d39bce9 82b839f
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Mon Feb 27 12:59:49 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Feb 27 12:59:49 2017 -0800
----------------------------------------------------------------------
.../samza/operators/StreamGraphBuilder.java | 2 +
.../apache/samza/container/RunLoopFactory.java | 16 +-
.../org/apache/samza/task/TaskFactories.java | 95 +++++++
.../org/apache/samza/util/ScalaToJavaUtils.java | 41 +++
.../apache/samza/container/SamzaContainer.scala | 81 +++---
.../apache/samza/task/TestTaskFactories.java | 256 +++++++++++++++++++
.../testUtils/InvalidStreamGraphBuilder.java | 25 ++
.../samza/testUtils/TestAsyncStreamTask.java | 35 +++
.../samza/testUtils/TestStreamGraphBuilder.java | 33 +++
.../apache/samza/testUtils/TestStreamTask.java | 34 +++
.../apache/samza/zk/TestZkLeaderElector.java | 2 +-
.../java/org/apache/samza/zk/TestZkUtils.java | 2 +-
.../test/integration/StreamTaskTestUtil.scala | 3 +-
13 files changed, 564 insertions(+), 61 deletions(-)
----------------------------------------------------------------------