You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:07 UTC

[07/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java
deleted file mode 100644
index 889977b..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/StatefulDoFnExecutor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals;
-import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
-    public StatefulDoFnExecutor(
-            String stepName, String description, StormPipelineOptions pipelineOptions,
-            DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
-            WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
-            Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
-                    sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
-        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
-                mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
-    }
-
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        if (mainInputTag.equals(tag)) {
-            WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
-            stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
-                    executorContext.getExecutorsBolt().timerService()));
-            stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
-                    kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-            processMainInput(elem);
-        } else {
-            processSideInput(tag, elem);
-        }
-    }
-
-    @Override
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        stepContext.setStateInternals(new JStormStateInternals<>(key,
-                kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-        super.onTimer(key, timerData);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java
deleted file mode 100644
index 60d2f1a..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import org.apache.beam.runners.core.TimerInternals;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Interface that tracks input watermarks and manages timers in each bolt.
- */
-public interface TimerService extends Serializable {
-
-    void init(List<Integer> upStreamTasks);
-
-    /**
-     *
-     * @param task
-     * @param inputWatermark
-     * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
-     */
-    long updateInputWatermark(Integer task, long inputWatermark);
-
-    long currentInputWatermark();
-
-    long currentOutputWatermark();
-
-    void clearWatermarkHold(String namespace);
-
-    void addWatermarkHold(String namespace, Instant watermarkHold);
-
-    void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
-
-    void fireTimers(long newWatermark);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerServiceImpl.java
deleted file mode 100644
index 1411428..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TimerServiceImpl.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import avro.shaded.com.google.common.collect.Maps;
-import avro.shaded.com.google.common.collect.Sets;
-import com.alibaba.jstorm.utils.Pair;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.joda.time.Instant;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Default implementation of {@link TimerService}.
- */
-public class TimerServiceImpl implements TimerService {
-    private transient ExecutorContext executorContext;
-    private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
-
-    private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>();
-    private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
-    private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
-    private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
-    private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>();
-    private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
-            timerDataToKeyedExecutors = Maps.newHashMap();
-
-    private boolean initialized = false;
-
-    public TimerServiceImpl() {
-    }
-
-    public TimerServiceImpl(ExecutorContext executorContext) {
-        this.executorContext = executorContext;
-        this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
-    }
-
-    @Override
-    public void init(List<Integer> upStreamTasks) {
-        for (Integer task : upStreamTasks) {
-            upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-            inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-        }
-        initialized = true;
-    }
-
-    @Override
-    public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
-        checkState(initialized, "TimerService has not been initialized.");
-        Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
-        // Make sure the input watermark don't go backward.
-        if (taskInputWatermark > oldTaskInputWatermark) {
-            upStreamTaskToInputWatermark.put(task, taskInputWatermark);
-            inputWatermarks.add(taskInputWatermark);
-            inputWatermarks.remove(oldTaskInputWatermark);
-
-            long newLocalInputWatermark = currentInputWatermark();
-            if (newLocalInputWatermark > oldTaskInputWatermark) {
-                return newLocalInputWatermark;
-            }
-        }
-        return 0;
-    }
-
-    @Override
-    public void fireTimers(long newWatermark) {
-        TimerInternals.TimerData timerData;
-        while ((timerData = eventTimeTimersQueue.peek()) != null
-                && timerData.getTimestamp().getMillis() <= newWatermark) {
-            for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
-                DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
-                executor.onTimer(keyedExecutor.getSecond(), timerData);
-            }
-            eventTimeTimersQueue.remove();
-            timerDataToKeyedExecutors.remove(timerData);
-        }
-    }
-
-    @Override
-    public long currentInputWatermark() {
-        return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
-    }
-
-    @Override
-    public long currentOutputWatermark() {
-        if (watermarkHolds.isEmpty()) {
-            return currentInputWatermark();
-        } else {
-            return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
-        }
-    }
-
-    @Override
-    public void clearWatermarkHold(String namespace) {
-        Instant currentHold = namespaceToWatermarkHold.get(namespace);
-        if (currentHold != null) {
-            watermarkHolds.remove(currentHold);
-            namespaceToWatermarkHold.remove(namespace);
-        }
-    }
-
-    @Override
-    public void addWatermarkHold(String namespace, Instant watermarkHold) {
-        Instant currentHold = namespaceToWatermarkHold.get(namespace);
-        if (currentHold == null) {
-            namespaceToWatermarkHold.put(namespace, watermarkHold);
-            watermarkHolds.add(watermarkHold);
-        } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
-            namespaceToWatermarkHold.put(namespace, watermarkHold);
-            watermarkHolds.add(watermarkHold);
-            watermarkHolds.remove(currentHold);
-        }
-    }
-
-    @Override
-    public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
-        checkArgument(
-                TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
-                String.format("Does not support domain: %s.", timerData.getDomain()));
-        Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
-        if (keyedExecutors == null) {
-            keyedExecutors = Sets.newHashSet();
-            eventTimeTimersQueue.add(timerData);
-        }
-        keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
-        timerDataToKeyedExecutors.put(timerData, keyedExecutors);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxExecutorsBolt.java
deleted file mode 100644
index 5049d72..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxExecutorsBolt.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
-    private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
-
-    private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
-    private static final String TIMER_SERVICE_KET = "timer_service_key";
-
-    private ExecutorsBolt executorsBolt;
-    private IKvStoreManager kvStoreManager;
-    private IKvStore<String, TimerService> timerServiceStore;
-
-    public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
-        this.executorsBolt = executorsBolt;
-        this.executorsBolt.setStatefulBolt(true);
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        try {
-            executorsBolt.prepare(stormConf, context, collector);
-            kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
-            timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
-        } catch (IOException e) {
-            LOG.error("Failed to prepare stateful bolt", e);
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        executorsBolt.execute(input);
-    }
-
-    @Override
-    public void cleanup() {
-        executorsBolt.cleanup();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        executorsBolt.declareOutputFields(declarer);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return executorsBolt.getComponentConfiguration();
-    }
-
-    @Override
-    public void initState(Object userState) {
-        LOG.info("Begin to init from state: {}", userState);
-        restore(userState);
-    }
-
-    @Override
-    public Object finishBatch(long batchId) {
-        try {
-            timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
-        } catch (IOException e) {
-            LOG.error("Failed to store current timer service status", e);
-            throw new RuntimeException(e.getMessage());
-        }
-        kvStoreManager.checkpoint(batchId);
-        return null;
-    }
-
-    @Override
-    public Object commit(long batchId, Object state) {
-        return kvStoreManager.backup(batchId);
-    }
-
-    @Override
-    public void rollBack(Object userState) {
-        LOG.info("Begin to rollback from state: {}", userState);
-        restore(userState);
-    }
-
-    @Override
-    public void ackCommit(long batchId, long timeStamp) {
-        kvStoreManager.remove(batchId);
-    }
-
-    private void restore(Object userState) {
-        try {
-            // restore all states
-            kvStoreManager.restore(userState);
-
-            // init timer service
-            timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
-            TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
-            if (timerService == null) {
-                timerService = executorsBolt.initTimerService();
-            }
-            executorsBolt.setTimerService(timerService);
-        } catch (IOException e) {
-            LOG.error("Failed to restore state", e);
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxUnboundedSourceSpout.java
deleted file mode 100644
index 65ce814..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/TxUnboundedSourceSpout.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
-    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
-
-    private static final String SOURCE_STORE_ID = "SourceCheckpoint";
-    private static final String CHECKPOINT_MARK = "CheckpointMark";
-
-    private UnboundedSourceSpout sourceSpout;
-    private UnboundedSource.UnboundedReader reader;
-    private IKvStoreManager kvStoreManager;
-    private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
-
-    public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
-        this.sourceSpout = sourceSpout;
-    }
-
-    private void restore(Object userState) {
-        try {
-            kvStoreManager.restore(userState);
-            sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
-            UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
-            sourceSpout.createSourceReader(checkpointMark);
-            reader = sourceSpout.getUnboundedSourceReader();
-        } catch (IOException e) {
-            LOG.error("Failed to init state", e);
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-    @Override
-    public void initState(Object userState) {
-        restore(userState);
-    }
-
-    @Override
-    public Object finishBatch(long checkpointId) {
-        try {
-            // Store check point mark from unbounded source reader
-            UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
-            sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
-
-            // checkpoint all kv stores in current manager
-            kvStoreManager.checkpoint(checkpointId);
-        } catch (IOException e) {
-            LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
-            throw new RuntimeException(e.getMessage());
-        }
-        return null;
-    }
-
-    @Override
-    public Object commit(long batchId, Object state) {
-        // backup kv stores to remote state backend
-        return kvStoreManager.backup(batchId);
-    }
-
-    @Override
-    public void rollBack(Object userState) {
-        restore(userState);
-    }
-
-    @Override
-    public void ackCommit(long batchId, long timeStamp) {
-        // remove obsolete state in bolt local and remote state backend
-        kvStoreManager.remove(batchId);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        sourceSpout.declareOutputFields(declarer);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return sourceSpout.getComponentConfiguration();
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        try {
-            sourceSpout.open(conf, context, collector);
-            String storeName = String.format("task-%s", context.getThisTaskId());
-            String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
-            kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, storePath, true);
-
-            reader = sourceSpout.getUnboundedSourceReader();
-        } catch (IOException e) {
-            LOG.error("Failed to open transactional unbounded source spout", e);
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-    @Override
-    public void close() {
-        sourceSpout.close();
-    }
-
-    @Override
-    public void activate() {
-        sourceSpout.activate();
-    }
-
-    @Override
-    public void deactivate() {
-        sourceSpout.deactivate();
-    }
-
-    @Override
-    public void nextTuple() {
-        sourceSpout.nextTuple();
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        throw new UnsupportedOperationException();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/UnboundedSourceSpout.java
deleted file mode 100644
index 7a4b269..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/UnboundedSourceSpout.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import com.alibaba.jstorm.beam.translation.util.CommonInstance;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import com.alibaba.jstorm.window.Watermark;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.util.SerializedPipelineOptions;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Spout implementation that wraps a Beam UnboundedSource
- *
- * TODO: add wrapper to support metrics in UnboundedSource.
- */
-public class UnboundedSourceSpout extends AdaptorBasicSpout {
-    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
-
-    private final String description;
-    private final UnboundedSource source;
-    private final SerializedPipelineOptions serializedOptions;
-    private final TupleTag<?> outputTag;
-
-    private transient StormPipelineOptions pipelineOptions;
-    private transient UnboundedSource.UnboundedReader reader;
-    private transient SpoutOutputCollector collector;
-
-    private volatile boolean hasNextRecord;
-    private AtomicBoolean activated = new AtomicBoolean();
-
-    private KryoSerializer<WindowedValue> serializer;
-
-    private long lastWaterMark = 0l;
-
-    public UnboundedSourceSpout(
-            String description,
-            UnboundedSource source,
-            StormPipelineOptions options,
-            TupleTag<?> outputTag) {
-        this.description = checkNotNull(description, "description");
-        this.source = checkNotNull(source, "source");
-        this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
-        this.outputTag = checkNotNull(outputTag, "outputTag");
-    }
-
-    @Override
-    public synchronized void close() {
-        try {
-            activated.set(false);
-            this.reader.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public void activate() {
-        activated.set(true);
-        
-    }
-
-    @Override
-    public void deactivate() {
-        activated.set(false);
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        try {
-            this.collector = collector;
-            this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
-
-            createSourceReader(null);
-
-            this.serializer = new KryoSerializer<>(conf);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to create unbounded reader.", e);
-        }
-    }
-
-    public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
-        if (reader != null) {
-            reader.close();
-        }
-        reader = this.source.createReader(this.pipelineOptions, checkpointMark);
-        hasNextRecord = this.reader.start();
-    }
-
-    @Override
-    public synchronized void nextTuple() {
-        if (!activated.get()) {
-            return;
-        }
-        try {
-            if (!hasNextRecord) {
-                hasNextRecord = reader.advance();
-            }
-
-            while (hasNextRecord && activated.get()) {
-                Object value = reader.getCurrent();
-                Instant timestamp = reader.getCurrentTimestamp();
-
-                WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-                LOG.debug("Source output: " + wv.getValue());
-                if (keyedEmit(outputTag.getId())) {
-                    KV kv = (KV) wv.getValue();
-                    // Convert WindowedValue<KV> to <K, WindowedValue<V>>
-                    byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
-                    collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
-                } else {
-                    byte[] immutableValue = serializer.serialize(wv);
-                    collector.emit(outputTag.getId(), new Values(immutableValue));
-                }
-
-                // move to next record
-                hasNextRecord = reader.advance();
-            }
-
-            Instant waterMark = reader.getWatermark();
-            if (waterMark != null && lastWaterMark <  waterMark.getMillis()) {
-                lastWaterMark = waterMark.getMillis();
-                collector.flush();
-                collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
-                LOG.debug("Source output: WM-{}", waterMark.toDateTime());
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Exception reading values from source.", e);
-        }
-    }
-
-    public UnboundedSource getUnboundedSource() {
-        return source;
-    }
-
-    public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
-        return reader;
-    }
-
-    @Override
-    public String toString() {
-        return description;
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ViewExecutor.java
deleted file mode 100644
index a297b50..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ViewExecutor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * JStorm {@link Executor} for {@link View}.
- */
-public class ViewExecutor implements Executor {
-
-    private final String description;
-    private final TupleTag outputTag;
-    private ExecutorsBolt executorsBolt;
-
-    public ViewExecutor(String description, TupleTag outputTag) {
-        this.description = description;
-        this.outputTag = outputTag;
-    }
-
-    @Override
-    public void init(ExecutorContext context) {
-        this.executorsBolt = context.getExecutorsBolt();
-    }
-
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        executorsBolt.processExecutorElem(outputTag, elem);
-    }
-
-    @Override
-    public void cleanup() {}
-
-    @Override
-    public String toString() {
-        return description;
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/WindowAssignExecutor.java
deleted file mode 100644
index b26c3e5..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/WindowAssignExecutor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import com.google.common.collect.Iterables;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
-    private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
-
-    private final String description;
-    private WindowFn<T, W> windowFn;
-    private ExecutorsBolt executorsBolt;
-    private TupleTag outputTag;
-
-    class JStormAssignContext<InputT, W extends BoundedWindow>
-            extends WindowFn<InputT, W>.AssignContext {
-        private final WindowedValue<InputT> value;
-
-        JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
-            fn.super();
-            checkArgument(
-                    Iterables.size(value.getWindows()) == 1,
-                    String.format(
-                            "%s passed to window assignment must be in a single window, but it was in %s: %s",
-                            WindowedValue.class.getSimpleName(),
-                            Iterables.size(value.getWindows()),
-                            value.getWindows()));
-            this.value = value;
-        }
-
-        @Override
-        public InputT element() {
-            return value.getValue();
-        }
-
-        @Override
-        public Instant timestamp() {
-            return value.getTimestamp();
-        }
-
-        @Override
-        public BoundedWindow window() {
-            return Iterables.getOnlyElement(value.getWindows());
-        }
-    }
-
-    public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
-        this.description = description;
-        this.windowFn = windowFn;
-        this.outputTag = outputTag;
-    }
-
-    @Override
-    public void init(ExecutorContext context) {
-        this.executorsBolt = context.getExecutorsBolt();
-    }
-
-    @Override
-    public void process(TupleTag tag, WindowedValue elem) {
-        Collection<W> windows = null;
-        try {
-            windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
-            for (W window: windows) {
-                executorsBolt.processExecutorElem(
-                        outputTag,
-                        WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to assign windows for elem=" + elem, e);
-        }
-    }
-
-    @Override
-    public void cleanup() {}
-
-
-    @Override
-    public String toString() {
-        return description;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormBagState.java
deleted file mode 100644
index 7f77a78..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormBagState.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * JStorm implementation of {@link BagState}.
- */
-class JStormBagState<K, T> implements BagState<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
-
-    @Nullable
-    private final K key;
-    private final StateNamespace namespace;
-    private final IKvStore<ComposedKey, T> kvState;
-    private final IKvStore<ComposedKey, Object> stateInfoKvState;
-    private int elemIndex;
-
-    public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
-                           IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
-        this.key = key;
-        this.namespace = checkNotNull(namespace, "namespace");
-        this.kvState = checkNotNull(kvState, "kvState");
-        this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
-
-        Integer index = (Integer) stateInfoKvState.get(getComposedKey());
-        this.elemIndex =  index != null ? ++index : 0;
-    }
-
-    @Override
-    public void add(T input) {
-        try {
-            kvState.put(getComposedKey(elemIndex), input);
-            stateInfoKvState.put(getComposedKey(), elemIndex);
-            elemIndex++;
-        } catch (IOException e) {
-            throw new RuntimeException(e.getCause());
-        }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-        return new ReadableState<Boolean>() {
-            @Override
-            public Boolean read() {
-                return elemIndex <= 0;
-            }
-
-            @Override
-            public ReadableState<Boolean> readLater() {
-                // TODO: support prefetch.
-                return this;
-            }
-        };
-    }
-
-    @Override
-    public Iterable<T> read() {
-        return new BagStateIterable(elemIndex);
-    }
-
-    @Override
-    public BagState readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
-
-    @Override
-    public void clear() {
-        try {
-            for (int i = 0; i < elemIndex; i++) {
-                kvState.remove(getComposedKey(i));
-            }
-            stateInfoKvState.remove(getComposedKey());
-            elemIndex = 0;
-        } catch (IOException e) {
-            throw new RuntimeException(e.getCause());
-        }
-    }
-
-    private ComposedKey getComposedKey() {
-        return ComposedKey.of(key, namespace);
-    }
-
-    private ComposedKey getComposedKey(int elemIndex) {
-        return ComposedKey.of(key, namespace, elemIndex);
-    }
-
-    private class BagStateIterable implements KvStoreIterable<T> {
-
-        private class BagStateIterator implements Iterator<T> {
-            private final int size;
-            private int cursor = 0;
-
-            BagStateIterator() {
-                Integer s = null;
-                try {
-                    s = (Integer) stateInfoKvState.get(getComposedKey());
-                } catch (IOException e) {
-                    LOG.error("Failed to get elemIndex for key={}", getComposedKey());
-                }
-                this.size = s != null ? ++s : 0;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return cursor < size;
-            }
-
-            @Override
-            public T next() {
-                if (cursor >= size) {
-                    throw new NoSuchElementException();
-                }
-
-                T value = null;
-                try {
-                    value = kvState.get(getComposedKey(cursor));
-                } catch (IOException e) {
-                    LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
-                }
-                cursor++;
-                return value;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        }
-
-        private final int size;
-
-        BagStateIterable(int size) {
-            this.size = size;
-        }
-
-        @Override
-        public Iterator<T> iterator() {
-            return new BagStateIterator();
-        }
-
-        @Override
-        public String toString() {
-            return String.format("BagStateIterable: composedKey=%s", getComposedKey());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormCombiningState.java
deleted file mode 100644
index 496dc5e..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormCombiningState.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.transforms.Combine;
-
-/**
- * JStorm implementation of {@link CombiningState}.
- */
-public class JStormCombiningState<InputT, AccumT, OutputT>
-        implements CombiningState<InputT, AccumT, OutputT> {
-
-    @Nullable
-    private final BagState<AccumT> accumBagState;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-    JStormCombiningState(
-            BagState<AccumT> accumBagState,
-            Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-        this.accumBagState = checkNotNull(accumBagState, "accumBagState");
-        this.combineFn = checkNotNull(combineFn, "combineFn");
-    }
-
-    @Override
-    public AccumT getAccum() {
-        // TODO: replacing the accumBagState with the merged accum.
-        return combineFn.mergeAccumulators(accumBagState.read());
-    }
-
-    @Override
-    public void addAccum(AccumT accumT) {
-        accumBagState.add(accumT);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
-        return combineFn.mergeAccumulators(iterable);
-    }
-
-    @Override
-    public void add(InputT input) {
-        accumBagState.add(
-                combineFn.addInput(combineFn.createAccumulator(), input));
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-        return accumBagState.isEmpty();
-    }
-
-    @Override
-    public OutputT read() {
-        return combineFn.extractOutput(
-            combineFn.mergeAccumulators(accumBagState.read()));
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
-
-    @Override
-    public void clear() {
-        accumBagState.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormMapState.java
deleted file mode 100644
index 5d5bbbf..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormMapState.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import com.alibaba.jstorm.cache.IKvStore;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-public class JStormMapState<K, V> implements MapState<K, V> {
-    private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
-
-    private final K key;
-    private final StateNamespace namespace;
-    private IKvStore<K, V> kvStore;
-
-    public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
-        this.key = key;
-        this.namespace = namespace;
-        this.kvStore = kvStore;
-    }
-
-    @Override
-    public void put(K var1, V var2) {
-        try {
-            kvStore.put(var1, var2);
-        } catch (IOException e) {
-            reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
-        }
-    }
-
-    @Override
-    public ReadableState<V> putIfAbsent(K var1, V var2) {
-        ReadableState<V> ret = null;
-        try {
-            V value = kvStore.get(var1);
-            if (value == null) {
-                kvStore.put(var1, var2);
-                ret = new MapReadableState<>(null);
-            } else {
-                ret = new MapReadableState<>(value);
-            }
-        } catch (IOException e) {
-            reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
-        }
-        return ret;
-    }
-
-    @Override
-    public void remove(K var1) {
-        try {
-            kvStore.remove(var1);
-        } catch (IOException e) {
-            reportError(String.format("Failed to remove key=%s", var1), e);
-        }
-    }
-
-    @Override
-    public ReadableState<V> get(K var1) {
-        ReadableState<V> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState(kvStore.get(var1));
-        } catch (IOException e) {
-            reportError(String.format("Failed to get value for key=%s", var1), e);
-        }
-        return ret;
-    }
-
-    @Override
-    public ReadableState<Iterable<K>> keys() {
-        ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState<>(kvStore.keys());
-        } catch (IOException e) {
-            reportError(String.format("Failed to get keys"), e);
-        }
-        return ret;
-    }
-
-    @Override
-    public ReadableState<Iterable<V>> values() {
-        ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState<>(kvStore.values());
-        } catch (IOException e) {
-            reportError(String.format("Failed to get values"), e);
-        }
-        return ret;
-    }
-
-    @Override
-    public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
-        ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
-        try {
-            ret = new MapReadableState<>(kvStore.entries());
-        } catch (IOException e) {
-            reportError(String.format("Failed to get values"), e);
-        }
-        return ret;
-    }
-
-    @Override
-    public void clear() {
-        try {
-            Iterable<K> keys = kvStore.keys();
-            kvStore.removeBatch(keys);
-        } catch (IOException e) {
-            reportError(String.format("Failed to clear map state"), e);
-        }
-    }
-
-    private void reportError(String errorInfo, IOException e) {
-        LOG.error(errorInfo, e);
-        throw new RuntimeException(errorInfo);
-    }
-
-    private class MapReadableState<T> implements ReadableState<T> {
-        private T value;
-
-        public MapReadableState(T value) {
-            this.value = value;
-        }
-
-        @Override
-        public T read() {
-            return value;
-        }
-
-        @Override
-        public ReadableState<T> readLater() {
-            return this;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternals.java
deleted file mode 100644
index 4a202cf..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternals.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import com.alibaba.jstorm.beam.translation.runtime.TimerService;
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.CombiningState;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.StateBinder;
-import org.apache.beam.sdk.state.StateContext;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * JStorm implementation of {@link StateInternals}.
- */
-public class JStormStateInternals<K> implements StateInternals {
-
-    private static final String STATE_INFO = "state-info:";
-
-    @Nullable
-    private final K key;
-    private final IKvStoreManager kvStoreManager;
-    private final TimerService timerService;
-    private final int executorId;
-
-    public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
-                                TimerService timerService, int executorId) {
-        this.key = key;
-        this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
-        this.timerService = checkNotNull(timerService, "timerService");
-        this.executorId = executorId;
-    }
-
-    @Nullable
-    @Override
-    public K getKey() {
-        return key;
-    }
-
-    @Override
-    public <T extends State> T state(
-        StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
-        // throw new UnsupportedOperationException("StateContext is not supported.");
-        /**
-         * TODOļ¼š
-         * Same implementation as state() which is without StateContext. This might be updated after
-         * we figure out if we really need StateContext for JStorm state internals.
-         */
-        return state(namespace, address);
-    }
-
-    @Override
-    public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
-        return address.getSpec().bind(address.getId(), new StateBinder() {
-            @Override
-            public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
-                try {
-                    return new JStormValueState<>(
-                            getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
-                } catch (IOException e) {
-                    throw new RuntimeException();
-                }
-            }
-
-            @Override
-            public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
-                try {
-                    return new JStormBagState(
-                            getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
-                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-                } catch (IOException e) {
-                    throw new RuntimeException();
-                }
-            }
-
-            @Override
-            public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-                String id,
-                StateSpec<MapState<KeyT, ValueT>> spec,
-                Coder<KeyT> mapKeyCoder,
-                Coder<ValueT> mapValueCoder) {
-                try {
-                    return new JStormMapState<>(getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            @Override
-            public <InputT, AccumT, OutputT> CombiningState bindCombining(
-                    String id,
-                    StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
-                    Coder<AccumT> accumCoder,
-                    Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-                try {
-                    BagState<AccumT> accumBagState = new JStormBagState(
-                            getKey(), namespace,
-                            kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
-                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-                    return new JStormCombiningState<>(accumBagState, combineFn);
-                } catch (IOException e) {
-                    throw new RuntimeException();
-                }
-            }
-
-
-            @Override
-            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
-            bindCombiningWithContext(
-                String id,
-                StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
-                CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public WatermarkHoldState bindWatermark(
-                String id,
-                StateSpec<WatermarkHoldState> spec,
-                final TimestampCombiner timestampCombiner) {
-                try {
-                    BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
-                            getKey(), namespace,
-                            kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
-                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
-
-                    Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
-                            new BinaryCombineFn<Instant>() {
-                                @Override
-                                public Instant apply(Instant left, Instant right) {
-                                  return timestampCombiner.combine(left, right);
-                                }};
-                    return new JStormWatermarkHoldState(
-                            namespace,
-                            new JStormCombiningState<>(
-                                    accumBagState,
-                                    outputTimeCombineFn),
-                            timestampCombiner,
-                            timerService);
-                } catch (IOException e) {
-                    throw new RuntimeException();
-                }
-            }
-        });
-    }
-
-    private String getStoreId(String stateId) {
-        return String.format("%s-%s", stateId, executorId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormValueState.java
deleted file mode 100644
index 3d5c68b..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormValueState.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import com.alibaba.jstorm.cache.ComposedKey;
-import com.alibaba.jstorm.cache.IKvStore;
-
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.ValueState;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-
-/**
- * JStorm implementation of {@link ValueState}.
- */
-public class JStormValueState<K, T> implements ValueState<T> {
-
-    @Nullable
-    private final K key;
-    private final StateNamespace namespace;
-    private final IKvStore<ComposedKey, T> kvState;
-
-    JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
-        this.key = key;
-        this.namespace = namespace;
-        this.kvState = kvState;
-    }
-
-    @Override
-    public void write(T t) {
-        try {
-            kvState.put(getComposedKey(), t);
-        } catch (IOException e) {
-            throw new RuntimeException(String.format(
-                    "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
-        }
-    }
-
-    @Override
-    public T read() {
-        try {
-            return kvState.get(getComposedKey());
-        } catch (IOException e) {
-            throw new RuntimeException(String.format(
-                    "Failed to read key: %s, namespace: %s.", key, namespace));
-        }
-    }
-
-    @Override
-    public ValueState<T> readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
-
-    @Override
-    public void clear() {
-        try {
-            kvState.remove(getComposedKey());
-        } catch (IOException e) {
-            throw new RuntimeException(String.format(
-                    "Failed to clear key: %s, namespace: %s.", key, namespace));
-        }
-    }
-
-    private ComposedKey getComposedKey() {
-        return ComposedKey.of(key, namespace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormWatermarkHoldState.java
deleted file mode 100644
index 7888d85..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormWatermarkHoldState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.alibaba.jstorm.beam.translation.runtime.TimerService;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.GroupingState;
-import org.apache.beam.sdk.state.ReadableState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-
-/**
- * JStorm implementation of {@link WatermarkHoldState}.
- */
-public class JStormWatermarkHoldState implements WatermarkHoldState {
-
-    private final StateNamespace namespace;
-    private final GroupingState<Instant, Instant> watermarkHoldsState;
-    private final TimestampCombiner timestampCombiner;
-    private final TimerService timerService;
-
-    JStormWatermarkHoldState(
-            StateNamespace namespace,
-            GroupingState<Instant, Instant> watermarkHoldsState,
-            TimestampCombiner timestampCombiner,
-            TimerService timerService) {
-        this.namespace = checkNotNull(namespace, "namespace");
-        this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
-        this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
-        this.timerService = checkNotNull(timerService, "timerService");
-    }
-
-    @Override
-    public TimestampCombiner getTimestampCombiner() {
-        return timestampCombiner;
-    }
-
-    @Override
-    public void add(Instant instant) {
-        timerService.addWatermarkHold(namespace.stringKey(), instant);
-        watermarkHoldsState.add(instant);
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-        return watermarkHoldsState.isEmpty();
-    }
-
-    @Override
-    public Instant read() {
-        return watermarkHoldsState.read();
-    }
-
-    @Override
-    public WatermarkHoldState readLater() {
-        // TODO: support prefetch.
-        return this;
-    }
-
-    @Override
-    public void clear() {
-        timerService.clearWatermarkHold(namespace.stringKey());
-        watermarkHoldsState.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/timer/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/timer/JStormTimerInternals.java
deleted file mode 100644
index 9aba566..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/timer/JStormTimerInternals.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.timer;
-
-import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.TimerService;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * JStorm implementation of {@link TimerInternals}.
- */
-public class JStormTimerInternals<K> implements TimerInternals {
-
-    private final K key;
-    private final DoFnExecutor<?, ?> doFnExecutor;
-    private final TimerService timerService;
-
-
-    public JStormTimerInternals(@Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
-        this.key = key;
-        this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
-        this.timerService = checkNotNull(timerService, "timerService");
-    }
-
-    @Override
-    public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
-        setTimer(TimerData.of(timerId, namespace, target, timeDomain));
-    }
-
-    @Override
-    @Deprecated
-    public void setTimer(TimerData timerData) {
-        timerService.setTimer(key, timerData, doFnExecutor);
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-        throw new UnsupportedOperationException(
-                "Canceling of a timer is not yet supported.");
-    }
-
-    @Override
-    @Deprecated
-    public void deleteTimer(StateNamespace namespace, String timerId) {
-        throw new UnsupportedOperationException(
-                "Canceling of a timer is not yet supported.");
-    }
-
-    @Override
-    @Deprecated
-    public void deleteTimer(TimerData timerData) {
-        throw new UnsupportedOperationException(
-                "Canceling of a timer is not yet supported.");
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-        return Instant.now();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-        return null;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-        return new Instant(timerService.currentInputWatermark());
-    }
-
-    @Override
-    @Nullable
-    public Instant currentOutputWatermarkTime() {
-        return new Instant(timerService.currentOutputWatermark());
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/BoundedSourceTranslator.java
deleted file mode 100644
index fc494ac..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/BoundedSourceTranslator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.UnboundedSourceSpout;
-
-import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Translates a {@link Read.Bounded} into a Storm spout.
- *
- * @param <T>
- */
-public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
-
-    @Override
-    public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
-        TupleTag<?> outputTag = userGraphContext.getOutputTag();
-        PValue outputValue = userGraphContext.getOutput();
-        UnboundedSourceSpout spout = new UnboundedSourceSpout(
-                description,
-                new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
-                userGraphContext.getOptions(), outputTag);
-
-        context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(outputTag, outputValue));
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombineGloballyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombineGloballyTranslator.java
deleted file mode 100644
index 03bcaff..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombineGloballyTranslator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.transforms.Combine;
-
-public class CombineGloballyTranslator<InputT, OutputT> extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombinePerKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombinePerKeyTranslator.java
deleted file mode 100644
index d37bd01..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/CombinePerKeyTranslator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.transforms.Combine;
-
-public class CombinePerKeyTranslator<K, InputT, OutputT> extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/FlattenTranslator.java
deleted file mode 100644
index 6eba601..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/FlattenTranslator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.google.common.collect.Maps;
-import org.apache.beam.sdk.transforms.Flatten;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.FlattenExecutor;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Map;
-
-public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
-
-    @Override
-    public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-
-        // Since a new tag is created in PCollectionList, retrieve the real tag here.
-        Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
-        for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
-            PCollection<V> pc = (PCollection<V>) entry.getValue();
-            inputs.putAll(pc.expand());
-        }
-        System.out.println("Real inputs: " + inputs);
-        System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
-        String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
-        FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
-        context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTranslator.java
deleted file mode 100644
index c8ff467..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTranslator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.translation.runtime.GroupByWindowExecutor;
-import com.google.common.collect.Lists;
-import org.apache.beam.sdk.transforms.GroupByKey;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Collections;
-import java.util.List;
-
-public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
-    // information of transform
-    protected PCollection<KV<K, V>> input;
-    protected PCollection<KV<K, Iterable<V>>> output;
-    protected List<TupleTag<?>> inputTags;
-    protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
-    protected List<TupleTag<?>> sideOutputTags;
-    protected List<PCollectionView<?>> sideInputs;
-    protected WindowingStrategy<?, ?> windowingStrategy;
-
-    @Override
-    public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
-        input = (PCollection<KV<K, V>>) userGraphContext.getInput();
-        output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
-
-        inputTags = userGraphContext.getInputTags();
-        mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
-        sideOutputTags = Lists.newArrayList();
-
-        sideInputs = Collections.<PCollectionView<?>>emptyList();
-        windowingStrategy = input.getWindowingStrategy();
-
-        GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
-                userGraphContext.getStepName(),
-                description,
-                context,
-                context.getUserGraphContext().getOptions(),
-                windowingStrategy,
-                mainOutputTag,
-                sideOutputTags);
-        context.addTransformExecutor(groupByWindowExecutor);
-    }
-}