You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:06 UTC
[06/53] [abbrv] beam git commit: jstorm-runner: rename the package to
org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java
deleted file mode 100644
index 1870681..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundMultiTranslator.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import avro.shaded.com.google.common.collect.Lists;
-import avro.shaded.com.google.common.collect.Maps;
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.MultiOutputDoFnExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.MultiStatefulDoFnExecutor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Translates a ParDo.BoundMulti to a Storm {@link com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor}.
- */
-public class ParDoBoundMultiTranslator<InputT, OutputT>
- extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
-
- @Override
- public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
- final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
- PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
- Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
- Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
- for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
- Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
- localToExternalTupleTagMap.put(entry.getKey(), itr.next());
- }
-
- TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
- List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
- sideOutputTags.remove(mainOutputTag);
-
- Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
- String description = describeTransform(
- transform,
- allInputs,
- allOutputs);
-
- ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
-
- DoFnExecutor executor;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- executor = new MultiStatefulDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- (DoFn<KV, OutputT>) transform.getFn(),
- (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<KV>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
- } else {
- executor = new MultiOutputDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- transform.getFn(),
- WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
- }
-
- context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java
deleted file mode 100644
index a8d8186..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import java.util.List;
-import java.util.Map;
-
-import avro.shaded.com.google.common.collect.Lists;
-import com.alibaba.jstorm.beam.translation.runtime.StatefulDoFnExecutor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.DoFnExecutor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
- */
-public class ParDoBoundTranslator<InputT, OutputT>
- extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
-
- @Override
- public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
- final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- final TupleTag<?> inputTag = userGraphContext.getInputTag();
- PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
- TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
- List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
-
- Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
- String description = describeTransform(
- transform,
- allInputs,
- userGraphContext.getOutputs());
-
- ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
-
- DoFnExecutor executor;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- executor = new StatefulDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- (DoFn<KV, OutputT>) transform.getFn(),
- (Coder) WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<KV>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- } else {
- executor = new DoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- transform.getFn(),
- WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<InputT>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- }
-
- context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java
deleted file mode 100644
index 26a9b22..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ReshuffleTranslator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.transforms.Reshuffle;
-
-public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java
deleted file mode 100644
index f80a39d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/Stream.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.google.auto.value.AutoValue;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Class that defines the stream connection between upstream and downstream components.
- */
-@AutoValue
-public abstract class Stream {
-
- public abstract Producer getProducer();
- public abstract Consumer getConsumer();
-
- public static Stream of(Producer producer, Consumer consumer) {
- return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream(producer, consumer);
- }
-
- @AutoValue
- public abstract static class Producer {
- public abstract String getComponentId();
- public abstract String getStreamId();
- public abstract String getStreamName();
-
- public static Producer of(String componentId, String streamId, String streamName) {
- return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Producer(
- componentId, streamId, streamName);
- }
- }
-
- @AutoValue
- public abstract static class Consumer {
- public abstract String getComponentId();
- public abstract Grouping getGrouping();
-
- public static Consumer of(String componentId, Grouping grouping) {
- return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Consumer(
- componentId, grouping);
- }
- }
-
- @AutoValue
- public abstract static class Grouping {
- public abstract Type getType();
-
- @Nullable
- public abstract List<String> getFields();
-
- public static Grouping of(Type type) {
- checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
- return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
- type, null /* fields */);
- }
-
- public static Grouping byFields(List<String> fields) {
- checkNotNull(fields, "fields");
- checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
- return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
- Type.FIELDS, fields);
- }
-
- /**
- * Types of stream groupings Storm allows
- */
- public enum Type {
- ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java
deleted file mode 100644
index e1c35f6..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/TransformTranslator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.FluentIterable;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Map;
-
-/**
- * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
- */
-public interface TransformTranslator<T extends PTransform<?, ?>> {
-
- void translateNode(T transform, TranslationContext context);
-
- /**
- * Returns true if this translator can translate the given transform.
- */
- boolean canTranslate(T transform, TranslationContext context);
-
- class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
- @Override
- public void translateNode(T1 transform, TranslationContext context) {
-
- }
-
- @Override
- public boolean canTranslate(T1 transform, TranslationContext context) {
- return true;
- }
-
- static String describeTransform(
- PTransform<?, ?> transform,
- Map<TupleTag<?>, PValue> inputs,
- Map<TupleTag<?>, PValue> outputs) {
- return String.format("%s --> %s --> %s",
- Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
- .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
- @Override
- public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
- return taggedPValue.getKey().getId();
- // return taggedPValue.getValue().getName();
- }})),
- transform.getName(),
- Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
- .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
- @Override
- public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
- return taggedPvalue.getKey().getId();
- //return taggedPValue.getValue().getName();
- }})));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java
deleted file mode 100644
index 0677e92..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/UnboundedSourceTranslator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.UnboundedSourceSpout;
-
-/**
- * Translates a Read.Unbounded into a Storm spout.
- *
- * @param <T>
- */
-public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
- public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
-
- TupleTag<?> tag = userGraphContext.getOutputTag();
- PValue output = userGraphContext.getOutput();
-
- UnboundedSourceSpout spout = new UnboundedSourceSpout(
- description,
- transform.getSource(), userGraphContext.getOptions(), tag);
- context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java
deleted file mode 100644
index 3069955..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/ViewTranslator.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import com.alibaba.jstorm.beam.translation.runtime.ViewExecutor;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
- */
-public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
- @Override
- public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
- context.addTransformExecutor(viewExecutor);
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
- * for the Flink runner in streaming mode.
- */
- public static class ViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
- public ViewAsMap(View.AsMap<K, V> transform) {
- }
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // TODO: log warning as other runners.
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
- }
-
- /**
- * Specialized expansion for {@link
- * View.AsMultimap View.AsMultimap} for the
- * Flink runner in streaming mode.
- */
- public static class ViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsMultimap(View.AsMultimap<K, V> transform) {
- }
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // TODO: log warning as other runners.
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link View.AsList View.AsList} for the
- * JStorm runner in streaming mode.
- */
- public static class ViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsList(View.AsList<T> transform) {}
-
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link View.AsIterable View.AsIterable} for the
- * JStorm runner in streaming mode.
- */
- public static class ViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsIterable(View.AsIterable<T> transform) { }
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
- }
-
- /**
- * Specialized expansion for
- * {@link View.AsSingleton View.AsSingleton} for the
- * JStorm runner in streaming mode.
- */
- public static class ViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
- private View.AsSingleton<T> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsSingleton(View.AsSingleton<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine.globally(
- new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
-
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
- }
-
- public static class CombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
- Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public CombineGloballyAsSingletonView(
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined =
- input.apply(Combine.globally(transform.getCombineFn())
- .withoutDefaults()
- .withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined,
- combined.getWindowingStrategy(),
- transform.getInsertDefault(),
- transform.getInsertDefault()
- ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined
- .apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
- }
- }
-
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Collections.singletonList(c.element()));
- }
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-
- /**
- * Creates a primitive {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- *
- * @param <ElemT> The type of the elements of the input PCollection
- * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateJStormPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
- private PCollectionView<ViewT> view;
-
- private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
- PCollectionView<ViewT> view) {
- return new CreateJStormPCollectionView<>(view);
- }
-
- @Override
- public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
- return view;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java
deleted file mode 100644
index 7fe8ddd..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowAssignTranslator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.translation.runtime.WindowAssignExecutor;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-
-public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
-
- @Override
- public void translateNode(Window.Assign<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- context.getUserGraphContext().setWindowed();
- WindowAssignExecutor executor = new WindowAssignExecutor(
- description,
- transform.getWindowFn(),
- userGraphContext.getOutputTag());
- context.addTransformExecutor(executor);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java
deleted file mode 100644
index 0b35052..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/translator/WindowBoundTranslator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-
-import com.alibaba.jstorm.beam.translation.TranslationContext;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a Window.Bound node into a Storm WindowedBolt
- *
- * @param <T>
- */
-public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
-
- // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
- @Override
- public void translateNode(Window.Assign<T> transform, TranslationContext context) {
- if (transform.getWindowFn() instanceof FixedWindows) {
- context.getUserGraphContext().setWindowed();
- } else if (transform.getWindowFn() instanceof SlidingWindows) {
- context.getUserGraphContext().setWindowed();
- } else {
- throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java
deleted file mode 100644
index a75efa9..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/CommonInstance.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.util;
-
-public class CommonInstance {
- public static final String KEY = "Key";
- public static final String VALUE = "Value";
-
- public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java
deleted file mode 100644
index 8bf49d8..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultSideInputReader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import javax.annotation.Nullable;
-import java.io.Serializable;
-
-/**
- * No-op SideInputReader implementation.
- */
-public class DefaultSideInputReader implements SideInputReader, Serializable {
- @Nullable
- @Override
- public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
- return null;
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> pCollectionView) {
- return false;
- }
-
- @Override
- public boolean isEmpty() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java
deleted file mode 100644
index 08d1f2d..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/translation/util/DefaultStepContext.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.util;
-
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Default StepContext for running DoFn This does not allow accessing state or timer internals.
- */
-public class DefaultStepContext implements ExecutionContext.StepContext {
-
- private TimerInternals timerInternals;
-
- private StateInternals stateInternals;
-
- public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
- this.timerInternals = checkNotNull(timerInternals, "timerInternals");
- this.stateInternals = checkNotNull(stateInternals, "stateInternals");
- }
-
- @Override
- public String getStepName() {
- return null;
- }
-
- @Override
- public String getTransformName() {
- return null;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> windowedValue) {
-
- }
-
- @Override
- public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
-
- }
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException {
- throw new UnsupportedOperationException("Writing side-input data is not supported.");
- }
-
- @Override
- public StateInternals stateInternals() {
- return stateInternals;
- }
-
- @Override
- public TimerInternals timerInternals() {
- return timerInternals;
- }
-
- public void setStateInternals(StateInternals stateInternals) {
- this.stateInternals = stateInternals;
- }
-
- public void setTimerInternals(TimerInternals timerInternals) {
- this.timerInternals = timerInternals;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java
deleted file mode 100644
index 6cf3ae5..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/RunnerUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.util;
-
-import com.alibaba.jstorm.beam.translation.runtime.Executor;
-
-import com.alibaba.jstorm.beam.translation.runtime.GroupByWindowExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.MultiStatefulDoFnExecutor;
-import com.alibaba.jstorm.beam.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-public class RunnerUtils {
- /**
- * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
- * @param elem
- * @return
- */
- public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
- WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
- SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
- kvElem.getValue().getKey(),
- kvElem.withValue(kvElem.getValue().getValue()));
- return workItem;
- }
-
- public static boolean isGroupByKeyExecutor (Executor executor) {
- if (executor instanceof GroupByWindowExecutor) {
- return true;
- } else if (executor instanceof StatefulDoFnExecutor ||
- executor instanceof MultiStatefulDoFnExecutor) {
- return true;
- } else {
- return false;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java
deleted file mode 100644
index 543db1c..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.jstorm.beam.util;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
- private final byte[] serializedOptions;
-
- /** Lazily initialized copy of deserialized options */
- private transient PipelineOptions pipelineOptions;
-
- public SerializedPipelineOptions(PipelineOptions options) {
- checkNotNull(options, "PipelineOptions must not be null.");
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- new ObjectMapper().writeValue(baos, options);
- this.serializedOptions = baos.toByteArray();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
- }
-
- }
-
- public PipelineOptions getPipelineOptions() {
- if (pipelineOptions == null) {
- try {
- pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
- } catch (IOException e) {
- throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
- }
- }
-
- return pipelineOptions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java
deleted file mode 100644
index 2f9b224..0000000
--- a/runners/jstorm/src/main/java/com/alibaba/jstorm/beam/util/SingletonKeyedWorkItem.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.util;
-
-import java.util.Collections;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed word item.
- * @param <K>
- * @param <ElemT>
- */
-public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
-
- final K key;
- final WindowedValue<ElemT> value;
-
- private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
- this.key = key;
- this.value = value;
- }
-
- public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
- return new SingletonKeyedWorkItem<K, ElemT>(key, value);
- }
-
- @Override
- public K key() {
- return key;
- }
-
- public WindowedValue<ElemT> value() {
- return value;
- }
-
- @Override
- public Iterable<TimerInternals.TimerData> timersIterable() {
- return Collections.EMPTY_LIST;
- }
-
- @Override
- public Iterable<WindowedValue<ElemT>> elementsIterable() {
- return Collections.singletonList(value);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java
new file mode 100644
index 0000000..457beb6
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormPipelineOptions.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm;
+
+import avro.shaded.com.google.common.collect.Maps;
+import org.apache.beam.sdk.options.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Options which can be used to configure a JStorm PipelineRunner.
+ */
+public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+
+ @Description("Indicate if the topology is running on local machine or distributed cluster")
+ @Default.Boolean(false)
+ Boolean getLocalMode();
+ void setLocalMode(Boolean isLocal);
+
+ @Description("Executing time(sec) of topology on local mode. Default is 1min.")
+ @Default.Long(60)
+ Long getLocalModeExecuteTime();
+ void setLocalModeExecuteTime(Long time);
+
+ @Description("Worker number of topology")
+ @Default.Integer(1)
+ Integer getWorkerNumber();
+ void setWorkerNumber(Integer number);
+
+ @Description("Global parallelism number of a component")
+ @Default.Integer(1)
+ Integer getParallelismNumber();
+ void setParallelismNumber(Integer number);
+
+ @Description("System topology config of JStorm")
+ @Default.InstanceFactory(DefaultMapValueFactory.class)
+ Map getTopologyConfig();
+ void setTopologyConfig(Map conf);
+
+ @Description("Indicate if it is an exactly once topology")
+ @Default.Boolean(false)
+ Boolean getExactlyOnceTopology();
+ void setExactlyOnceTopology(Boolean isExactlyOnce);
+
+ @Description("Parallelism number of a specified composite PTransform")
+ @Default.InstanceFactory(DefaultMapValueFactory.class)
+ Map getParallelismNumMap();
+ void setParallelismNumMap(Map parallelismNumMap);
+
+ class DefaultMapValueFactory implements DefaultValueFactory<Map> {
+ @Override
+ public Map create(PipelineOptions pipelineOptions) {
+ return Maps.newHashMap();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java
new file mode 100644
index 0000000..12b3c18
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRegistrar.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+public class StormRegistrar {
+ private StormRegistrar() {
+ }
+
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>> of(StormRunner.class);
+ }
+ }
+
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>> of(StormPipelineOptions.class);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java
new file mode 100644
index 0000000..8bee49f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/StormRunner.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
+import com.alibaba.jstorm.beam.serialization.*;
+import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer;
+import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer;
+import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer;
+import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer;
+import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer;
+import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer;
+import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer;
+import org.apache.beam.runners.jstorm.translation.StormPipelineTranslator;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent;
+import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
+import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
+import org.apache.beam.runners.jstorm.translation.translator.Stream;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.transactional.TransactionTopologyBuilder;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import java.util.Map;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.joda.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point into the Storm Runner.
+ *
+ * After reading the user defined pipeline, Beam will invoke the run() method with a representation of the pipeline.
+ */
+public class StormRunner extends PipelineRunner<StormRunner.StormPipelineResult> {
+ private static final Logger LOG = LoggerFactory.getLogger(StormRunner.class);
+
+ private StormPipelineOptions options;
+
+ public StormRunner(StormPipelineOptions options) {
+ this.options = options;
+ }
+
+ public static StormRunner fromOptions(PipelineOptions options) {
+ StormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(StormPipelineOptions.class, options);
+ return new StormRunner(pipelineOptions);
+ }
+
+ /**
+ * convert pipeline options to storm configuration format
+ * @param options
+ * @return
+ */
+ private Config convertPipelineOptionsToConfig(StormPipelineOptions options) {
+ Config config = new Config();
+ if (options.getLocalMode())
+ config.put(Config.STORM_CLUSTER_MODE, "local");
+ else
+ config.put(Config.STORM_CLUSTER_MODE, "distributed");
+
+ Config.setNumWorkers(config, options.getWorkerNumber());
+
+ config.putAll(options.getTopologyConfig());
+
+ // Setup config for runtime env
+ config.put("worker.external", "beam");
+ config.put("topology.acker.executors", 0);
+
+ UnmodifiableCollectionsSerializer.registerSerializers(config);
+ // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap
+ ImmutableListSerializer.registerSerializers(config);
+ SdkRepackImmuListSerializer.registerSerializers(config);
+ ImmutableSetSerializer.registerSerializers(config);
+ SdkRepackImmuSetSerializer.registerSerializers(config);
+ ImmutableMapSerializer.registerSerializers(config);
+
+ config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
+ return config;
+ }
+
+ @Override
+ public StormPipelineResult run(Pipeline pipeline) {
+ LOG.info("Running pipeline...");
+ TranslationContext context = new TranslationContext(this.options);
+ StormPipelineTranslator transformer = new StormPipelineTranslator(context);
+ transformer.translate(pipeline);
+ LOG.info("UserGraphContext=\n{}", context.getUserGraphContext());
+ LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext());
+
+ for (Stream stream : context.getExecutionGraphContext().getStreams()) {
+ LOG.info(stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId());
+ }
+
+ String topologyName = options.getJobName();
+ Config config = convertPipelineOptionsToConfig(options);
+
+ return runTopology(
+ topologyName,
+ getTopology(options, context.getExecutionGraphContext()),
+ config);
+ }
+
+ private StormPipelineResult runTopology(String topologyName, StormTopology topology, Config config) {
+ try {
+ if (StormConfig.local_mode(config)) {
+ LocalCluster localCluster = LocalCluster.getInstance();
+ localCluster.submitTopology(topologyName, config, topology);
+ return new LocalStormPipelineResult(
+ topologyName, config, localCluster, options.getLocalModeExecuteTime());
+ } else {
+ StormSubmitter.submitTopology(topologyName, config, topology);
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Fail to submit topology", e);
+ throw new RuntimeException("Fail to submit topology", e);
+ }
+ }
+
+ public static abstract class StormPipelineResult implements PipelineResult {
+
+ private final String topologyName;
+ private final Config config;
+
+ StormPipelineResult(String topologyName, Config config) {
+ this.config = checkNotNull(config, "config");
+ this.topologyName = checkNotNull(topologyName, "topologyName");
+ }
+
+ public State getState() {
+ return null;
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public String getTopologyName() {
+ return topologyName;
+ }
+ }
+
+ public static class LocalStormPipelineResult extends StormPipelineResult {
+
+ private LocalCluster localCluster;
+ private long localModeExecuteTimeSecs;
+
+ LocalStormPipelineResult(
+ String topologyName,
+ Config config,
+ LocalCluster localCluster,
+ long localModeExecuteTimeSecs) {
+ super(topologyName, config);
+ this.localCluster = checkNotNull(localCluster, "localCluster");
+ }
+
+ @Override
+ public State cancel() throws IOException {
+ //localCluster.deactivate(getTopologyName());
+ localCluster.killTopology(getTopologyName());
+ localCluster.shutdown();
+ JStormUtils.sleepMs(1000);
+ return State.CANCELLED;
+ }
+
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ return waitUntilFinish();
+ }
+
+ @Override
+ public State waitUntilFinish() {
+ JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
+ try {
+ return cancel();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public MetricResults metrics() {
+ return null;
+ }
+ }
+
+ private AbstractComponent getComponent(String id, TranslationContext.ExecutionGraphContext context) {
+ AbstractComponent component = null;
+ AdaptorBasicSpout spout = context.getSpout(id);
+ if (spout != null) {
+ component = spout;
+ } else {
+ AdaptorBasicBolt bolt = context.getBolt(id);
+ if (bolt != null)
+ component = bolt;
+ }
+
+ return component;
+ }
+
+ private StormTopology getTopology(StormPipelineOptions options, TranslationContext.ExecutionGraphContext context) {
+ boolean isExactlyOnce = options.getExactlyOnceTopology();
+ TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
+
+ int parallelismNumber = options.getParallelismNumber();
+ Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
+ for (String id : spouts.keySet()) {
+ IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
+ builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
+ }
+
+ HashMap<String, BoltDeclarer> declarers = new HashMap<>();
+ Iterable<Stream> streams = context.getStreams();
+ LOG.info("streams=" + streams);
+ for (Stream stream : streams) {
+ String destBoltId = stream.getConsumer().getComponentId();
+ IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId));
+ BoltDeclarer declarer = declarers.get(destBoltId);
+ if (declarer == null) {
+ declarer = builder.setBolt(destBoltId, bolt,
+ getParallelismNum(context.getBolt(destBoltId), parallelismNumber));
+ declarers.put(destBoltId, declarer);
+ }
+
+ Stream.Grouping grouping = stream.getConsumer().getGrouping();
+ String streamId = stream.getProducer().getStreamId();
+ String srcBoltId = stream.getProducer().getComponentId();
+
+ // add stream output declare for "from" component
+ AbstractComponent component = getComponent(srcBoltId, context);
+ if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
+ component.addKVOutputField(streamId);
+ else
+ component.addOutputField(streamId);
+
+ // "to" component declares grouping to "from" component
+ switch (grouping.getType()) {
+ case SHUFFLE:
+ declarer.shuffleGrouping(srcBoltId, streamId);
+ break;
+ case FIELDS:
+ declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields()));
+ break;
+ case ALL:
+ declarer.allGrouping(srcBoltId, streamId);
+ break;
+ case DIRECT:
+ declarer.directGrouping(srcBoltId, streamId);
+ break;
+ case GLOBAL:
+ declarer.globalGrouping(srcBoltId, streamId);
+ break;
+ case LOCAL_OR_SHUFFLE:
+ declarer.localOrShuffleGrouping(srcBoltId, streamId);
+ break;
+ case NONE:
+ declarer.noneGrouping(srcBoltId, streamId);
+ break;
+ default:
+ throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+ }
+
+ // Subscribe grouping of water mark stream
+ component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID);
+ declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID);
+ }
+
+ if (isExactlyOnce) {
+ ((TransactionTopologyBuilder) builder).enableHdfs();
+ }
+ return builder.createTopology();
+ }
+
+ private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) {
+ IRichSpout ret = null;
+ if (isExactlyOnce) {
+ if (spout instanceof UnboundedSourceSpout) {
+ ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout);
+ } else {
+ String error = String.format("The specified type(%s) is not supported in exactly once mode yet!", spout.getClass().toString());
+ throw new RuntimeException(error);
+ }
+ } else {
+ ret = spout;
+ }
+ return ret;
+ }
+
+ private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) {
+ return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt;
+ }
+
+ /**
+ * Calculate the final parallelism number according to the configured number and global number.
+ * @param component
+ * @param globalParallelismNum
+ * @return final parallelism number for the specified component
+ */
+ private int getParallelismNum(AbstractComponent component, int globalParallelismNum) {
+ int configParallelismNum = component.getParallelismNum();
+ return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
new file mode 100644
index 0000000..fa7bdf3
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -0,0 +1,120 @@
+package org.apache.beam.runners.jstorm;
+
+import avro.shaded.com.google.common.collect.Maps;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.metric.*;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.google.common.base.Optional;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Test JStorm runner.
+ */
+public class TestJStormRunner extends PipelineRunner<StormRunner.StormPipelineResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class);
+
+ public static TestJStormRunner fromOptions(PipelineOptions options) {
+ return new TestJStormRunner(options.as(StormPipelineOptions.class));
+ }
+
+ private final StormRunner stormRunner;
+ private final StormPipelineOptions options;
+
+ private TestJStormRunner(StormPipelineOptions options) {
+ this.options = options;
+ Map conf = Maps.newHashMap();
+ //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
+ options.setTopologyConfig(conf);
+ options.setLocalMode(true);
+ stormRunner = StormRunner.fromOptions(checkNotNull(options, "options"));
+ }
+
+ @Override
+ public StormRunner.StormPipelineResult run(Pipeline pipeline) {
+ StormRunner.StormPipelineResult result = stormRunner.run(pipeline);
+
+ try {
+ int numberOfAssertions = PAssert.countAsserts(pipeline);
+
+ LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions);
+ if(numberOfAssertions == 0) {
+ // If assert number is zero, wait 5 sec
+ JStormUtils.sleepMs(5000);
+ return result;
+ } else {
+ for (int i = 0; i < 40; ++i) {
+ Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
+ if (success.isPresent() && success.get()) {
+ return result;
+ } else if (success.isPresent() && !success.get()) {
+ throw new AssertionError("Failed assertion checks.");
+ } else {
+ JStormUtils.sleepMs(500);
+ }
+ }
+ LOG.info("Assertion checks timed out.");
+ throw new AssertionError("Assertion checks timed out.");
+ }
+ } finally {
+ clearPAssertCount();
+ cancel(result);
+ }
+ }
+
+ private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) {
+ int successes = 0;
+ for (AsmMetric metric : JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+ successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
+ }
+ int failures = 0;
+ for (AsmMetric metric : JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+ failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
+ }
+
+ if (failures > 0) {
+ LOG.info("Found {} success, {} failures out of {} expected assertions.",
+ successes, failures, expectedNumberOfAssertions);
+ return Optional.of(false);
+ } else if (successes >= expectedNumberOfAssertions) {
+ LOG.info("Found {} success, {} failures out of {} expected assertions.",
+ successes, failures, expectedNumberOfAssertions);
+ return Optional.of(true);
+ }
+
+ LOG.info("Found {} success, {} failures out of {} expected assertions.",
+ successes, failures, expectedNumberOfAssertions);
+ return Optional.absent();
+ }
+
+ private void clearPAssertCount() {
+ String topologyName = options.getJobName();
+ AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
+ Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
+ while (itr.hasNext()) {
+ Map.Entry<String, AsmMetric> metric = itr.next();
+ if (metric.getKey().contains(topologyName)) {
+ itr.remove();
+ }
+ }
+ }
+
+ private void cancel(StormRunner.StormPipelineResult result) {
+ try {
+ result.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to cancel.", e);
+}
+ }
+}