You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:22 UTC

[22/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
deleted file mode 100644
index a26472c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link DoFn} with multi-output.
- * @param <InputT>
- * @param <OutputT>
- */
-public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
-  private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
-
-  /**
-   * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
-   * tag is used in downstream consumer. So before output, we need to map this "local" tag to
-   * "external" tag. See PCollectionTuple for details.
-   */
-  public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      if (localTupleTagMap.containsKey(tag)) {
-        executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
-      } else {
-        executorsBolt.processExecutorElem(tag, output);
-      }
-    }
-  }
-
-  protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
-
-  public MultiOutputDoFnExecutor(
-      String stepName,
-      String description,
-      JStormPipelineOptions pipelineOptions,
-      DoFn<InputT, OutputT> doFn,
-      Coder<WindowedValue<InputT>> inputCoder,
-      WindowingStrategy<?, ?> windowingStrategy,
-      TupleTag<InputT> mainInputTag,
-      Collection<PCollectionView<?>> sideInputs,
-      Map<TupleTag, PCollectionView<?>> sideInputTagToView,
-      TupleTag<OutputT> mainTupleTag,
-      List<TupleTag<?>> sideOutputTags,
-      Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
-  ) {
-    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
-        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
-    this.localTupleTagMap = localTupleTagMap;
-    this.outputManager = new MultiOutputDoFnExecutorOutputManager();
-    LOG.info("localTupleTagMap: {}", localTupleTagMap);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
deleted file mode 100644
index 5e87cff..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
- * @param <OutputT>
- */
-public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
-
-  public MultiStatefulDoFnExecutor(
-      String stepName, String description,
-      JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
-      Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
-      TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
-      Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
-      List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
-    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
-        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
-  }
-
-  @Override
-  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-    if (mainInputTag.equals(tag)) {
-      WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
-      stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
-          executorContext.getExecutorsBolt().timerService()));
-      stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
-          kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-      processMainInput(elem);
-    } else {
-      processSideInput(tag, elem);
-    }
-  }
-
-  @Override
-  public void onTimer(Object key, TimerInternals.TimerData timerData) {
-    stepContext.setStateInternals(new JStormStateInternals<>(key,
-        kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-    super.onTimer(key, timerData);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
deleted file mode 100644
index 77ae844..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * JStorm {@link Executor} for stateful {@link DoFn}.
- * @param <OutputT>
- */
-public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
-  public StatefulDoFnExecutor(
-      String stepName, String description, JStormPipelineOptions pipelineOptions,
-      DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
-      WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
-      Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
-          sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
-    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
-        mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
-  }
-
-  @Override
-  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-    if (mainInputTag.equals(tag)) {
-      WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
-      stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
-          executorContext.getExecutorsBolt().timerService()));
-      stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
-          kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-      processMainInput(elem);
-    } else {
-      processSideInput(tag, elem);
-    }
-  }
-
-  @Override
-  public void onTimer(Object key, TimerInternals.TimerData timerData) {
-    stepContext.setStateInternals(new JStormStateInternals<>(key,
-        kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-    super.onTimer(key, timerData);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
deleted file mode 100644
index 5c41bda..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.runners.core.TimerInternals;
-import org.joda.time.Instant;
-
-/**
- * Interface that tracks input watermarks and manages timers in each bolt.
- */
-public interface TimerService extends Serializable {
-
-  void init(List<Integer> upStreamTasks);
-
-  /**
-   *
-   * @param task
-   * @param inputWatermark
-   * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
-   */
-  long updateInputWatermark(Integer task, long inputWatermark);
-
-  long currentInputWatermark();
-
-  long currentOutputWatermark();
-
-  void clearWatermarkHold(String namespace);
-
-  void addWatermarkHold(String namespace, Instant watermarkHold);
-
-  void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
-
-  void fireTimers(long newWatermark);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
deleted file mode 100644
index 0103095..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-
-/**
- * Default implementation of {@link TimerService}.
- */
-public class TimerServiceImpl implements TimerService {
-  private transient ExecutorContext executorContext;
-  private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
-
-  private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
-      new ConcurrentHashMap<>();
-  private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
-  private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
-  private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
-  private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
-      new PriorityQueue<>();
-  private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
-      timerDataToKeyedExecutors = Maps.newHashMap();
-
-  private boolean initialized = false;
-
-  public TimerServiceImpl() {
-  }
-
-  public TimerServiceImpl(ExecutorContext executorContext) {
-    this.executorContext = executorContext;
-    this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
-  }
-
-  @Override
-  public void init(List<Integer> upStreamTasks) {
-    for (Integer task : upStreamTasks) {
-      upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-      inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-    }
-    initialized = true;
-  }
-
-  @Override
-  public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
-    checkState(initialized, "TimerService has not been initialized.");
-    Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
-    // Make sure the input watermark don't go backward.
-    if (taskInputWatermark > oldTaskInputWatermark) {
-      upStreamTaskToInputWatermark.put(task, taskInputWatermark);
-      inputWatermarks.add(taskInputWatermark);
-      inputWatermarks.remove(oldTaskInputWatermark);
-
-      long newLocalInputWatermark = currentInputWatermark();
-      if (newLocalInputWatermark > oldTaskInputWatermark) {
-        return newLocalInputWatermark;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public void fireTimers(long newWatermark) {
-    TimerInternals.TimerData timerData;
-    while ((timerData = eventTimeTimersQueue.peek()) != null
-        && timerData.getTimestamp().getMillis() <= newWatermark) {
-      for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
-        DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
-        executor.onTimer(keyedExecutor.getSecond(), timerData);
-      }
-      eventTimeTimersQueue.remove();
-      timerDataToKeyedExecutors.remove(timerData);
-    }
-  }
-
-  @Override
-  public long currentInputWatermark() {
-    return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
-  }
-
-  @Override
-  public long currentOutputWatermark() {
-    if (watermarkHolds.isEmpty()) {
-      return currentInputWatermark();
-    } else {
-      return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
-    }
-  }
-
-  @Override
-  public void clearWatermarkHold(String namespace) {
-    Instant currentHold = namespaceToWatermarkHold.get(namespace);
-    if (currentHold != null) {
-      watermarkHolds.remove(currentHold);
-      namespaceToWatermarkHold.remove(namespace);
-    }
-  }
-
-  @Override
-  public void addWatermarkHold(String namespace, Instant watermarkHold) {
-    Instant currentHold = namespaceToWatermarkHold.get(namespace);
-    if (currentHold == null) {
-      namespaceToWatermarkHold.put(namespace, watermarkHold);
-      watermarkHolds.add(watermarkHold);
-    } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
-      namespaceToWatermarkHold.put(namespace, watermarkHold);
-      watermarkHolds.add(watermarkHold);
-      watermarkHolds.remove(currentHold);
-    }
-  }
-
-  @Override
-  public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
-    checkArgument(
-        TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
-        String.format("Does not support domain: %s.", timerData.getDomain()));
-    Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
-    if (keyedExecutors == null) {
-      keyedExecutors = Sets.newHashSet();
-      eventTimeTimersQueue.add(timerData);
-    }
-    keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
-    timerDataToKeyedExecutors.put(timerData, keyedExecutors);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
deleted file mode 100644
index 8dc51b5..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
-import java.io.IOException;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Transactional executors bolt handles the checkpoint and restore of state and timer.
- */
-public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
-  private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
-
-  private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
-  private static final String TIMER_SERVICE_KET = "timer_service_key";
-
-  private ExecutorsBolt executorsBolt;
-  private IKvStoreManager kvStoreManager;
-  private IKvStore<String, TimerService> timerServiceStore;
-
-  public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
-    this.executorsBolt = executorsBolt;
-    this.executorsBolt.setStatefulBolt(true);
-  }
-
-  @Override
-  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    try {
-      executorsBolt.prepare(stormConf, context, collector);
-      kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
-      timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
-    } catch (IOException e) {
-      LOG.error("Failed to prepare stateful bolt", e);
-      throw new RuntimeException(e.getMessage());
-    }
-  }
-
-  @Override
-  public void execute(Tuple input) {
-    executorsBolt.execute(input);
-  }
-
-  @Override
-  public void cleanup() {
-    executorsBolt.cleanup();
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    executorsBolt.declareOutputFields(declarer);
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return executorsBolt.getComponentConfiguration();
-  }
-
-  @Override
-  public void initState(Object userState) {
-    LOG.info("Begin to init from state: {}", userState);
-    restore(userState);
-  }
-
-  @Override
-  public Object finishBatch(long batchId) {
-    try {
-      timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
-    } catch (IOException e) {
-      LOG.error("Failed to store current timer service status", e);
-      throw new RuntimeException(e.getMessage());
-    }
-    kvStoreManager.checkpoint(batchId);
-    return null;
-  }
-
-  @Override
-  public Object commit(long batchId, Object state) {
-    return kvStoreManager.backup(batchId);
-  }
-
-  @Override
-  public void rollBack(Object userState) {
-    LOG.info("Begin to rollback from state: {}", userState);
-    restore(userState);
-  }
-
-  @Override
-  public void ackCommit(long batchId, long timeStamp) {
-    kvStoreManager.remove(batchId);
-  }
-
-  private void restore(Object userState) {
-    try {
-      // restore all states
-      kvStoreManager.restore(userState);
-
-      // init timer service
-      timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
-      TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
-      if (timerService == null) {
-        timerService = executorsBolt.initTimerService();
-      }
-      executorsBolt.setTimerService(timerService);
-    } catch (IOException e) {
-      LOG.error("Failed to restore state", e);
-      throw new RuntimeException(e.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
deleted file mode 100644
index 48b410f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.slf4j.LoggerFactory;
-
-/**
- * Transactional unbounded source spout handles the checkpoint and restore of state and timer.
- */
-public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
-  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
-
-  private static final String SOURCE_STORE_ID = "SourceCheckpoint";
-  private static final String CHECKPOINT_MARK = "CheckpointMark";
-
-  private UnboundedSourceSpout sourceSpout;
-  private UnboundedSource.UnboundedReader reader;
-  private IKvStoreManager kvStoreManager;
-  private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
-
-  public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
-    this.sourceSpout = sourceSpout;
-  }
-
-  private void restore(Object userState) {
-    try {
-      kvStoreManager.restore(userState);
-      sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
-      UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
-      sourceSpout.createSourceReader(checkpointMark);
-      reader = sourceSpout.getUnboundedSourceReader();
-    } catch (IOException e) {
-      LOG.error("Failed to init state", e);
-      throw new RuntimeException(e.getMessage());
-    }
-  }
-
-  @Override
-  public void initState(Object userState) {
-    restore(userState);
-  }
-
-  @Override
-  public Object finishBatch(long checkpointId) {
-    try {
-      // Store check point mark from unbounded source reader
-      UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
-      sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
-
-      // checkpoint all kv stores in current manager
-      kvStoreManager.checkpoint(checkpointId);
-    } catch (IOException e) {
-      LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
-      throw new RuntimeException(e.getMessage());
-    }
-    return null;
-  }
-
-  @Override
-  public Object commit(long batchId, Object state) {
-    // backup kv stores to remote state backend
-    return kvStoreManager.backup(batchId);
-  }
-
-  @Override
-  public void rollBack(Object userState) {
-    restore(userState);
-  }
-
-  @Override
-  public void ackCommit(long batchId, long timeStamp) {
-    // remove obsolete state in bolt local and remote state backend
-    kvStoreManager.remove(batchId);
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    sourceSpout.declareOutputFields(declarer);
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return sourceSpout.getComponentConfiguration();
-  }
-
-  @Override
-  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-    try {
-      sourceSpout.open(conf, context, collector);
-      String storeName = String.format("task-%s", context.getThisTaskId());
-      String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
-      kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(
-          context, storeName, storePath, true);
-
-      reader = sourceSpout.getUnboundedSourceReader();
-    } catch (IOException e) {
-      LOG.error("Failed to open transactional unbounded source spout", e);
-      throw new RuntimeException(e.getMessage());
-    }
-  }
-
-  @Override
-  public void close() {
-    sourceSpout.close();
-  }
-
-  @Override
-  public void activate() {
-    sourceSpout.activate();
-  }
-
-  @Override
-  public void deactivate() {
-    sourceSpout.deactivate();
-  }
-
-  @Override
-  public void nextTuple() {
-    sourceSpout.nextTuple();
-  }
-
-  @Override
-  public void ack(Object msgId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void fail(Object msgId) {
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
deleted file mode 100644
index 690824d..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Values;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Spout implementation that wraps a Beam UnboundedSource.
- * TODO: add wrapper to support metrics in UnboundedSource.
- */
-public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
-  private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
-
-  private final String description;
-  private final UnboundedSource source;
-  private final SerializedPipelineOptions serializedOptions;
-  private final TupleTag<?> outputTag;
-
-  private transient JStormPipelineOptions pipelineOptions;
-  private transient UnboundedSource.UnboundedReader reader;
-  private transient SpoutOutputCollector collector;
-
-  private volatile boolean hasNextRecord;
-  private AtomicBoolean activated = new AtomicBoolean();
-
-  private KryoSerializer<WindowedValue> serializer;
-
-  private long lastWaterMark = 0L;
-
-  public UnboundedSourceSpout(
-      String description,
-      UnboundedSource source,
-      JStormPipelineOptions options,
-      TupleTag<?> outputTag) {
-    this.description = checkNotNull(description, "description");
-    this.source = checkNotNull(source, "source");
-    this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
-    this.outputTag = checkNotNull(outputTag, "outputTag");
-  }
-
-  @Override
-  public synchronized void close() {
-    try {
-      activated.set(false);
-      this.reader.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void activate() {
-    activated.set(true);
-
-  }
-
-  @Override
-  public void deactivate() {
-    activated.set(false);
-  }
-
-  @Override
-  public void ack(Object msgId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void fail(Object msgId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Map<String, Object> getComponentConfiguration() {
-    return null;
-  }
-
-  @Override
-  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-    try {
-      this.collector = collector;
-      this.pipelineOptions =
-          this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
-      createSourceReader(null);
-
-      this.serializer = new KryoSerializer<>(conf);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create unbounded reader.", e);
-    }
-  }
-
-  public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-    reader = this.source.createReader(this.pipelineOptions, checkpointMark);
-    hasNextRecord = this.reader.start();
-  }
-
-  @Override
-  public synchronized void nextTuple() {
-    if (!activated.get()) {
-      return;
-    }
-    try {
-      if (!hasNextRecord) {
-        hasNextRecord = reader.advance();
-      }
-
-      while (hasNextRecord && activated.get()) {
-        Object value = reader.getCurrent();
-        Instant timestamp = reader.getCurrentTimestamp();
-
-        WindowedValue wv =
-            WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-        LOG.debug("Source output: " + wv.getValue());
-        if (keyedEmit(outputTag.getId())) {
-          KV kv = (KV) wv.getValue();
-          // Convert WindowedValue<KV> to <K, WindowedValue<V>>
-          byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
-          collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
-        } else {
-          byte[] immutableValue = serializer.serialize(wv);
-          collector.emit(outputTag.getId(), new Values(immutableValue));
-        }
-
-        // move to next record
-        hasNextRecord = reader.advance();
-      }
-
-      Instant waterMark = reader.getWatermark();
-      if (waterMark != null && lastWaterMark < waterMark.getMillis()) {
-        lastWaterMark = waterMark.getMillis();
-        collector.flush();
-        collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
-        LOG.debug("Source output: WM-{}", waterMark.toDateTime());
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("Exception reading values from source.", e);
-    }
-  }
-
-  public UnboundedSource getUnboundedSource() {
-    return source;
-  }
-
-  public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
-    return reader;
-  }
-
-  @Override
-  public String toString() {
-    return description;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
deleted file mode 100644
index 4320967..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * JStorm {@link Executor} for {@link View}.
- */
-public class ViewExecutor implements Executor {
-
-  private final String description;
-  private final TupleTag outputTag;
-  private ExecutorsBolt executorsBolt;
-
-  public ViewExecutor(String description, TupleTag outputTag) {
-    this.description = description;
-    this.outputTag = outputTag;
-  }
-
-  @Override
-  public void init(ExecutorContext context) {
-    this.executorsBolt = context.getExecutorsBolt();
-  }
-
-  @Override
-  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-    executorsBolt.processExecutorElem(outputTag, elem);
-  }
-
-  @Override
-  public void cleanup() {
-  }
-
-  @Override
-  public String toString() {
-    return description;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
deleted file mode 100644
index 3cd0aa9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
- * @param <T>
- * @param <W>
- */
-public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
-  private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
-
-  private final String description;
-  private WindowFn<T, W> windowFn;
-  private ExecutorsBolt executorsBolt;
-  private TupleTag outputTag;
-
-  class JStormAssignContext<InputT, W extends BoundedWindow>
-      extends WindowFn<InputT, W>.AssignContext {
-    private final WindowedValue<InputT> value;
-
-    JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
-      fn.super();
-      checkArgument(
-          Iterables.size(value.getWindows()) == 1,
-          String.format(
-              "%s passed to window assignment must be in a single window, but it was in %s: %s",
-              WindowedValue.class.getSimpleName(),
-              Iterables.size(value.getWindows()),
-              value.getWindows()));
-      this.value = value;
-    }
-
-    @Override
-    public InputT element() {
-      return value.getValue();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return value.getTimestamp();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return Iterables.getOnlyElement(value.getWindows());
-    }
-  }
-
-  public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
-    this.description = description;
-    this.windowFn = windowFn;
-    this.outputTag = outputTag;
-  }
-
-  @Override
-  public void init(ExecutorContext context) {
-    this.executorsBolt = context.getExecutorsBolt();
-  }
-
-  @Override
-  public void process(TupleTag tag, WindowedValue elem) {
-    Collection<W> windows = null;
-    try {
-      windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
-      for (W window : windows) {
-        executorsBolt.processExecutorElem(
-            outputTag,
-            WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
-      }
-    } catch (Exception e) {
-      LOG.warn("Failed to assign windows for elem=" + elem, e);
-    }
-  }
-
-  @Override
-  public void cleanup() {
-  }
-
-
-  @Override
-  public String toString() {
-    return description;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
deleted file mode 100644
index df54383..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link BagState} in JStorm runner.
- */
-class JStormBagState<K, T> implements BagState<T> {
-  private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
-
-  @Nullable
-  private final K key;
-  private final StateNamespace namespace;
-  private final IKvStore<ComposedKey, T> kvState;
-  private final IKvStore<ComposedKey, Object> stateInfoKvState;
-  private int elemIndex;
-
-  public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
-                        IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
-    this.key = key;
-    this.namespace = checkNotNull(namespace, "namespace");
-    this.kvState = checkNotNull(kvState, "kvState");
-    this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
-
-    Integer index = (Integer) stateInfoKvState.get(getComposedKey());
-    this.elemIndex = index != null ? ++index : 0;
-  }
-
-  @Override
-  public void add(T input) {
-    try {
-      kvState.put(getComposedKey(elemIndex), input);
-      stateInfoKvState.put(getComposedKey(), elemIndex);
-      elemIndex++;
-    } catch (IOException e) {
-      throw new RuntimeException(e.getCause());
-    }
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty() {
-    return new ReadableState<Boolean>() {
-      @Override
-      public Boolean read() {
-        return elemIndex <= 0;
-      }
-
-      @Override
-      public ReadableState<Boolean> readLater() {
-        // TODO: support prefetch.
-        return this;
-      }
-    };
-  }
-
-  @Override
-  public Iterable<T> read() {
-    return new BagStateIterable(elemIndex);
-  }
-
-  @Override
-  public BagState readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    try {
-      for (int i = 0; i < elemIndex; i++) {
-        kvState.remove(getComposedKey(i));
-      }
-      stateInfoKvState.remove(getComposedKey());
-      elemIndex = 0;
-    } catch (IOException e) {
-      throw new RuntimeException(e.getCause());
-    }
-  }
-
-  private ComposedKey getComposedKey() {
-    return ComposedKey.of(key, namespace);
-  }
-
-  private ComposedKey getComposedKey(int elemIndex) {
-    return ComposedKey.of(key, namespace, elemIndex);
-  }
-
-  /**
-   * Implementation of Bag state Iterable.
-   */
-  private class BagStateIterable implements KvStoreIterable<T> {
-
-    private class BagStateIterator implements Iterator<T> {
-      private final int size;
-      private int cursor = 0;
-
-      BagStateIterator() {
-        Integer s = null;
-        try {
-          s = (Integer) stateInfoKvState.get(getComposedKey());
-        } catch (IOException e) {
-          LOG.error("Failed to get elemIndex for key={}", getComposedKey());
-        }
-        this.size = s != null ? ++s : 0;
-      }
-
-      @Override
-      public boolean hasNext() {
-        return cursor < size;
-      }
-
-      @Override
-      public T next() {
-        if (cursor >= size) {
-          throw new NoSuchElementException();
-        }
-
-        T value = null;
-        try {
-          value = kvState.get(getComposedKey(cursor));
-        } catch (IOException e) {
-          LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
-        }
-        cursor++;
-        return value;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    }
-
-    private final int size;
-
-    BagStateIterable(int size) {
-      this.size = size;
-    }
-
-    @Override
-    public Iterator<T> iterator() {
-      return new BagStateIterator();
-    }
-
-    @Override
-    public String toString() {
-      return String.format("BagStateIterable: composedKey=%s", getComposedKey());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
deleted file mode 100644
index 7c6a239..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.transforms.Combine;
-
-/**
- * JStorm implementation of {@link CombiningState}.
- */
-public class JStormCombiningState<InputT, AccumT, OutputT>
-    implements CombiningState<InputT, AccumT, OutputT> {
-
-  @Nullable
-  private final BagState<AccumT> accumBagState;
-  private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
-  JStormCombiningState(
-      BagState<AccumT> accumBagState,
-      Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-    this.accumBagState = checkNotNull(accumBagState, "accumBagState");
-    this.combineFn = checkNotNull(combineFn, "combineFn");
-  }
-
-  @Override
-  public AccumT getAccum() {
-    // TODO: replacing the accumBagState with the merged accum.
-    return combineFn.mergeAccumulators(accumBagState.read());
-  }
-
-  @Override
-  public void addAccum(AccumT accumT) {
-    accumBagState.add(accumT);
-  }
-
-  @Override
-  public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
-    return combineFn.mergeAccumulators(iterable);
-  }
-
-  @Override
-  public void add(InputT input) {
-    accumBagState.add(
-        combineFn.addInput(combineFn.createAccumulator(), input));
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty() {
-    return accumBagState.isEmpty();
-  }
-
-  @Override
-  public OutputT read() {
-    return combineFn.extractOutput(
-        combineFn.mergeAccumulators(accumBagState.read()));
-  }
-
-  @Override
-  public CombiningState<InputT, AccumT, OutputT> readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    accumBagState.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
deleted file mode 100644
index ac3f91f..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import java.util.Map;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of {@link MapState} in JStorm runner.
- * @param <K>
- * @param <V>
- */
-public class JStormMapState<K, V> implements MapState<K, V> {
-  private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
-
-  private final K key;
-  private final StateNamespace namespace;
-  private IKvStore<K, V> kvStore;
-
-  public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
-    this.key = key;
-    this.namespace = namespace;
-    this.kvStore = kvStore;
-  }
-
-  @Override
-  public void put(K var1, V var2) {
-    try {
-      kvStore.put(var1, var2);
-    } catch (IOException e) {
-      reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
-    }
-  }
-
-  @Override
-  public ReadableState<V> putIfAbsent(K var1, V var2) {
-    ReadableState<V> ret = null;
-    try {
-      V value = kvStore.get(var1);
-      if (value == null) {
-        kvStore.put(var1, var2);
-        ret = new MapReadableState<>(null);
-      } else {
-        ret = new MapReadableState<>(value);
-      }
-    } catch (IOException e) {
-      reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public void remove(K var1) {
-    try {
-      kvStore.remove(var1);
-    } catch (IOException e) {
-      reportError(String.format("Failed to remove key=%s", var1), e);
-    }
-  }
-
-  @Override
-  public ReadableState<V> get(K var1) {
-    ReadableState<V> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState(kvStore.get(var1));
-    } catch (IOException e) {
-      reportError(String.format("Failed to get value for key=%s", var1), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public ReadableState<Iterable<K>> keys() {
-    ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState<>(kvStore.keys());
-    } catch (IOException e) {
-      reportError(String.format("Failed to get keys"), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public ReadableState<Iterable<V>> values() {
-    ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState<>(kvStore.values());
-    } catch (IOException e) {
-      reportError(String.format("Failed to get values"), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
-    ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
-    try {
-      ret = new MapReadableState<>(kvStore.entries());
-    } catch (IOException e) {
-      reportError(String.format("Failed to get values"), e);
-    }
-    return ret;
-  }
-
-  @Override
-  public void clear() {
-    try {
-      Iterable<K> keys = kvStore.keys();
-      kvStore.removeBatch(keys);
-    } catch (IOException e) {
-      reportError(String.format("Failed to clear map state"), e);
-    }
-  }
-
-  private void reportError(String errorInfo, IOException e) {
-    LOG.error(errorInfo, e);
-    throw new RuntimeException(errorInfo);
-  }
-
-  private class MapReadableState<T> implements ReadableState<T> {
-    private T value;
-
-    public MapReadableState(T value) {
-      this.value = value;
-    }
-
-    @Override
-    public T read() {
-      return value;
-    }
-
-    @Override
-    public ReadableState<T> readLater() {
-      return this;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
deleted file mode 100644
index 80ef3a2..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.StateBinder;
-import org.apache.beam.sdk.state.StateContext;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link StateInternals}.
- */
-public class JStormStateInternals<K> implements StateInternals {
-
-  private static final String STATE_INFO = "state-info:";
-
-  @Nullable
-  private final K key;
-  private final IKvStoreManager kvStoreManager;
-  private final TimerService timerService;
-  private final int executorId;
-
-  public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
-                              TimerService timerService, int executorId) {
-    this.key = key;
-    this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
-    this.timerService = checkNotNull(timerService, "timerService");
-    this.executorId = executorId;
-  }
-
-  @Nullable
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  @Override
-  public <T extends State> T state(
-      StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
-    // throw new UnsupportedOperationException("StateContext is not supported.");
-    /**
-     * TODOļ¼š
-     * Same implementation as state() which is without StateContext. This might be updated after
-     * we figure out if we really need StateContext for JStorm state internals.
-     */
-    return state(namespace, address);
-  }
-
-  @Override
-  public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
-    return address.getSpec().bind(address.getId(), new StateBinder() {
-      @Override
-      public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
-        try {
-          return new JStormValueState<>(
-              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
-        } catch (IOException e) {
-          throw new RuntimeException();
-        }
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
-        try {
-          return new JStormBagState(
-              getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
-              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-        } catch (IOException e) {
-          throw new RuntimeException();
-        }
-      }
-
-      @Override
-      public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          String id,
-          StateSpec<MapState<KeyT, ValueT>> spec,
-          Coder<KeyT> mapKeyCoder,
-          Coder<ValueT> mapValueCoder) {
-        try {
-          return new JStormMapState<>(
-              getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT> CombiningState bindCombining(
-          String id,
-          StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
-          Coder<AccumT> accumCoder,
-          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-        try {
-          BagState<AccumT> accumBagState = new JStormBagState(
-              getKey(), namespace,
-              kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
-              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-          return new JStormCombiningState<>(accumBagState, combineFn);
-        } catch (IOException e) {
-          throw new RuntimeException();
-        }
-      }
-
-
-      @Override
-      public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-      bindCombiningWithContext(
-          String id,
-          StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
-          CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public WatermarkHoldState bindWatermark(
-          String id,
-          StateSpec<WatermarkHoldState> spec,
-          final TimestampCombiner timestampCombiner) {
-        try {
-          BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
-              getKey(), namespace,
-              kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
-              kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-
-          Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
-              new BinaryCombineFn<Instant>() {
-                @Override
-                public Instant apply(Instant left, Instant right) {
-                  return timestampCombiner.combine(left, right);
-                }
-              };
-          return new JStormWatermarkHoldState(
-              namespace,
-              new JStormCombiningState<>(
-                  accumBagState,
-                  outputTimeCombineFn),
-              timestampCombiner,
-              timerService);
-        } catch (IOException e) {
-          throw new RuntimeException();
-        }
-      }
-    });
-  }
-
-  private String getStoreId(String stateId) {
-    return String.format("%s-%s", stateId, executorId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
deleted file mode 100644
index 79ff6b4..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.ValueState;
-
-/**
- * JStorm implementation of {@link ValueState}.
- */
-public class JStormValueState<K, T> implements ValueState<T> {
-
-  @Nullable
-  private final K key;
-  private final StateNamespace namespace;
-  private final IKvStore<ComposedKey, T> kvState;
-
-  JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
-    this.key = key;
-    this.namespace = namespace;
-    this.kvState = kvState;
-  }
-
-  @Override
-  public void write(T t) {
-    try {
-      kvState.put(getComposedKey(), t);
-    } catch (IOException e) {
-      throw new RuntimeException(String.format(
-          "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
-    }
-  }
-
-  @Override
-  public T read() {
-    try {
-      return kvState.get(getComposedKey());
-    } catch (IOException e) {
-      throw new RuntimeException(String.format(
-          "Failed to read key: %s, namespace: %s.", key, namespace));
-    }
-  }
-
-  @Override
-  public ValueState<T> readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    try {
-      kvState.remove(getComposedKey());
-    } catch (IOException e) {
-      throw new RuntimeException(String.format(
-          "Failed to clear key: %s, namespace: %s.", key, namespace));
-    }
-  }
-
-  private ComposedKey getComposedKey() {
-    return ComposedKey.of(key, namespace);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
deleted file mode 100644
index dc3ba43..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link WatermarkHoldState}.
- */
-public class JStormWatermarkHoldState implements WatermarkHoldState {
-
-  private final StateNamespace namespace;
-  private final GroupingState<Instant, Instant> watermarkHoldsState;
-  private final TimestampCombiner timestampCombiner;
-  private final TimerService timerService;
-
-  JStormWatermarkHoldState(
-      StateNamespace namespace,
-      GroupingState<Instant, Instant> watermarkHoldsState,
-      TimestampCombiner timestampCombiner,
-      TimerService timerService) {
-    this.namespace = checkNotNull(namespace, "namespace");
-    this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
-    this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
-    this.timerService = checkNotNull(timerService, "timerService");
-  }
-
-  @Override
-  public TimestampCombiner getTimestampCombiner() {
-    return timestampCombiner;
-  }
-
-  @Override
-  public void add(Instant instant) {
-    timerService.addWatermarkHold(namespace.stringKey(), instant);
-    watermarkHoldsState.add(instant);
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty() {
-    return watermarkHoldsState.isEmpty();
-  }
-
-  @Override
-  public Instant read() {
-    return watermarkHoldsState.read();
-  }
-
-  @Override
-  public WatermarkHoldState readLater() {
-    // TODO: support prefetch.
-    return this;
-  }
-
-  @Override
-  public void clear() {
-    timerService.clearWatermarkHold(namespace.stringKey());
-    watermarkHoldsState.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
deleted file mode 100644
index 184a957..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.runtime.timer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
-import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link TimerInternals}.
- */
-public class JStormTimerInternals<K> implements TimerInternals {
-
-  private final K key;
-  private final DoFnExecutor<?, ?> doFnExecutor;
-  private final TimerService timerService;
-
-
-  public JStormTimerInternals(
-      @Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
-    this.key = key;
-    this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
-    this.timerService = checkNotNull(timerService, "timerService");
-  }
-
-  @Override
-  public void setTimer(
-      StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
-    setTimer(TimerData.of(timerId, namespace, target, timeDomain));
-  }
-
-  @Override
-  @Deprecated
-  public void setTimer(TimerData timerData) {
-    timerService.setTimer(key, timerData, doFnExecutor);
-  }
-
-  @Override
-  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-    throw new UnsupportedOperationException(
-        "Canceling of a timer is not yet supported.");
-  }
-
-  @Override
-  @Deprecated
-  public void deleteTimer(StateNamespace namespace, String timerId) {
-    throw new UnsupportedOperationException(
-        "Canceling of a timer is not yet supported.");
-  }
-
-  @Override
-  @Deprecated
-  public void deleteTimer(TimerData timerData) {
-    throw new UnsupportedOperationException(
-        "Canceling of a timer is not yet supported.");
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return Instant.now();
-  }
-
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return null;
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return new Instant(timerService.currentInputWatermark());
-  }
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    return new Instant(timerService.currentOutputWatermark());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
deleted file mode 100644
index 7e7a54a..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a {@link Read.Bounded} into a Storm spout.
- *
- * @param <T>
- */
-public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
-
-  @Override
-  public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
-    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    String description =
-        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
-    TupleTag<?> outputTag = userGraphContext.getOutputTag();
-    PValue outputValue = userGraphContext.getOutput();
-    UnboundedSourceSpout spout = new UnboundedSourceSpout(
-        description,
-        new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
-        userGraphContext.getOptions(), outputTag);
-
-    context.getExecutionGraphContext().registerSpout(
-        spout, TaggedPValue.of(outputTag, outputValue));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
deleted file mode 100644
index 44ce8d8..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
- * @param <V>
- */
-public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
-
-  @Override
-  public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
-    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-
-    // Since a new tag is created in PCollectionList, retrieve the real tag here.
-    Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
-    for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
-      PCollection<V> pc = (PCollection<V>) entry.getValue();
-      inputs.putAll(pc.expand());
-    }
-    System.out.println("Real inputs: " + inputs);
-    System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
-    String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
-    FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
-    context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
deleted file mode 100644
index 85cb85d..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
- * @param <K>
- * @param <V>
- */
-public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
-  // information of transform
-  protected PCollection<KV<K, V>> input;
-  protected PCollection<KV<K, Iterable<V>>> output;
-  protected List<TupleTag<?>> inputTags;
-  protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
-  protected List<TupleTag<?>> sideOutputTags;
-  protected List<PCollectionView<?>> sideInputs;
-  protected WindowingStrategy<?, ?> windowingStrategy;
-
-  @Override
-  public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
-    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-    String description =
-        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
-    input = (PCollection<KV<K, V>>) userGraphContext.getInput();
-    output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
-
-    inputTags = userGraphContext.getInputTags();
-    mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
-    sideOutputTags = Lists.newArrayList();
-
-    sideInputs = Collections.<PCollectionView<?>>emptyList();
-    windowingStrategy = input.getWindowingStrategy();
-
-    GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
-        userGraphContext.getStepName(),
-        description,
-        context,
-        context.getUserGraphContext().getOptions(),
-        windowingStrategy,
-        mainOutputTag,
-        sideOutputTags);
-    context.addTransformExecutor(groupByWindowExecutor);
-  }
-}