You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/27 21:01:39 UTC

[1/3] samza git commit: SAMZA-1093: instantiating StreamOperatorTask in SamzaContainer (WIP)

Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 dde754246 -> a9b213c13


SAMZA-1093: instantiating StreamOperatorTask in SamzaContainer (WIP)

SAMZA-1093: Instantiating StreamOperatorTask in SamzaContainer

SAMZA-1093: fix import

SAMZA-1093: code ready, more unit tests pending


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/82b839f9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/82b839f9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/82b839f9

Branch: refs/heads/samza-fluent-api-v1
Commit: 82b839f99de832aed9f2f4dd935d9f6bbf6932ec
Parents: 7e7747f
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Feb 21 16:18:25 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Feb 24 23:33:26 2017 -0800

----------------------------------------------------------------------
 .../samza/operators/StreamGraphBuilder.java     |   2 +
 .../apache/samza/container/RunLoopFactory.java  |  16 +-
 .../org/apache/samza/task/TaskFactories.java    |  95 +++++++
 .../org/apache/samza/util/ScalaToJavaUtils.java |  41 +++
 .../apache/samza/container/SamzaContainer.scala |  81 +++---
 .../apache/samza/task/TestTaskFactories.java    | 256 +++++++++++++++++++
 .../testUtils/InvalidStreamGraphBuilder.java    |  25 ++
 .../samza/testUtils/TestAsyncStreamTask.java    |  35 +++
 .../samza/testUtils/TestStreamGraphBuilder.java |  33 +++
 .../apache/samza/testUtils/TestStreamTask.java  |  34 +++
 .../apache/samza/zk/TestZkLeaderElector.java    |   2 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   |   2 +-
 .../test/integration/StreamTaskTestUtil.scala   |   3 +-
 13 files changed, 564 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
index b415cf8..23e8625 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
@@ -27,6 +27,8 @@ import org.apache.samza.config.Config;
  */
 @InterfaceStability.Unstable
 public interface StreamGraphBuilder {
+  static final String BUILDER_CLASS_CONFIG = "job.graph.builder.class";
+
   /**
    * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
    * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index 32ab47a..f81266b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -27,12 +27,12 @@ import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
-import scala.runtime.AbstractFunction0;
 import scala.runtime.AbstractFunction1;
 
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.samza.util.Util.asScalaClock;
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
 
 /**
  * Factory class to create runloop for a Samza task, based on the type
@@ -109,18 +109,4 @@ public class RunLoopFactory {
     }
   }
 
-  /**
-   * Returns a default value object for scala option.getOrDefault() to use
-   * @param value default value
-   * @param <T> value type
-   * @return object containing default value
-   */
-  public static <T> AbstractFunction0<T> defaultValue(final T value) {
-    return new AbstractFunction0<T>() {
-      @Override
-      public T apply() {
-        return value;
-      }
-    };
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
new file mode 100644
index 0000000..0bfa8c4
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactories.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.TaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
+
+public class TaskFactories {
+  private static final Logger log = LoggerFactory.getLogger(TaskFactories.class);
+
+  public static Object fromTaskClassConfig(Config config) throws ClassNotFoundException {
+
+    String taskClassName;
+
+    // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory
+    if (isStreamOperatorTask(config)) {
+      taskClassName = StreamOperatorTask.class.getName();
+    } else {
+      taskClassName = new TaskConfig(config).getTaskClass().getOrElse(defaultValue(null));
+    }
+
+    if (taskClassName == null) {
+      throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory");
+    }
+    log.info("Got task class name: {}", taskClassName);
+
+    boolean isAsyncTaskClass = AsyncStreamTask.class.isAssignableFrom(Class.forName(taskClassName));
+    if (isAsyncTaskClass) {
+      return new AsyncStreamTaskFactory() {
+        @Override
+        public AsyncStreamTask createInstance() {
+          try {
+            return (AsyncStreamTask) Class.forName(taskClassName).newInstance();
+          } catch (Exception e) {
+            log.error("Error loading AsyncStreamTask class: {}. error: {}", taskClassName, e);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+    }
+
+    return new StreamTaskFactory() {
+      @Override
+      public StreamTask createInstance() {
+        try {
+          return taskClassName == StreamOperatorTask.class.getName() ? createStreamOperatorTask(config) :
+              (StreamTask) Class.forName(taskClassName).newInstance();
+        } catch (Exception e) {
+          log.error("Error loading StreamTask class: {}. error: {}", taskClassName, e);
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static StreamTask createStreamOperatorTask(Config config) throws Exception {
+    StreamGraphBuilder graphBuilder = (StreamGraphBuilder) Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance();
+    return new StreamOperatorTask(graphBuilder);
+  }
+
+  private static boolean isStreamOperatorTask(Config config) {
+    try {
+      if (config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != null && config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG) != "") {
+        return StreamGraphBuilder.class.isAssignableFrom(Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)));
+      }
+      return false;
+    } catch (Exception e) {
+      log.error("Failed to validate StreamGraphBuilder class from the config. {}={}",
+          StreamGraphBuilder.BUILDER_CLASS_CONFIG, config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
new file mode 100644
index 0000000..6c7fc2d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import scala.runtime.AbstractFunction0;
+
+/**
+ * Common utils methods that helps to convert or use Scala objects in Java code
+ */
+public class ScalaToJavaUtils {
+  /**
+   * Returns a default value object for scala option.getOrDefault() to use
+   * @param value default value
+   * @param <T> value type
+   * @return object containing default value
+   */
+  public static <T> AbstractFunction0<T> defaultValue(final T value) {
+    return new AbstractFunction0<T>() {
+      @Override
+      public T apply() {
+        return value;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89522dc..5476fb5 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -25,6 +25,8 @@ import java.util
 import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit}
 import java.lang.Thread.UncaughtExceptionHandler
 import java.net.{URL, UnknownHostException}
+import javax.servlet.SingleThreadModel
+
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
@@ -64,13 +66,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
-import org.apache.samza.task.AsyncRunLoop
-import org.apache.samza.task.AsyncStreamTask
-import org.apache.samza.task.AsyncStreamTaskAdapter
-import org.apache.samza.task.AsyncStreamTaskFactory
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.StreamTaskFactory
-import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.task._
 import org.apache.samza.util.HighResolutionClock
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
@@ -432,28 +428,6 @@ object SamzaContainer extends Logging {
     val singleThreadMode = config.getSingleThreadMode
     info("Got single thread mode: " + singleThreadMode)
 
-    val taskClassName = config.getTaskClass.orNull
-    info("Got task class name: %s" format taskClassName)
-
-    if (taskClassName == null && taskFactory == null) {
-      throw new SamzaException("Either the task class name or the task factory instance is required.")
-    }
-
-    val isAsyncTask: Boolean =
-      if (taskFactory != null) {
-        taskFactory.isInstanceOf[AsyncStreamTaskFactory]
-      } else {
-        classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
-      }
-
-    if (isAsyncTask) {
-      info("Got an AsyncStreamTask implementation.")
-    }
-
-    if(singleThreadMode && isAsyncTask) {
-      throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
-    }
-
     val threadPoolSize = config.getThreadPoolSize
     info("Got thread pool size: " + threadPoolSize)
 
@@ -462,6 +436,39 @@ object SamzaContainer extends Logging {
     else
       null
 
+    def maybeAsyncStreamTaskFactory : Object = {
+
+      val taskClassFactory = taskFactory match {
+        case null => TaskFactories.fromTaskClassConfig(config)
+        case _ => taskFactory
+      }
+
+      if (taskClassFactory == null) {
+        throw new SamzaException("Either the task class name or the task factory instance is required.")
+      }
+
+      val isAsyncTaskClass: Boolean = taskClassFactory.isInstanceOf[AsyncStreamTaskFactory]
+
+      if (isAsyncTaskClass) {
+        info("Got an AsyncStreamTask implementation.")
+      }
+
+      if (singleThreadMode && isAsyncTaskClass) {
+        throw new SamzaException("AsyncStreamTask cannot run on single thread mode.")
+      }
+
+      if (!singleThreadMode && !isAsyncTaskClass) {
+        info("Converting StreamTask to AsyncStreamTaskAdapter when running StreamTask w/ multiple threads")
+        new AsyncStreamTaskFactory {
+          override def createInstance() = new AsyncStreamTaskAdapter(taskClassFactory.asInstanceOf[StreamTaskFactory].createInstance(), taskThreadPool)
+        }
+      } else {
+        taskClassFactory
+      }
+    }
+
+    val finalTaskFactory = maybeAsyncStreamTaskFactory
+
     // Wire up all task-instance-level (unshared) objects.
     val taskNames = containerModel
       .getTasks
@@ -483,24 +490,14 @@ object SamzaContainer extends Logging {
 
       val taskName = taskModel.getTaskName
 
-      val taskObj = if (taskFactory != null) {
-        debug("Using task factory to create task instance")
-        taskFactory match {
+      debug("Using task factory to create task instance")
+      val task =
+        finalTaskFactory match {
           case tf: AsyncStreamTaskFactory => tf.createInstance()
           case tf: StreamTaskFactory => tf.createInstance()
           case _ =>
             throw new SamzaException("taskFactory must be an instance of StreamTaskFactory or AsyncStreamTaskFactory")
         }
-      } else {
-        debug("Using task class name: %s to create instance" format taskClassName)
-        Class.forName(taskClassName).newInstance
-      }
-
-      val task = if (!singleThreadMode && !isAsyncTask)
-        // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool
-        new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], taskThreadPool)
-      else
-        taskObj
 
       val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
new file mode 100644
index 0000000..29741a4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactories.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.testUtils.TestAsyncStreamTask;
+import org.apache.samza.testUtils.TestStreamTask;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test methods to create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based on task class configuration
+ */
+public class TestTaskFactories {
+
+  @Test
+  public void testStreamTaskClass() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "no.such.class");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ClassNotFoundException cfe) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testStreamOperatorTaskClass() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ConfigException ce) {
+      // expected
+    }
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "no.such.class");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ConfigException ce) {
+      // expected
+    }
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ empty class name for StreamGraphBuilder");
+    } catch (ConfigException ce) {
+      // expected
+    }
+
+    config = new MapConfig(new HashMap<String, String>());
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail(String.format("Should have failed w/ non-existing entry for %s", StreamGraphBuilder.BUILDER_CLASS_CONFIG));
+    } catch (ConfigException ce) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testStreamOperatorTaskClassWithTaskClass() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "no.such.class");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.TestStreamGraphBuilder");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof StreamOperatorTask);
+  }
+
+  @Test
+  public void testStreamTaskClassWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof StreamTaskFactory);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof TestStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no class not found");
+    } catch (ClassNotFoundException cne) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testAsyncStreamTask() throws ClassNotFoundException {
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "no.such.class");
+      }
+    });
+    try {
+      TaskFactories.fromTaskClassConfig(config);
+      fail("Should have failed w/ no.such.class");
+    } catch (ClassNotFoundException cfe) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws ClassNotFoundException {
+
+    Config config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "org.apache.samza.testUtils.InvalidStreamGraphBuilder");
+      }
+    });
+    Object retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, "");
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+
+    config = new MapConfig(new HashMap<String, String>() {
+      {
+        this.put("task.class", "org.apache.samza.testUtils.TestAsyncStreamTask");
+        this.put(StreamGraphBuilder.BUILDER_CLASS_CONFIG, null);
+      }
+    });
+    retFactory = TaskFactories.fromTaskClassConfig(config);
+    assertTrue(retFactory instanceof AsyncStreamTaskFactory);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() instanceof TestAsyncStreamTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
new file mode 100644
index 0000000..2fe6946
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/InvalidStreamGraphBuilder.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+/**
+ * Test class. Invalid class to implement {@link org.apache.samza.operators.StreamGraphBuilder}
+ */
+public class InvalidStreamGraphBuilder {
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
new file mode 100644
index 0000000..81f3fd4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.AsyncStreamTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link AsyncStreamTask}
+ */
+public class TestAsyncStreamTask implements AsyncStreamTask {
+  @Override
+  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
new file mode 100644
index 0000000..10eb02f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamGraphBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+
+/**
+ * Test implementation class for {@link StreamGraphBuilder}
+ */
+public class TestStreamGraphBuilder implements StreamGraphBuilder {
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
new file mode 100644
index 0000000..ce0980a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * Test implementation class for {@link StreamTask}
+ */
+public class TestStreamTask implements StreamTask {
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index b999ec5..9b5033b 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -57,7 +57,7 @@ public class TestZkLeaderElector {
 
   @Before
   public void testSetup() {
-    testZkConnectionString = "localhost:" + zkServer.getPort();
+    testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
     try {
       testZkUtils = getZkUtilsWithNewClient();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 855d29d..1cf901e 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -47,7 +47,7 @@ public class TestZkUtils {
   public void testSetup() {
     try {
       zkClient = new ZkClient(
-          new ZkConnection("localhost:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
+          new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
           CONNECTION_TIMEOUT_MS);
     } catch (Exception e) {
       Assert.fail("Client connection setup failed. Aborting tests..");

http://git-wip-us.apache.org/repos/asf/samza/blob/82b839f9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 5d82b92..b803dfe 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -89,8 +89,6 @@ object StreamTaskTestUtil {
     "systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
     "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
     "systems.kafka.samza.msg.serde" -> "string",
-    "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181",
-    "systems.kafka.producer.bootstrap.servers" -> "localhost:9092",
     // Since using state, need a checkpoint manager
     "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
     "task.checkpoint.system" -> "kafka",
@@ -122,6 +120,7 @@ object StreamTaskTestUtil {
     val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
     brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
 
+    // setup the zookeeper and bootstrap servers for local kafka cluster
     jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
       "systems.kafka.producer.bootstrap.servers" -> brokers)
 


[3/3] samza git commit: Merge branch 'samza-fluent-api-v1' of http://git-wip-us.apache.org/repos/asf/samza into samza-fluent-api-v1

Posted by ni...@apache.org.
Merge branch 'samza-fluent-api-v1' of http://git-wip-us.apache.org/repos/asf/samza into samza-fluent-api-v1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a9b213c1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a9b213c1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a9b213c1

Branch: refs/heads/samza-fluent-api-v1
Commit: a9b213c1389fe46c7f5575b22dc4e1d20ac7b367
Parents: cf267f8 dde7542
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Mon Feb 27 13:00:57 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Feb 27 13:00:57 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/operators/StreamGraph.java |  2 +-
 .../org/apache/samza/operators/StreamSpec.java  | 46 -----------
 .../apache/samza/operators/StreamGraphImpl.java | 83 +++++++++-----------
 .../samza/processorgraph/ExecutionPlanner.java  | 34 ++++----
 .../samza/example/KeyValueStoreExample.java     | 24 +-----
 .../samza/example/NoContextStreamExample.java   | 33 +-------
 .../samza/example/OrderShipmentJoinExample.java | 35 +--------
 .../samza/example/PageViewCounterExample.java   | 23 +-----
 .../samza/example/RepartitionExample.java       | 23 +-----
 .../samza/example/TestBroadcastExample.java     | 15 +---
 .../apache/samza/example/TestJoinExample.java   | 26 ++----
 .../apache/samza/example/TestWindowExample.java | 15 +---
 .../operators/impl/TestStreamOperatorImpl.java  |  1 +
 13 files changed, 89 insertions(+), 271 deletions(-)
----------------------------------------------------------------------



[2/3] samza git commit: Merge branch 'SAMZA-1093' into samza-fluent-api-v1

Posted by ni...@apache.org.
Merge branch 'SAMZA-1093' into samza-fluent-api-v1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cf267f8a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cf267f8a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cf267f8a

Branch: refs/heads/samza-fluent-api-v1
Commit: cf267f8a5e21aff4b37feb30557231515753b840
Parents: d39bce9 82b839f
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Mon Feb 27 12:59:49 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Mon Feb 27 12:59:49 2017 -0800

----------------------------------------------------------------------
 .../samza/operators/StreamGraphBuilder.java     |   2 +
 .../apache/samza/container/RunLoopFactory.java  |  16 +-
 .../org/apache/samza/task/TaskFactories.java    |  95 +++++++
 .../org/apache/samza/util/ScalaToJavaUtils.java |  41 +++
 .../apache/samza/container/SamzaContainer.scala |  81 +++---
 .../apache/samza/task/TestTaskFactories.java    | 256 +++++++++++++++++++
 .../testUtils/InvalidStreamGraphBuilder.java    |  25 ++
 .../samza/testUtils/TestAsyncStreamTask.java    |  35 +++
 .../samza/testUtils/TestStreamGraphBuilder.java |  33 +++
 .../apache/samza/testUtils/TestStreamTask.java  |  34 +++
 .../apache/samza/zk/TestZkLeaderElector.java    |   2 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   |   2 +-
 .../test/integration/StreamTaskTestUtil.scala   |   3 +-
 13 files changed, 564 insertions(+), 61 deletions(-)
----------------------------------------------------------------------