You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:20 UTC
[20/53] [abbrv] beam git commit: jstorm-runner: Fix checkstyle error
jstorm-runner: Fix checkstyle error
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5a15d548
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5a15d548
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5a15d548
Branch: refs/heads/jstorm-runner
Commit: 5a15d5488f9438695948e72af08ada4c263471d7
Parents: 78a5076
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Fri Jul 14 14:14:49 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800
----------------------------------------------------------------------
.../runners/jstorm/JStormPipelineOptions.java | 3 +
.../beam/runners/jstorm/JStormRunner.java | 16 +-
.../beam/runners/jstorm/TestJStormRunner.java | 2 +-
.../serialization/ImmutableListSerializer.java | 4 +-
.../serialization/ImmutableMapSerializer.java | 3 +
.../serialization/ImmutableSetSerializer.java | 3 +
.../KvStoreIterableSerializer.java | 3 +
.../SdkRepackImmuListSerializer.java | 3 +
.../SdkRepackImmuSetSerializer.java | 3 +
.../UnmodifiableCollectionsSerializer.java | 5 +-
.../translation/JStormPipelineTranslator.java | 186 +++++++++++++++++++
.../translation/StormPipelineTranslator.java | 186 -------------------
.../jstorm/translation/TranslationContext.java | 9 +-
.../translation/runtime/AbstractComponent.java | 4 +-
.../translation/runtime/AdaptorBasicBolt.java | 5 +-
.../translation/runtime/AdaptorBasicSpout.java | 5 +-
.../translation/runtime/DoFnExecutor.java | 16 +-
.../jstorm/translation/runtime/Executor.java | 7 +-
.../translation/runtime/ExecutorContext.java | 3 +
.../translation/runtime/ExecutorsBolt.java | 15 +-
.../translation/runtime/FlattenExecutor.java | 6 +-
.../runtime/GroupByWindowExecutor.java | 5 +
.../runtime/MultiOutputDoFnExecutor.java | 7 +-
.../runtime/MultiStatefulDoFnExecutor.java | 4 +
.../runtime/StatefulDoFnExecutor.java | 4 +
.../translation/runtime/TimerServiceImpl.java | 8 +-
.../translation/runtime/TxExecutorsBolt.java | 5 +-
.../runtime/TxUnboundedSourceSpout.java | 5 +-
.../runtime/UnboundedSourceSpout.java | 5 +-
.../runtime/WindowAssignExecutor.java | 7 +-
.../runtime/state/JStormBagState.java | 5 +-
.../runtime/state/JStormMapState.java | 7 +-
.../translator/FlattenTranslator.java | 6 +-
.../translator/GroupByKeyTranslator.java | 5 +
.../translator/ParDoBoundMultiTranslator.java | 2 +-
.../translator/ParDoBoundTranslator.java | 4 +-
.../jstorm/translation/translator/Stream.java | 11 +-
.../translator/TransformTranslator.java | 4 +
.../translation/translator/ViewTranslator.java | 18 +-
.../translator/WindowAssignTranslator.java | 7 +-
.../jstorm/translation/util/CommonInstance.java | 5 +-
.../beam/runners/jstorm/util/RunnerUtils.java | 12 +-
.../jstorm/util/SerializedPipelineOptions.java | 2 +-
.../jstorm/util/SingletonKeyedWorkItem.java | 3 +-
.../runtime/state/JStormStateInternalsTest.java | 14 +-
45 files changed, 384 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
index 2a87756..114877a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
@@ -64,6 +64,9 @@ public interface JStormPipelineOptions extends PipelineOptions {
Map getParallelismNumMap();
void setParallelismNumMap(Map parallelismNumMap);
+ /**
+ * Default value factory for topology configuration of JStorm.
+ */
class DefaultMapValueFactory implements DefaultValueFactory<Map> {
@Override
public Map create(PipelineOptions pipelineOptions) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 5375d6e..00ec7f6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -78,17 +78,17 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
}
/**
- * convert pipeline options to storm configuration format
- *
+ * convert pipeline options to storm configuration format.
* @param options
* @return
*/
private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
Config config = new Config();
- if (options.getLocalMode())
+ if (options.getLocalMode()) {
config.put(Config.STORM_CLUSTER_MODE, "local");
- else
+ } else {
config.put(Config.STORM_CLUSTER_MODE, "distributed");
+ }
Config.setNumWorkers(config, options.getWorkerNumber());
@@ -161,8 +161,9 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
component = spout;
} else {
AdaptorBasicBolt bolt = context.getBolt(id);
- if (bolt != null)
+ if (bolt != null) {
component = bolt;
+ }
}
return component;
@@ -202,10 +203,11 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
// add stream output declare for "from" component
AbstractComponent component = getComponent(srcBoltId, context);
- if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
+ if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) {
component.addKVOutputField(streamId);
- else
+ } else {
component.addOutputField(streamId);
+ }
// "to" component declares grouping to "from" component
switch (grouping.getType()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index e27efc0..b1b0379 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -2,7 +2,6 @@ package org.apache.beam.runners.jstorm;
import static com.google.common.base.Preconditions.checkNotNull;
-import avro.shaded.com.google.common.collect.Maps;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.metric.AsmMetricRegistry;
import com.alibaba.jstorm.metric.AsmWindow;
@@ -11,6 +10,7 @@ import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.utils.JStormUtils;
import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
index c479f26..268774c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
@@ -10,8 +10,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
+/**
+ * Specific serializer of {@link Kryo} for ImmutableList.
+ */
public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
private static final boolean DOES_NOT_ACCEPT_NULL = false;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
index 77eede3..6b998fc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
@@ -11,6 +11,9 @@ import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Specific serializer of {@link Kryo} for ImmutableMap.
+ */
public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> {
private static final boolean DOES_NOT_ACCEPT_NULL = true;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
index 3a43b2b..edc7b09 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
@@ -8,6 +8,9 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+/**
+ * Specific serializer of {@link Kryo} for ImmutableSet.
+ */
public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
private static final boolean DOES_NOT_ACCEPT_NULL = false;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
index b47f3b7..3835816 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
@@ -9,6 +9,9 @@ import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
+/**
+ * Specific serializer of {@link Kryo} for KvStoreIterable.
+ */
public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
public KvStoreIterableSerializer() {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
index dd4272c..f1ed644 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
@@ -12,6 +12,9 @@ import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Table;
+/**
+ * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableList.
+ */
public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> {
private static final boolean DOES_NOT_ACCEPT_NULL = false;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
index 6973c82..d1ed046 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
@@ -8,6 +8,9 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
+/**
+ * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableSet.
+ */
public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> {
private static final boolean DOES_NOT_ACCEPT_NULL = false;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
index bcee778..33343fc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
@@ -21,6 +21,9 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+/**
+ * Specific serializer of {@link Kryo} for Unmodifiable Collection.
+ */
public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
private static final Field SOURCE_COLLECTION_FIELD;
@@ -83,7 +86,7 @@ public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
}
}
- private static enum UnmodifiableCollection {
+ private enum UnmodifiableCollection {
COLLECTION(
Collections.unmodifiableCollection(Arrays.asList("")).getClass(),
SOURCE_COLLECTION_FIELD) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
new file mode 100644
index 0000000..1449a43
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.List;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pipleline translator of JStorm.
+ */
+public class JStormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+ private static final Logger LOG = LoggerFactory.getLogger(JStormPipelineTranslator.class);
+ private TranslationContext context;
+ private int depth = 0;
+
+ public JStormPipelineTranslator(TranslationContext context) {
+ this.context = context;
+ }
+
+ public void translate(Pipeline pipeline) {
+ List<PTransformOverride> transformOverrides =
+ ImmutableList.<PTransformOverride>builder()
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new ReflectiveOneToOneOverrideFactory(
+ (ViewTranslator.CombineGloballyAsSingletonView.class))))
+ .build();
+ pipeline.replaceAll(transformOverrides);
+ pipeline.traverseTopologically(this);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
+ this.depth++;
+
+ // check if current composite transforms need to be translated.
+ // If not, all sub transforms will be translated in visitPrimitiveTransform.
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null) {
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+ if (translator != null && applyCanTranslate(transform, node, translator)) {
+ applyStreamingTransform(transform, node, translator);
+ LOG.info(genSpaces(this.depth) + "translated-" + node);
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ this.depth--;
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+ }
+
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
+
+ if (!node.isRootNode()) {
+ PTransform<?, ?> transform = node.getTransform();
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+ if (translator == null || !applyCanTranslate(transform, node, translator)) {
+ LOG.info(node.getTransform().getClass().toString());
+ throw new UnsupportedOperationException(
+ "The transform " + transform + " is currently not supported.");
+ }
+ applyStreamingTransform(transform, node, translator);
+ }
+ }
+
+ public void visitValue(PValue value, TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+ }
+
+ private <T extends PTransform<?, ?>> void applyStreamingTransform(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ TransformTranslator<?> translator) {
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+ context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+ typedTranslator.translateNode(typedTransform, context);
+
+ // Maintain PValue to TupleTag map for side inputs translation.
+ context.getUserGraphContext().recordOutputTaggedPValue();
+ }
+
+ private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ TransformTranslator<?> translator) {
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+ context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+
+ return typedTranslator.canTranslate(typedTransform, context);
+ }
+
+ /**
+ * Utility formatting method.
+ *
+ * @param n number of spaces to generate
+ * @return String with "|" followed by n spaces
+ */
+ protected static String genSpaces(int n) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ builder.append("| ");
+ }
+ return builder.toString();
+ }
+
+ private static class ReflectiveOneToOneOverrideFactory<
+ InputT extends PValue,
+ OutputT extends PValue,
+ TransformT extends PTransform<InputT, OutputT>>
+ extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+ private final Class<PTransform<InputT, OutputT>> replacement;
+
+ private ReflectiveOneToOneOverrideFactory(
+ Class<PTransform<InputT, OutputT>> replacement) {
+ this.replacement = replacement;
+ }
+
+ @Override
+ public PTransformReplacement<InputT, OutputT> getReplacementTransform(
+ AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
+ PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
+ PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
+ .withArg(
+ (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(),
+ originalPTransform)
+ .build();
+ InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
+ return PTransformReplacement.of(inputT, replacedPTransform);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
deleted file mode 100644
index 6d6f1c6..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.List;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Pipleline translator of Storm
- */
-public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
- private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
- private TranslationContext context;
- private int depth = 0;
-
- public StormPipelineTranslator(TranslationContext context) {
- this.context = context;
- }
-
- public void translate(Pipeline pipeline) {
- List<PTransformOverride> transformOverrides =
- ImmutableList.<PTransformOverride>builder()
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
- .add(PTransformOverride.of(
- PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
- new ReflectiveOneToOneOverrideFactory(
- (ViewTranslator.CombineGloballyAsSingletonView.class))))
- .build();
- pipeline.replaceAll(transformOverrides);
- pipeline.traverseTopologically(this);
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
- this.depth++;
-
- // check if current composite transforms need to be translated.
- // If not, all sub transforms will be translated in visitPrimitiveTransform.
- PTransform<?, ?> transform = node.getTransform();
- if (transform != null) {
- TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
-
- if (translator != null && applyCanTranslate(transform, node, translator)) {
- applyStreamingTransform(transform, node, translator);
- LOG.info(genSpaces(this.depth) + "translated-" + node);
- return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
- }
- }
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- this.depth--;
- LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
- }
-
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
-
- if (!node.isRootNode()) {
- PTransform<?, ?> transform = node.getTransform();
- TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
- if (translator == null || !applyCanTranslate(transform, node, translator)) {
- LOG.info(node.getTransform().getClass().toString());
- throw new UnsupportedOperationException(
- "The transform " + transform + " is currently not supported.");
- }
- applyStreamingTransform(transform, node, translator);
- }
- }
-
- public void visitValue(PValue value, TransformHierarchy.Node node) {
- LOG.info(genSpaces(this.depth) + "visiting value {}", value);
- }
-
- private <T extends PTransform<?, ?>> void applyStreamingTransform(
- PTransform<?, ?> transform,
- TransformHierarchy.Node node,
- TransformTranslator<?> translator) {
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
- @SuppressWarnings("unchecked")
- TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
- context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
- typedTranslator.translateNode(typedTransform, context);
-
- // Maintain PValue to TupleTag map for side inputs translation.
- context.getUserGraphContext().recordOutputTaggedPValue();
- }
-
- private <T extends PTransform<?, ?>> boolean applyCanTranslate(
- PTransform<?, ?> transform,
- TransformHierarchy.Node node,
- TransformTranslator<?> translator) {
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
- @SuppressWarnings("unchecked")
- TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
- context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
-
- return typedTranslator.canTranslate(typedTransform, context);
- }
-
- /**
- * Utility formatting method.
- *
- * @param n number of spaces to generate
- * @return String with "|" followed by n spaces
- */
- protected static String genSpaces(int n) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < n; i++) {
- builder.append("| ");
- }
- return builder.toString();
- }
-
- private static class ReflectiveOneToOneOverrideFactory<
- InputT extends PValue,
- OutputT extends PValue,
- TransformT extends PTransform<InputT, OutputT>>
- extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
- private final Class<PTransform<InputT, OutputT>> replacement;
-
- private ReflectiveOneToOneOverrideFactory(
- Class<PTransform<InputT, OutputT>> replacement) {
- this.replacement = replacement;
- }
-
- @Override
- public PTransformReplacement<InputT, OutputT> getReplacementTransform(
- AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
- PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
- PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
- .withArg(
- (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(),
- originalPTransform)
- .build();
- InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
- return PTransformReplacement.of(inputT, replacedPTransform);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 526352a..1230a31 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.jstorm.translation;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import avro.shaded.com.google.common.collect.Lists;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
@@ -239,7 +239,9 @@ public class TranslationContext {
}
}
- // TODO: add getSideInputs() and getSideOutputs().
+ /**
+ * Context of user graph.
+ */
public static class UserGraphContext {
private final JStormPipelineOptions options;
private final Map<PValue, TupleTag> pValueToTupleTag;
@@ -326,6 +328,9 @@ public class TranslationContext {
}
}
+ /**
+ * Context of execution graph.
+ */
public static class ExecutionGraphContext {
private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
index 68e9e17..3d7fab8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-/*
+/**
* Enable user to add output stream definitions by API, rather than hard-code.
*/
public abstract class AbstractComponent implements IComponent {
@@ -65,4 +65,4 @@ public abstract class AbstractComponent implements IComponent {
public void setParallelismNum(int num) {
parallelismNum = num;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
index 5e9b056..d8d4d46 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
@@ -19,6 +19,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
import backtype.storm.topology.IRichBatchBolt;
+/**
+ * Adaptor bolt of JStorm extends {@link AbstractComponent}.
+ */
public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
index 0480518..814d416 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
@@ -19,6 +19,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
import backtype.storm.topology.IRichSpout;
+/**
+ * Adaptor bolt of JStorm extends {@link AbstractComponent}.
+ */
public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
index 9507948..e07d890 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import avro.shaded.com.google.common.collect.Iterables;
import com.alibaba.jstorm.cache.IKvStoreManager;
import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -62,11 +62,19 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * JStorm {@link Executor} for {@link DoFn}.
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
public class DoFnExecutor<InputT, OutputT> implements Executor {
private static final long serialVersionUID = 5297603063991078668L;
private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
+ /**
+ * Implements {@link OutputManager} in a DoFn executor.
+ */
public class DoFnExecutorOutputManager implements OutputManager, Serializable {
private static final long serialVersionUID = -661113364735206170L;
@@ -174,7 +182,7 @@ public class DoFnExecutor<InputT, OutputT> implements Executor {
initService(context);
// Side inputs setup
- if (sideInputs != null && sideInputs.isEmpty() == false) {
+ if (sideInputs != null && !sideInputs.isEmpty()) {
pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
watermarkHoldTag =
StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
@@ -261,10 +269,10 @@ public class DoFnExecutor<InputT, OutputT> implements Executor {
}
/**
- * Process all pushed back elements when receiving watermark with max timestamp
+ * Process all pushed back elements when receiving watermark with max timestamp.
*/
public void processAllPushBackElements() {
- if (sideInputs != null && sideInputs.isEmpty() == false) {
+ if (sideInputs != null && !sideInputs.isEmpty()) {
BagState<WindowedValue<InputT>> pushedBackElements =
pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
if (pushedBackElements != null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
index 1a03cb8..0ec4fdd 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
@@ -21,13 +21,16 @@ import java.io.Serializable;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
+/**
+ * An executor is a basic executable unit in a JStorm task.
+ */
public interface Executor extends Serializable {
/**
- * Initialization during runtime
+ * Initialization during runtime.
*/
void init(ExecutorContext context);
<T> void process(TupleTag<T> tag, WindowedValue<T> elem);
void cleanup();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
index 1f65921..55ca171 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
@@ -21,6 +21,9 @@ import backtype.storm.task.TopologyContext;
import com.alibaba.jstorm.cache.IKvStoreManager;
import com.google.auto.value.AutoValue;
+/**
+ * Context of a executors bolt when runtime.
+ */
@AutoValue
public abstract class ExecutorContext {
public static ExecutorContext of(
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
index e80fb48..d33c17a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -19,8 +19,6 @@ package org.apache.beam.runners.jstorm.translation.runtime;
import static com.google.common.base.Preconditions.checkNotNull;
-import avro.shaded.com.google.common.base.Joiner;
-import avro.shaded.com.google.common.collect.Sets;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.ITupleExt;
@@ -31,8 +29,10 @@ import com.alibaba.jstorm.cache.KvStoreManagerFactory;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.utils.KryoSerializer;
import com.google.common.base.Function;
+import com.google.common.base.Joiner;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -49,6 +49,9 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
+ */
public class ExecutorsBolt extends AdaptorBasicBolt {
private static final long serialVersionUID = -7751043327801735211L;
@@ -129,10 +132,10 @@ public class ExecutorsBolt extends AdaptorBasicBolt {
// init kv store manager
String storeName = String.format("task-%d", context.getThisTaskId());
String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
- IKvStoreManager kvStoreManager = isStatefulBolt ?
- KvStoreManagerFactory.getKvStoreManagerWithMonitor(
- context, storeName, stateStorePath, isStatefulBolt) :
- KvStoreManagerFactory.getKvStoreManager(
+ IKvStoreManager kvStoreManager = isStatefulBolt
+ ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+ context, storeName, stateStorePath, isStatefulBolt)
+ : KvStoreManagerFactory.getKvStoreManager(
stormConf, storeName, stateStorePath, isStatefulBolt);
this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
index 5a07243..caf1e47 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
@@ -22,6 +22,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
+ * @param <InputT>
+ */
public class FlattenExecutor<InputT> implements Executor {
private final String description;
@@ -53,4 +57,4 @@ public class FlattenExecutor<InputT> implements Executor {
public String toString() {
return description;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
index 625726d..0dd1af9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
@@ -52,6 +52,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
+ * @param <K>
+ * @param <V>
+ */
public class GroupByWindowExecutor<K, V>
extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
private static final long serialVersionUID = -7563050475488610553L;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
index d36d9a6..a26472c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
@@ -30,6 +30,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * JStorm {@link Executor} for {@link DoFn} with multi-output.
+ * @param <InputT>
+ * @param <OutputT>
+ */
public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
@@ -71,4 +76,4 @@ public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<Input
this.outputManager = new MultiOutputDoFnExecutorOutputManager();
LOG.info("localTupleTagMap: {}", localTupleTagMap);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
index 45ac62a..5e87cff 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
@@ -32,6 +32,10 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
+ * @param <OutputT>
+ */
public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
public MultiStatefulDoFnExecutor(
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
index ba0c052..77ae844 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
@@ -32,6 +32,10 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn}.
+ * @param <OutputT>
+ */
public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
public StatefulDoFnExecutor(
String stepName, String description, JStormPipelineOptions pipelineOptions,
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
index d2514f1..0103095 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import avro.shaded.com.google.common.collect.Maps;
-import avro.shaded.com.google.common.collect.Sets;
import com.alibaba.jstorm.utils.Pair;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,7 +47,7 @@ public class TimerServiceImpl implements TimerService {
private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
- private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+ private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
new PriorityQueue<>();
private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
timerDataToKeyedExecutors = Maps.newHashMap();
@@ -152,4 +152,4 @@ public class TimerServiceImpl implements TimerService {
keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
timerDataToKeyedExecutors.put(timerData, keyedExecutors);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
index 2bd5f7d..8dc51b5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
@@ -29,6 +29,9 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Transactional executors bolt handles the checkpoint and restore of state and timer.
+ */
public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
@@ -127,4 +130,4 @@ public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
throw new RuntimeException(e.getMessage());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
index 16f7d99..48b410f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
@@ -29,6 +29,9 @@ import java.util.Map;
import org.apache.beam.sdk.io.UnboundedSource;
import org.slf4j.LoggerFactory;
+/**
+ * Transactional unbounded source spout handles the checkpoint and restore of state and timer.
+ */
public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
@@ -150,4 +153,4 @@ public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
public void fail(Object msgId) {
throw new UnsupportedOperationException();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
index 7f98c61..006cd47 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -40,8 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Spout implementation that wraps a Beam UnboundedSource
- * <p>
+ * Spout implementation that wraps a Beam UnboundedSource.
* TODO: add wrapper to support metrics in UnboundedSource.
*/
public class UnboundedSourceSpout extends AdaptorBasicSpout {
@@ -61,7 +60,7 @@ public class UnboundedSourceSpout extends AdaptorBasicSpout {
private KryoSerializer<WindowedValue> serializer;
- private long lastWaterMark = 0l;
+ private long lastWaterMark = 0L;
public UnboundedSourceSpout(
String description,
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
index 7f21d26..3cd0aa9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
@@ -29,6 +29,11 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+ * @param <T>
+ * @param <W>
+ */
public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
@@ -104,4 +109,4 @@ public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executo
public String toString() {
return description;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
index 1466f35..df54383 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * JStorm implementation of {@link BagState}.
+ * Implementation of {@link BagState} in JStorm runner.
*/
class JStormBagState<K, T> implements BagState<T> {
private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
@@ -115,6 +115,9 @@ class JStormBagState<K, T> implements BagState<T> {
return ComposedKey.of(key, namespace, elemIndex);
}
+ /**
+ * Implementation of Bag state Iterable.
+ */
private class BagStateIterable implements KvStoreIterable<T> {
private class BagStateIterator implements Iterator<T> {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
index f1c1ed0..ac3f91f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
@@ -26,6 +26,11 @@ import org.apache.beam.sdk.state.ReadableState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Implementation of {@link MapState} in JStorm runner.
+ * @param <K>
+ * @param <V>
+ */
public class JStormMapState<K, V> implements MapState<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
@@ -150,4 +155,4 @@ public class JStormMapState<K, V> implements MapState<K, V> {
return this;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
index bf8d472..44ce8d8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
@@ -26,6 +26,10 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+/**
+ * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
+ * @param <V>
+ */
public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
@Override
@@ -44,4 +48,4 @@ public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PC
FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
index 85f96ce..85cb85d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
@@ -29,6 +29,11 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
+/**
+ * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
+ * @param <K>
+ * @param <V>
+ */
public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
// information of transform
protected PCollection<KV<K, V>> input;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
index 77e4381..6e3392c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
-import avro.shaded.com.google.common.collect.Maps;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
index 7b998d9..ad8f85f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.runners.jstorm.translation.translator;
-import avro.shaded.com.google.common.collect.Lists;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.jstorm.translation.TranslationContext;
@@ -40,7 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
+ * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
*/
public class ParDoBoundTranslator<InputT, OutputT>
extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
index a15a8ba..71243b9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
@@ -39,6 +39,9 @@ public abstract class Stream {
producer, consumer);
}
+ /**
+ * JStorm producer.
+ */
@AutoValue
public abstract static class Producer {
public abstract String getComponentId();
@@ -53,6 +56,9 @@ public abstract class Stream {
}
}
+ /**
+ * JStorm consumer.
+ */
@AutoValue
public abstract static class Consumer {
public abstract String getComponentId();
@@ -65,6 +71,9 @@ public abstract class Stream {
}
}
+ /**
+ * JStorm grouping, which define how to transfer message between two nodes.
+ */
@AutoValue
public abstract static class Grouping {
public abstract Type getType();
@@ -86,7 +95,7 @@ public abstract class Stream {
}
/**
- * Types of stream groupings Storm allows
+ * Types of stream groupings Storm allows.
*/
public enum Type {
ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
index 487cac0..bfa94a0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
@@ -38,6 +38,10 @@ public interface TransformTranslator<T extends PTransform<?, ?>> {
*/
boolean canTranslate(T transform, TranslationContext context);
+ /**
+ * Default translator.
+ * @param <T1>
+ */
class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
@Override
public void translateNode(T1 transform, TranslationContext context) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
index c55c8d6..f71ee9c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
@@ -54,8 +54,7 @@ public class ViewTranslator
/**
* Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
- * for the Flink runner in streaming mode.
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}.
*/
public static class ViewAsMap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
@@ -93,8 +92,7 @@ public class ViewTranslator
/**
* Specialized expansion for {@link
- * View.AsMultimap View.AsMultimap} for the
- * Flink runner in streaming mode.
+ * View.AsMultimap View.AsMultimap}.
*/
public static class ViewAsMultimap<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
@@ -135,8 +133,7 @@ public class ViewTranslator
/**
* Specialized implementation for
- * {@link View.AsList View.AsList} for the
- * JStorm runner in streaming mode.
+ * {@link View.AsList View.AsList}.
*/
public static class ViewAsList<T>
extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
@@ -258,6 +255,12 @@ public class ViewTranslator
}
}
+ /**
+ * Specialized expansion for
+ * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
+ * @param <InputT>
+ * @param <OutputT>
+ */
public static class CombineGloballyAsSingletonView<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
Combine.GloballyAsSingletonView<InputT, OutputT> transform;
@@ -351,8 +354,7 @@ public class ViewTranslator
/**
* Creates a primitive {@link PCollectionView}.
- * <p>
- * <p>For internal use only by runner implementors.
+ * For internal use only by runner implementors.
*
* @param <ElemT> The type of the elements of the input PCollection
* @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
index 6de34dd..2ccb8d7 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
@@ -21,6 +21,11 @@ import org.apache.beam.runners.jstorm.translation.TranslationContext;
import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
import org.apache.beam.sdk.transforms.windowing.Window;
+/**
+ * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a
+ * JStorm {@link WindowAssignExecutor}.
+ * @param <T>
+ */
public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
@Override
@@ -35,4 +40,4 @@ public class WindowAssignTranslator<T> extends TransformTranslator.Default<Windo
userGraphContext.getOutputTag());
context.addTransformExecutor(executor);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
index 596d8b4..4b92a4c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.runners.jstorm.translation.util;
+/**
+ * Common definition of JStorm runner.
+ */
public class CommonInstance {
public static final String KEY = "Key";
public static final String VALUE = "Value";
public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
index 9fd62e4..ad83c2b 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
@@ -25,10 +25,12 @@ import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
+/**
+ * Utils for JStorm runner.
+ */
public class RunnerUtils {
/**
- * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
- *
+ * Convert {@link WindowedValue} into {@link KeyedWorkItem}.
* @param elem
* @return
*/
@@ -43,11 +45,11 @@ public class RunnerUtils {
public static boolean isGroupByKeyExecutor(Executor executor) {
if (executor instanceof GroupByWindowExecutor) {
return true;
- } else if (executor instanceof StatefulDoFnExecutor ||
- executor instanceof MultiStatefulDoFnExecutor) {
+ } else if (executor instanceof StatefulDoFnExecutor
+ || executor instanceof MultiStatefulDoFnExecutor) {
return true;
} else {
return false;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
index 182794f..479afdc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
@@ -34,7 +34,7 @@ public class SerializedPipelineOptions implements Serializable {
private final byte[] serializedOptions;
/**
- * Lazily initialized copy of deserialized options
+ * Lazily initialized copy of deserialized options.
*/
private transient PipelineOptions pipelineOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
index cce21b3..46a12b9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.util.WindowedValue;
/**
* Singleton keyed word item.
- *
* @param <K>
* @param <ElemT>
*/
@@ -60,4 +59,4 @@ public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
public Iterable<WindowedValue<ElemT>> elementsIterable() {
return Collections.singletonList(value);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
index 2a8160c..66f33a7 100644
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
@@ -21,10 +21,10 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-import avro.shaded.com.google.common.collect.Maps;
import com.alibaba.jstorm.cache.IKvStoreManager;
import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.core.StateNamespaces;
@@ -175,11 +175,11 @@ public class JStormStateInternalsTest {
Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
Map.Entry<Integer, Integer> entry = itr.next();
- assertEquals((long) entry.getKey(), 1l);
- assertEquals((long) entry.getValue(), 12l);
+ assertEquals((long) entry.getKey(), 1L);
+ assertEquals((long) entry.getValue(), 12L);
entry = itr.next();
- assertEquals((long) entry.getKey(), 2l);
- assertEquals((long) entry.getValue(), 22l);
+ assertEquals((long) entry.getKey(), 2L);
+ assertEquals((long) entry.getValue(), 22L);
assertEquals(false, itr.hasNext());
mapStateA.remove(1);
@@ -191,8 +191,8 @@ public class JStormStateInternalsTest {
entries = mapStateA.entries().read();
itr = entries.iterator();
entry = itr.next();
- assertEquals((long) entry.getKey(), 2l);
- assertEquals((long) entry.getValue(), 22l);
+ assertEquals((long) entry.getKey(), 2L);
+ assertEquals((long) entry.getValue(), 22L);
assertEquals(false, itr.hasNext());
}