You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:08 UTC

[08/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java
deleted file mode 100644
index c3e9805..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslationContext.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation;
-
-import avro.shaded.com.google.common.collect.Lists;
-import com.alibaba.jstorm.beam.translation.translator.Stream;
-import com.alibaba.jstorm.beam.util.RunnerUtils;
-import com.google.common.base.Strings;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.runtime.AdaptorBasicSpout;
-import com.alibaba.jstorm.beam.translation.runtime.Executor;
-import com.alibaba.jstorm.beam.translation.runtime.ExecutorsBolt;
-import com.alibaba.jstorm.beam.translation.util.CommonInstance;
-
-import java.util.*;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Maintains the state necessary during Pipeline translation to build a Storm topology.
- */
-public class TranslationContext {
-    private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
-
-    private final UserGraphContext userGraphContext;
-    private final ExecutionGraphContext executionGraphContext;
-
-    public TranslationContext(StormPipelineOptions options) {
-        this.userGraphContext = new UserGraphContext(options);
-        this.executionGraphContext = new ExecutionGraphContext();
-    }
-
-    public ExecutionGraphContext getExecutionGraphContext() {
-        return executionGraphContext;
-    }
-
-    public UserGraphContext getUserGraphContext() {
-        return userGraphContext;
-    }
-
-    private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
-        Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
-        if (!producer.getComponentId().equals(destComponentName)) {
-            Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
-            executionGraphContext.registerStreamConsumer(consumer, producer);
-
-            ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
-            if (executorsBolt != null) {
-                executorsBolt.addExternalOutputTag(input.getTag());
-            }
-        }
-    }
-
-    private String getUpstreamExecutorsBolt() {
-        for (PValue value : userGraphContext.getInputs().values()) {
-            String componentId = executionGraphContext.getProducerComponentId(value);
-            if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
-                return componentId;
-            }
-        }
-        // When upstream component is spout, "null" will be return.
-        return null;
-    }
-
-    /**
-     * check if the current transform is applied to source collection.
-     * @return
-     */
-    private boolean connectedToSource() {
-        for (PValue value : userGraphContext.getInputs().values()) {
-            if (executionGraphContext.producedBySpout(value)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * @param upstreamExecutorsBolt
-     * @return true if there is multiple input streams, or upstream executor output the same stream
-     *          to different executors
-     */
-    private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
-        if (inputs.size() > 1) {
-            return true;
-        } else {
-            final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
-            if (!intersection.isEmpty()) {
-                // there is already a different executor consume the same input
-                return true;
-            } else {
-                return false;
-            }
-        }
-    }
-
-    public void addTransformExecutor(Executor executor) {
-        addTransformExecutor(executor, Collections.EMPTY_LIST);
-    }
-
-    public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
-        addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
-    }
-
-    public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
-        addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
-    }
-
-    public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) {
-        String name = null;
-
-        ExecutorsBolt bolt = null;
-
-        boolean isGBK = false;
-        /**
-         * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
-         * For following cases, a new bolt is created for the specified executor, otherwise the executor
-         * will be added into the bolt contains corresponding upstream executor.
-         * a) it is a GroupByKey executor
-         * b) it is connected to source directly
-         * c) None existing upstream bolt was found
-         * d) For the purpose of performance to reduce the side effects between multiple streams which
-         *    is output to same executor, a new bolt will be created.
-         */
-        if (RunnerUtils.isGroupByKeyExecutor(executor)) {
-            bolt = new ExecutorsBolt();
-            name = executionGraphContext.registerBolt(bolt);
-            isGBK = true;
-        } else if (connectedToSource()) {
-            bolt = new ExecutorsBolt();
-            name = executionGraphContext.registerBolt(bolt);
-        } else {
-            name = getUpstreamExecutorsBolt();
-            if (name == null) {
-                bolt = new ExecutorsBolt();
-                name = executionGraphContext.registerBolt(bolt);
-            } else {
-                bolt = executionGraphContext.getBolt(name);
-                if (isMultipleInputOrOutput(bolt, inputs)) {
-                    bolt = new ExecutorsBolt();
-                    name = executionGraphContext.registerBolt(bolt);
-                }
-            }
-        }
-
-        // update the output tags of current transform into ExecutorsBolt
-        for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
-            TupleTag tag = entry.getKey();
-            PValue value = entry.getValue();
-
-            // use tag of PValueBase
-            if (value instanceof PValueBase) {
-                tag = ((PValueBase) value).expand().keySet().iterator().next();
-            }
-            executionGraphContext.registerStreamProducer(
-                    TaggedPValue.of(tag, value),
-                    Stream.Producer.of(name, tag.getId(), value.getName()));
-            //bolt.addOutputTags(tag);
-        }
-
-        // add the transform executor into the chain of ExecutorsBolt
-        for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
-            TupleTag tag = entry.getKey();
-            PValue value = entry.getValue();
-            bolt.addExecutor(tag, executor);
-
-            // filter all connections inside bolt
-            //if (!bolt.getOutputTags().contains(tag)) {
-                Stream.Grouping grouping;
-                if (isGBK) {
-                    grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
-                } else {
-                    grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
-                }
-                addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
-            //}
-        }
-
-        for (PValue sideInput : sideInputs) {
-            TupleTag tag = userGraphContext.findTupleTag(sideInput);
-            bolt.addExecutor(tag, executor);
-            checkState(!bolt.getOutputTags().contains(tag));
-            addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
-        }
-
-        bolt.registerExecutor(executor);
-
-        // set parallelismNumber
-        String pTransformfullName = userGraphContext.currentTransform.getFullName();
-        String compositeName = pTransformfullName.split("/")[0];
-        Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
-        if (parallelismNumMap.containsKey(compositeName)) {
-            int configNum = (Integer) parallelismNumMap.get(compositeName);
-            int currNum = bolt.getParallelismNum();
-            bolt.setParallelismNum(Math.max(configNum, currNum));
-        }
-    }
-
-    // TODO: add getSideInputs() and getSideOutputs().
-    public static class UserGraphContext {
-        private final StormPipelineOptions options;
-        private final Map<PValue, TupleTag> pValueToTupleTag;
-        private AppliedPTransform<?, ?, ?> currentTransform = null;
-
-        private boolean isWindowed = false;
-
-        public UserGraphContext(StormPipelineOptions options) {
-            this.options = checkNotNull(options, "options");
-            this.pValueToTupleTag = Maps.newHashMap();
-        }
-
-        public StormPipelineOptions getOptions() {
-            return this.options;
-        }
-
-        public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
-            this.currentTransform = transform;
-        }
-
-        public String getStepName() {
-            return currentTransform.getFullName();
-        }
-
-        public <T extends PValue> T getInput() {
-            return (T) currentTransform.getInputs().values().iterator().next();
-        }
-
-        public Map<TupleTag<?>, PValue> getInputs() {
-            return currentTransform.getInputs();
-        }
-
-        public TupleTag<?> getInputTag() {
-            return currentTransform.getInputs().keySet().iterator().next();
-        }
-
-        public List<TupleTag<?>> getInputTags() {
-            return Lists.newArrayList(currentTransform.getInputs().keySet());
-        }
-
-        public <T extends PValue> T getOutput() {
-            return (T) currentTransform.getOutputs().values().iterator().next();
-        }
-
-        public Map<TupleTag<?>, PValue> getOutputs() {
-            return currentTransform.getOutputs();
-        }
-
-        public TupleTag<?> getOutputTag() {
-            return currentTransform.getOutputs().keySet().iterator().next();
-        }
-
-        public List<TupleTag<?>> getOutputTags() {
-            return Lists.newArrayList(currentTransform.getOutputs().keySet());
-        }
-
-        public void recordOutputTaggedPValue() {
-            for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
-                pValueToTupleTag.put(entry.getValue(), entry.getKey());
-            }
-        }
-
-        public <T> TupleTag<T> findTupleTag(PValue pValue) {
-            return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
-        }
-
-        public void setWindowed() {
-            this.isWindowed = true;
-        }
-
-        public boolean isWindowed() {
-            return this.isWindowed;
-        }
-
-        @Override
-        public String toString() {
-            return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
-                    .transform(new Function<Map.Entry<PValue,TupleTag>, String>() {
-                        @Override
-                        public String apply(Map.Entry<PValue, TupleTag> entry) {
-                            return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
-                        }}));
-        }
-    }
-
-    public static class ExecutionGraphContext {
-
-        private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
-        private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
-
-        // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
-        private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
-        private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
-
-        private final List<Stream> streams = new ArrayList<>();
-
-        private int id = 1;
-
-        public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
-            checkNotNull(spout, "spout");
-            checkNotNull(output, "output");
-            String name = "spout" + genId();
-            this.spoutMap.put(name, spout);
-            registerStreamProducer(
-                    output,
-                    Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
-        }
-
-        public AdaptorBasicSpout getSpout(String id) {
-            if (Strings.isNullOrEmpty(id)) {
-                return null;
-            }
-            return this.spoutMap.get(id);
-        }
-
-        public Map<String, AdaptorBasicSpout> getSpouts() {
-            return this.spoutMap;
-        }
-
-        public String registerBolt(ExecutorsBolt bolt) {
-            checkNotNull(bolt, "bolt");
-            String name = "bolt" + genId();
-            this.boltMap.put(name, bolt);
-            return name;
-        }
-
-        public ExecutorsBolt getBolt(String id) {
-            if (Strings.isNullOrEmpty(id)) {
-                return null;
-            }
-            return this.boltMap.get(id);
-        }
-
-        public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
-            checkNotNull(taggedPValue, "taggedPValue");
-            checkNotNull(producer, "producer");
-            pValueToProducer.put(taggedPValue.getValue(), producer);
-            producerToTaggedPValue.put(producer, taggedPValue);
-        }
-
-        public Stream.Producer getProducer(PValue pValue) {
-            return pValueToProducer.get(checkNotNull(pValue, "pValue"));
-        }
-
-        public String getProducerComponentId(PValue pValue) {
-            Stream.Producer producer = getProducer(pValue);
-            return producer == null ? null : producer.getComponentId();
-        }
-
-        public boolean producedBySpout(PValue pValue) {
-            String componentId = getProducerComponentId(pValue);
-            return getSpout(componentId) != null;
-        }
-
-        public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
-            streams.add(Stream.of(
-                    checkNotNull(producer, "producer"),
-                    checkNotNull(consumer, "consumer")));
-        }
-
-        public Map<PValue, Stream.Producer> getPValueToProducers() {
-            return pValueToProducer;
-        }
-
-        public Iterable<Stream> getStreams() {
-            return streams;
-        }
-
-        @Override
-        public String toString() {
-            List<String> ret = new ArrayList<>();
-            ret.add("SPOUT");
-            for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
-                ret.add(entry.getKey() + ": " + entry.getValue().toString());
-            }
-            ret.add("BOLT");
-            for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
-                ret.add(entry.getKey() + ": " + entry.getValue().toString());
-            }
-            ret.add("STREAM");
-            for (Stream stream : streams) {
-                ret.add(String.format(
-                        "%s@@%s ---> %s@@%s",
-                        stream.getProducer().getStreamId(),
-                        stream.getProducer().getComponentId(),
-                        stream.getConsumer().getGrouping(),
-                        stream.getConsumer().getComponentId()));
-            }
-            return Joiner.on("\n").join(ret);
-        }
-
-        private synchronized int genId() {
-            return id++;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java
deleted file mode 100644
index 5e92eea..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/TranslatorRegistry.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation;
-
-import com.alibaba.jstorm.beam.translation.translator.*;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Lookup table mapping PTransform types to associated TransformTranslator implementations.
- */
-public class TranslatorRegistry {
-    private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
-
-    private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>();
-
-    static {
-        TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
-        TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
-        // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
-        // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
-
-        TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
-        TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
-
-        //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
-        TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
-
-        TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
-
-        TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-
-        TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
-
-        /**
-         * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be 
-         * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms.
-         * If any improvement is required, the composite transforms will be translated in the future.
-         */
-        // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
-        // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
-        // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
-    }
-
-    public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
-        TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
-        if (translator == null) {
-            LOG.warn("Unsupported operator={}", transform.getClass().getName());
-        }
-        return translator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java
deleted file mode 100644
index 876546d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AbstractComponent.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.alibaba.jstorm.beam.translation.util.CommonInstance;
-
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/*
- * Enable user to add output stream definitions by API, rather than hard-code.
- */
-public abstract class AbstractComponent implements IComponent {
-    private Map<String, Fields> streamToFields = new HashMap<>();
-    private Map<String, Boolean> keyStreams = new HashMap<>();
-    private int parallelismNum = 0;
-
-    public void addOutputField(String streamId) {
-        addOutputField(streamId, new Fields(CommonInstance.VALUE));
-    }
-
-    public void addOutputField(String streamId, Fields fields) {
-        streamToFields.put(streamId, fields);
-        keyStreams.put(streamId, false);
-    }
-
-    public void addKVOutputField(String streamId) {
-        streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
-        keyStreams.put(streamId, true);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
-            declarer.declareStream(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public boolean keyedEmit(String streamId) {
-        Boolean isKeyedStream = keyStreams.get(streamId);
-        return isKeyedStream == null ? false : isKeyedStream;
-    }
-
-    public int getParallelismNum() {
-        return parallelismNum;
-    }
-
-    public void setParallelismNum(int num) {
-        parallelismNum = num;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java
deleted file mode 100644
index d1308af..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicBolt.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.topology.IRichBatchBolt;
-
-public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java
deleted file mode 100644
index 2f77bfb..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/AdaptorBasicSpout.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.topology.IRichSpout;
-
-public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java
deleted file mode 100644
index 9d88c4d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnExecutor.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.Serializable;
-import java.util.*;
-
-import avro.shaded.com.google.common.collect.Iterables;
-import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals;
-import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals;
-
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.util.DefaultStepContext;
-import com.alibaba.jstorm.beam.util.SerializedPipelineOptions;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class DoFnExecutor<InputT, OutputT> implements Executor {
-    private static final long serialVersionUID = 5297603063991078668L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
-
-    public class DoFnExecutorOutputManager implements OutputManager, Serializable {
-        private static final long serialVersionUID = -661113364735206170L;
-
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-            executorsBolt.processExecutorElem(tag, output);
-        }
-    }
-
-    protected transient DoFnRunner<InputT, OutputT> runner = null;
-    protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
-
-    protected final String stepName;
-
-    protected int internalDoFnExecutorId;
-
-    protected final String description;
-
-    protected final TupleTag<OutputT> mainTupleTag;
-    protected final List<TupleTag<?>> sideOutputTags;
-
-    protected SerializedPipelineOptions serializedOptions;
-    protected transient StormPipelineOptions pipelineOptions;
-
-    protected DoFn<InputT, OutputT> doFn;
-    protected final Coder<WindowedValue<InputT>> inputCoder;
-    protected DoFnInvoker<InputT, OutputT> doFnInvoker;
-    protected OutputManager outputManager;
-    protected WindowingStrategy<?, ?> windowingStrategy;
-    protected final TupleTag<InputT> mainInputTag;
-    protected Collection<PCollectionView<?>> sideInputs;
-    protected SideInputHandler sideInputHandler;
-    protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
-
-    // Initialize during runtime
-    protected ExecutorContext executorContext;
-    protected ExecutorsBolt executorsBolt;
-    protected TimerInternals timerInternals;
-    protected transient StateInternals pushbackStateInternals;
-    protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
-    protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
-    protected transient IKvStoreManager kvStoreManager;
-    protected DefaultStepContext stepContext;
-    protected transient MetricClient metricClient;
-
-    public DoFnExecutor(
-            String stepName,
-            String description,
-            StormPipelineOptions pipelineOptions,
-            DoFn<InputT, OutputT> doFn,
-            Coder<WindowedValue<InputT>> inputCoder,
-            WindowingStrategy<?, ?> windowingStrategy,
-            TupleTag<InputT> mainInputTag,
-            Collection<PCollectionView<?>> sideInputs,
-            Map<TupleTag, PCollectionView<?>> sideInputTagToView,
-            TupleTag<OutputT> mainTupleTag,
-            List<TupleTag<?>> sideOutputTags) {
-        this.stepName = checkNotNull(stepName, "stepName");
-        this.description = checkNotNull(description, "description");
-        this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-        this.doFn = doFn;
-        this.inputCoder = inputCoder;
-        this.outputManager = new DoFnExecutorOutputManager();
-        this.windowingStrategy = windowingStrategy;
-        this.mainInputTag = mainInputTag;
-        this.sideInputs = sideInputs;
-        this.mainTupleTag = mainTupleTag;
-        this.sideOutputTags = sideOutputTags;
-        this.sideInputTagToView = sideInputTagToView;
-    }
-
-    protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
-        return new DoFnRunnerWithMetrics<>(
-            stepName,
-            DoFnRunners.simpleRunner(
-                this.pipelineOptions,
-                this.doFn,
-                this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
-                this.outputManager,
-                this.mainTupleTag,
-                this.sideOutputTags,
-                this.stepContext,
-                this.windowingStrategy),
-            MetricsReporter.create(metricClient));
-    }
-
-    protected void initService(ExecutorContext context) {
-        // TODO: what should be set for key in here?
-        timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService());
-        kvStoreManager = context.getKvStoreManager();
-        stepContext = new DefaultStepContext(timerInternals,
-                new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-        metricClient = new MetricClient(executorContext.getTopologyContext());
-    }
-
-    @Override
-    public void init(ExecutorContext context) {
-        this.executorContext = context;
-        this.executorsBolt = context.getExecutorsBolt();
-        this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
-
-        initService(context);
-
-        // Side inputs setup
-        if (sideInputs != null && sideInputs.isEmpty() == false) {
-            pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
-            watermarkHoldTag =
-                    StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
-            pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
-            sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
-            runner = getDoFnRunner();
-            pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
-        } else {
-            runner = getDoFnRunner();
-        }
-
-        // Process user's setup
-        doFnInvoker = DoFnInvokers.invokerFor(doFn);
-        doFnInvoker.invokeSetup();
-    }
-
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
-                tag, mainInputTag, sideInputs, elem.getValue()));
-        if (mainInputTag.equals(tag)) {
-            processMainInput(elem);
-        } else {
-            processSideInput(tag, elem);
-        }
-    }
-
-    protected <T> void processMainInput(WindowedValue<T> elem) {
-       if (sideInputs.isEmpty()) {
-           runner.processElement((WindowedValue<InputT>) elem);
-       } else {
-           Iterable<WindowedValue<InputT>> justPushedBack =
-               pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
-           BagState<WindowedValue<InputT>> pushedBack =
-                   pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-           Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
-           for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
-               if (pushedBackValue.getTimestamp().isBefore(min)) {
-                   min = pushedBackValue.getTimestamp();
-               }
-               min = earlier(min, pushedBackValue.getTimestamp());
-               pushedBack.add(pushedBackValue);
-           }
-           pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
-       }
-    }
-
-    protected void processSideInput(TupleTag tag, WindowedValue elem) {
-        LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
-
-        PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
-        sideInputHandler.addSideInputValue(sideInputView, elem);
-
-        BagState<WindowedValue<InputT>> pushedBack =
-                pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-        List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-
-        Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
-        if (pushedBackInputs != null) {
-            for (WindowedValue<InputT> input : pushedBackInputs) {
-
-                Iterable<WindowedValue<InputT>> justPushedBack =
-                        pushbackRunner.processElementInReadyWindows(input);
-                Iterables.addAll(newPushedBack, justPushedBack);
-            }
-        }
-        pushedBack.clear();
-
-        Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
-        for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
-            min = earlier(min, pushedBackValue.getTimestamp());
-            pushedBack.add(pushedBackValue);
-        }
-
-        WatermarkHoldState watermarkHold =
-                pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
-        // TODO: clear-then-add is not thread-safe.
-        watermarkHold.clear();
-        watermarkHold.add(min);
-    }
-
-    /**
-     * Process all pushed back elements when receiving watermark with max timestamp
-     */
-    public void processAllPushBackElements() {
-        if (sideInputs != null && sideInputs.isEmpty() == false) {
-            BagState<WindowedValue<InputT>> pushedBackElements =
-                    pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-            if (pushedBackElements != null) {
-                for (WindowedValue<InputT> elem : pushedBackElements.read()) {
-                    LOG.info("Process pushback elem={}", elem);
-                    runner.processElement(elem);
-                }
-                pushedBackElements.clear();
-            }
-
-            WatermarkHoldState watermarkHold =
-                    pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
-            watermarkHold.clear();
-            watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
-        }
-    }
-
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        StateNamespace namespace = timerData.getNamespace();
-        checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-        BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-        if (pushbackRunner != null) {
-            pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
-        } else {
-            runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        doFnInvoker.invokeTeardown();
-    }
-
-    @Override
-    public String toString() {
-        return description;
-    }
-
-    private Instant earlier(Instant left, Instant right) {
-        return left.isBefore(right) ? left : right;
-    }
-
-    public void startBundle() {
-        if (pushbackRunner != null) {
-            pushbackRunner.startBundle();
-        } else {
-            runner.startBundle();
-        }
-    }
-
-    public void finishBundle() {
-        if (pushbackRunner != null) {
-            pushbackRunner.finishBundle();
-        } else {
-            runner.finishBundle();
-        }
-    }
-
-    public void setInternalDoFnExecutorId(int id) {
-        this.internalDoFnExecutorId = id;
-    }
-
-    public int getInternalDoFnExecutorId() {
-        return internalDoFnExecutorId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java
deleted file mode 100644
index 105dffb..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/DoFnRunnerWithMetrics.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Closeable;
-import java.io.IOException;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * DoFnRunner decorator which registers {@link MetricsContainer}.
- */
-public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
-
-  private final String stepName;
-  private final DoFnRunner<InputT, OutputT> delegate;
-  private final MetricsReporter metricsReporter;
-
-  DoFnRunnerWithMetrics(
-      String stepName,
-      DoFnRunner<InputT, OutputT> delegate,
-      MetricsReporter metricsReporter) {
-    this.stepName = checkNotNull(stepName, "stepName");
-    this.delegate = checkNotNull(delegate, "delegate");
-    this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
-  }
-
-  @Override
-  public void startBundle() {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.startBundle();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.processElement(elem);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.onTimer(timerId, window, timestamp, timeDomain);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void finishBundle() {
-    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
-        metricsReporter.getMetricsContainer(stepName))) {
-      delegate.finishBundle();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    metricsReporter.updateMetrics();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java
deleted file mode 100644
index 30348b2..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/Executor.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.alibaba.jstorm.utils.Pair;
-
-public interface Executor extends Serializable {
-    /**
-     * Initialization during runtime
-     */
-    void init(ExecutorContext context);
-
-    <T> void  process(TupleTag<T> tag, WindowedValue<T> elem);
-
-    void cleanup();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java
deleted file mode 100644
index 7f9aa77..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import backtype.storm.task.TopologyContext;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-public abstract class ExecutorContext {
-    public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) {
-        return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
-    }
-
-    public abstract TopologyContext getTopologyContext();
-
-    public abstract ExecutorsBolt getExecutorsBolt();
-
-    public abstract IKvStoreManager getKvStoreManager();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java
deleted file mode 100644
index ebd9456..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/ExecutorsBolt.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.IOException;
-import java.util.*;
-
-import avro.shaded.com.google.common.base.Joiner;
-import avro.shaded.com.google.common.collect.Sets;
-import backtype.storm.tuple.ITupleExt;
-import backtype.storm.tuple.TupleImplExt;
-import com.alibaba.jstorm.beam.translation.util.CommonInstance;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import com.alibaba.jstorm.cache.KvStoreManagerFactory;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.utils.KryoSerializer;
-import com.alibaba.jstorm.window.Watermark;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class ExecutorsBolt extends AdaptorBasicBolt {
-    private static final long serialVersionUID = -7751043327801735211L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
-
-    protected ExecutorContext executorContext;
-
-    protected TimerService timerService;
-
-    // map from input tag to executor inside bolt
-    protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
-    // set of all output tags that will be emit outside bolt
-    protected final Set<TupleTag> outputTags = Sets.newHashSet();
-    protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
-    protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
-    protected int internalDoFnExecutorId = 1;
-    protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
-
-    protected OutputCollector collector;
-
-    protected boolean isStatefulBolt = false;
-
-    protected KryoSerializer<WindowedValue> serializer;
-
-    public ExecutorsBolt() {
-
-    }
-
-    public void setStatefulBolt(boolean isStateful) {
-        isStatefulBolt = isStateful;
-    }
-
-    public void addExecutor(TupleTag inputTag, Executor executor) {
-        inputTagToExecutor.put(
-                checkNotNull(inputTag, "inputTag"),
-                checkNotNull(executor, "executor"));
-    }
-
-    public Map<TupleTag, Executor> getExecutors() {
-        return inputTagToExecutor;
-    }
-
-    public void registerExecutor(Executor executor) {
-        if (executor instanceof DoFnExecutor) {
-            DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
-            idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
-            doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
-            internalDoFnExecutorId++;
-        }
-    }
-
-    public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
-        return idToDoFnExecutor;
-    }
-
-    public void addOutputTags(TupleTag tag) {
-        outputTags.add(tag);
-    }
-
-    public void addExternalOutputTag(TupleTag<?> tag) {
-        externalOutputTags.add(tag);
-    }
-
-    public Set<TupleTag> getOutputTags() {
-        return outputTags;
-    }
-
-    public ExecutorContext getExecutorContext() {
-        return executorContext;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        LOG.info("Start to prepare for task-{}", context.getThisTaskId());
-        try {
-            this.collector = collector;
-
-            // init kv store manager
-            String storeName = String.format("task-%d", context.getThisTaskId());
-            String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
-            IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) :
-                    KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt);
-            this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
-
-            // init time service
-            timerService = initTimerService();
-
-            // init all internal executors
-            for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
-                executor.init(executorContext);
-                if (executor instanceof DoFnExecutor) {
-                    doFnExecutors.add((DoFnExecutor) executor);
-                }
-            }
-
-            this.serializer = new KryoSerializer<WindowedValue>(stormConf);
-
-            LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
-            LOG.info("inputTagToExecutor={}", inputTagToExecutor);
-            LOG.info("outputTags={}", outputTags);
-            LOG.info("externalOutputTags={}", externalOutputTags);
-            LOG.info("doFnExecutors={}", doFnExecutors);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to prepare executors bolt", e);
-        }
-    }
-
-    public TimerService initTimerService() {
-        TopologyContext context = executorContext.getTopologyContext();
-        List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
-                .transformAndConcat(
-                        new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
-                            @Override
-                            public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
-                                if (Common.isSystemComponent(value.getKey())) {
-                                    return Collections.EMPTY_LIST;
-                                } else {
-                                    return value.getValue();
-                                }
-                            }
-                        })
-                .toList();
-        TimerService ret = new TimerServiceImpl(executorContext);
-        ret.init(tasks);
-        return ret;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        // process a batch
-        String streamId = input.getSourceStreamId();
-        ITupleExt tuple = (ITupleExt) input;
-        Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
-        if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
-            while (valueIterator.hasNext()) {
-                processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
-            }
-        } else {
-            doFnStartBundle();
-            while (valueIterator.hasNext()) {
-                processElement(valueIterator.next(), streamId);
-            }
-            doFnFinishBundle();
-        }
-    }
-
-    private void processWatermark(long watermarkTs, int sourceTask) {
-        long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
-        LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
-                (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime());
-        if (newWaterMark != 0) {
-            // Some buffer windows are going to be triggered.
-            doFnStartBundle();
-            timerService.fireTimers(newWaterMark);
-
-            // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
-            // to be received from now on. So we are going to process all push back data.
-            if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-                for (DoFnExecutor doFnExecutor : doFnExecutors) {
-                    doFnExecutor.processAllPushBackElements();
-                }
-            }
-
-            doFnFinishBundle();
-        }
-
-        long currentWaterMark = timerService.currentOutputWatermark();
-        if (!externalOutputTags.isEmpty()) {
-            collector.flush();
-            collector.emit(
-                    CommonInstance.BEAM_WATERMARK_STREAM_ID,
-                    new Values(currentWaterMark));
-            LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
-        }
-    }
-
-    private void processElement(List<Object> values, String streamId) {
-        TupleTag inputTag = new TupleTag(streamId);
-        WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
-        processExecutorElem(inputTag, windowedValue);
-    }
-
-    public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
-        LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
-        if (elem != null) {
-            Executor executor = inputTagToExecutor.get(inputTag);
-            if (executor != null) {
-                executor.process(inputTag, elem);
-            }
-            if (externalOutputTags.contains(inputTag)) {
-                emitOutsideBolt(inputTag, elem);
-            }
-        } else {
-            LOG.info("Received null elem for tag={}", inputTag);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
-            executor.cleanup();
-        }
-        executorContext.getKvStoreManager().close();
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    public TimerService timerService() {
-        return timerService;
-    }
-
-    public void setTimerService(TimerService service) {
-        timerService = service;
-    }
-
-    private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
-        WindowedValue wv = null;
-        if (values.size() > 1) {
-            Object key = values.get(0);
-            WindowedValue value = serializer.deserialize((byte[]) values.get(1));
-            wv = value.withValue(KV.of(key, value.getValue()));
-        } else {
-            wv = serializer.deserialize((byte[])values.get(0));
-        }
-        return wv;
-    }
-
-    protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
-        LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
-        if (keyedEmit(outputTag.getId())) {
-            KV kv = (KV) outputValue.getValue();
-            byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
-            // Convert WindowedValue<KV> to <K, WindowedValue<V>>
-            if (kv.getKey() == null) {
-                // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
-                collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
-            } else {
-                collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
-            }
-        } else {
-            byte[] immutableOutputValue = serializer.serialize(outputValue);
-            collector.emit(outputTag.getId(), new Values(immutableOutputValue));
-        }
-    }
-
-    private void doFnStartBundle() {
-        for (DoFnExecutor doFnExecutor : doFnExecutors) {
-            doFnExecutor.startBundle();
-        }
-    }
-
-    private void doFnFinishBundle() {
-        for (DoFnExecutor doFnExecutor : doFnExecutors) {
-            doFnExecutor.finishBundle();
-        }
-    }
-
-    @Override
-    public String toString() {
-        // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
-        List<String> ret = new ArrayList<>();
-        /*ret.add("inputTags");
-        for (TupleTag inputTag : inputTagToExecutor.keySet()) {
-            ret.add(inputTag.getId());
-        }*/
-        ret.add("internalExecutors");
-        for (Executor executor : inputTagToExecutor.values()) {
-            ret.add(executor.toString());
-        }
-        ret.add("externalOutputTags");
-        for (TupleTag output : externalOutputTags) {
-            ret.add(output.getId());
-        }
-        return Joiner.on('\n').join(ret).concat("\n");
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java
deleted file mode 100644
index 7158b2f..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/FlattenExecutor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class FlattenExecutor<InputT> implements Executor {
-
-    private final String description;
-    private TupleTag mainOutputTag;
-    private ExecutorContext context;
-    private ExecutorsBolt executorsBolt;
-
-    public FlattenExecutor(String description, TupleTag mainTupleTag) {
-        this.description = checkNotNull(description, "description");
-        this.mainOutputTag = mainTupleTag;
-    }
-
-    @Override
-    public void init(ExecutorContext context) {
-        this.context = context;
-        this.executorsBolt = context.getExecutorsBolt();
-    }
-
-    @Override
-    public void process(TupleTag tag, WindowedValue elem) {
-        executorsBolt.processExecutorElem(mainOutputTag, elem);
-    }
-
-    @Override
-    public void cleanup() {}
-
-    @Override
-    public String toString() {
-        return description;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java
deleted file mode 100644
index 1958c77..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/GroupByWindowExecutor.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import java.io.Serializable;
-import java.util.List;
-
-import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals;
-import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.TranslationContext.UserGraphContext;
-import com.alibaba.jstorm.beam.translation.util.DefaultStepContext;
-import com.alibaba.jstorm.beam.util.RunnerUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
-    private static final long serialVersionUID = -7563050475488610553L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
-
-    private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-            executorsBolt.processExecutorElem(tag, output);
-        }
-    }
-
-    private KvCoder<K, V> inputKvCoder;
-    private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
-
-    public GroupByWindowExecutor(
-            String stepName,
-            String description,
-            TranslationContext context,
-            StormPipelineOptions pipelineOptions,
-            WindowingStrategy<?, ?> windowingStrategy,
-            TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
-        // The doFn will be created when runtime. Just pass "null" here
-        super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags);
-
-        this.outputManager = new GroupByWindowOutputManager();
-        UserGraphContext userGraphContext = context.getUserGraphContext();
-        PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
-        this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
-    }
-
-    private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
-        final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
-            @Override
-            public StateInternals stateInternalsForKey(K key) {
-                return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
-            }
-        };
-        TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
-            @Override
-            public TimerInternals timerInternalsForKey(K key) {
-                return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService());
-            }
-        };
-
-        reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
-        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
-              GroupAlsoByWindowViaWindowSetNewDoFn.create(
-                  windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
-                      (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
-        return doFn;
-    }
-
-    @Override
-    protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
-        doFn = getGroupByWindowDoFn();
-
-        DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner(
-                this.pipelineOptions,
-                this.doFn,
-                NullSideInputReader.empty(),
-                this.outputManager,
-                this.mainTupleTag,
-                this.sideOutputTags,
-                this.stepContext,
-                this.windowingStrategy);
-
-        DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner(
-                simpleRunner,
-                this.stepContext,
-                this.windowingStrategy);
-        return new DoFnRunnerWithMetrics<>(
-            stepName, doFnRunner, MetricsReporter.create(metricClient));
-    }
-
-    @Override
-    public void process(TupleTag tag, WindowedValue elem) {
-        /**
-         *  For GroupByKey, KV type elem is received. We need to convert the KV elem
-         *  into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
-         */
-        KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
-        runner.processElement(elem.withValue(keyedWorkItem));
-    }
-
-    @Override
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        StateNamespace namespace = timerData.getNamespace();
-        checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-
-        runner.processElement(
-                WindowedValue.valueInGlobalWindow(
-                        KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
-    }
-
-    @Override
-    public String toString() {
-        return super.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java
deleted file mode 100644
index 33095b1..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MetricsReporter.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
-
-import com.alibaba.jstorm.common.metric.AsmCounter;
-import com.alibaba.jstorm.common.metric.AsmGauge;
-import com.alibaba.jstorm.common.metric.AsmHistogram;
-import com.alibaba.jstorm.common.metric.AsmMeter;
-import com.alibaba.jstorm.common.metric.AsmMetric;
-import com.alibaba.jstorm.metric.MetricClient;
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainer;
-import org.apache.beam.sdk.metrics.MetricsFilter;
-
-/**
- * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
- */
-public class MetricsReporter {
-
-  private static final String METRIC_KEY_SEPARATOR = "__";
-  private static final String COUNTER_PREFIX = "__counter";
-
-  private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
-  private final Map<String, Long> reportedCounters = Maps.newHashMap();
-  private final MetricClient metricClient;
-
-  public static MetricsReporter create(MetricClient metricClient) {
-    return new MetricsReporter(metricClient);
-  }
-
-  private MetricsReporter(MetricClient metricClient) {
-    this.metricClient = checkNotNull(metricClient, "metricClient");
-  }
-
-  public MetricsContainer getMetricsContainer(String stepName) {
-    return metricsContainers.getContainer(stepName);
-  }
-
-  public void updateMetrics() {
-    MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
-    MetricQueryResults metricQueryResults =
-        metricResults.queryMetrics(MetricsFilter.builder().build());
-    updateCounters(metricQueryResults.counters());
-  }
-
-  private void updateCounters(Iterable<MetricResult<Long>> counters) {
-    System.out.print("updateCounters");
-    for (MetricResult<Long> metricResult : counters) {
-      String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
-      System.out.print("metricName: " + metricName);
-      Long updateValue = metricResult.attempted();
-      Long oldValue = reportedCounters.get(metricName);
-
-      if (oldValue == null || oldValue < updateValue) {
-        AsmCounter counter = metricClient.registerCounter(metricName);
-        Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
-        counter.update(incValue);
-      }
-    }
-  }
-
-  private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
-    return prefix
-        + METRIC_KEY_SEPARATOR + metricResult.step()
-        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
-        + METRIC_KEY_SEPARATOR + metricResult.name().name();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java
deleted file mode 100644
index bd3dfb3..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiOutputDoFnExecutor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
-    private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
-
-    /**
-     * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag
-     * is used in downstream consumer. So before output, we need to map this "local" tag to "external"
-     * tag. See PCollectionTuple for details.
-     */
-    public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-            if (localTupleTagMap.containsKey(tag)) {
-                executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
-            } else {
-                executorsBolt.processExecutorElem(tag, output);
-            }
-        }
-    }
-
-    protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
-
-    public MultiOutputDoFnExecutor(
-            String stepName,
-            String description,
-            StormPipelineOptions pipelineOptions,
-            DoFn<InputT, OutputT> doFn,
-            Coder<WindowedValue<InputT>> inputCoder,
-            WindowingStrategy<?, ?> windowingStrategy,
-            TupleTag<InputT> mainInputTag,
-            Collection<PCollectionView<?>> sideInputs,
-            Map<TupleTag, PCollectionView<?>> sideInputTagToView,
-            TupleTag<OutputT> mainTupleTag,
-            List<TupleTag<?>> sideOutputTags,
-            Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
-            ) {
-        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
-                sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
-        this.localTupleTagMap = localTupleTagMap;
-        this.outputManager = new MultiOutputDoFnExecutorOutputManager();
-        LOG.info("localTupleTagMap: {}", localTupleTagMap);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java
deleted file mode 100644
index 51aa960..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/runtime/MultiStatefulDoFnExecutor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.translation.runtime.state.JStormStateInternals;
-import com.alibaba.jstorm.beam.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
-
-    public MultiStatefulDoFnExecutor(
-            String stepName, String description,
-            StormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
-            Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
-            TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
-            Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
-            List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
-        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
-    }
-
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        if (mainInputTag.equals(tag)) {
-            WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
-            stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
-                    executorContext.getExecutorsBolt().timerService()));
-            stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
-                    kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-            processMainInput(elem);
-        } else {
-            processSideInput(tag, elem);
-        }
-    }
-
-    @Override
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        stepContext.setStateInternals(new JStormStateInternals<>(key,
-                kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-        super.onTimer(key, timerData);
-    }
-}