You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:11 UTC
[11/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
index c487578..77e4381 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
@@ -18,94 +18,101 @@
package org.apache.beam.runners.jstorm.translation.translator;
import avro.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor;
import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TupleTag;
/**
* Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
*/
public class ParDoBoundMultiTranslator<InputT, OutputT>
- extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
-
- @Override
- public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
- final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
- PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+ extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
- Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
- Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
- for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
- Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
- localToExternalTupleTagMap.put(entry.getKey(), itr.next());
- }
+ @Override
+ public void translateNode(
+ ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+ final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
+ PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
- TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
- List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
- sideOutputTags.remove(mainOutputTag);
+ Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
+ Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
+ Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
+ localToExternalTupleTagMap.put(entry.getKey(), itr.next());
+ }
- Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
- String description = describeTransform(
- transform,
- allInputs,
- allOutputs);
+ TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+ List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
+ sideOutputTags.remove(mainOutputTag);
- ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
+ Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+ String description = describeTransform(
+ transform,
+ allInputs,
+ allOutputs);
- DoFnExecutor executor;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- executor = new MultiStatefulDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- (DoFn<KV, OutputT>) transform.getFn(),
- (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<KV>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
- } else {
- executor = new MultiOutputDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- transform.getFn(),
- WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
- }
+ ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
- context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ DoFnExecutor executor;
+ DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+ executor = new MultiStatefulDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ (DoFn<KV, OutputT>) transform.getFn(),
+ (Coder) WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<KV>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags,
+ localToExternalTupleTagMap);
+ } else {
+ executor = new MultiOutputDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ transform.getFn(),
+ WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags,
+ localToExternalTupleTagMap);
}
+
+ context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
index 3a952a9..7b998d9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
@@ -17,24 +17,25 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
-import java.util.List;
-import java.util.Map;
-
import avro.shaded.com.google.common.collect.Lists;
-import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.*;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,65 +43,68 @@ import org.slf4j.LoggerFactory;
* Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
*/
public class ParDoBoundTranslator<InputT, OutputT>
- extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+ extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
- @Override
- public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
- final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- final TupleTag<?> inputTag = userGraphContext.getInputTag();
- PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
- TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
- List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
+ @Override
+ public void translateNode(
+ ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
+ final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ final TupleTag<?> inputTag = userGraphContext.getInputTag();
+ PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
- Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
- String description = describeTransform(
- transform,
- allInputs,
- userGraphContext.getOutputs());
+ TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+ List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
- ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
+ Map<TupleTag<?>, PValue> allInputs =
+ avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
+ String description = describeTransform(
+ transform,
+ allInputs,
+ userGraphContext.getOutputs());
- DoFnExecutor executor;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- executor = new StatefulDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- (DoFn<KV, OutputT>) transform.getFn(),
- (Coder) WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<KV>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- } else {
- executor = new DoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- transform.getFn(),
- WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<InputT>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- }
+ ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+ for (PCollectionView pCollectionView : transform.getSideInputs()) {
+ sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+ }
- context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ DoFnExecutor executor;
+ DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+ executor = new StatefulDoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ (DoFn<KV, OutputT>) transform.getFn(),
+ (Coder) WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<KV>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags);
+ } else {
+ executor = new DoFnExecutor<>(
+ userGraphContext.getStepName(),
+ description,
+ userGraphContext.getOptions(),
+ transform.getFn(),
+ WindowedValue.getFullCoder(
+ input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+ input.getWindowingStrategy(),
+ (TupleTag<InputT>) inputTag,
+ transform.getSideInputs(),
+ sideInputTagToView.build(),
+ mainOutputTag,
+ sideOutputTags);
}
+
+ context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
index 1ef1ec3..c450a22 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
@@ -19,6 +19,6 @@ package org.apache.beam.runners.jstorm.translation.translator;
import org.apache.beam.sdk.transforms.Reshuffle;
-public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> {
-
+public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K, V>> {
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
index 5b5a8e2..a15a8ba 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
@@ -17,76 +17,79 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
-import com.google.auto.value.AutoValue;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.auto.value.AutoValue;
+import java.util.List;
+import javax.annotation.Nullable;
+
/**
* Class that defines the stream connection between upstream and downstream components.
*/
@AutoValue
public abstract class Stream {
- public abstract Producer getProducer();
- public abstract Consumer getConsumer();
+ public abstract Producer getProducer();
- public static Stream of(Producer producer, Consumer consumer) {
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream(
- producer, consumer);
+ public abstract Consumer getConsumer();
+
+ public static Stream of(Producer producer, Consumer consumer) {
+ return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream(
+ producer, consumer);
+ }
+
+ @AutoValue
+ public abstract static class Producer {
+ public abstract String getComponentId();
+
+ public abstract String getStreamId();
+
+ public abstract String getStreamName();
+
+ public static Producer of(String componentId, String streamId, String streamName) {
+ return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer(
+ componentId, streamId, streamName);
}
+ }
- @AutoValue
- public abstract static class Producer {
- public abstract String getComponentId();
- public abstract String getStreamId();
- public abstract String getStreamName();
+ @AutoValue
+ public abstract static class Consumer {
+ public abstract String getComponentId();
- public static Producer of(String componentId, String streamId, String streamName) {
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Producer(
- componentId, streamId, streamName);
- }
+ public abstract Grouping getGrouping();
+
+ public static Consumer of(String componentId, Grouping grouping) {
+ return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer(
+ componentId, grouping);
}
+ }
+
+ @AutoValue
+ public abstract static class Grouping {
+ public abstract Type getType();
- @AutoValue
- public abstract static class Consumer {
- public abstract String getComponentId();
- public abstract Grouping getGrouping();
+ @Nullable
+ public abstract List<String> getFields();
+
+ public static Grouping of(Type type) {
+ checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
+ return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
+ type, null /* fields */);
+ }
- public static Consumer of(String componentId, Grouping grouping) {
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Consumer(
- componentId, grouping);
- }
+ public static Grouping byFields(List<String> fields) {
+ checkNotNull(fields, "fields");
+ checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
+ return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
+ Type.FIELDS, fields);
}
- @AutoValue
- public abstract static class Grouping {
- public abstract Type getType();
-
- @Nullable
- public abstract List<String> getFields();
-
- public static Grouping of(Type type) {
- checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
- type, null /* fields */);
- }
-
- public static Grouping byFields(List<String> fields) {
- checkNotNull(fields, "fields");
- checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
- return new org.apache.beam.runners.jstorm.translation.translator.AutoValue_Stream_Grouping(
- Type.FIELDS, fields);
- }
-
- /**
- * Types of stream groupings Storm allows
- */
- public enum Type {
- ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
- }
+ /**
+ * Types of stream groupings Storm allows
+ */
+ public enum Type {
+ ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
index bebdf7b..487cac0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
@@ -20,57 +20,57 @@ package org.apache.beam.runners.jstorm.translation.translator;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.FluentIterable;
+import java.util.Map;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.sdk.transforms.PTransform;
-
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import java.util.Map;
-
/**
* Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
*/
public interface TransformTranslator<T extends PTransform<?, ?>> {
- void translateNode(T transform, TranslationContext context);
+ void translateNode(T transform, TranslationContext context);
- /**
- * Returns true if this translator can translate the given transform.
- */
- boolean canTranslate(T transform, TranslationContext context);
+ /**
+ * Returns true if this translator can translate the given transform.
+ */
+ boolean canTranslate(T transform, TranslationContext context);
- class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
- @Override
- public void translateNode(T1 transform, TranslationContext context) {
+ class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+ @Override
+ public void translateNode(T1 transform, TranslationContext context) {
- }
+ }
- @Override
- public boolean canTranslate(T1 transform, TranslationContext context) {
- return true;
- }
+ @Override
+ public boolean canTranslate(T1 transform, TranslationContext context) {
+ return true;
+ }
- static String describeTransform(
- PTransform<?, ?> transform,
- Map<TupleTag<?>, PValue> inputs,
- Map<TupleTag<?>, PValue> outputs) {
- return String.format("%s --> %s --> %s",
- Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
- .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
- @Override
- public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
- return taggedPValue.getKey().getId();
- // return taggedPValue.getValue().getName();
- }})),
- transform.getName(),
- Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
- .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
- @Override
- public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
- return taggedPvalue.getKey().getId();
- //return taggedPValue.getValue().getName();
- }})));
- }
+ static String describeTransform(
+ PTransform<?, ?> transform,
+ Map<TupleTag<?>, PValue> inputs,
+ Map<TupleTag<?>, PValue> outputs) {
+ return String.format("%s --> %s --> %s",
+ Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
+ .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+ @Override
+ public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
+ return taggedPValue.getKey().getId();
+ // return taggedPValue.getValue().getName();
+ }
+ })),
+ transform.getName(),
+ Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
+ .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+ @Override
+ public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
+ return taggedPvalue.getKey().getId();
+ //return taggedPValue.getValue().getName();
+ }
+ })));
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
index ac7d7bd..33ac024 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
@@ -17,30 +17,30 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-
/**
* Translates a Read.Unbounded into a Storm spout.
- *
+ *
* @param <T>
*/
public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
- public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- TupleTag<?> tag = userGraphContext.getOutputTag();
- PValue output = userGraphContext.getOutput();
+ TupleTag<?> tag = userGraphContext.getOutputTag();
+ PValue output = userGraphContext.getOutput();
- UnboundedSourceSpout spout = new UnboundedSourceSpout(
- description,
- transform.getSource(), userGraphContext.getOptions(), tag);
- context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
- }
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ description,
+ transform.getSource(), userGraphContext.getOptions(), tag);
+ context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
index 0ebf837..c55c8d6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
@@ -17,6 +17,10 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor;
import org.apache.beam.sdk.coders.Coder;
@@ -33,342 +37,342 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
/**
* A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
*/
-public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
- @Override
- public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
- context.addTransformExecutor(viewExecutor);
+public class ViewTranslator
+ extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
+ @Override
+ public void translateNode(
+ CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description = describeTransform(
+ transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
+ context.addTransformExecutor(viewExecutor);
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+ * for the Flink runner in streaming mode.
+ */
+ public static class ViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
+ public ViewAsMap(View.AsMap<K, V> transform) {
}
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
- * for the Flink runner in streaming mode.
- */
- public static class ViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
- public ViewAsMap(View.AsMap<K, V> transform) {
- }
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // TODO: log warning as other runners.
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
+ @Override
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // TODO: log warning as other runners.
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
}
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * View.AsMultimap View.AsMultimap} for the
+ * Flink runner in streaming mode.
+ */
+ public static class ViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
/**
- * Specialized expansion for {@link
- * View.AsMultimap View.AsMultimap} for the
- * Flink runner in streaming mode.
+ * Builds an instance of this class from the overridden transform.
*/
- public static class ViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsMultimap(View.AsMultimap<K, V> transform) {
- }
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // TODO: log warning as other runners.
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsMultimap(View.AsMultimap<K, V> transform) {
+ }
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
- }
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // TODO: log warning as other runners.
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
}
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsList View.AsList} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
/**
- * Specialized implementation for
- * {@link View.AsList View.AsList} for the
- * JStorm runner in streaming mode.
+ * Builds an instance of this class from the overridden transform.
*/
- public static class ViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsList(View.AsList<T> transform) {}
-
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
- }
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsList(View.AsList<T> transform) {
+ }
- @Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
+ @Override
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
}
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsIterable View.AsIterable} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
/**
- * Specialized implementation for
- * {@link View.AsIterable View.AsIterable} for the
- * JStorm runner in streaming mode.
+ * Builds an instance of this class from the overridden transform.
*/
- public static class ViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsIterable(View.AsIterable<T> transform) { }
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input,
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsIterable(View.AsIterable<T> transform) {
}
- /**
- * Specialized expansion for
- * {@link View.AsSingleton View.AsSingleton} for the
- * JStorm runner in streaming mode.
- */
- public static class ViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
- private View.AsSingleton<T> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
- public ViewAsSingleton(View.AsSingleton<T> transform) {
- this.transform = transform;
- }
+ @Override
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
+ }
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine.globally(
- new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
+ /**
+ * Specialized expansion for
+ * {@link View.AsSingleton View.AsSingleton} for the
+ * JStorm runner in streaming mode.
+ */
+ public static class ViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+ private View.AsSingleton<T> transform;
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+ public ViewAsSingleton(View.AsSingleton<T> transform) {
+ this.transform = transform;
}
- public static class CombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
- Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public CombineGloballyAsSingletonView(
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined =
- input.apply(Combine.globally(transform.getCombineFn())
- .withoutDefaults()
- .withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined,
- combined.getWindowingStrategy(),
- transform.getInsertDefault(),
- transform.getInsertDefault()
- ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined
- .apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
- }
+ @Override
+ public PCollectionView<T> expand(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
- @Override
- protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
- }
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
}
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Collections.singletonList(c.element()));
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
}
+ }
}
+ }
+
+ public static class CombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform;
/**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
+ * Builds an instance of this class from the overridden transform.
*/
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- private static final long serialVersionUID = 1L;
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public CombineGloballyAsSingletonView(
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<>();
- }
+ @Override
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined,
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
+ }
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(Collections.singletonList(c.element()));
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ private static final long serialVersionUID = 1L;
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
}
- /**
- * Creates a primitive {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- *
- * @param <ElemT> The type of the elements of the input PCollection
- * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateJStormPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
- private PCollectionView<ViewT> view;
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
- private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
- this.view = view;
- }
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
- public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
- PCollectionView<ViewT> view) {
- return new CreateJStormPCollectionView<>(view);
- }
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ * <p>
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateJStormPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
- @Override
- public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
- return view;
- }
+ public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateJStormPCollectionView<>(view);
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+ return view;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
index 0bf9a49..6de34dd 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
@@ -17,22 +17,22 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-
public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
- @Override
- public void translateNode(Window.Assign<T> transform, TranslationContext context) {
- TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
- context.getUserGraphContext().setWindowed();
- WindowAssignExecutor executor = new WindowAssignExecutor(
- description,
- transform.getWindowFn(),
- userGraphContext.getOutputTag());
- context.addTransformExecutor(executor);
- }
+ @Override
+ public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+ TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+ String description =
+ describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+ context.getUserGraphContext().setWindowed();
+ WindowAssignExecutor executor = new WindowAssignExecutor(
+ description,
+ transform.getWindowFn(),
+ userGraphContext.getOutputTag());
+ context.addTransformExecutor(executor);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
index b67aff9..c863c9e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
@@ -21,27 +21,27 @@ import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Translates a Window.Bound node into a Storm WindowedBolt
- *
+ *
* @param <T>
*/
public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
- // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
- @Override
- public void translateNode(Window.Assign<T> transform, TranslationContext context) {
- if (transform.getWindowFn() instanceof FixedWindows) {
- context.getUserGraphContext().setWindowed();
- } else if (transform.getWindowFn() instanceof SlidingWindows) {
- context.getUserGraphContext().setWindowed();
- } else {
- throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn());
- }
+ // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
+ @Override
+ public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+ if (transform.getWindowFn() instanceof FixedWindows) {
+ context.getUserGraphContext().setWindowed();
+ } else if (transform.getWindowFn() instanceof SlidingWindows) {
+ context.getUserGraphContext().setWindowed();
+ } else {
+ throw new UnsupportedOperationException(
+ "Not supported window type currently: " + transform.getWindowFn());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
index 07a3ad5..596d8b4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.jstorm.translation.util;
public class CommonInstance {
- public static final String KEY = "Key";
- public static final String VALUE = "Value";
+ public static final String KEY = "Key";
+ public static final String VALUE = "Value";
- public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
+ public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
index 87562fd..750095e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
@@ -17,30 +17,29 @@
*/
package org.apache.beam.runners.jstorm.translation.util;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import java.io.Serializable;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
-import javax.annotation.Nullable;
-import java.io.Serializable;
-
/**
* No-op SideInputReader implementation.
*/
public class DefaultSideInputReader implements SideInputReader, Serializable {
- @Nullable
- @Override
- public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
- return null;
- }
+ @Nullable
+ @Override
+ public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
+ return null;
+ }
- @Override
- public <T> boolean contains(PCollectionView<T> pCollectionView) {
- return false;
- }
+ @Override
+ public <T> boolean contains(PCollectionView<T> pCollectionView) {
+ return false;
+ }
- @Override
- public boolean isEmpty() {
- return true;
- }
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
index 481b7fb..4eb1d8f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
@@ -17,73 +17,74 @@
*/
package org.apache.beam.runners.jstorm.translation.util;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Default StepContext for running DoFn This does not allow accessing state or timer internals.
*/
public class DefaultStepContext implements ExecutionContext.StepContext {
- private TimerInternals timerInternals;
+ private TimerInternals timerInternals;
- private StateInternals stateInternals;
+ private StateInternals stateInternals;
- public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
- this.timerInternals = checkNotNull(timerInternals, "timerInternals");
- this.stateInternals = checkNotNull(stateInternals, "stateInternals");
- }
+ public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
+ this.timerInternals = checkNotNull(timerInternals, "timerInternals");
+ this.stateInternals = checkNotNull(stateInternals, "stateInternals");
+ }
- @Override
- public String getStepName() {
- return null;
- }
+ @Override
+ public String getStepName() {
+ return null;
+ }
- @Override
- public String getTransformName() {
- return null;
- }
+ @Override
+ public String getTransformName() {
+ return null;
+ }
- @Override
- public void noteOutput(WindowedValue<?> windowedValue) {
+ @Override
+ public void noteOutput(WindowedValue<?> windowedValue) {
- }
+ }
- @Override
- public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+ @Override
+ public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
- }
+ }
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException {
- throw new UnsupportedOperationException("Writing side-input data is not supported.");
- }
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(
+ TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+ Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder)
+ throws IOException {
+ throw new UnsupportedOperationException("Writing side-input data is not supported.");
+ }
- @Override
- public StateInternals stateInternals() {
- return stateInternals;
- }
+ @Override
+ public StateInternals stateInternals() {
+ return stateInternals;
+ }
- @Override
- public TimerInternals timerInternals() {
- return timerInternals;
- }
+ @Override
+ public TimerInternals timerInternals() {
+ return timerInternals;
+ }
- public void setStateInternals(StateInternals stateInternals) {
- this.stateInternals = stateInternals;
- }
+ public void setStateInternals(StateInternals stateInternals) {
+ this.stateInternals = stateInternals;
+ }
- public void setTimerInternals(TimerInternals timerInternals) {
- this.timerInternals = timerInternals;
- }
+ public void setTimerInternals(TimerInternals timerInternals) {
+ this.timerInternals = timerInternals;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
index cbf815a..9fd62e4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
@@ -17,37 +17,37 @@
*/
package org.apache.beam.runners.jstorm.util;
+import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.jstorm.translation.runtime.Executor;
-
import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
-import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
public class RunnerUtils {
- /**
- * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
- * @param elem
- * @return
- */
- public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
- WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
- SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
- kvElem.getValue().getKey(),
- kvElem.withValue(kvElem.getValue().getValue()));
- return workItem;
- }
+ /**
+ * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
+ *
+ * @param elem
+ * @return
+ */
+ public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
+ WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
+ SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
+ kvElem.getValue().getKey(),
+ kvElem.withValue(kvElem.getValue().getValue()));
+ return workItem;
+ }
- public static boolean isGroupByKeyExecutor (Executor executor) {
- if (executor instanceof GroupByWindowExecutor) {
- return true;
- } else if (executor instanceof StatefulDoFnExecutor ||
- executor instanceof MultiStatefulDoFnExecutor) {
- return true;
- } else {
- return false;
- }
+ public static boolean isGroupByKeyExecutor(Executor executor) {
+ if (executor instanceof GroupByWindowExecutor) {
+ return true;
+ } else if (executor instanceof StatefulDoFnExecutor ||
+ executor instanceof MultiStatefulDoFnExecutor) {
+ return true;
+ } else {
+ return false;
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
index 391699b..182794f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
@@ -18,47 +18,48 @@
package org.apache.beam.runners.jstorm.util;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.beam.sdk.options.PipelineOptions;
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.sdk.options.PipelineOptions;
/**
* Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
*/
public class SerializedPipelineOptions implements Serializable {
- private final byte[] serializedOptions;
+ private final byte[] serializedOptions;
- /** Lazily initialized copy of deserialized options */
- private transient PipelineOptions pipelineOptions;
+ /**
+ * Lazily initialized copy of deserialized options
+ */
+ private transient PipelineOptions pipelineOptions;
- public SerializedPipelineOptions(PipelineOptions options) {
- checkNotNull(options, "PipelineOptions must not be null.");
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- new ObjectMapper().writeValue(baos, options);
- this.serializedOptions = baos.toByteArray();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
- }
+ public SerializedPipelineOptions(PipelineOptions options) {
+ checkNotNull(options, "PipelineOptions must not be null.");
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(baos, options);
+ this.serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
}
- public PipelineOptions getPipelineOptions() {
- if (pipelineOptions == null) {
- try {
- pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
- } catch (IOException e) {
- throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
- }
- }
+ }
- return pipelineOptions;
+ public PipelineOptions getPipelineOptions() {
+ if (pipelineOptions == null) {
+ try {
+ pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
}
+ return pipelineOptions;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
index dee5f1a..cce21b3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.util.WindowedValue;
/**
* Singleton keyed word item.
+ *
* @param <K>
* @param <ElemT>
*/
@@ -38,7 +39,7 @@ public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
}
public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
- return new SingletonKeyedWorkItem<K, ElemT>(key, value);
+ return new SingletonKeyedWorkItem<K, ElemT>(key, value);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
index 344d3c7..0d6fc23 100644
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrarTest.java
@@ -34,7 +34,7 @@ public class JStormRunnerRegistrarTest {
@Test
public void testFullName() {
String[] args =
- new String[] {String.format("--runner=%s", JStormRunner.class.getName())};
+ new String[]{String.format("--runner=%s", JStormRunner.class.getName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
assertEquals(opts.getRunner(), JStormRunner.class);
}
@@ -42,7 +42,7 @@ public class JStormRunnerRegistrarTest {
@Test
public void testClassName() {
String[] args =
- new String[] {String.format("--runner=%s", JStormRunner.class.getSimpleName())};
+ new String[]{String.format("--runner=%s", JStormRunner.class.getSimpleName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
assertEquals(opts.getRunner(), JStormRunner.class);
}