You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:24 UTC

[24/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
new file mode 100644
index 0000000..fce870f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateBinder;
+import org.apache.beam.sdk.state.StateContext;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link StateInternals}.
+ */
+class JStormStateInternals<K> implements StateInternals {
+
+  private static final String STATE_INFO = "state-info:";
+
+  @Nullable
+  private final K key;
+  private final IKvStoreManager kvStoreManager;
+  private final TimerService timerService;
+  private final int executorId;
+
+  public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
+                              TimerService timerService, int executorId) {
+    this.key = key;
+    this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
+    this.timerService = checkNotNull(timerService, "timerService");
+    this.executorId = executorId;
+  }
+
+  @Nullable
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  @Override
+  public <T extends State> T state(
+      StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
+    // throw new UnsupportedOperationException("StateContext is not supported.");
+    /**
+     * TODOļ¼š
+     * Same implementation as state() which is without StateContext. This might be updated after
+     * we figure out if we really need StateContext for JStorm state internals.
+     */
+    return state(namespace, address);
+  }
+
+  @Override
+  public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+    return address.getSpec().bind(address.getId(), new StateBinder() {
+      @Override
+      public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
+        try {
+          return new JStormValueState<>(
+              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
+        try {
+          return new JStormBagState(
+              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+
+      @Override
+      public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+          String id,
+          StateSpec<MapState<KeyT, ValueT>> spec,
+          Coder<KeyT> mapKeyCoder,
+          Coder<ValueT> mapValueCoder) {
+        try {
+          return new JStormMapState<>(
+              getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT> CombiningState bindCombining(
+          String id,
+          StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+          Coder<AccumT> accumCoder,
+          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+        try {
+          BagState<AccumT> accumBagState = new JStormBagState(
+              getKey(), namespace,
+              kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
+              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+          return new JStormCombiningState<>(accumBagState, combineFn);
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+
+
+      @Override
+      public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
+      bindCombiningWithContext(
+          String id,
+          StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
+          CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public WatermarkHoldState bindWatermark(
+          String id,
+          StateSpec<WatermarkHoldState> spec,
+          final TimestampCombiner timestampCombiner) {
+        try {
+          BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
+              getKey(), namespace,
+              kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
+              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+
+          Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
+              new BinaryCombineFn<Instant>() {
+                @Override
+                public Instant apply(Instant left, Instant right) {
+                  return timestampCombiner.combine(left, right);
+                }
+              };
+          return new JStormWatermarkHoldState(
+              namespace,
+              new JStormCombiningState<>(
+                  accumBagState,
+                  outputTimeCombineFn),
+              timestampCombiner,
+              timerService);
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
+      }
+    });
+  }
+
+  private String getStoreId(String stateId) {
+    return String.format("%s-%s", stateId, executorId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
new file mode 100644
index 0000000..4c96541
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link TimerInternals}.
+ */
+class JStormTimerInternals<K> implements TimerInternals {
+
+  private final K key;
+  private final DoFnExecutor<?, ?> doFnExecutor;
+  private final TimerService timerService;
+
+  public JStormTimerInternals(
+      @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
+    this.key = key;
+    this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
+    this.timerService = checkNotNull(timerService, "timerService");
+  }
+
+  @Override
+  public void setTimer(
+      StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+    setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+  }
+
+  @Override
+  @Deprecated
+  public void setTimer(TimerData timerData) {
+    timerService.setTimer(key, timerData, doFnExecutor);
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+    throw new UnsupportedOperationException(
+        "Canceling of a timer is not yet supported.");
+  }
+
+  @Override
+  @Deprecated
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException(
+        "Canceling of a timer is not yet supported.");
+  }
+
+  @Override
+  @Deprecated
+  public void deleteTimer(TimerData timerData) {
+    throw new UnsupportedOperationException(
+        "Canceling of a timer is not yet supported.");
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return Instant.now();
+  }
+
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return null;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return new Instant(timerService.currentInputWatermark());
+  }
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    return new Instant(timerService.currentOutputWatermark());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
new file mode 100644
index 0000000..5d79d21
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormValueState.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.ValueState;
+
+/**
+ * JStorm implementation of {@link ValueState}.
+ */
+class JStormValueState<K, T> implements ValueState<T> {
+
+  @Nullable
+  private final K key;
+  private final StateNamespace namespace;
+  private final IKvStore<ComposedKey, T> kvState;
+
+  JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+    this.key = key;
+    this.namespace = namespace;
+    this.kvState = kvState;
+  }
+
+  @Override
+  public void write(T t) {
+    try {
+      kvState.put(getComposedKey(), t);
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
+    }
+  }
+
+  @Override
+  public T read() {
+    try {
+      return kvState.get(getComposedKey());
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Failed to read key: %s, namespace: %s.", key, namespace));
+    }
+  }
+
+  @Override
+  public ValueState<T> readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
+
+  @Override
+  public void clear() {
+    try {
+      kvState.remove(getComposedKey());
+    } catch (IOException e) {
+      throw new RuntimeException(String.format(
+          "Failed to clear key: %s, namespace: %s.", key, namespace));
+    }
+  }
+
+  private ComposedKey getComposedKey() {
+    return ComposedKey.of(key, namespace);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
new file mode 100644
index 0000000..7e1c28f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormWatermarkHoldState.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.GroupingState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link WatermarkHoldState}.
+ */
+class JStormWatermarkHoldState implements WatermarkHoldState {
+
+  private final StateNamespace namespace;
+  private final GroupingState<Instant, Instant> watermarkHoldsState;
+  private final TimestampCombiner timestampCombiner;
+  private final TimerService timerService;
+
+  JStormWatermarkHoldState(
+      StateNamespace namespace,
+      GroupingState<Instant, Instant> watermarkHoldsState,
+      TimestampCombiner timestampCombiner,
+      TimerService timerService) {
+    this.namespace = checkNotNull(namespace, "namespace");
+    this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+    this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+    this.timerService = checkNotNull(timerService, "timerService");
+  }
+
+  @Override
+  public TimestampCombiner getTimestampCombiner() {
+    return timestampCombiner;
+  }
+
+  @Override
+  public void add(Instant instant) {
+    timerService.addWatermarkHold(namespace.stringKey(), instant);
+    watermarkHoldsState.add(instant);
+  }
+
+  @Override
+  public ReadableState<Boolean> isEmpty() {
+    return watermarkHoldsState.isEmpty();
+  }
+
+  @Override
+  public Instant read() {
+    return watermarkHoldsState.read();
+  }
+
+  @Override
+  public WatermarkHoldState readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
+
+  @Override
+  public void clear() {
+    timerService.clearWatermarkHold(namespace.stringKey());
+    watermarkHoldsState.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
new file mode 100644
index 0000000..82d8bdc
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+import com.alibaba.jstorm.common.metric.AsmCounter;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
+ */
+class MetricsReporter {
+
+  private static final String METRIC_KEY_SEPARATOR = "__";
+  private static final String COUNTER_PREFIX = "__counter";
+
+  private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
+  private final Map<String, Long> reportedCounters = Maps.newHashMap();
+  private final MetricClient metricClient;
+
+  public static MetricsReporter create(MetricClient metricClient) {
+    return new MetricsReporter(metricClient);
+  }
+
+  private MetricsReporter(MetricClient metricClient) {
+    this.metricClient = checkNotNull(metricClient, "metricClient");
+  }
+
+  public MetricsContainer getMetricsContainer(String stepName) {
+    return metricsContainers.getContainer(stepName);
+  }
+
+  public void updateMetrics() {
+    MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
+    MetricQueryResults metricQueryResults =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+    updateCounters(metricQueryResults.counters());
+  }
+
+  private void updateCounters(Iterable<MetricResult<Long>> counters) {
+    System.out.print("updateCounters");
+    for (MetricResult<Long> metricResult : counters) {
+      String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
+      System.out.print("metricName: " + metricName);
+      Long updateValue = metricResult.attempted();
+      Long oldValue = reportedCounters.get(metricName);
+
+      if (oldValue == null || oldValue < updateValue) {
+        AsmCounter counter = metricClient.registerCounter(metricName);
+        Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
+        counter.update(incValue);
+      }
+    }
+  }
+
+  private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
+    return prefix
+        + METRIC_KEY_SEPARATOR + metricResult.step()
+        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+        + METRIC_KEY_SEPARATOR + metricResult.name().name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
new file mode 100644
index 0000000..49b0f85
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link DoFn} with multi-output.
+ * @param <InputT>
+ * @param <OutputT>
+ */
+class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
+  private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
+
+  /**
+   * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
+   * tag is used in downstream consumer. So before output, we need to map this "local" tag to
+   * "external" tag. See PCollectionTuple for details.
+   */
+  public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      if (localTupleTagMap.containsKey(tag)) {
+        executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
+      } else {
+        executorsBolt.processExecutorElem(tag, output);
+      }
+    }
+  }
+
+  protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
+
+  public MultiOutputDoFnExecutor(
+      String stepName,
+      String description,
+      JStormPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      Coder<WindowedValue<InputT>> inputCoder,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<InputT> mainInputTag,
+      Collection<PCollectionView<?>> sideInputs,
+      Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+      TupleTag<OutputT> mainTupleTag,
+      List<TupleTag<?>> sideOutputTags,
+      Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+  ) {
+    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+    this.localTupleTagMap = localTupleTagMap;
+    this.outputManager = new MultiOutputDoFnExecutorOutputManager();
+    LOG.info("localTupleTagMap: {}", localTupleTagMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
new file mode 100644
index 0000000..a3ffc30
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
+ * @param <OutputT>
+ */
+class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
+
+  public MultiStatefulDoFnExecutor(
+      String stepName, String description,
+      JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
+      Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
+      Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
+      List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+  }
+
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    if (mainInputTag.equals(tag)) {
+      WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+      stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+          executorContext.getExecutorsBolt().timerService()));
+      stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+          kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+      processMainInput(elem);
+    } else {
+      processSideInput(tag, elem);
+    }
+  }
+
+  @Override
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    stepContext.setStateInternals(new JStormStateInternals<>(key,
+        kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+    super.onTimer(key, timerData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..7daa1cb
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
+ */
+class ParDoBoundMultiTranslator<InputT, OutputT>
+    extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
+
+  @Override
+  public void translateNode(
+      ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+    final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
+    PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+    Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
+    Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
+    for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
+      Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
+      localToExternalTupleTagMap.put(entry.getKey(), itr.next());
+    }
+
+    TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+    List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
+    sideOutputTags.remove(mainOutputTag);
+
+    Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
+    String description = describeTransform(
+        transform,
+        allInputs,
+        allOutputs);
+
+    ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
+
+    DoFnExecutor executor;
+    DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    if (signature.stateDeclarations().size() > 0
+        || signature.timerDeclarations().size() > 0) {
+      executor = new MultiStatefulDoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          (DoFn<KV, OutputT>) transform.getFn(),
+          (Coder) WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          (TupleTag<KV>) inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags,
+          localToExternalTupleTagMap);
+    } else {
+      executor = new MultiOutputDoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          transform.getFn(),
+          WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags,
+          localToExternalTupleTagMap);
+    }
+
+    context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
new file mode 100644
index 0000000..6feb7f8
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
+ */
+class ParDoBoundTranslator<InputT, OutputT>
+    extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+
+  @Override
+  public void translateNode(
+      ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
+    final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    final TupleTag<?> inputTag = userGraphContext.getInputTag();
+    PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+    TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+    List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
+
+    Map<TupleTag<?>, PValue> allInputs =
+        avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
+    String description = describeTransform(
+        transform,
+        allInputs,
+        userGraphContext.getOutputs());
+
+    ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+    for (PCollectionView pCollectionView : transform.getSideInputs()) {
+      sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+    }
+
+    DoFnExecutor executor;
+    DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    if (signature.stateDeclarations().size() > 0
+        || signature.timerDeclarations().size() > 0) {
+      executor = new StatefulDoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          (DoFn<KV, OutputT>) transform.getFn(),
+          (Coder) WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          (TupleTag<KV>) inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags);
+    } else {
+      executor = new DoFnExecutor<>(
+          userGraphContext.getStepName(),
+          description,
+          userGraphContext.getOptions(),
+          transform.getFn(),
+          WindowedValue.getFullCoder(
+              input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+          input.getWindowingStrategy(),
+          (TupleTag<InputT>) inputTag,
+          transform.getSideInputs(),
+          sideInputTagToView.build(),
+          mainOutputTag,
+          sideOutputTags);
+    }
+
+    context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java
new file mode 100644
index 0000000..4f469f3
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/RunnerUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Utils for JStorm runner.
+ */
+class RunnerUtils {
+  /**
+   * Convert {@link WindowedValue} into {@link KeyedWorkItem}.
+   * @param elem
+   * @return
+   */
+  public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
+    WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
+    SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
+        kvElem.getValue().getKey(),
+        kvElem.withValue(kvElem.getValue().getValue()));
+    return workItem;
+  }
+
+  public static boolean isGroupByKeyExecutor(Executor executor) {
+    if (executor instanceof GroupByWindowExecutor) {
+      return true;
+    } else if (executor instanceof StatefulDoFnExecutor
+            || executor instanceof MultiStatefulDoFnExecutor) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java
new file mode 100644
index 0000000..14d2972
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SerializedPipelineOptions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+  private final byte[] serializedOptions;
+
+  /**
+   * Lazily initialized copy of deserialized options.
+   */
+  private transient PipelineOptions pipelineOptions;
+
+  public SerializedPipelineOptions(PipelineOptions options) {
+    checkNotNull(options, "PipelineOptions must not be null.");
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      new ObjectMapper().writeValue(baos, options);
+      this.serializedOptions = baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+    }
+
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    if (pipelineOptions == null) {
+      try {
+        pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+      }
+    }
+
+    return pipelineOptions;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java
new file mode 100644
index 0000000..b321c76
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/SingletonKeyedWorkItem.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collections;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed word item.
+ * @param <K>
+ * @param <ElemT>
+ */
+class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+
+  final K key;
+  final WindowedValue<ElemT> value;
+
+  private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
+    return new SingletonKeyedWorkItem<K, ElemT>(key, value);
+  }
+
+  @Override
+  public K key() {
+    return key;
+  }
+
+  public WindowedValue<ElemT> value() {
+    return value;
+  }
+
+  @Override
+  public Iterable<TimerInternals.TimerData> timersIterable() {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Iterable<WindowedValue<ElemT>> elementsIterable() {
+    return Collections.singletonList(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
new file mode 100644
index 0000000..911f259
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn}.
+ * @param <OutputT>
+ */
+class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
+  public StatefulDoFnExecutor(
+      String stepName, String description, JStormPipelineOptions pipelineOptions,
+      DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
+      WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
+      Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
+          sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
+        mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+  }
+
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    if (mainInputTag.equals(tag)) {
+      WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+      stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+          executorContext.getExecutorsBolt().timerService()));
+      stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+          kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+      processMainInput(elem);
+    } else {
+      processSideInput(tag, elem);
+    }
+  }
+
+  @Override
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    stepContext.setStateInternals(new JStormStateInternals<>(key,
+        kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+    super.onTimer(key, timerData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java
new file mode 100644
index 0000000..30ff18c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Stream.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Class that defines the stream connection between upstream and downstream components.
+ */
+@AutoValue
+public abstract class Stream {
+
+  public abstract Producer getProducer();
+
+  public abstract Consumer getConsumer();
+
+  public static Stream of(Producer producer, Consumer consumer) {
+    return new AutoValue_Stream(
+        producer, consumer);
+  }
+
+  /**
+   * JStorm producer.
+   */
+  @AutoValue
+  public abstract static class Producer {
+    public abstract String getComponentId();
+
+    public abstract String getStreamId();
+
+    public abstract String getStreamName();
+
+    public static Producer of(String componentId, String streamId, String streamName) {
+      return new AutoValue_Stream_Producer(
+          componentId, streamId, streamName);
+    }
+  }
+
+  /**
+   * JStorm consumer.
+   */
+  @AutoValue
+  public abstract static class Consumer {
+    public abstract String getComponentId();
+
+    public abstract Grouping getGrouping();
+
+    public static Consumer of(String componentId, Grouping grouping) {
+      return new AutoValue_Stream_Consumer(
+          componentId, grouping);
+    }
+  }
+
+  /**
+   * JStorm grouping, which define how to transfer message between two nodes.
+   */
+  @AutoValue
+  public abstract static class Grouping {
+    public abstract Type getType();
+
+    @Nullable
+    public abstract List<String> getFields();
+
+    public static Grouping of(Type type) {
+      checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
+      return new AutoValue_Stream_Grouping(
+          type, null /* fields */);
+    }
+
+    public static Grouping byFields(List<String> fields) {
+      checkNotNull(fields, "fields");
+      checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
+      return new AutoValue_Stream_Grouping(
+          Type.FIELDS, fields);
+    }
+
+    /**
+     * Types of stream groupings Storm allows.
+     */
+    public enum Type {
+      ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
new file mode 100644
index 0000000..29345aa
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.runners.core.TimerInternals;
+import org.joda.time.Instant;
+
+/**
+ * Interface that tracks input watermarks and manages timers in each bolt.
+ */
+interface TimerService extends Serializable {
+
+  void init(List<Integer> upStreamTasks);
+
+  /**
+   *
+   * @param task
+   * @param inputWatermark
+   * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
+   */
+  long updateInputWatermark(Integer task, long inputWatermark);
+
+  long currentInputWatermark();
+
+  long currentOutputWatermark();
+
+  void clearWatermarkHold(String namespace);
+
+  void addWatermarkHold(String namespace, Instant watermarkHold);
+
+  void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
+
+  void fireTimers(long newWatermark);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
new file mode 100644
index 0000000..c2600e5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.alibaba.jstorm.utils.Pair;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+
+/**
+ * Default implementation of {@link TimerService}.
+ */
+class TimerServiceImpl implements TimerService {
+  private transient ExecutorContext executorContext;
+  private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+
+  private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
+      new ConcurrentHashMap<>();
+  private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
+  private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
+  private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
+  private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+      new PriorityQueue<>();
+  private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
+      timerDataToKeyedExecutors = Maps.newHashMap();
+
+  private boolean initialized = false;
+
+  public TimerServiceImpl() {
+  }
+
+  public TimerServiceImpl(ExecutorContext executorContext) {
+    this.executorContext = executorContext;
+    this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
+  }
+
+  @Override
+  public void init(List<Integer> upStreamTasks) {
+    for (Integer task : upStreamTasks) {
+      upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+      inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    }
+    initialized = true;
+  }
+
+  @Override
+  public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
+    checkState(initialized, "TimerService has not been initialized.");
+    Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
+    // Make sure the input watermark don't go backward.
+    if (taskInputWatermark > oldTaskInputWatermark) {
+      upStreamTaskToInputWatermark.put(task, taskInputWatermark);
+      inputWatermarks.add(taskInputWatermark);
+      inputWatermarks.remove(oldTaskInputWatermark);
+
+      long newLocalInputWatermark = currentInputWatermark();
+      if (newLocalInputWatermark > oldTaskInputWatermark) {
+        return newLocalInputWatermark;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public void fireTimers(long newWatermark) {
+    TimerInternals.TimerData timerData;
+    while ((timerData = eventTimeTimersQueue.peek()) != null
+        && timerData.getTimestamp().getMillis() <= newWatermark) {
+      for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
+        DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
+        executor.onTimer(keyedExecutor.getSecond(), timerData);
+      }
+      eventTimeTimersQueue.remove();
+      timerDataToKeyedExecutors.remove(timerData);
+    }
+  }
+
+  @Override
+  public long currentInputWatermark() {
+    return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+  }
+
+  @Override
+  public long currentOutputWatermark() {
+    if (watermarkHolds.isEmpty()) {
+      return currentInputWatermark();
+    } else {
+      return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
+    }
+  }
+
+  @Override
+  public void clearWatermarkHold(String namespace) {
+    Instant currentHold = namespaceToWatermarkHold.get(namespace);
+    if (currentHold != null) {
+      watermarkHolds.remove(currentHold);
+      namespaceToWatermarkHold.remove(namespace);
+    }
+  }
+
+  @Override
+  public void addWatermarkHold(String namespace, Instant watermarkHold) {
+    Instant currentHold = namespaceToWatermarkHold.get(namespace);
+    if (currentHold == null) {
+      namespaceToWatermarkHold.put(namespace, watermarkHold);
+      watermarkHolds.add(watermarkHold);
+    } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+      namespaceToWatermarkHold.put(namespace, watermarkHold);
+      watermarkHolds.add(watermarkHold);
+      watermarkHolds.remove(currentHold);
+    }
+  }
+
+  @Override
+  public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
+    checkArgument(
+        TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+        String.format("Does not support domain: %s.", timerData.getDomain()));
+    Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
+    if (keyedExecutors == null) {
+      keyedExecutors = Sets.newHashSet();
+      eventTimeTimersQueue.add(timerData);
+    }
+    keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
+    timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
new file mode 100644
index 0000000..edd3d8a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> {
+
+  void translateNode(T transform, TranslationContext context);
+
+  /**
+   * Returns true if this translator can translate the given transform.
+   */
+  boolean canTranslate(T transform, TranslationContext context);
+
+    /**
+     * Default translator.
+     * @param <T1>
+     */
+  class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+    @Override
+    public void translateNode(T1 transform, TranslationContext context) {
+
+    }
+
+    @Override
+    public boolean canTranslate(T1 transform, TranslationContext context) {
+      return true;
+    }
+
+    static String describeTransform(
+        PTransform<?, ?> transform,
+        Map<TupleTag<?>, PValue> inputs,
+        Map<TupleTag<?>, PValue> outputs) {
+      return String.format("%s --> %s --> %s",
+          Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
+              .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+                @Override
+                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
+                  return taggedPValue.getKey().getId();
+                  // return taggedPValue.getValue().getName();
+                }
+              })),
+          transform.getName(),
+          Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
+              .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+                @Override
+                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
+                  return taggedPvalue.getKey().getId();
+                  //return taggedPValue.getValue().getName();
+                }
+              })));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 28d102d..b84fd4a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -34,12 +34,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.Executor;
-import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.runners.jstorm.translation.translator.Stream;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.PValueBase;

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index 316186e..9eaa13a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -19,15 +19,6 @@ package org.apache.beam.runners.jstorm.translation;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundMultiTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.UnboundedSourceTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.WindowAssignTranslator;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -40,7 +31,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Lookup table mapping PTransform types to associated TransformTranslator implementations.
  */
-public class TranslatorRegistry {
+class TranslatorRegistry {
   private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
 
   private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS =

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java
new file mode 100644
index 0000000..2159cfa
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxExecutorsBolt.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
+import java.io.IOException;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transactional executors bolt handles the checkpoint and restore of state and timer.
+ */
+public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
+  private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
+
+  private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
+  private static final String TIMER_SERVICE_KET = "timer_service_key";
+
+  private ExecutorsBolt executorsBolt;
+  private IKvStoreManager kvStoreManager;
+  private IKvStore<String, TimerService> timerServiceStore;
+
+  public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
+    this.executorsBolt = executorsBolt;
+    this.executorsBolt.setStatefulBolt(true);
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    try {
+      executorsBolt.prepare(stormConf, context, collector);
+      kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
+      timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+    } catch (IOException e) {
+      LOG.error("Failed to prepare stateful bolt", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    executorsBolt.execute(input);
+  }
+
+  @Override
+  public void cleanup() {
+    executorsBolt.cleanup();
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    executorsBolt.declareOutputFields(declarer);
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return executorsBolt.getComponentConfiguration();
+  }
+
+  @Override
+  public void initState(Object userState) {
+    LOG.info("Begin to init from state: {}", userState);
+    restore(userState);
+  }
+
+  @Override
+  public Object finishBatch(long batchId) {
+    try {
+      timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
+    } catch (IOException e) {
+      LOG.error("Failed to store current timer service status", e);
+      throw new RuntimeException(e.getMessage());
+    }
+    kvStoreManager.checkpoint(batchId);
+    return null;
+  }
+
+  @Override
+  public Object commit(long batchId, Object state) {
+    return kvStoreManager.backup(batchId);
+  }
+
+  @Override
+  public void rollBack(Object userState) {
+    LOG.info("Begin to rollback from state: {}", userState);
+    restore(userState);
+  }
+
+  @Override
+  public void ackCommit(long batchId, long timeStamp) {
+    kvStoreManager.remove(batchId);
+  }
+
+  private void restore(Object userState) {
+    try {
+      // restore all states
+      kvStoreManager.restore(userState);
+
+      // init timer service
+      timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+      TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
+      if (timerService == null) {
+        timerService = executorsBolt.initTimerService();
+      }
+      executorsBolt.setTimerService(timerService);
+    } catch (IOException e) {
+      LOG.error("Failed to restore state", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java
new file mode 100644
index 0000000..382cb50
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TxUnboundedSourceSpout.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transactional unbounded source spout handles the checkpoint and restore of state and timer.
+ */
+public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
+
+  private static final String SOURCE_STORE_ID = "SourceCheckpoint";
+  private static final String CHECKPOINT_MARK = "CheckpointMark";
+
+  private UnboundedSourceSpout sourceSpout;
+  private UnboundedSource.UnboundedReader reader;
+  private IKvStoreManager kvStoreManager;
+  private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
+
+  public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
+    this.sourceSpout = sourceSpout;
+  }
+
+  private void restore(Object userState) {
+    try {
+      kvStoreManager.restore(userState);
+      sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
+      UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
+      sourceSpout.createSourceReader(checkpointMark);
+      reader = sourceSpout.getUnboundedSourceReader();
+    } catch (IOException e) {
+      LOG.error("Failed to init state", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void initState(Object userState) {
+    restore(userState);
+  }
+
+  @Override
+  public Object finishBatch(long checkpointId) {
+    try {
+      // Store check point mark from unbounded source reader
+      UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
+      sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
+
+      // checkpoint all kv stores in current manager
+      kvStoreManager.checkpoint(checkpointId);
+    } catch (IOException e) {
+      LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
+      throw new RuntimeException(e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public Object commit(long batchId, Object state) {
+    // backup kv stores to remote state backend
+    return kvStoreManager.backup(batchId);
+  }
+
+  @Override
+  public void rollBack(Object userState) {
+    restore(userState);
+  }
+
+  @Override
+  public void ackCommit(long batchId, long timeStamp) {
+    // remove obsolete state in bolt local and remote state backend
+    kvStoreManager.remove(batchId);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    sourceSpout.declareOutputFields(declarer);
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return sourceSpout.getComponentConfiguration();
+  }
+
+  @Override
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    try {
+      sourceSpout.open(conf, context, collector);
+      String storeName = String.format("task-%s", context.getThisTaskId());
+      String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+      kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+          context, storeName, storePath, true);
+
+      reader = sourceSpout.getUnboundedSourceReader();
+    } catch (IOException e) {
+      LOG.error("Failed to open transactional unbounded source spout", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void close() {
+    sourceSpout.close();
+  }
+
+  @Override
+  public void activate() {
+    sourceSpout.activate();
+  }
+
+  @Override
+  public void deactivate() {
+    sourceSpout.deactivate();
+  }
+
+  @Override
+  public void nextTuple() {
+    sourceSpout.nextTuple();
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
+}