You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:23 UTC

[23/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
new file mode 100644
index 0000000..dab9518
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Values;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Spout implementation that wraps a Beam UnboundedSource.
+ * TODO: add wrapper to support metrics in UnboundedSource.
+ */
+public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
+  private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+  private final String description;
+  private final UnboundedSource source;
+  private final SerializedPipelineOptions serializedOptions;
+  private final TupleTag<?> outputTag;
+
+  private transient JStormPipelineOptions pipelineOptions;
+  private transient UnboundedSource.UnboundedReader reader;
+  private transient SpoutOutputCollector collector;
+
+  private volatile boolean hasNextRecord;
+  private AtomicBoolean activated = new AtomicBoolean();
+
+  private KryoSerializer<WindowedValue> serializer;
+
+  private long lastWaterMark = 0L;
+
+  public UnboundedSourceSpout(
+      String description,
+      UnboundedSource source,
+      JStormPipelineOptions options,
+      TupleTag<?> outputTag) {
+    this.description = checkNotNull(description, "description");
+    this.source = checkNotNull(source, "source");
+    this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
+    this.outputTag = checkNotNull(outputTag, "outputTag");
+  }
+
+  @Override
+  public synchronized void close() {
+    try {
+      activated.set(false);
+      this.reader.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void activate() {
+    activated.set(true);
+
+  }
+
+  @Override
+  public void deactivate() {
+    activated.set(false);
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  @Override
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    try {
+      this.collector = collector;
+      this.pipelineOptions =
+          this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+      createSourceReader(null);
+
+      this.serializer = new KryoSerializer<>(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create unbounded reader.", e);
+    }
+  }
+
+  public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+    reader = this.source.createReader(this.pipelineOptions, checkpointMark);
+    hasNextRecord = this.reader.start();
+  }
+
+  @Override
+  public synchronized void nextTuple() {
+    if (!activated.get()) {
+      return;
+    }
+    try {
+      if (!hasNextRecord) {
+        hasNextRecord = reader.advance();
+      }
+
+      while (hasNextRecord && activated.get()) {
+        Object value = reader.getCurrent();
+        Instant timestamp = reader.getCurrentTimestamp();
+
+        WindowedValue wv =
+            WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+        LOG.debug("Source output: " + wv.getValue());
+        if (keyedEmit(outputTag.getId())) {
+          KV kv = (KV) wv.getValue();
+          // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+          byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
+          collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
+        } else {
+          byte[] immutableValue = serializer.serialize(wv);
+          collector.emit(outputTag.getId(), new Values(immutableValue));
+        }
+
+        // move to next record
+        hasNextRecord = reader.advance();
+      }
+
+      Instant waterMark = reader.getWatermark();
+      if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
+        lastWaterMark = waterMark.getMillis();
+        collector.flush();
+        collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
+        LOG.debug("Source output: WM-{}", waterMark.toDateTime());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Exception reading values from source.", e);
+    }
+  }
+
+  public UnboundedSource getUnboundedSource() {
+    return source;
+  }
+
+  public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
+    return reader;
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
new file mode 100644
index 0000000..54c9b94
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a Read.Unbounded into a Storm spout.
+ *
+ * @param <T>
+ */
+class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
+  public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+    TupleTag<?> tag = userGraphContext.getOutputTag();
+    PValue output = userGraphContext.getOutput();
+
+    UnboundedSourceSpout spout = new UnboundedSourceSpout(
+        description,
+        transform.getSource(), userGraphContext.getOptions(), tag);
+    context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java
new file mode 100644
index 0000000..822ed8a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * JStorm {@link Executor} for {@link View}.
+ */
+class ViewExecutor implements Executor {
+
+  private final String description;
+  private final TupleTag outputTag;
+  private ExecutorsBolt executorsBolt;
+
+  public ViewExecutor(String description, TupleTag outputTag) {
+    this.description = description;
+    this.outputTag = outputTag;
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.executorsBolt = context.getExecutorsBolt();
+  }
+
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    executorsBolt.processExecutorElem(outputTag, elem);
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
new file mode 100644
index 0000000..9ab5784
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+
+/**
+ * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
+ */
+class ViewTranslator
+    extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
+  @Override
+  public void translateNode(
+      CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description = describeTransform(
+        transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+    ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
+    context.addTransformExecutor(viewExecutor);
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}.
+   */
+  public static class ViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
+    public ViewAsMap(View.AsMap<K, V> transform) {
+    }
+
+    @Override
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // TODO: log warning as other runners.
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * View.AsMultimap View.AsMultimap}.
+   */
+  public static class ViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsMultimap(View.AsMultimap<K, V> transform) {
+    }
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // TODO: log warning as other runners.
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsList View.AsList}.
+   */
+  public static class ViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsList(View.AsList<T> transform) {
+    }
+
+    @Override
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsIterable View.AsIterable} for the
+   * JStorm runner in streaming mode.
+   */
+  public static class ViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsIterable(View.AsIterable<T> transform) {
+    }
+
+    @Override
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Specialized expansion for
+   * {@link View.AsSingleton View.AsSingleton} for the
+   * JStorm runner in streaming mode.
+   */
+  public static class ViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+    private View.AsSingleton<T> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+    public ViewAsSingleton(View.AsSingleton<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> expand(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one element "
+            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  /**
+   * Specialized expansion for
+   * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
+   * @param <InputT>
+   * @param <OutputT>
+     */
+  public static class CombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public CombineGloballyAsSingletonView(
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined,
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(Collections.singletonList(c.element()));
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   * For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateJStormPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateJStormPCollectionView<>(view);
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
new file mode 100644
index 0000000..8d60392
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+ * @param <T>
+ * @param <W>
+ */
+class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
+  private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
+
+  private final String description;
+  private WindowFn<T, W> windowFn;
+  private ExecutorsBolt executorsBolt;
+  private TupleTag outputTag;
+
+  class JStormAssignContext<InputT, W extends BoundedWindow>
+      extends WindowFn<InputT, W>.AssignContext {
+    private final WindowedValue<InputT> value;
+
+    JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+      fn.super();
+      checkArgument(
+          Iterables.size(value.getWindows()) == 1,
+          String.format(
+              "%s passed to window assignment must be in a single window, but it was in %s: %s",
+              WindowedValue.class.getSimpleName(),
+              Iterables.size(value.getWindows()),
+              value.getWindows()));
+      this.value = value;
+    }
+
+    @Override
+    public InputT element() {
+      return value.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return value.getTimestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(value.getWindows());
+    }
+  }
+
+  public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
+    this.description = description;
+    this.windowFn = windowFn;
+    this.outputTag = outputTag;
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.executorsBolt = context.getExecutorsBolt();
+  }
+
+  @Override
+  public void process(TupleTag tag, WindowedValue elem) {
+    Collection<W> windows = null;
+    try {
+      windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
+      for (W window : windows) {
+        executorsBolt.processExecutorElem(
+            outputTag,
+            WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to assign windows for elem=" + elem, e);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+
+  @Override
+  public String toString() {
+    return description;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..86cb638
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignTranslator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+/**
+ * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a
+ * JStorm {@link WindowAssignExecutor}.
+ * @param <T>
+ */
+class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+
+  @Override
+  public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+    context.getUserGraphContext().setWindowed();
+    WindowAssignExecutor executor = new WindowAssignExecutor(
+        description,
+        transform.getWindowFn(),
+        userGraphContext.getOutputTag());
+    context.addTransformExecutor(executor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java
new file mode 100644
index 0000000..f8f2f3f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the translation to JStorm topology.
+ */
+package org.apache.beam.runners.jstorm.translation;

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
deleted file mode 100644
index 3d7fab8..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-
-/**
- * Enable user to add output stream definitions by API, rather than hard-code.
- */
-public abstract class AbstractComponent implements IComponent {
-  private Map<String, Fields> streamToFields = new HashMap<>();
-  private Map<String, Boolean> keyStreams = new HashMap<>();
-  private int parallelismNum = 0;
-
-  public void addOutputField(String streamId) {
-    addOutputField(streamId, new Fields(CommonInstance.VALUE));
-  }
-
-  public void addOutputField(String streamId, Fields fields) {
-    streamToFields.put(streamId, fields);
-    keyStreams.put(streamId, false);
-  }
-
-  public void addKVOutputField(String streamId) {
-    streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
-    keyStreams.put(streamId, true);
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
-      declarer.declareStream(entry.getKey(), entry.getValue());
-    }
-  }
-
-  public boolean keyedEmit(String streamId) {
-    Boolean isKeyedStream = keyStreams.get(streamId);
-    return isKeyedStream == null ? false : isKeyedStream;
-  }
-
-  public int getParallelismNum() {
-    return parallelismNum;
-  }
-
-  public void setParallelismNum(int num) {
-    parallelismNum = num;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
deleted file mode 100644
index e07d890..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link DoFn}.
- * @param <InputT> input type
- * @param <OutputT> output type
- */
-public class DoFnExecutor<InputT, OutputT> implements Executor {
-  private static final long serialVersionUID = 5297603063991078668L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
-
-  /**
-   * Implements {@link OutputManager} in a DoFn executor.
-   */
-  public class DoFnExecutorOutputManager implements OutputManager, Serializable {
-    private static final long serialVersionUID = -661113364735206170L;
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      executorsBolt.processExecutorElem(tag, output);
-    }
-  }
-
-  protected transient DoFnRunner<InputT, OutputT> runner = null;
-  protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
-
-  protected final String stepName;
-
-  protected int internalDoFnExecutorId;
-
-  protected final String description;
-
-  protected final TupleTag<OutputT> mainTupleTag;
-  protected final List<TupleTag<?>> sideOutputTags;
-
-  protected SerializedPipelineOptions serializedOptions;
-  protected transient JStormPipelineOptions pipelineOptions;
-
-  protected DoFn<InputT, OutputT> doFn;
-  protected final Coder<WindowedValue<InputT>> inputCoder;
-  protected DoFnInvoker<InputT, OutputT> doFnInvoker;
-  protected OutputManager outputManager;
-  protected WindowingStrategy<?, ?> windowingStrategy;
-  protected final TupleTag<InputT> mainInputTag;
-  protected Collection<PCollectionView<?>> sideInputs;
-  protected SideInputHandler sideInputHandler;
-  protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
-
-  // Initialize during runtime
-  protected ExecutorContext executorContext;
-  protected ExecutorsBolt executorsBolt;
-  protected TimerInternals timerInternals;
-  protected transient StateInternals pushbackStateInternals;
-  protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
-  protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
-  protected transient IKvStoreManager kvStoreManager;
-  protected DefaultStepContext stepContext;
-  protected transient MetricClient metricClient;
-
-  public DoFnExecutor(
-      String stepName,
-      String description,
-      JStormPipelineOptions pipelineOptions,
-      DoFn<InputT, OutputT> doFn,
-      Coder<WindowedValue<InputT>> inputCoder,
-      WindowingStrategy<?, ?> windowingStrategy,
-      TupleTag<InputT> mainInputTag,
-      Collection<PCollectionView<?>> sideInputs,
-      Map<TupleTag, PCollectionView<?>> sideInputTagToView,
-      TupleTag<OutputT> mainTupleTag,
-      List<TupleTag<?>> sideOutputTags) {
-    this.stepName = checkNotNull(stepName, "stepName");
-    this.description = checkNotNull(description, "description");
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-    this.doFn = doFn;
-    this.inputCoder = inputCoder;
-    this.outputManager = new DoFnExecutorOutputManager();
-    this.windowingStrategy = windowingStrategy;
-    this.mainInputTag = mainInputTag;
-    this.sideInputs = sideInputs;
-    this.mainTupleTag = mainTupleTag;
-    this.sideOutputTags = sideOutputTags;
-    this.sideInputTagToView = sideInputTagToView;
-  }
-
-  protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
-    return new DoFnRunnerWithMetrics<>(
-        stepName,
-        DoFnRunners.simpleRunner(
-            this.pipelineOptions,
-            this.doFn,
-            this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
-            this.outputManager,
-            this.mainTupleTag,
-            this.sideOutputTags,
-            this.stepContext,
-            this.windowingStrategy),
-        MetricsReporter.create(metricClient));
-  }
-
-  protected void initService(ExecutorContext context) {
-    // TODO: what should be set for key in here?
-    timerInternals = new JStormTimerInternals(
-        null /* key */, this, context.getExecutorsBolt().timerService());
-    kvStoreManager = context.getKvStoreManager();
-    stepContext = new DefaultStepContext(timerInternals,
-        new JStormStateInternals(
-            null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-    metricClient = new MetricClient(executorContext.getTopologyContext());
-  }
-
-  @Override
-  public void init(ExecutorContext context) {
-    this.executorContext = context;
-    this.executorsBolt = context.getExecutorsBolt();
-    this.pipelineOptions =
-        this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
-    initService(context);
-
-    // Side inputs setup
-    if (sideInputs != null && !sideInputs.isEmpty()) {
-      pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
-      watermarkHoldTag =
-          StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
-      pushbackStateInternals = new JStormStateInternals(
-          null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
-      sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
-      runner = getDoFnRunner();
-      pushbackRunner =
-          SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
-    } else {
-      runner = getDoFnRunner();
-    }
-
-    // Process user's setup
-    doFnInvoker = DoFnInvokers.invokerFor(doFn);
-    doFnInvoker.invokeSetup();
-  }
-
-  @Override
-  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-    LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
-        tag, mainInputTag, sideInputs, elem.getValue()));
-    if (mainInputTag.equals(tag)) {
-      processMainInput(elem);
-    } else {
-      processSideInput(tag, elem);
-    }
-  }
-
-  protected <T> void processMainInput(WindowedValue<T> elem) {
-    if (sideInputs.isEmpty()) {
-      runner.processElement((WindowedValue<InputT>) elem);
-    } else {
-      Iterable<WindowedValue<InputT>> justPushedBack =
-          pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
-      BagState<WindowedValue<InputT>> pushedBack =
-          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-      Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
-        if (pushedBackValue.getTimestamp().isBefore(min)) {
-          min = pushedBackValue.getTimestamp();
-        }
-        min = earlier(min, pushedBackValue.getTimestamp());
-        pushedBack.add(pushedBackValue);
-      }
-      pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
-    }
-  }
-
-  protected void processSideInput(TupleTag tag, WindowedValue elem) {
-    LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
-
-    PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
-    sideInputHandler.addSideInputValue(sideInputView, elem);
-
-    BagState<WindowedValue<InputT>> pushedBack =
-        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-    List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-
-    Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
-    if (pushedBackInputs != null) {
-      for (WindowedValue<InputT> input : pushedBackInputs) {
-
-        Iterable<WindowedValue<InputT>> justPushedBack =
-            pushbackRunner.processElementInReadyWindows(input);
-        Iterables.addAll(newPushedBack, justPushedBack);
-      }
-    }
-    pushedBack.clear();
-
-    Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
-      min = earlier(min, pushedBackValue.getTimestamp());
-      pushedBack.add(pushedBackValue);
-    }
-
-    WatermarkHoldState watermarkHold =
-        pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
-    // TODO: clear-then-add is not thread-safe.
-    watermarkHold.clear();
-    watermarkHold.add(min);
-  }
-
-  /**
-   * Process all pushed back elements when receiving watermark with max timestamp.
-   */
-  public void processAllPushBackElements() {
-    if (sideInputs != null && !sideInputs.isEmpty()) {
-      BagState<WindowedValue<InputT>> pushedBackElements =
-          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-      if (pushedBackElements != null) {
-        for (WindowedValue<InputT> elem : pushedBackElements.read()) {
-          LOG.info("Process pushback elem={}", elem);
-          runner.processElement(elem);
-        }
-        pushedBackElements.clear();
-      }
-
-      WatermarkHoldState watermarkHold =
-          pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
-      watermarkHold.clear();
-      watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    }
-  }
-
-  public void onTimer(Object key, TimerInternals.TimerData timerData) {
-    StateNamespace namespace = timerData.getNamespace();
-    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-    BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-    if (pushbackRunner != null) {
-      pushbackRunner.onTimer(
-          timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
-    } else {
-      runner.onTimer(
-          timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
-    }
-  }
-
-  @Override
-  public void cleanup() {
-    doFnInvoker.invokeTeardown();
-  }
-
-  @Override
-  public String toString() {
-    return description;
-  }
-
-  private Instant earlier(Instant left, Instant right) {
-    return left.isBefore(right) ? left : right;
-  }
-
-  public void startBundle() {
-    if (pushbackRunner != null) {
-      pushbackRunner.startBundle();
-    } else {
-      runner.startBundle();
-    }
-  }
-
-  public void finishBundle() {
-    if (pushbackRunner != null) {
-      pushbackRunner.finishBundle();
-    } else {
-      runner.finishBundle();
-    }
-  }
-
-  public void setInternalDoFnExecutorId(int id) {
-    this.internalDoFnExecutorId = id;
-  }
-
-  public int getInternalDoFnExecutorId() {
-    return internalDoFnExecutorId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
deleted file mode 100644
index 1610a8a..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Closeable;
-import java.io.IOException;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * DoFnRunner decorator which registers {@link MetricsContainer}.
- */
-public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
-  private final String stepName;
-  private final DoFnRunner<InputT, OutputT> delegate;
-  private final MetricsReporter metricsReporter;
-
-  DoFnRunnerWithMetrics(
-      String stepName,
-      DoFnRunner<InputT, OutputT> delegate,
-      MetricsReporter metricsReporter) {
-    this.stepName = checkNotNull(stepName, "stepName");
-    this.delegate = checkNotNull(delegate, "delegate");
-    this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
-  }
-
-  @Override
-  public void startBundle() {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.startBundle();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.processElement(elem);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.onTimer(timerId, window, timestamp, timeDomain);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void finishBundle() {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.finishBundle();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    metricsReporter.updateMetrics();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
deleted file mode 100644
index 0ec4fdd..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * An executor is a basic executable unit in a JStorm task.
- */
-public interface Executor extends Serializable {
-  /**
-   * Initialization during runtime.
-   */
-  void init(ExecutorContext context);
-
-  <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
-
-  void cleanup();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
deleted file mode 100644
index 55ca171..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.task.TopologyContext;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.google.auto.value.AutoValue;
-
-/**
- * Context of a executors bolt when runtime.
- */
-@AutoValue
-public abstract class ExecutorContext {
-  public static ExecutorContext of(
-      TopologyContext topologyContext,
-      ExecutorsBolt bolt,
-      IKvStoreManager kvStoreManager) {
-    return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
-  }
-
-  public abstract TopologyContext getTopologyContext();
-
-  public abstract ExecutorsBolt getExecutorsBolt();
-
-  public abstract IKvStoreManager getKvStoreManager();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
deleted file mode 100644
index 0366c13..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBatchBolt;
-import backtype.storm.tuple.ITupleExt;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
- */
-public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
-  private static final long serialVersionUID = -7751043327801735211L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
-
-  protected ExecutorContext executorContext;
-
-  protected TimerService timerService;
-
-  // map from input tag to executor inside bolt
-  protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
-  // set of all output tags that will be emit outside bolt
-  protected final Set<TupleTag> outputTags = Sets.newHashSet();
-  protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
-  protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
-  protected int internalDoFnExecutorId = 1;
-  protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
-
-  protected OutputCollector collector;
-
-  protected boolean isStatefulBolt = false;
-
-  protected KryoSerializer<WindowedValue> serializer;
-
-  public ExecutorsBolt() {
-
-  }
-
-  public void setStatefulBolt(boolean isStateful) {
-    isStatefulBolt = isStateful;
-  }
-
-  public void addExecutor(TupleTag inputTag, Executor executor) {
-    inputTagToExecutor.put(
-        checkNotNull(inputTag, "inputTag"),
-        checkNotNull(executor, "executor"));
-  }
-
-  public Map<TupleTag, Executor> getExecutors() {
-    return inputTagToExecutor;
-  }
-
-  public void registerExecutor(Executor executor) {
-    if (executor instanceof DoFnExecutor) {
-      DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
-      idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
-      doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
-      internalDoFnExecutorId++;
-    }
-  }
-
-  public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
-    return idToDoFnExecutor;
-  }
-
-  public void addOutputTags(TupleTag tag) {
-    outputTags.add(tag);
-  }
-
-  public void addExternalOutputTag(TupleTag<?> tag) {
-    externalOutputTags.add(tag);
-  }
-
-  public Set<TupleTag> getOutputTags() {
-    return outputTags;
-  }
-
-  public ExecutorContext getExecutorContext() {
-    return executorContext;
-  }
-
-  @Override
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    LOG.info("Start to prepare for task-{}", context.getThisTaskId());
-    try {
-      this.collector = collector;
-
-      // init kv store manager
-      String storeName = String.format("task-%d", context.getThisTaskId());
-      String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
-      IKvStoreManager kvStoreManager = isStatefulBolt
-              ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(
-              context, storeName, stateStorePath, isStatefulBolt)
-              : KvStoreManagerFactory.getKvStoreManager(
-              stormConf, storeName, stateStorePath, isStatefulBolt);
-      this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
-
-      // init time service
-      timerService = initTimerService();
-
-      // init all internal executors
-      for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
-        executor.init(executorContext);
-        if (executor instanceof DoFnExecutor) {
-          doFnExecutors.add((DoFnExecutor) executor);
-        }
-      }
-
-      this.serializer = new KryoSerializer<WindowedValue>(stormConf);
-
-      LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
-      LOG.info("inputTagToExecutor={}", inputTagToExecutor);
-      LOG.info("outputTags={}", outputTags);
-      LOG.info("externalOutputTags={}", externalOutputTags);
-      LOG.info("doFnExecutors={}", doFnExecutors);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to prepare executors bolt", e);
-    }
-  }
-
-  public TimerService initTimerService() {
-    TopologyContext context = executorContext.getTopologyContext();
-    List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
-        .transformAndConcat(
-            new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
-              @Override
-              public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
-                if (Common.isSystemComponent(value.getKey())) {
-                  return Collections.EMPTY_LIST;
-                } else {
-                  return value.getValue();
-                }
-              }
-            })
-        .toList();
-    TimerService ret = new TimerServiceImpl(executorContext);
-    ret.init(tasks);
-    return ret;
-  }
-
-  @Override
-  public void execute(Tuple input) {
-    // process a batch
-    String streamId = input.getSourceStreamId();
-    ITupleExt tuple = (ITupleExt) input;
-    Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
-    if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
-      while (valueIterator.hasNext()) {
-        processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
-      }
-    } else {
-      doFnStartBundle();
-      while (valueIterator.hasNext()) {
-        processElement(valueIterator.next(), streamId);
-      }
-      doFnFinishBundle();
-    }
-  }
-
-  private void processWatermark(long watermarkTs, int sourceTask) {
-    long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
-    LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
-        (new Instant(watermarkTs)).toDateTime(),
-        sourceTask,
-        (new Instant(newWaterMark)).toDateTime());
-    if (newWaterMark != 0) {
-      // Some buffer windows are going to be triggered.
-      doFnStartBundle();
-      timerService.fireTimers(newWaterMark);
-
-      // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
-      // to be received from now on. So we are going to process all push back data.
-      if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-        for (DoFnExecutor doFnExecutor : doFnExecutors) {
-          doFnExecutor.processAllPushBackElements();
-        }
-      }
-
-      doFnFinishBundle();
-    }
-
-    long currentWaterMark = timerService.currentOutputWatermark();
-    if (!externalOutputTags.isEmpty()) {
-      collector.flush();
-      collector.emit(
-          CommonInstance.BEAM_WATERMARK_STREAM_ID,
-          new Values(currentWaterMark));
-      LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
-    }
-  }
-
-  private void processElement(List<Object> values, String streamId) {
-    TupleTag inputTag = new TupleTag(streamId);
-    WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
-    processExecutorElem(inputTag, windowedValue);
-  }
-
-  public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
-    LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
-    if (elem != null) {
-      Executor executor = inputTagToExecutor.get(inputTag);
-      if (executor != null) {
-        executor.process(inputTag, elem);
-      }
-      if (externalOutputTags.contains(inputTag)) {
-        emitOutsideBolt(inputTag, elem);
-      }
-    } else {
-      LOG.info("Received null elem for tag={}", inputTag);
-    }
-  }
-
-  @Override
-  public void cleanup() {
-    for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
-      executor.cleanup();
-    }
-    executorContext.getKvStoreManager().close();
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  public TimerService timerService() {
-    return timerService;
-  }
-
-  public void setTimerService(TimerService service) {
-    timerService = service;
-  }
-
-  private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
-    WindowedValue wv = null;
-    if (values.size() > 1) {
-      Object key = values.get(0);
-      WindowedValue value = serializer.deserialize((byte[]) values.get(1));
-      wv = value.withValue(KV.of(key, value.getValue()));
-    } else {
-      wv = serializer.deserialize((byte[]) values.get(0));
-    }
-    return wv;
-  }
-
-  protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
-    LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
-    if (keyedEmit(outputTag.getId())) {
-      KV kv = (KV) outputValue.getValue();
-      byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
-      // Convert WindowedValue<KV> to <K, WindowedValue<V>>
-      if (kv.getKey() == null) {
-        // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
-        collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
-      } else {
-        collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
-      }
-    } else {
-      byte[] immutableOutputValue = serializer.serialize(outputValue);
-      collector.emit(outputTag.getId(), new Values(immutableOutputValue));
-    }
-  }
-
-  private void doFnStartBundle() {
-    for (DoFnExecutor doFnExecutor : doFnExecutors) {
-      doFnExecutor.startBundle();
-    }
-  }
-
-  private void doFnFinishBundle() {
-    for (DoFnExecutor doFnExecutor : doFnExecutors) {
-      doFnExecutor.finishBundle();
-    }
-  }
-
-  @Override
-  public String toString() {
-    // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
-    List<String> ret = new ArrayList<>();
-        /*ret.add("inputTags");
-        for (TupleTag inputTag : inputTagToExecutor.keySet()) {
-            ret.add(inputTag.getId());
-        }*/
-    ret.add("internalExecutors");
-    for (Executor executor : inputTagToExecutor.values()) {
-      ret.add(executor.toString());
-    }
-    ret.add("externalOutputTags");
-    for (TupleTag output : externalOutputTags) {
-      ret.add(output.getId());
-    }
-    return Joiner.on('\n').join(ret).concat("\n");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
deleted file mode 100644
index caf1e47..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
- * @param <InputT>
- */
-public class FlattenExecutor<InputT> implements Executor {
-
-  private final String description;
-  private TupleTag mainOutputTag;
-  private ExecutorContext context;
-  private ExecutorsBolt executorsBolt;
-
-  public FlattenExecutor(String description, TupleTag mainTupleTag) {
-    this.description = checkNotNull(description, "description");
-    this.mainOutputTag = mainTupleTag;
-  }
-
-  @Override
-  public void init(ExecutorContext context) {
-    this.context = context;
-    this.executorsBolt = context.getExecutorsBolt();
-  }
-
-  @Override
-  public void process(TupleTag tag, WindowedValue elem) {
-    executorsBolt.processExecutorElem(mainOutputTag, elem);
-  }
-
-  @Override
-  public void cleanup() {
-  }
-
-  @Override
-  public String toString() {
-    return description;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
deleted file mode 100644
index 0dd1af9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
- * @param <K>
- * @param <V>
- */
-public class GroupByWindowExecutor<K, V>
-    extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
-  private static final long serialVersionUID = -7563050475488610553L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
-
-  private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      executorsBolt.processExecutorElem(tag, output);
-    }
-  }
-
-  private KvCoder<K, V> inputKvCoder;
-  private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
-
-  public GroupByWindowExecutor(
-      String stepName,
-      String description,
-      TranslationContext context,
-      JStormPipelineOptions pipelineOptions,
-      WindowingStrategy<?, ?> windowingStrategy,
-      TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
-    // The doFn will be created when runtime. Just pass "null" here
-    super(
-        stepName,
-        description,
-        pipelineOptions,
-        null,
-        null,
-        windowingStrategy,
-        null,
-        null,
-        null,
-        mainTupleTag,
-        sideOutputTags);
-
-    this.outputManager = new GroupByWindowOutputManager();
-    UserGraphContext userGraphContext = context.getUserGraphContext();
-    PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
-    this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
-  }
-
-  private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
-    final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
-      @Override
-      public StateInternals stateInternalsForKey(K key) {
-        return new JStormStateInternals<K>(
-            key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
-      }
-    };
-    TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
-      @Override
-      public TimerInternals timerInternalsForKey(K key) {
-        return new JStormTimerInternals<>(
-            key,
-            GroupByWindowExecutor.this,
-            executorContext.getExecutorsBolt().timerService());
-      }
-    };
-
-    reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
-    DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
-        GroupAlsoByWindowViaWindowSetNewDoFn.create(
-            windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
-            (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
-    return doFn;
-  }
-
-  @Override
-  protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
-    doFn = getGroupByWindowDoFn();
-
-    DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner(
-        this.pipelineOptions,
-        this.doFn,
-        NullSideInputReader.empty(),
-        this.outputManager,
-        this.mainTupleTag,
-        this.sideOutputTags,
-        this.stepContext,
-        this.windowingStrategy);
-
-    DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner =
-        DoFnRunners.lateDataDroppingRunner(
-            simpleRunner,
-            this.stepContext,
-            this.windowingStrategy);
-    return new DoFnRunnerWithMetrics<>(
-        stepName, doFnRunner, MetricsReporter.create(metricClient));
-  }
-
-  @Override
-  public void process(TupleTag tag, WindowedValue elem) {
-    /**
-     *  For GroupByKey, KV type elem is received. We need to convert the KV elem
-     *  into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
-     */
-    KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
-    runner.processElement(elem.withValue(keyedWorkItem));
-  }
-
-  @Override
-  public void onTimer(Object key, TimerInternals.TimerData timerData) {
-    StateNamespace namespace = timerData.getNamespace();
-    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-
-    runner.processElement(
-        WindowedValue.valueInGlobalWindow(
-            KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
-  }
-
-  @Override
-  public String toString() {
-    return super.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
deleted file mode 100644
index a022440..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
-
-import com.alibaba.jstorm.common.metric.AsmCounter;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsFilter;
-
-/**
- * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
- */
-public class MetricsReporter {
-
-  private static final String METRIC_KEY_SEPARATOR = "__";
-  private static final String COUNTER_PREFIX = "__counter";
-
-  private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
-  private final Map<String, Long> reportedCounters = Maps.newHashMap();
-  private final MetricClient metricClient;
-
-  public static MetricsReporter create(MetricClient metricClient) {
-    return new MetricsReporter(metricClient);
-  }
-
-  private MetricsReporter(MetricClient metricClient) {
-    this.metricClient = checkNotNull(metricClient, "metricClient");
-  }
-
-  public MetricsContainer getMetricsContainer(String stepName) {
-    return metricsContainers.getContainer(stepName);
-  }
-
-  public void updateMetrics() {
-    MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
-    MetricQueryResults metricQueryResults =
-        metricResults.queryMetrics(MetricsFilter.builder().build());
-    updateCounters(metricQueryResults.counters());
-  }
-
-  private void updateCounters(Iterable<MetricResult<Long>> counters) {
-    System.out.print("updateCounters");
-    for (MetricResult<Long> metricResult : counters) {
-      String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
-      System.out.print("metricName: " + metricName);
-      Long updateValue = metricResult.attempted();
-      Long oldValue = reportedCounters.get(metricName);
-
-      if (oldValue == null || oldValue < updateValue) {
-        AsmCounter counter = metricClient.registerCounter(metricName);
-        Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
-        counter.update(incValue);
-      }
-    }
-  }
-
-  private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
-    return prefix
-        + METRIC_KEY_SEPARATOR + metricResult.step()
-        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
-        + METRIC_KEY_SEPARATOR + metricResult.name().name();
-  }
-}