You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:06 UTC

[06/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java
deleted file mode 100644
index 1870681..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import avro.shaded.com.google.common.collect.Lists;
-import avro.shaded.com.google.common.collect.Maps;
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.MultiOutputDoFnExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.MultiStatefulDoFnExecutor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Translates a ParDo.BoundMulti to a Storm {@link com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor}.
- */
-public class ParDoBoundMultiTranslator<InputT, OutputT>
-        extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
-
-    @Override
-    public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
-        final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
-        PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
-        Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
-        Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
-        for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
-            Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
-            localToExternalTupleTagMap.put(entry.getKey(), itr.next());
-        }
-
-        TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
-        List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
-        sideOutputTags.remove(mainOutputTag);
-
-        Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
-        String description = describeTransform(
-                transform,
-                allInputs,
-                allOutputs);
-
-        ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
-
-        DoFnExecutor executor;
-        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-        if (signature.stateDeclarations().size() > 0
-                || signature.timerDeclarations().size() > 0) {
-            executor = new MultiStatefulDoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    (DoFn<KV, OutputT>) transform.getFn(),
-                    (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    (TupleTag<KV>) inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags,
-                    localToExternalTupleTagMap);
-        } else {
-            executor = new MultiOutputDoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    transform.getFn(),
-                    WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags,
-                    localToExternalTupleTagMap);
-        }
-
-        context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java
deleted file mode 100644
index a8d8186..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import java.util.List;
-import java.util.Map;
-
-import avro.shaded.com.google.common.collect.Lists;
-import com.alibaba.jstorm.beam.translation.runtime.StatefulDoFnExecutor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
- */
-public class ParDoBoundTranslator<InputT, OutputT>
-        extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
-
-    @Override
-    public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
-        final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        final TupleTag<?> inputTag = userGraphContext.getInputTag();
-        PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
-        TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
-        List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
-
-        Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
-        String description = describeTransform(
-                transform,
-                allInputs,
-                userGraphContext.getOutputs());
-
-        ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
-        for (PCollectionView pCollectionView : transform.getSideInputs()) {
-            sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
-        }
-
-        DoFnExecutor executor;
-        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-        if (signature.stateDeclarations().size() > 0
-                || signature.timerDeclarations().size() > 0) {
-            executor = new StatefulDoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    (DoFn<KV, OutputT>) transform.getFn(),
-                    (Coder) WindowedValue.getFullCoder(
-                            input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    (TupleTag<KV>) inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags);
-        } else {
-            executor = new DoFnExecutor<>(
-                    userGraphContext.getStepName(),
-                    description,
-                    userGraphContext.getOptions(),
-                    transform.getFn(),
-                    WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
-                    input.getWindowingStrategy(),
-                    (TupleTag<InputT>) inputTag,
-                    transform.getSideInputs(),
-                    sideInputTagToView.build(),
-                    mainOutputTag,
-                    sideOutputTags);
-        }
-
-        context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java
deleted file mode 100644
index 26a9b22..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.transforms.Reshuffle;
-
-public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> {
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java
deleted file mode 100644
index f80a39d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.google.auto.value.AutoValue;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Class that defines the stream connection between upstream and downstream components.
- */
-@AutoValue
-public abstract class Stream {
-
-    public abstract Producer getProducer();
-    public abstract Consumer getConsumer();
-
-    public static Stream of(Producer producer, Consumer consumer) {
-        return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream(producer, consumer);
-    }
-
-    @AutoValue
-    public abstract static class Producer {
-        public abstract String getComponentId();
-        public abstract String getStreamId();
-        public abstract String getStreamName();
-
-        public static Producer of(String componentId, String streamId, String streamName) {
-            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Producer(
-                    componentId, streamId, streamName);
-        }
-    }
-
-    @AutoValue
-    public abstract static class Consumer {
-        public abstract String getComponentId();
-        public abstract Grouping getGrouping();
-
-        public static Consumer of(String componentId, Grouping grouping) {
-            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Consumer(
-                    componentId, grouping);
-        }
-    }
-
-    @AutoValue
-    public abstract static class Grouping {
-        public abstract Type getType();
-
-        @Nullable
-        public abstract List<String> getFields();
-
-        public static Grouping of(Type type) {
-            checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
-            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
-                    type, null /* fields */);
-        }
-
-        public static Grouping byFields(List<String> fields) {
-            checkNotNull(fields, "fields");
-            checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
-            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
-                    Type.FIELDS, fields);
-        }
-
-        /**
-         * Types of stream groupings Storm allows
-         */
-        public enum Type {
-            ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java
deleted file mode 100644
index e1c35f6..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Map;
-
-/**
- * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
- */
-public interface TransformTranslator<T extends PTransform<?, ?>> {
-
-    void translateNode(T transform, TranslationContext context);
-
-    /**
-     * Returns true if this translator can translate the given transform.
-     */
-    boolean canTranslate(T transform, TranslationContext context);
-
-    class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
-        @Override
-        public void translateNode(T1 transform, TranslationContext context) {
-
-        }
-
-        @Override
-        public boolean canTranslate(T1 transform, TranslationContext context) {
-            return true;
-        }
-
-        static String describeTransform(
-                PTransform<?, ?> transform,
-                Map<TupleTag<?>, PValue> inputs,
-                Map<TupleTag<?>, PValue> outputs) {
-            return String.format("%s --> %s --> %s",
-                    Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
-                            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
-                                @Override
-                                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
-                                    return taggedPValue.getKey().getId();
-                                    // return taggedPValue.getValue().getName();
-                                }})),
-                    transform.getName(),
-                    Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
-                            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
-                                @Override
-                                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
-                                    return taggedPvalue.getKey().getId();
-                                    //return taggedPValue.getValue().getName();
-                                }})));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java
deleted file mode 100644
index 0677e92..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.UnboundedSourceSpout;
-
-/**
- * Translates a Read.Unbounded into a Storm spout.
- * 
- * @param <T>
- */
-public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
-    public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
-        TupleTag<?> tag = userGraphContext.getOutputTag();
-        PValue output = userGraphContext.getOutput();
-
-        UnboundedSourceSpout spout = new UnboundedSourceSpout(
-                description,
-                transform.getSource(), userGraphContext.getOptions(), tag);
-        context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java
deleted file mode 100644
index 3069955..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.ViewExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
- */
-public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
-    @Override
-    public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-        ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
-        context.addTransformExecutor(viewExecutor);
-    }
-
-    /**
-     * Specialized implementation for
-     * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-     * for the Flink runner in streaming mode.
-     */
-    public static class ViewAsMap<K, V>
-            extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-        @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
-        public ViewAsMap(View.AsMap<K, V> transform) {
-        }
-
-        @Override
-        public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-            PCollectionView<Map<K, V>> view =
-                    PCollectionViews.mapView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            @SuppressWarnings({"rawtypes", "unchecked"})
-            KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-            try {
-                inputCoder.getKeyCoder().verifyDeterministic();
-            } catch (Coder.NonDeterministicException e) {
-                // TODO: log warning as other runners.
-            }
-
-            return input
-                    .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-        }
-
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsMap";
-        }
-    }
-
-    /**
-     * Specialized expansion for {@link
-     * View.AsMultimap View.AsMultimap} for the
-     * Flink runner in streaming mode.
-     */
-    public static class ViewAsMultimap<K, V>
-            extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsMultimap(View.AsMultimap<K, V> transform) {
-        }
-
-        @Override
-        public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-            PCollectionView<Map<K, Iterable<V>>> view =
-                    PCollectionViews.multimapView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            @SuppressWarnings({"rawtypes", "unchecked"})
-            KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-            try {
-                inputCoder.getKeyCoder().verifyDeterministic();
-            } catch (Coder.NonDeterministicException e) {
-                // TODO: log warning as other runners.
-            }
-
-            return input
-                    .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-        }
-
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsMultimap";
-        }
-    }
-
-    /**
-     * Specialized implementation for
-     * {@link View.AsList View.AsList} for the
-     * JStorm runner in streaming mode.
-     */
-    public static class ViewAsList<T>
-            extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsList(View.AsList<T> transform) {}
-
-        @Override
-        public PCollectionView<List<T>> expand(PCollection<T> input) {
-            PCollectionView<List<T>> view =
-                    PCollectionViews.listView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
-        }
-
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsList";
-        }
-    }
-
-    /**
-     * Specialized implementation for
-     * {@link View.AsIterable View.AsIterable} for the
-     * JStorm runner in streaming mode.
-     */
-    public static class ViewAsIterable<T>
-            extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsIterable(View.AsIterable<T> transform) { }
-
-        @Override
-        public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-            PCollectionView<Iterable<T>> view =
-                    PCollectionViews.iterableView(
-                            input,
-                            input.getWindowingStrategy(),
-                            input.getCoder());
-
-            return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-                    .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
-        }
-
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsIterable";
-        }
-    }
-
-    /**
-     * Specialized expansion for
-     * {@link View.AsSingleton View.AsSingleton} for the
-     * JStorm runner in streaming mode.
-     */
-    public static class ViewAsSingleton<T>
-            extends PTransform<PCollection<T>, PCollectionView<T>> {
-        private View.AsSingleton<T> transform;
-
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
-        public ViewAsSingleton(View.AsSingleton<T> transform) {
-            this.transform = transform;
-        }
-
-        @Override
-        public PCollectionView<T> expand(PCollection<T> input) {
-            Combine.Globally<T, T> combine = Combine.globally(
-                    new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-            if (!transform.hasDefaultValue()) {
-                combine = combine.withoutDefaults();
-            }
-            return input.apply(combine.asSingletonView());
-        }
-
-        @Override
-        protected String getKindString() {
-            return "StreamingViewAsSingleton";
-        }
-
-        private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-            private boolean hasDefaultValue;
-            private T defaultValue;
-
-            SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-                this.hasDefaultValue = hasDefaultValue;
-                this.defaultValue = defaultValue;
-            }
-
-            @Override
-            public T apply(T left, T right) {
-                throw new IllegalArgumentException("PCollection with more than one element "
-                        + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-                        + "combine the PCollection into a single value");
-            }
-
-            @Override
-            public T identity() {
-                if (hasDefaultValue) {
-                    return defaultValue;
-                } else {
-                    throw new IllegalArgumentException(
-                            "Empty PCollection accessed as a singleton view. "
-                                    + "Consider setting withDefault to provide a default value");
-                }
-            }
-        }
-    }
-
-    public static class CombineGloballyAsSingletonView<InputT, OutputT>
-            extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-        /**
-         * Builds an instance of this class from the overridden transform.
-         */
-        @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-        public CombineGloballyAsSingletonView(
-                Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-            this.transform = transform;
-        }
-
-        @Override
-        public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-            PCollection<OutputT> combined =
-                    input.apply(Combine.globally(transform.getCombineFn())
-                            .withoutDefaults()
-                            .withFanout(transform.getFanout()));
-
-            PCollectionView<OutputT> view = PCollectionViews.singletonView(
-                    combined,
-                    combined.getWindowingStrategy(),
-                    transform.getInsertDefault(),
-                    transform.getInsertDefault()
-                            ? transform.getCombineFn().defaultValue() : null,
-                    combined.getCoder());
-            return combined
-                    .apply(ParDo.of(new WrapAsList<OutputT>()))
-                    .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
-        }
-
-        @Override
-        protected String getKindString() {
-            return "StreamingCombineGloballyAsSingletonView";
-        }
-    }
-
-    private static class WrapAsList<T> extends DoFn<T, List<T>> {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-            c.output(Collections.singletonList(c.element()));
-        }
-    }
-
-    /**
-     * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-     * They require the input {@link PCollection} fits in memory.
-     * For a large {@link PCollection} this is expected to crash!
-     *
-     * @param <T> the type of elements to concatenate.
-     */
-    private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public List<T> createAccumulator() {
-            return new ArrayList<>();
-        }
-
-        @Override
-        public List<T> addInput(List<T> accumulator, T input) {
-            accumulator.add(input);
-            return accumulator;
-        }
-
-        @Override
-        public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-            List<T> result = createAccumulator();
-            for (List<T> accumulator : accumulators) {
-                result.addAll(accumulator);
-            }
-            return result;
-        }
-
-        @Override
-        public List<T> extractOutput(List<T> accumulator) {
-            return accumulator;
-        }
-
-        @Override
-        public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-            return ListCoder.of(inputCoder);
-        }
-
-        @Override
-        public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-            return ListCoder.of(inputCoder);
-        }
-    }
-
-    /**
-     * Creates a primitive {@link PCollectionView}.
-     *
-     * <p>For internal use only by runner implementors.
-     *
-     * @param <ElemT> The type of the elements of the input PCollection
-     * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-     */
-    public static class CreateJStormPCollectionView<ElemT, ViewT>
-            extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-        private PCollectionView<ViewT> view;
-
-        private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
-            this.view = view;
-        }
-
-        public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
-                PCollectionView<ViewT> view) {
-            return new CreateJStormPCollectionView<>(view);
-        }
-
-        @Override
-        public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-            return view;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java
deleted file mode 100644
index 7fe8ddd..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.translation.runtime.WindowAssignExecutor;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-
-public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
-
-    @Override
-    public void translateNode(Window.Assign<T> transform, TranslationContext context) {
-        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
-        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-        context.getUserGraphContext().setWindowed();
-        WindowAssignExecutor executor = new WindowAssignExecutor(
-                description,
-                transform.getWindowFn(),
-                userGraphContext.getOutputTag());
-        context.addTransformExecutor(executor);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java
deleted file mode 100644
index 0b35052..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a Window.Bound node into a Storm WindowedBolt
- * 
- * @param <T>
- */
-public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
-    private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
-
-    // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
-    @Override
-    public void translateNode(Window.Assign<T> transform, TranslationContext context) {
-        if (transform.getWindowFn() instanceof FixedWindows) {
-            context.getUserGraphContext().setWindowed();
-        } else if (transform.getWindowFn() instanceof SlidingWindows) {
-            context.getUserGraphContext().setWindowed();
-        } else {
-            throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java
deleted file mode 100644
index a75efa9..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.util;
-
-public class CommonInstance {
-    public static final String KEY = "Key";
-    public static final String VALUE = "Value";
-
-    public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java
deleted file mode 100644
index 8bf49d8..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import javax.annotation.Nullable;
-import java.io.Serializable;
-
-/**
- * No-op SideInputReader implementation.
- */
-public class DefaultSideInputReader implements SideInputReader, Serializable {
-    @Nullable
-    @Override
-    public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
-        return null;
-    }
-
-    @Override
-    public <T> boolean contains(PCollectionView<T> pCollectionView) {
-        return false;
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java
deleted file mode 100644
index 08d1f2d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.util;
-
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Default StepContext for running DoFn This does not allow accessing state or timer internals.
- */
-public class DefaultStepContext implements ExecutionContext.StepContext {
-
-    private TimerInternals timerInternals;
-
-    private StateInternals stateInternals;
-
-    public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
-        this.timerInternals = checkNotNull(timerInternals, "timerInternals");
-        this.stateInternals = checkNotNull(stateInternals, "stateInternals");
-    }
-
-    @Override
-    public String getStepName() {
-        return null;
-    }
-
-    @Override
-    public String getTransformName() {
-        return null;
-    }
-
-    @Override
-    public void noteOutput(WindowedValue<?> windowedValue) {
-
-    }
-
-    @Override
-    public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
-
-    }
-
-    @Override
-    public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
-            Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException {
-        throw new UnsupportedOperationException("Writing side-input data is not supported.");
-    }
-
-    @Override
-    public StateInternals stateInternals() {
-        return stateInternals;
-    }
-
-    @Override
-    public TimerInternals timerInternals() {
-        return timerInternals;
-    }
-
-    public void setStateInternals(StateInternals stateInternals) {
-        this.stateInternals = stateInternals;
-    }
-
-    public void setTimerInternals(TimerInternals timerInternals) {
-        this.timerInternals = timerInternals;
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java
deleted file mode 100644
index 6cf3ae5..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.util;
-
-import com.alibaba.jstorm.beam.translation.runtime.Executor;
-
-import com.alibaba.jstorm.beam.translation.runtime.GroupByWindowExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.MultiStatefulDoFnExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-public class RunnerUtils {
-    /**
-     * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
-     * @param elem
-     * @return
-     */
-    public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
-        WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
-        SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
-                kvElem.getValue().getKey(),
-                kvElem.withValue(kvElem.getValue().getValue()));
-        return workItem;
-    }
-
-    public static boolean isGroupByKeyExecutor (Executor executor) {
-        if (executor instanceof GroupByWindowExecutor) {
-            return true;
-        } else if (executor instanceof StatefulDoFnExecutor ||
-                executor instanceof MultiStatefulDoFnExecutor) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java
deleted file mode 100644
index 543db1c..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.jstorm.beam.util;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
-    private final byte[] serializedOptions;
-
-    /** Lazily initialized copy of deserialized options */
-    private transient PipelineOptions pipelineOptions;
-
-    public SerializedPipelineOptions(PipelineOptions options) {
-        checkNotNull(options, "PipelineOptions must not be null.");
-
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            new ObjectMapper().writeValue(baos, options);
-            this.serializedOptions = baos.toByteArray();
-        } catch (Exception e) {
-            throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-        }
-
-    }
-
-    public PipelineOptions getPipelineOptions() {
-        if (pipelineOptions == null) {
-            try {
-                pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
-            } catch (IOException e) {
-                throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
-            }
-        }
-
-        return pipelineOptions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java
deleted file mode 100644
index 2f9b224..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.util;
-
-import java.util.Collections;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed word item.
- * @param <K>
- * @param <ElemT>
- */
-public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
-
-  final K key;
-  final WindowedValue<ElemT> value;
-
-  private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
-      return new SingletonKeyedWorkItem<K, ElemT>(key, value);
-  }
-
-  @Override
-  public K key() {
-    return key;
-  }
-
-  public WindowedValue<ElemT> value() {
-    return value;
-  }
-
-  @Override
-  public Iterable<TimerInternals.TimerData> timersIterable() {
-    return Collections.EMPTY_LIST;
-  }
-
-  @Override
-  public Iterable<WindowedValue<ElemT>> elementsIterable() {
-    return Collections.singletonList(value);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java
new file mode 100644
index 0000000..457beb6
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm;
+
+import avro.shaded.com.google.common.collect.Maps;
+import org.apache.beam.sdk.options.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Options which can be used to configure a JStorm PipelineRunner.
+ */
+public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+
+    @Description("Indicate if the topology is running on local machine or distributed cluster")
+    @Default.Boolean(false)
+    Boolean getLocalMode();
+    void setLocalMode(Boolean isLocal);
+
+    @Description("Executing time(sec) of topology on local mode. Default is 1min.")
+    @Default.Long(60)
+    Long getLocalModeExecuteTime();
+    void setLocalModeExecuteTime(Long time);
+
+    @Description("Worker number of topology")
+    @Default.Integer(1)
+    Integer getWorkerNumber();
+    void setWorkerNumber(Integer number);
+
+    @Description("Global parallelism number of a component")
+    @Default.Integer(1)
+    Integer getParallelismNumber();
+    void setParallelismNumber(Integer number);
+
+    @Description("System topology config of JStorm")
+    @Default.InstanceFactory(DefaultMapValueFactory.class)
+    Map getTopologyConfig();
+    void setTopologyConfig(Map conf);
+
+    @Description("Indicate if it is an exactly once topology")
+    @Default.Boolean(false)
+    Boolean getExactlyOnceTopology();
+    void setExactlyOnceTopology(Boolean isExactlyOnce);
+
+    @Description("Parallelism number of a specified composite PTransform")
+    @Default.InstanceFactory(DefaultMapValueFactory.class)
+    Map getParallelismNumMap();
+    void setParallelismNumMap(Map parallelismNumMap);
+
+    class DefaultMapValueFactory implements DefaultValueFactory<Map> {
+        @Override
+        public Map create(PipelineOptions pipelineOptions) {
+            return Maps.newHashMap();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java
new file mode 100644
index 0000000..12b3c18
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+public class StormRegistrar {
+    private StormRegistrar() {
+    }
+
+    @AutoService(PipelineRunnerRegistrar.class)
+    public static class Runner implements PipelineRunnerRegistrar {
+        @Override
+        public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+            return ImmutableList.<Class<? extends PipelineRunner<?>>> of(StormRunner.class);
+        }
+    }
+
+    @AutoService(PipelineOptionsRegistrar.class)
+    public static class Options implements PipelineOptionsRegistrar {
+        @Override
+        public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+            return ImmutableList.<Class<? extends PipelineOptions>> of(StormPipelineOptions.class);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java
new file mode 100644
index 0000000..8bee49f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
+import com.alibaba.jstorm.beam.serialization.*;
+import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer;
+import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer;
+import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer;
+import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer;
+import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer;
+import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer;
+import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer;
+import org.apache.beam.runners.jstorm.translation.StormPipelineTranslator;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent;
+import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
+import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
+import org.apache.beam.runners.jstorm.translation.translator.Stream;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.transactional.TransactionTopologyBuilder;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import java.util.Map;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.joda.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point into the Storm Runner.
+ * 
+ * After reading the user defined pipeline, Beam will invoke the run() method with a representation of the pipeline.
+ */
+public class StormRunner extends PipelineRunner<StormRunner.StormPipelineResult> {
+    private static final Logger LOG = LoggerFactory.getLogger(StormRunner.class);
+
+    private StormPipelineOptions options;
+
+    public StormRunner(StormPipelineOptions options) {
+        this.options = options;
+    }
+
+    public static StormRunner fromOptions(PipelineOptions options) {
+        StormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(StormPipelineOptions.class, options);
+        return new StormRunner(pipelineOptions);
+    }
+
+    /**
+     * convert pipeline options to storm configuration format
+     * @param options
+     * @return
+     */
+    private Config convertPipelineOptionsToConfig(StormPipelineOptions options) {
+        Config config = new Config();
+        if (options.getLocalMode())
+            config.put(Config.STORM_CLUSTER_MODE, "local");
+        else
+            config.put(Config.STORM_CLUSTER_MODE, "distributed");
+        
+        Config.setNumWorkers(config, options.getWorkerNumber());
+
+        config.putAll(options.getTopologyConfig());
+
+        // Setup config for runtime env
+        config.put("worker.external", "beam");
+        config.put("topology.acker.executors", 0);
+
+        UnmodifiableCollectionsSerializer.registerSerializers(config);
+        // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap
+        ImmutableListSerializer.registerSerializers(config);
+        SdkRepackImmuListSerializer.registerSerializers(config);
+        ImmutableSetSerializer.registerSerializers(config);
+        SdkRepackImmuSetSerializer.registerSerializers(config);
+        ImmutableMapSerializer.registerSerializers(config);
+
+        config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
+        return config;
+    }
+
+    @Override
+    public StormPipelineResult run(Pipeline pipeline) {
+        LOG.info("Running pipeline...");
+        TranslationContext context = new TranslationContext(this.options);
+        StormPipelineTranslator transformer = new StormPipelineTranslator(context);
+        transformer.translate(pipeline);
+        LOG.info("UserGraphContext=\n{}", context.getUserGraphContext());
+        LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext());
+
+        for (Stream stream : context.getExecutionGraphContext().getStreams()) {
+            LOG.info(stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId());
+        }
+
+        String topologyName = options.getJobName();
+        Config config = convertPipelineOptionsToConfig(options);
+
+        return runTopology(
+                topologyName,
+                getTopology(options, context.getExecutionGraphContext()),
+                config);
+    }
+
+    private StormPipelineResult runTopology(String topologyName, StormTopology topology, Config config) {
+        try {
+            if (StormConfig.local_mode(config)) {
+                LocalCluster localCluster = LocalCluster.getInstance();
+                localCluster.submitTopology(topologyName, config, topology);
+                return new LocalStormPipelineResult(
+                        topologyName, config, localCluster, options.getLocalModeExecuteTime());
+            } else {
+                StormSubmitter.submitTopology(topologyName, config, topology);
+                return null;
+            }
+        } catch (Exception e) {
+            LOG.warn("Fail to submit topology", e);
+            throw new RuntimeException("Fail to submit topology", e);
+        }
+    }
+
+    public static abstract class StormPipelineResult implements PipelineResult {
+
+        private final String topologyName;
+        private final Config config;
+
+        StormPipelineResult(String topologyName, Config config) {
+            this.config = checkNotNull(config, "config");
+            this.topologyName = checkNotNull(topologyName, "topologyName");
+        }
+
+        public State getState() {
+            return null;
+        }
+
+        public Config getConfig() {
+            return config;
+        }
+
+        public String getTopologyName() {
+            return topologyName;
+        }
+    }
+
+    public static class LocalStormPipelineResult extends StormPipelineResult {
+
+        private LocalCluster localCluster;
+        private long localModeExecuteTimeSecs;
+
+        LocalStormPipelineResult(
+                String topologyName,
+                Config config,
+                LocalCluster localCluster,
+                long localModeExecuteTimeSecs) {
+            super(topologyName, config);
+            this.localCluster = checkNotNull(localCluster, "localCluster");
+        }
+
+        @Override
+        public State cancel() throws IOException {
+            //localCluster.deactivate(getTopologyName());
+            localCluster.killTopology(getTopologyName());
+            localCluster.shutdown();
+            JStormUtils.sleepMs(1000);
+            return State.CANCELLED;
+        }
+
+        @Override
+        public State waitUntilFinish(Duration duration) {
+            return waitUntilFinish();
+        }
+
+        @Override
+        public State waitUntilFinish() {
+            JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
+            try {
+                return cancel();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public MetricResults metrics() {
+            return null;
+        }
+    }
+
+    private AbstractComponent getComponent(String id, TranslationContext.ExecutionGraphContext context) {
+        AbstractComponent component = null;
+        AdaptorBasicSpout spout = context.getSpout(id);
+        if (spout != null) {
+            component = spout;
+        } else {
+            AdaptorBasicBolt bolt = context.getBolt(id);
+            if (bolt != null)
+                component = bolt;
+        }
+
+        return component;
+    }
+
+    private StormTopology getTopology(StormPipelineOptions options, TranslationContext.ExecutionGraphContext context) {
+        boolean isExactlyOnce = options.getExactlyOnceTopology();
+        TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
+
+        int parallelismNumber = options.getParallelismNumber();
+        Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
+        for (String id : spouts.keySet()) {
+            IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
+            builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
+        }
+
+        HashMap<String, BoltDeclarer> declarers = new HashMap<>();
+        Iterable<Stream> streams = context.getStreams();
+        LOG.info("streams=" + streams);
+        for (Stream stream : streams) {
+            String destBoltId = stream.getConsumer().getComponentId();
+            IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId));
+            BoltDeclarer declarer = declarers.get(destBoltId);
+            if (declarer == null) {
+                declarer = builder.setBolt(destBoltId, bolt,
+                        getParallelismNum(context.getBolt(destBoltId), parallelismNumber));
+                declarers.put(destBoltId, declarer);
+            }
+
+            Stream.Grouping grouping = stream.getConsumer().getGrouping();
+            String streamId = stream.getProducer().getStreamId();
+            String srcBoltId = stream.getProducer().getComponentId();
+
+            // add stream output declare for "from" component
+            AbstractComponent component = getComponent(srcBoltId, context);
+            if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
+                component.addKVOutputField(streamId);
+            else
+                component.addOutputField(streamId);
+
+            // "to" component declares grouping to "from" component 
+            switch (grouping.getType()) {
+            case SHUFFLE:
+                declarer.shuffleGrouping(srcBoltId, streamId);
+                break;
+            case FIELDS:
+                declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields()));
+                break;
+            case ALL:
+                declarer.allGrouping(srcBoltId, streamId);
+                break;
+            case DIRECT:
+                declarer.directGrouping(srcBoltId, streamId);
+                break;
+            case GLOBAL:
+                declarer.globalGrouping(srcBoltId, streamId);
+                break;
+            case LOCAL_OR_SHUFFLE:
+                declarer.localOrShuffleGrouping(srcBoltId, streamId);
+                break;
+            case NONE:
+                declarer.noneGrouping(srcBoltId, streamId);
+                break;
+            default:
+                throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+            }
+
+            // Subscribe grouping of water mark stream
+            component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID);
+            declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID);
+        }
+
+        if (isExactlyOnce) {
+            ((TransactionTopologyBuilder) builder).enableHdfs();
+        }
+        return builder.createTopology();
+    }
+
+    private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) {
+        IRichSpout ret = null;
+        if (isExactlyOnce) {
+            if (spout instanceof UnboundedSourceSpout) {
+                ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout);
+            } else {
+                String error = String.format("The specified type(%s) is not supported in exactly once mode yet!", spout.getClass().toString());
+                throw new RuntimeException(error);
+            }
+        } else {
+            ret = spout;
+        }
+        return ret;
+    }
+
+    private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) {
+        return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt;
+    }
+
+    /**
+     * Calculate the final parallelism number according to the configured number and global number.
+     * @param component
+     * @param globalParallelismNum
+     * @return final parallelism number for the specified component
+     */
+    private int getParallelismNum(AbstractComponent component, int globalParallelismNum) {
+        int configParallelismNum = component.getParallelismNum();
+        return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
new file mode 100644
index 0000000..fa7bdf3
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -0,0 +1,120 @@
+package org.apache.beam.runners.jstorm;
+
+import avro.shaded.com.google.common.collect.Maps;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.metric.*;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.google.common.base.Optional;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Test JStorm runner.
+ */
+public class TestJStormRunner extends PipelineRunner<StormRunner.StormPipelineResult> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class);
+
+    public static TestJStormRunner fromOptions(PipelineOptions options) {
+        return new TestJStormRunner(options.as(StormPipelineOptions.class));
+    }
+
+    private final StormRunner stormRunner;
+    private final StormPipelineOptions options;
+
+    private TestJStormRunner(StormPipelineOptions options) {
+        this.options = options;
+        Map conf = Maps.newHashMap();
+        //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
+        options.setTopologyConfig(conf);
+        options.setLocalMode(true);
+        stormRunner = StormRunner.fromOptions(checkNotNull(options, "options"));
+    }
+
+    @Override
+    public StormRunner.StormPipelineResult run(Pipeline pipeline) {
+        StormRunner.StormPipelineResult result = stormRunner.run(pipeline);
+
+        try {
+            int numberOfAssertions = PAssert.countAsserts(pipeline);
+
+            LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions);
+            if(numberOfAssertions == 0) {
+                // If assert number is zero, wait 5 sec
+                JStormUtils.sleepMs(5000);
+                return result;
+            } else {
+                for (int i = 0; i < 40; ++i) {
+                    Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
+                    if (success.isPresent() && success.get()) {
+                        return result;
+                    } else if (success.isPresent() && !success.get()) {
+                        throw new AssertionError("Failed assertion checks.");
+                    } else {
+                        JStormUtils.sleepMs(500);
+                    }
+                }
+                LOG.info("Assertion checks timed out.");
+                throw new AssertionError("Assertion checks timed out.");
+            }
+        } finally {
+            clearPAssertCount();
+            cancel(result);
+        }
+    }
+
+    private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) {
+        int successes = 0;
+        for (AsmMetric metric : JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+            successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
+        }
+        int failures = 0;
+        for (AsmMetric metric : JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+            failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
+        }
+
+        if (failures > 0) {
+            LOG.info("Found {} success, {} failures out of {} expected assertions.",
+                    successes, failures, expectedNumberOfAssertions);
+            return Optional.of(false);
+        } else if (successes >= expectedNumberOfAssertions) {
+            LOG.info("Found {} success, {} failures out of {} expected assertions.",
+                    successes, failures, expectedNumberOfAssertions);
+            return Optional.of(true);
+        }
+
+        LOG.info("Found {} success, {} failures out of {} expected assertions.",
+                successes, failures, expectedNumberOfAssertions);
+        return Optional.absent();
+    }
+
+    private void clearPAssertCount() {
+        String topologyName = options.getJobName();
+        AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
+        Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
+        while (itr.hasNext()) {
+            Map.Entry<String, AsmMetric> metric = itr.next();
+            if (metric.getKey().contains(topologyName)) {
+                itr.remove();
+            }
+        }
+    }
+
+    private void cancel(StormRunner.StormPipelineResult result) {
+        try {
+            result.cancel();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to cancel.", e);
+}
+    }
+}