You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:41 UTC

[41/53] [abbrv] beam git commit: jstorm-runner: Fixup for review comments

jstorm-runner: Fixup for review comments


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/90ed2ef3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/90ed2ef3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/90ed2ef3

Branch: refs/heads/jstorm-runner
Commit: 90ed2ef344d19ca730429e9eb7c71779f995fc47
Parents: 6078cbc
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Aug 14 16:20:03 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:03:00 2017 +0800

----------------------------------------------------------------------
 .../runners/jstorm/JStormPipelineOptions.java   |  12 +--
 .../beam/runners/jstorm/JStormRunner.java       |  21 ++--
 .../beam/runners/jstorm/JStormRunnerResult.java |  21 ++--
 .../beam/runners/jstorm/TestJStormRunner.java   |  19 +++-
 .../serialization/JavaUtilsSerializer.java      |   3 +-
 .../translation/BoundedSourceTranslator.java    |   4 +-
 .../jstorm/translation/DoFnExecutor.java        |  27 +++--
 .../runners/jstorm/translation/Executor.java    |   6 ++
 .../jstorm/translation/ExecutorsBolt.java       |   8 +-
 .../jstorm/translation/FlattenExecutor.java     |   1 -
 .../jstorm/translation/FlattenTranslator.java   |  23 ++--
 .../translation/GroupByKeyTranslator.java       |  12 ---
 .../translation/GroupByWindowExecutor.java      |  12 ---
 .../translation/JStormStateInternals.java       |  29 +++--
 .../jstorm/translation/MetricsReporter.java     |   2 -
 .../translation/MultiOutputDoFnExecutor.java    |  22 +---
 .../translation/MultiStatefulDoFnExecutor.java  |   5 +-
 .../translation/ParDoBoundMultiTranslator.java  |  14 +--
 .../translation/ParDoBoundTranslator.java       | 108 -------------------
 .../translation/StatefulDoFnExecutor.java       |   1 -
 .../jstorm/translation/TimerService.java        |   2 +-
 .../jstorm/translation/TimerServiceImpl.java    |   8 +-
 .../jstorm/translation/TransformTranslator.java |  16 ++-
 .../jstorm/translation/TranslationContext.java  |  18 +++-
 .../jstorm/translation/TranslatorRegistry.java  |   1 -
 .../translation/UnboundedSourceSpout.java       |   8 +-
 .../jstorm/translation/ViewTranslator.java      |   4 +-
 .../translation/WindowAssignExecutor.java       |   2 -
 .../translation/JStormStateInternalsTest.java   |   2 +-
 29 files changed, 141 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
index 114877a..e494757 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
@@ -36,8 +36,8 @@ public interface JStormPipelineOptions extends PipelineOptions {
 
   @Description("Executing time(sec) of topology on local mode. Default is 1min.")
   @Default.Long(60)
-  Long getLocalModeExecuteTime();
-  void setLocalModeExecuteTime(Long time);
+  Long getLocalModeExecuteTimeSec();
+  void setLocalModeExecuteTimeSec(Long time);
 
   @Description("Worker number of topology")
   @Default.Integer(1)
@@ -46,8 +46,8 @@ public interface JStormPipelineOptions extends PipelineOptions {
 
   @Description("Global parallelism number of a component")
   @Default.Integer(1)
-  Integer getParallelismNumber();
-  void setParallelismNumber(Integer number);
+  Integer getParallelism();
+  void setParallelism(Integer number);
 
   @Description("System topology config of JStorm")
   @Default.InstanceFactory(DefaultMapValueFactory.class)
@@ -61,8 +61,8 @@ public interface JStormPipelineOptions extends PipelineOptions {
 
   @Description("Parallelism number of a specified composite PTransform")
   @Default.InstanceFactory(DefaultMapValueFactory.class)
-  Map getParallelismNumMap();
-  void setParallelismNumMap(Map parallelismNumMap);
+  Map getParallelismMap();
+  void setParallelismMap(Map parallelismNumMap);
 
   /**
    * Default value factory for topology configuration of JStorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 47de42c..21a8fae 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -79,15 +79,15 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
   }
 
   public static JStormRunner fromOptions(PipelineOptions options) {
-    JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(
-        JStormPipelineOptions.class, options);
+    JStormPipelineOptions pipelineOptions =
+        PipelineOptionsValidator.validate(JStormPipelineOptions.class, options);
     return new JStormRunner(pipelineOptions);
   }
 
   /**
-   * convert pipeline options to storm configuration format.
+   * Convert pipeline options to JStorm configuration format.
    * @param options
-   * @return
+   * @return JStorm configuration
    */
   private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
     Config config = new Config();
@@ -103,6 +103,8 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
 
     // Setup config for runtime env
     config.put("worker.external", "beam");
+    // We use "com.alibaba.jstorm.transactional" API for "at least once" and "exactly once",
+    // so we don't need acker task for beam job any more, and set related number to 0.
     config.put("topology.acker.executors", 0);
 
     // Register serializers of Kryo
@@ -271,7 +273,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
         LocalCluster localCluster = LocalCluster.getInstance();
         localCluster.submitTopology(topologyName, config, topology);
         return JStormRunnerResult.local(
-            topologyName, config, localCluster, options.getLocalModeExecuteTime());
+            topologyName, config, localCluster, options.getLocalModeExecuteTimeSec());
       } else {
         StormSubmitter.submitTopology(topologyName, config, topology);
         return null;
@@ -298,11 +300,12 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
     TopologyBuilder builder =
         isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
 
-    int parallelismNumber = options.getParallelismNumber();
+    int parallelismNumber = options.getParallelism();
     Map<String, UnboundedSourceSpout> spouts = context.getSpouts();
-    for (String id : spouts.keySet()) {
-      IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
-      builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
+    for (Map.Entry<String, UnboundedSourceSpout> entry : spouts.entrySet()) {
+      IRichSpout spout = getSpout(isExactlyOnce, entry.getValue());
+      builder.setSpout(
+          entry.getKey(), spout, getParallelismNum(entry.getValue(), parallelismNumber));
     }
 
     HashMap<String, BoltDeclarer> declarers = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index 797c899..4b1850e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -38,7 +38,7 @@ public abstract class JStormRunnerResult implements PipelineResult {
       Config config,
       LocalCluster localCluster,
       long localModeExecuteTimeSecs) {
-    return new LocalStormPipelineResult(
+    return new LocalJStormPipelineResult(
         topologyName, config, localCluster, localModeExecuteTimeSecs);
   }
 
@@ -62,12 +62,12 @@ public abstract class JStormRunnerResult implements PipelineResult {
     return topologyName;
   }
 
-  private static class LocalStormPipelineResult extends JStormRunnerResult {
+  private static class LocalJStormPipelineResult extends JStormRunnerResult {
 
     private LocalCluster localCluster;
     private long localModeExecuteTimeSecs;
 
-    LocalStormPipelineResult(
+    LocalJStormPipelineResult(
         String topologyName,
         Config config,
         LocalCluster localCluster,
@@ -78,7 +78,6 @@ public abstract class JStormRunnerResult implements PipelineResult {
 
     @Override
     public State cancel() throws IOException {
-      //localCluster.deactivate(getTopologyName());
       localCluster.killTopology(getTopologyName());
       localCluster.shutdown();
       JStormUtils.sleepMs(1000);
@@ -87,12 +86,7 @@ public abstract class JStormRunnerResult implements PipelineResult {
 
     @Override
     public State waitUntilFinish(Duration duration) {
-      return waitUntilFinish();
-    }
-
-    @Override
-    public State waitUntilFinish() {
-      JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
+      JStormUtils.sleepMs(duration.getMillis());
       try {
         return cancel();
       } catch (IOException e) {
@@ -101,8 +95,13 @@ public abstract class JStormRunnerResult implements PipelineResult {
     }
 
     @Override
+    public State waitUntilFinish() {
+      return waitUntilFinish(Duration.standardSeconds(localModeExecuteTimeSecs));
+    }
+
+    @Override
     public MetricResults metrics() {
-      return null;
+      throw new UnsupportedOperationException("This method is not yet supported.");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index 21a58e3..c9990e4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -50,13 +50,21 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
     return new TestJStormRunner(options.as(JStormPipelineOptions.class));
   }
 
+  // waiting time when job with assertion
+  private static final int ASSERTION_WAITING_TIME_MS = 20 * 1000;
+  // waiting time when job without assertion
+  private static final int RESULT_WAITING_TIME_MS = 5 * 1000;
+  private static final int RESULT_CHECK_INTERVAL_MS = 500;
+
   private final JStormRunner stormRunner;
   private final JStormPipelineOptions options;
 
   private TestJStormRunner(JStormPipelineOptions options) {
     this.options = options;
     Map conf = Maps.newHashMap();
-    //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
+    // Default state backend is RocksDB, for the users who could not run RocksDB on local testing
+    // env, following config is used to configure state backend to memory.
+    // conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
     options.setTopologyConfig(conf);
     options.setLocalMode(true);
     stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options"));
@@ -73,8 +81,9 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
       LOG.info("Running JStorm job {} with {} expected assertions.",
                result.getTopologyName(), numberOfAssertions);
 
-      int maxTimeoutSec = numberOfAssertions > 0 ? 20 : 5;
-      for (int waitTime = 0; waitTime <= maxTimeoutSec * 1000; ) {
+      int maxTimeoutMs =
+          numberOfAssertions > 0 ? ASSERTION_WAITING_TIME_MS : RESULT_WAITING_TIME_MS;
+      for (int waitTime = 0; waitTime <= maxTimeoutMs; ) {
         Optional<Boolean> success = numberOfAssertions > 0
                 ? checkForPAssertSuccess(numberOfAssertions) : Optional.<Boolean>absent();
         Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
@@ -86,8 +95,8 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
           LOG.info("Exception was found.", taskExceptionRec);
           throw new RuntimeException(taskExceptionRec.getCause());
         } else {
-          JStormUtils.sleepMs(500);
-          waitTime += 500;
+          JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS);
+          waitTime += RESULT_CHECK_INTERVAL_MS;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
index 5df686c..fa46fdb 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
@@ -45,7 +45,7 @@ import java.util.TreeSet;
 public class JavaUtilsSerializer {
 
   /**
-   * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}.
+   * Specific {@link Kryo} serializer for {@code java.util.Collections.SingletonList}.
    */
   public static class CollectionsSingletonListSerializer extends Serializer<List<?>> {
     public CollectionsSingletonListSerializer() {
@@ -222,7 +222,6 @@ public class JavaUtilsSerializer {
    * @see Collections#unmodifiableSortedMap(SortedMap)
    */
   private static void registerUnmodifableCollectionSerializers(Config config) {
-    UnmodifiableCollection.values();
     for (final UnmodifiableCollection item : UnmodifiableCollection.values()) {
       config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
index 53555c9..77d0823 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
@@ -24,9 +24,7 @@ import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Translates a {@link Read.Bounded} into a Storm spout.
- *
- * @param <T>
+ * Translates a {@link Read.Bounded} into a JStorm spout.
  */
 class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index 2148f34..72c386a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -71,9 +71,15 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
   /**
    * Implements {@link OutputManager} in a DoFn executor.
    */
-  public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+  protected static class DoFnExecutorOutputManager implements OutputManager, Serializable {
     private static final long serialVersionUID = -661113364735206170L;
 
+    private ExecutorsBolt executorsBolt;
+
+    public DoFnExecutorOutputManager(ExecutorsBolt executorsBolt) {
+      this.executorsBolt = executorsBolt;
+    }
+
     @Override
     public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
       executorsBolt.processExecutorElem(tag, output);
@@ -97,23 +103,23 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
 
   protected DoFn<InputT, OutputT> doFn;
   protected final Coder<WindowedValue<InputT>> inputCoder;
-  protected DoFnInvoker<InputT, OutputT> doFnInvoker;
-  protected OutputManager outputManager;
+  protected transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+  protected transient OutputManager outputManager;
   protected WindowingStrategy<?, ?> windowingStrategy;
   protected final TupleTag<InputT> mainInputTag;
   protected Collection<PCollectionView<?>> sideInputs;
-  protected SideInputHandler sideInputHandler;
+  protected transient SideInputHandler sideInputHandler;
   protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
 
   // Initialize during runtime
-  protected ExecutorContext executorContext;
+  protected transient ExecutorContext executorContext;
   protected ExecutorsBolt executorsBolt;
-  protected TimerInternals timerInternals;
+  protected transient TimerInternals timerInternals;
   protected transient StateInternals pushbackStateInternals;
   protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
   protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
   protected transient IKvStoreManager kvStoreManager;
-  protected DefaultStepContext stepContext;
+  protected transient DefaultStepContext stepContext;
   protected transient MetricClient metricClient;
 
   public DoFnExecutor(
@@ -133,7 +139,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
     this.doFn = doFn;
     this.inputCoder = inputCoder;
-    this.outputManager = new DoFnExecutorOutputManager();
     this.windowingStrategy = windowingStrategy;
     this.mainInputTag = mainInputTag;
     this.sideInputs = sideInputs;
@@ -174,6 +179,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
     this.executorsBolt = context.getExecutorsBolt();
     this.pipelineOptions =
         this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+    this.outputManager = new DoFnExecutorOutputManager(executorsBolt);
 
     initService(context);
 
@@ -199,8 +205,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
 
   @Override
   public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-    LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
-        tag, mainInputTag, sideInputs, elem.getValue()));
     if (mainInputTag.equals(tag)) {
       processMainInput(elem);
     } else if (sideInputTagToView.containsKey(tag)) {
@@ -213,6 +217,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
   }
 
   protected <T> void processMainInput(WindowedValue<T> elem) {
+    LOG.debug(String.format("Main input: tag=%s, elem=%s", mainInputTag, elem));
     if (sideInputs.isEmpty()) {
       runner.processElement((WindowedValue<InputT>) elem);
     } else {
@@ -234,7 +239,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
   }
 
   protected void processSideInput(TupleTag tag, WindowedValue elem) {
-    LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
+    LOG.debug(String.format("Side inputs: tag=%s, elem=%s.", tag, elem));
 
     PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
     sideInputHandler.addSideInputValue(sideInputView, elem);

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
index 8812988..fd7af7d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
@@ -30,7 +30,13 @@ public interface Executor extends Serializable {
    */
   void init(ExecutorContext context);
 
+  /**
+   * Process element form "tag" stream.
+   */
   <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
 
+  /**
+   * Cleanup when task is shutdown.
+   */
   void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index f8e09be..449ecb5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -58,9 +58,9 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
 
   private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
 
-  protected ExecutorContext executorContext;
+  protected transient ExecutorContext executorContext;
 
-  protected TimerService timerService;
+  protected transient TimerService timerService;
 
   // map from input tag to executor inside bolt
   protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
@@ -73,7 +73,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
   protected int internalDoFnExecutorId = 1;
   protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
 
-  protected OutputCollector collector;
+  protected transient OutputCollector collector;
 
   protected boolean isStatefulBolt = false;
 
@@ -265,8 +265,8 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
   }
 
   public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
-    LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag);
     if (elem != null) {
+      LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag);
       Executor executor = inputTagToExecutor.get(inputTag);
       if (executor != null) {
         executor.process(inputTag, elem);

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
index 928fa24..9d4184c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
- * @param <InputT>
  */
 class FlattenExecutor<InputT> implements Executor {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index ebe8bc3..62621d0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -46,20 +45,18 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
   public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
     TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
 
-    // Since a new tag is created in PCollectionList, retrieve the real tag here.
+    // Flatten supports to consume multi-copy from a same PCollection, so we need to record
+    // the copy number here.
     Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
     Map<TupleTag<?>, Integer> tagToCopyNum = Maps.newHashMap();
-    for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
-      PCollection<V> pc = (PCollection<V>) entry.getValue();
-      //inputs.putAll(pc.expand());
-      for (Map.Entry<TupleTag<?>, PValue> entry1 : pc.expand().entrySet()) {
-        if (inputs.containsKey(entry1.getKey())) {
-          int copyNum = tagToCopyNum.get(entry1.getKey());
-          tagToCopyNum.put(entry1.getKey(), ++copyNum);
-        } else {
-          inputs.put(entry1.getKey(), entry1.getValue());
-          tagToCopyNum.put(entry1.getKey(), 1);
-        }
+    for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getTransformInputs().entrySet()) {
+      TupleTag tag = userGraphContext.findTupleTag(entry.getValue());
+      if (inputs.containsKey(tag)) {
+        int copyNum = tagToCopyNum.get(tag);
+        tagToCopyNum.put(tag, ++copyNum);
+      } else {
+        inputs.put(tag, entry.getValue());
+        tagToCopyNum.put(tag, 1);
       }
     }
     String description = describeTransform(transform, inputs, userGraphContext.getOutputs());

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
index 85c958a..02f42bd 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
@@ -18,28 +18,21 @@
 package org.apache.beam.runners.jstorm.translation;
 
 import com.google.common.collect.Lists;
-import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
- * @param <K>
- * @param <V>
  */
 class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
   // information of transform
   protected PCollection<KV<K, V>> input;
-  protected PCollection<KV<K, Iterable<V>>> output;
-  protected List<TupleTag<?>> inputTags;
   protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
   protected List<TupleTag<?>> sideOutputTags;
-  protected List<PCollectionView<?>> sideInputs;
   protected WindowingStrategy<?, ?> windowingStrategy;
 
   @Override
@@ -49,13 +42,8 @@ class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<
         describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
 
     input = (PCollection<KV<K, V>>) userGraphContext.getInput();
-    output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
-
-    inputTags = userGraphContext.getInputTags();
     mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
     sideOutputTags = Lists.newArrayList();
-
-    sideInputs = Collections.<PCollectionView<?>>emptyList();
     windowingStrategy = input.getWindowingStrategy();
 
     GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
index 1c858b7..cae1bc3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -51,8 +50,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
- * @param <K>
- * @param <V>
  */
 class GroupByWindowExecutor<K, V>
     extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
@@ -60,14 +57,6 @@ class GroupByWindowExecutor<K, V>
 
   private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
 
-  private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      executorsBolt.processExecutorElem(tag, output);
-    }
-  }
-
   private KvCoder<K, V> inputKvCoder;
   private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
 
@@ -92,7 +81,6 @@ class GroupByWindowExecutor<K, V>
         mainTupleTag,
         sideOutputTags);
 
-    this.outputManager = new GroupByWindowOutputManager();
     UserGraphContext userGraphContext = context.getUserGraphContext();
     PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
     this.inputKvCoder = (KvCoder<K, V>) input.getCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 292b771..e2139d8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -180,12 +180,8 @@ class JStormStateInternals<K> implements StateInternals {
               kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
 
           Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
-              new BinaryCombineFn<Instant>() {
-                @Override
-                public Instant apply(Instant left, Instant right) {
-                  return timestampCombiner.combine(left, right);
-                }
-              };
+              new WatermarkCombineFn(timestampCombiner);
+
           return new JStormWatermarkHoldState(
               id, spec, namespace,
               new JStormCombiningState<>(
@@ -203,6 +199,19 @@ class JStormStateInternals<K> implements StateInternals {
     });
   }
 
+  private static class WatermarkCombineFn extends BinaryCombineFn<Instant> {
+    private final TimestampCombiner timestampCombiner;
+
+    public WatermarkCombineFn(TimestampCombiner timestampCombiner) {
+      this.timestampCombiner = timestampCombiner;
+    }
+
+    @Override
+    public Instant apply(Instant left, Instant right) {
+      return timestampCombiner.combine(left, right);
+    }
+  };
+
   /**
    * JStorm implementation of {@link ValueState}.
    */
@@ -623,7 +632,7 @@ class JStormStateInternals<K> implements StateInternals {
 
     @Override
     public ReadableState<V> get(K var1) {
-      ReadableState<V> ret = new MapReadableState<>(null);
+      ReadableState<V> ret = null;
       try {
         ret = new MapReadableState(kvStore.get(var1));
       } catch (IOException e) {
@@ -634,7 +643,7 @@ class JStormStateInternals<K> implements StateInternals {
 
     @Override
     public ReadableState<Iterable<K>> keys() {
-      ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+      ReadableState<Iterable<K>> ret = null;
       try {
         ret = new MapReadableState<>(kvStore.keys());
       } catch (IOException e) {
@@ -645,7 +654,7 @@ class JStormStateInternals<K> implements StateInternals {
 
     @Override
     public ReadableState<Iterable<V>> values() {
-      ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+      ReadableState<Iterable<V>> ret = null;
       try {
         ret = new MapReadableState<>(kvStore.values());
       } catch (IOException e) {
@@ -656,7 +665,7 @@ class JStormStateInternals<K> implements StateInternals {
 
     @Override
     public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
-      ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+      ReadableState<Iterable<Map.Entry<K, V>>> ret = null;
       try {
         ret = new MapReadableState<>(kvStore.entries());
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
index 5b60b03..e7f3285 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
@@ -63,10 +63,8 @@ class MetricsReporter {
   }
 
   private void updateCounters(Iterable<MetricResult<Long>> counters) {
-    System.out.print("updateCounters");
     for (MetricResult<Long> metricResult : counters) {
       String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
-      System.out.print("metricName: " + metricName);
       Long updateValue = metricResult.attempted();
       Long oldValue = reportedCounters.get(metricName);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
index 138a5dc..f318a89 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
@@ -32,26 +32,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * JStorm {@link Executor} for {@link DoFn} with multi-output.
- * @param <InputT>
- * @param <OutputT>
  */
 class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
   private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
 
-  /**
-   * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
-   * tag is used in downstream consumer. So before output, we need to map this "local" tag to
-   * "external" tag. See PCollectionTuple for details.
-   */
-  public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      executorsBolt.processExecutorElem(tag, output);
-    }
-  }
-
-  protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
-
   public MultiOutputDoFnExecutor(
       String stepName,
       String description,
@@ -63,13 +47,9 @@ class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, Outp
       Collection<PCollectionView<?>> sideInputs,
       Map<TupleTag, PCollectionView<?>> sideInputTagToView,
       TupleTag<OutputT> mainTupleTag,
-      List<TupleTag<?>> sideOutputTags,
-      Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+      List<TupleTag<?>> sideOutputTags
   ) {
     super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
         sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
-    this.localTupleTagMap = localTupleTagMap;
-    this.outputManager = new MultiOutputDoFnExecutorOutputManager();
-    LOG.info("localTupleTagMap: {}", localTupleTagMap);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
index a3ffc30..44c0765 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
- * @param <OutputT>
  */
 class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
 
@@ -42,9 +41,9 @@ class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, Out
       Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
       TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
       Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
-      List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+      List<TupleTag<?>> sideOutputTags) {
     super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
-        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
index 7daa1cb..986af43 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,7 +32,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
@@ -50,12 +48,6 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
 
     Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
-    Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
-    for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
-      Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
-      localToExternalTupleTagMap.put(entry.getKey(), itr.next());
-    }
-
     TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
     List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
     sideOutputTags.remove(mainOutputTag);
@@ -90,8 +82,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
           transform.getSideInputs(),
           sideInputTagToView.build(),
           mainOutputTag,
-          sideOutputTags,
-          localToExternalTupleTagMap);
+          sideOutputTags);
     } else {
       executor = new MultiOutputDoFnExecutor<>(
           userGraphContext.getStepName(),
@@ -105,8 +96,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
           transform.getSideInputs(),
           sideInputTagToView.build(),
           mainOutputTag,
-          sideOutputTags,
-          localToExternalTupleTagMap);
+          sideOutputTags);
     }
 
     context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
deleted file mode 100644
index e6d09c4..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
- */
-class ParDoBoundTranslator<InputT, OutputT>
-    extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
-
-  @Override
-  public void translateNode(
-      ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
-    final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    final TupleTag<?> inputTag = userGraphContext.getInputTag();
-    PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
-    TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
-    List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
-
-    Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
-    for (PCollectionView pCollectionView : transform.getSideInputs()) {
-      allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-    }
-    String description = describeTransform(
-        transform,
-        allInputs,
-        userGraphContext.getOutputs());
-
-    ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
-    for (PCollectionView pCollectionView : transform.getSideInputs()) {
-      sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-    }
-
-    DoFnExecutor executor;
-    DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-    if (signature.stateDeclarations().size() > 0
-        || signature.timerDeclarations().size() > 0) {
-      executor = new StatefulDoFnExecutor<>(
-          userGraphContext.getStepName(),
-          description,
-          userGraphContext.getOptions(),
-          (DoFn<KV, OutputT>) transform.getFn(),
-          (Coder) WindowedValue.getFullCoder(
-              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-          input.getWindowingStrategy(),
-          (TupleTag<KV>) inputTag,
-          transform.getSideInputs(),
-          sideInputTagToView.build(),
-          mainOutputTag,
-          sideOutputTags);
-    } else {
-      executor = new DoFnExecutor<>(
-          userGraphContext.getStepName(),
-          description,
-          userGraphContext.getOptions(),
-          transform.getFn(),
-          WindowedValue.getFullCoder(
-              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-          input.getWindowingStrategy(),
-          (TupleTag<InputT>) inputTag,
-          transform.getSideInputs(),
-          sideInputTagToView.build(),
-          mainOutputTag,
-          sideOutputTags);
-    }
-
-    context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
index 911f259..70e2570 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * JStorm {@link Executor} for stateful {@link DoFn}.
- * @param <OutputT>
  */
 class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
   public StatefulDoFnExecutor(

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
index 24a9050..159fe70 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
@@ -30,7 +30,7 @@ interface TimerService extends Serializable {
   void init(List<Integer> upStreamTasks);
 
   /**
-   *
+   * Update watermark when receiving watermark from a upstream task.
    * @param task
    * @param inputWatermark
    * @return new watermark if any timer is triggered during the update of watermark, otherwise 0

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
index 6b463db..027fc14 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
@@ -39,15 +39,15 @@ import org.joda.time.Instant;
  * Default implementation of {@link TimerService}.
  */
 class TimerServiceImpl implements TimerService {
-  private transient ExecutorContext executorContext;
-  private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+  private ExecutorContext executorContext;
+  private Map<Integer, DoFnExecutor> idToDoFnExecutor;
 
   private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
       new ConcurrentHashMap<>();
   private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
   private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
   private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
-  private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+  private final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
       new PriorityQueue<>();
   private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
       timerDataToKeyedExecutors = Maps.newHashMap();
@@ -132,7 +132,7 @@ class TimerServiceImpl implements TimerService {
     if (currentHold == null) {
       namespaceToWatermarkHold.put(namespace, watermarkHold);
       watermarkHolds.add(watermarkHold);
-    } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+    } else if (watermarkHold.isBefore(currentHold)) {
       namespaceToWatermarkHold.put(namespace, watermarkHold);
       watermarkHolds.add(watermarkHold);
       watermarkHolds.remove(currentHold);

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
index 4d431d3..f0b8f74 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
@@ -38,8 +38,8 @@ interface TransformTranslator<T extends PTransform<?, ?>> {
   boolean canTranslate(T transform, TranslationContext context);
 
     /**
-     * Default translator.
-     * @param <T1>
+     * Default translator does NOT translate anything, but just generate
+     * the description of PTransform.
      */
   class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
     @Override
@@ -61,7 +61,11 @@ interface TransformTranslator<T extends PTransform<?, ?>> {
               .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
                 @Override
                 public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
-                  return taggedPValue.getKey().getId();
+                  if (taggedPValue != null) {
+                    return taggedPValue.getKey().getId();
+                  } else {
+                    return null;
+                  }
                 }
               })),
           transform.getName(),
@@ -69,7 +73,11 @@ interface TransformTranslator<T extends PTransform<?, ?>> {
               .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
                 @Override
                 public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
-                  return taggedPvalue.getKey().getId();
+                  if (taggedPvalue != null) {
+                    return taggedPvalue.getKey().getId();
+                  } else {
+                    return null;
+                  }
                 }
               })));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 0991448..4407f15 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -198,8 +198,6 @@ public class TranslationContext {
       }
       bolt.addExecutor(tag, executor, userGraphContext.getStepName());
 
-      // filter all connections inside bolt
-      //if (!bolt.getOutputTags().contains(tag)) {
       Stream.Grouping grouping;
       if (isGBK) {
         grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
@@ -207,7 +205,6 @@ public class TranslationContext {
         grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
       }
       addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
-      //}
     }
 
     for (PValue sideInput : sideInputs) {
@@ -223,7 +220,7 @@ public class TranslationContext {
     // set parallelismNumber
     String pTransformfullName = userGraphContext.currentTransform.getFullName();
     String compositeName = pTransformfullName.split("/")[0];
-    Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
+    Map parallelismNumMap = userGraphContext.getOptions().getParallelismMap();
     if (parallelismNumMap.containsKey(compositeName)) {
       int configNum = (Integer) parallelismNumMap.get(compositeName);
       int currNum = bolt.getParallelismNum();
@@ -262,10 +259,21 @@ public class TranslationContext {
       return (T) currentTransform.getInputs().values().iterator().next();
     }
 
-    public Map<TupleTag<?>, PValue> getInputs() {
+    public Map<TupleTag<?>, PValue> getTransformInputs() {
       return currentTransform.getInputs();
     }
 
+    /**
+     * Get input PValues with the output tags of upstream node.
+     */
+    public Map<TupleTag<?>, PValue> getInputs() {
+      Map<TupleTag<?>, PValue> ret = Maps.newHashMap();
+      for (PValue pValue : currentTransform.getInputs().values()) {
+        ret.put(findTupleTag(pValue), pValue);
+      }
+      return ret;
+    }
+
     public TupleTag<?> getInputTag() {
       return pValueToTupleTag.get(this.getInput());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index 9eaa13a..c8ea545 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -40,7 +40,6 @@ class TranslatorRegistry {
   static {
     TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
-    TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
     TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
     TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
     TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
index 4ae28e6..627a834 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -61,7 +62,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
 
   private KryoSerializer<WindowedValue> serializer;
 
-  private long lastWaterMark = 0L;
+  private long lastWaterMark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
 
   public UnboundedSourceSpout(
       String name,
@@ -113,7 +114,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
   }
 
   @Override
-  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+  public synchronized void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
     try {
       this.collector = collector;
       this.pipelineOptions =
@@ -127,7 +128,8 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
     }
   }
 
-  public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+  public synchronized void createSourceReader(UnboundedSource.CheckpointMark checkpointMark)
+      throws IOException {
     if (reader != null) {
       reader.close();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
index 9ab5784..de3f568 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
@@ -256,9 +256,7 @@ class ViewTranslator
   /**
    * Specialized expansion for
    * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
-   * @param <InputT>
-   * @param <OutputT>
-     */
+   */
   public static class CombineGloballyAsSingletonView<InputT, OutputT>
       extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
     Combine.GloballyAsSingletonView<InputT, OutputT> transform;

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
index 8d60392..832c95c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
@@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
- * @param <T>
- * @param <W>
  */
 class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
   private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
index b2ca267..3acf662 100644
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
@@ -64,7 +64,7 @@ public class JStormStateInternalsTest {
     IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
         Maps.newHashMap(),
         "test",
-        tmp.toString(),
+        tmp.getRoot().toString(),
         new KryoSerializer(Maps.newHashMap()));
     jstormStateInternals = new JStormStateInternals(
         "key-1", kvStoreManager, new TimerServiceImpl(), 0);