You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:14 UTC
[14/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
index d907fac..6d6f1c6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
@@ -17,15 +17,17 @@
*/
package org.apache.beam.runners.jstorm.translation;
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import java.util.List;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
@@ -34,144 +36,151 @@ import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-
-import java.util.List;
-
/**
* Pipleline translator of Storm
*/
public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
- private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
- private TranslationContext context;
- private int depth = 0;
-
- public StormPipelineTranslator(TranslationContext context) {
- this.context = context;
- }
-
- public void translate(Pipeline pipeline) {
- List<PTransformOverride> transformOverrides =
- ImmutableList.<PTransformOverride>builder()
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
- .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
- new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class))))
- .build();
- pipeline.replaceAll(transformOverrides);
- pipeline.traverseTopologically(this);
- }
-
- @Override
- public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
- LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
- this.depth++;
-
- // check if current composite transforms need to be translated.
- // If not, all sub transforms will be translated in visitPrimitiveTransform.
- PTransform<?, ?> transform = node.getTransform();
- if (transform != null) {
- TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
-
- if (translator != null && applyCanTranslate(transform, node, translator)) {
- applyStreamingTransform(transform, node, translator);
- LOG.info(genSpaces(this.depth) + "translated-" + node);
- return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
- }
- }
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- this.depth--;
- LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+ private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
+ private TranslationContext context;
+ private int depth = 0;
+
+ public StormPipelineTranslator(TranslationContext context) {
+ this.context = context;
+ }
+
+ public void translate(Pipeline pipeline) {
+ List<PTransformOverride> transformOverrides =
+ ImmutableList.<PTransformOverride>builder()
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new ReflectiveOneToOneOverrideFactory(
+ (ViewTranslator.CombineGloballyAsSingletonView.class))))
+ .build();
+ pipeline.replaceAll(transformOverrides);
+ pipeline.traverseTopologically(this);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
+ this.depth++;
+
+ // check if current composite transforms need to be translated.
+ // If not, all sub transforms will be translated in visitPrimitiveTransform.
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null) {
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+ if (translator != null && applyCanTranslate(transform, node, translator)) {
+ applyStreamingTransform(transform, node, translator);
+ LOG.info(genSpaces(this.depth) + "translated-" + node);
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
}
-
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {
- LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
-
- if (!node.isRootNode()) {
- PTransform<?, ?> transform = node.getTransform();
- TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
- if (translator == null || !applyCanTranslate(transform, node, translator)) {
- LOG.info(node.getTransform().getClass().toString());
- throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
- }
- applyStreamingTransform(transform, node, translator);
- }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ this.depth--;
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+ }
+
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
+
+ if (!node.isRootNode()) {
+ PTransform<?, ?> transform = node.getTransform();
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+ if (translator == null || !applyCanTranslate(transform, node, translator)) {
+ LOG.info(node.getTransform().getClass().toString());
+ throw new UnsupportedOperationException(
+ "The transform " + transform + " is currently not supported.");
+ }
+ applyStreamingTransform(transform, node, translator);
}
-
- public void visitValue(PValue value, TransformHierarchy.Node node) {
- LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+ }
+
+ public void visitValue(PValue value, TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+ }
+
+ private <T extends PTransform<?, ?>> void applyStreamingTransform(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ TransformTranslator<?> translator) {
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+ context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+ typedTranslator.translateNode(typedTransform, context);
+
+ // Maintain PValue to TupleTag map for side inputs translation.
+ context.getUserGraphContext().recordOutputTaggedPValue();
+ }
+
+ private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+ PTransform<?, ?> transform,
+ TransformHierarchy.Node node,
+ TransformTranslator<?> translator) {
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+ context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+
+ return typedTranslator.canTranslate(typedTransform, context);
+ }
+
+ /**
+ * Utility formatting method.
+ *
+ * @param n number of spaces to generate
+ * @return String with "|" followed by n spaces
+ */
+ protected static String genSpaces(int n) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ builder.append("| ");
}
-
- private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node,
- TransformTranslator<?> translator) {
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
- @SuppressWarnings("unchecked")
- TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
- context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
- typedTranslator.translateNode(typedTransform, context);
-
- // Maintain PValue to TupleTag map for side inputs translation.
- context.getUserGraphContext().recordOutputTaggedPValue();
+ return builder.toString();
+ }
+
+ private static class ReflectiveOneToOneOverrideFactory<
+ InputT extends PValue,
+ OutputT extends PValue,
+ TransformT extends PTransform<InputT, OutputT>>
+ extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+ private final Class<PTransform<InputT, OutputT>> replacement;
+
+ private ReflectiveOneToOneOverrideFactory(
+ Class<PTransform<InputT, OutputT>> replacement) {
+ this.replacement = replacement;
}
- private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) {
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
- @SuppressWarnings("unchecked")
- TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
- context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
-
- return typedTranslator.canTranslate(typedTransform, context);
- }
-
- /**
- * Utility formatting method.
- *
- * @param n number of spaces to generate
- * @return String with "|" followed by n spaces
- */
- protected static String genSpaces(int n) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < n; i++) {
- builder.append("| ");
- }
- return builder.toString();
- }
-
- private static class ReflectiveOneToOneOverrideFactory<
- InputT extends PValue,
- OutputT extends PValue,
- TransformT extends PTransform<InputT, OutputT>>
- extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
- private final Class<PTransform<InputT, OutputT>> replacement;
-
- private ReflectiveOneToOneOverrideFactory(
- Class<PTransform<InputT, OutputT>> replacement) {
- this.replacement = replacement;
- }
-
- @Override
- public PTransformReplacement<InputT, OutputT> getReplacementTransform(AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
- PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
- PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
- .withArg((Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), originalPTransform)
- .build();
- InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
- return PTransformReplacement.of(inputT, replacedPTransform);
- }
+ @Override
+ public PTransformReplacement<InputT, OutputT> getReplacementTransform(
+ AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
+ PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
+ PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
+ .withArg(
+ (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(),
+ originalPTransform)
+ .build();
+ InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
+ return PTransformReplacement.of(inputT, replacedPTransform);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 707202b..526352a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -17,19 +17,29 @@
*/
package org.apache.beam.runners.jstorm.translation;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
import avro.shaded.com.google.common.collect.Lists;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.translator.Stream;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
-import com.google.common.base.Strings;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
import org.apache.beam.runners.jstorm.translation.runtime.Executor;
+import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.translator.Stream;
import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.PValueBase;
@@ -38,387 +48,392 @@ import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
-
-import java.util.*;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* Maintains the state necessary during Pipeline translation to build a Storm topology.
*/
public class TranslationContext {
- private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
-
- private final UserGraphContext userGraphContext;
- private final ExecutionGraphContext executionGraphContext;
-
- public TranslationContext(JStormPipelineOptions options) {
- this.userGraphContext = new UserGraphContext(options);
- this.executionGraphContext = new ExecutionGraphContext();
+ private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
+
+ private final UserGraphContext userGraphContext;
+ private final ExecutionGraphContext executionGraphContext;
+
+ public TranslationContext(JStormPipelineOptions options) {
+ this.userGraphContext = new UserGraphContext(options);
+ this.executionGraphContext = new ExecutionGraphContext();
+ }
+
+ public ExecutionGraphContext getExecutionGraphContext() {
+ return executionGraphContext;
+ }
+
+ public UserGraphContext getUserGraphContext() {
+ return userGraphContext;
+ }
+
+ private void addStormStreamDef(
+ TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
+ Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
+ if (!producer.getComponentId().equals(destComponentName)) {
+ Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
+ executionGraphContext.registerStreamConsumer(consumer, producer);
+
+ ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
+ if (executorsBolt != null) {
+ executorsBolt.addExternalOutputTag(input.getTag());
+ }
}
-
- public ExecutionGraphContext getExecutionGraphContext() {
- return executionGraphContext;
+ }
+
+ private String getUpstreamExecutorsBolt() {
+ for (PValue value : userGraphContext.getInputs().values()) {
+ String componentId = executionGraphContext.getProducerComponentId(value);
+ if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
+ return componentId;
+ }
}
-
- public UserGraphContext getUserGraphContext() {
- return userGraphContext;
+ // When upstream component is spout, "null" will be return.
+ return null;
+ }
+
+ /**
+ * check if the current transform is applied to source collection.
+ *
+ * @return
+ */
+ private boolean connectedToSource() {
+ for (PValue value : userGraphContext.getInputs().values()) {
+ if (executionGraphContext.producedBySpout(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param upstreamExecutorsBolt
+ * @return true if there is multiple input streams, or upstream executor output the same stream
+ * to different executors
+ */
+ private boolean isMultipleInputOrOutput(
+ ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
+ if (inputs.size() > 1) {
+ return true;
+ } else {
+ final Sets.SetView<TupleTag> intersection =
+ Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
+ if (!intersection.isEmpty()) {
+ // there is already a different executor consume the same input
+ return true;
+ } else {
+ return false;
+ }
}
+ }
- private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
- Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
- if (!producer.getComponentId().equals(destComponentName)) {
- Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
- executionGraphContext.registerStreamConsumer(consumer, producer);
+ public void addTransformExecutor(Executor executor) {
+ addTransformExecutor(executor, Collections.EMPTY_LIST);
+ }
- ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
- if (executorsBolt != null) {
- executorsBolt.addExternalOutputTag(input.getTag());
- }
- }
- }
+ public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
+ addTransformExecutor(
+ executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
+ }
- private String getUpstreamExecutorsBolt() {
- for (PValue value : userGraphContext.getInputs().values()) {
- String componentId = executionGraphContext.getProducerComponentId(value);
- if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
- return componentId;
- }
- }
- // When upstream component is spout, "null" will be return.
- return null;
- }
+ public void addTransformExecutor(
+ Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
+ addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
+ }
- /**
- * check if the current transform is applied to source collection.
- * @return
- */
- private boolean connectedToSource() {
- for (PValue value : userGraphContext.getInputs().values()) {
- if (executionGraphContext.producedBySpout(value)) {
- return true;
- }
- }
- return false;
- }
+ public void addTransformExecutor(
+ Executor executor,
+ Map<TupleTag<?>, PValue> inputs,
+ Map<TupleTag<?>, PValue> outputs,
+ List<PValue> sideInputs) {
+ String name = null;
+ ExecutorsBolt bolt = null;
+
+ boolean isGBK = false;
/**
- * @param upstreamExecutorsBolt
- * @return true if there is multiple input streams, or upstream executor output the same stream
- * to different executors
+ * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
+ * For following cases, a new bolt is created for the specified executor, otherwise the executor
+ * will be added into the bolt contains corresponding upstream executor.
+ * a) it is a GroupByKey executor
+ * b) it is connected to source directly
+ * c) None existing upstream bolt was found
+ * d) For the purpose of performance to reduce the side effects between multiple streams which
+ * is output to same executor, a new bolt will be created.
*/
- private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
- if (inputs.size() > 1) {
- return true;
- } else {
- final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
- if (!intersection.isEmpty()) {
- // there is already a different executor consume the same input
- return true;
- } else {
- return false;
- }
- }
+ if (RunnerUtils.isGroupByKeyExecutor(executor)) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ isGBK = true;
+ } else if (connectedToSource()) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ } else {
+ name = getUpstreamExecutorsBolt();
+ if (name == null) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ } else {
+ bolt = executionGraphContext.getBolt(name);
+ if (isMultipleInputOrOutput(bolt, inputs)) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ }
+ }
}
- public void addTransformExecutor(Executor executor) {
- addTransformExecutor(executor, Collections.EMPTY_LIST);
- }
-
- public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
- addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
- }
-
- public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
- addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
- }
-
- public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) {
- String name = null;
-
- ExecutorsBolt bolt = null;
-
- boolean isGBK = false;
- /**
- * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
- * For following cases, a new bolt is created for the specified executor, otherwise the executor
- * will be added into the bolt contains corresponding upstream executor.
- * a) it is a GroupByKey executor
- * b) it is connected to source directly
- * c) None existing upstream bolt was found
- * d) For the purpose of performance to reduce the side effects between multiple streams which
- * is output to same executor, a new bolt will be created.
- */
- if (RunnerUtils.isGroupByKeyExecutor(executor)) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- isGBK = true;
- } else if (connectedToSource()) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- } else {
- name = getUpstreamExecutorsBolt();
- if (name == null) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- } else {
- bolt = executionGraphContext.getBolt(name);
- if (isMultipleInputOrOutput(bolt, inputs)) {
- bolt = new ExecutorsBolt();
- name = executionGraphContext.registerBolt(bolt);
- }
- }
- }
-
- // update the output tags of current transform into ExecutorsBolt
- for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
- TupleTag tag = entry.getKey();
- PValue value = entry.getValue();
-
- // use tag of PValueBase
- if (value instanceof PValueBase) {
- tag = ((PValueBase) value).expand().keySet().iterator().next();
- }
- executionGraphContext.registerStreamProducer(
- TaggedPValue.of(tag, value),
- Stream.Producer.of(name, tag.getId(), value.getName()));
- //bolt.addOutputTags(tag);
- }
+ // update the output tags of current transform into ExecutorsBolt
+ for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+ TupleTag tag = entry.getKey();
+ PValue value = entry.getValue();
+
+ // use tag of PValueBase
+ if (value instanceof PValueBase) {
+ tag = ((PValueBase) value).expand().keySet().iterator().next();
+ }
+ executionGraphContext.registerStreamProducer(
+ TaggedPValue.of(tag, value),
+ Stream.Producer.of(name, tag.getId(), value.getName()));
+ //bolt.addOutputTags(tag);
+ }
- // add the transform executor into the chain of ExecutorsBolt
- for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
- TupleTag tag = entry.getKey();
- PValue value = entry.getValue();
- bolt.addExecutor(tag, executor);
-
- // filter all connections inside bolt
- //if (!bolt.getOutputTags().contains(tag)) {
- Stream.Grouping grouping;
- if (isGBK) {
- grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
- } else {
- grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
- }
- addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
- //}
- }
+ // add the transform executor into the chain of ExecutorsBolt
+ for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
+ TupleTag tag = entry.getKey();
+ PValue value = entry.getValue();
+ bolt.addExecutor(tag, executor);
+
+ // filter all connections inside bolt
+ //if (!bolt.getOutputTags().contains(tag)) {
+ Stream.Grouping grouping;
+ if (isGBK) {
+ grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
+ } else {
+ grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
+ }
+ addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
+ //}
+ }
- for (PValue sideInput : sideInputs) {
- TupleTag tag = userGraphContext.findTupleTag(sideInput);
- bolt.addExecutor(tag, executor);
- checkState(!bolt.getOutputTags().contains(tag));
- addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
- }
+ for (PValue sideInput : sideInputs) {
+ TupleTag tag = userGraphContext.findTupleTag(sideInput);
+ bolt.addExecutor(tag, executor);
+ checkState(!bolt.getOutputTags().contains(tag));
+ addStormStreamDef(
+ TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
+ }
- bolt.registerExecutor(executor);
+ bolt.registerExecutor(executor);
- // set parallelismNumber
- String pTransformfullName = userGraphContext.currentTransform.getFullName();
- String compositeName = pTransformfullName.split("/")[0];
- Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
- if (parallelismNumMap.containsKey(compositeName)) {
- int configNum = (Integer) parallelismNumMap.get(compositeName);
- int currNum = bolt.getParallelismNum();
- bolt.setParallelismNum(Math.max(configNum, currNum));
- }
+ // set parallelismNumber
+ String pTransformfullName = userGraphContext.currentTransform.getFullName();
+ String compositeName = pTransformfullName.split("/")[0];
+ Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
+ if (parallelismNumMap.containsKey(compositeName)) {
+ int configNum = (Integer) parallelismNumMap.get(compositeName);
+ int currNum = bolt.getParallelismNum();
+ bolt.setParallelismNum(Math.max(configNum, currNum));
}
+ }
- // TODO: add getSideInputs() and getSideOutputs().
- public static class UserGraphContext {
- private final JStormPipelineOptions options;
- private final Map<PValue, TupleTag> pValueToTupleTag;
- private AppliedPTransform<?, ?, ?> currentTransform = null;
+ // TODO: add getSideInputs() and getSideOutputs().
+ public static class UserGraphContext {
+ private final JStormPipelineOptions options;
+ private final Map<PValue, TupleTag> pValueToTupleTag;
+ private AppliedPTransform<?, ?, ?> currentTransform = null;
- private boolean isWindowed = false;
+ private boolean isWindowed = false;
- public UserGraphContext(JStormPipelineOptions options) {
- this.options = checkNotNull(options, "options");
- this.pValueToTupleTag = Maps.newHashMap();
- }
+ public UserGraphContext(JStormPipelineOptions options) {
+ this.options = checkNotNull(options, "options");
+ this.pValueToTupleTag = Maps.newHashMap();
+ }
- public JStormPipelineOptions getOptions() {
- return this.options;
- }
+ public JStormPipelineOptions getOptions() {
+ return this.options;
+ }
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
- this.currentTransform = transform;
- }
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+ this.currentTransform = transform;
+ }
- public String getStepName() {
- return currentTransform.getFullName();
- }
+ public String getStepName() {
+ return currentTransform.getFullName();
+ }
- public <T extends PValue> T getInput() {
- return (T) currentTransform.getInputs().values().iterator().next();
- }
+ public <T extends PValue> T getInput() {
+ return (T) currentTransform.getInputs().values().iterator().next();
+ }
- public Map<TupleTag<?>, PValue> getInputs() {
- return currentTransform.getInputs();
- }
+ public Map<TupleTag<?>, PValue> getInputs() {
+ return currentTransform.getInputs();
+ }
- public TupleTag<?> getInputTag() {
- return currentTransform.getInputs().keySet().iterator().next();
- }
+ public TupleTag<?> getInputTag() {
+ return currentTransform.getInputs().keySet().iterator().next();
+ }
- public List<TupleTag<?>> getInputTags() {
- return Lists.newArrayList(currentTransform.getInputs().keySet());
- }
+ public List<TupleTag<?>> getInputTags() {
+ return Lists.newArrayList(currentTransform.getInputs().keySet());
+ }
- public <T extends PValue> T getOutput() {
- return (T) currentTransform.getOutputs().values().iterator().next();
- }
+ public <T extends PValue> T getOutput() {
+ return (T) currentTransform.getOutputs().values().iterator().next();
+ }
- public Map<TupleTag<?>, PValue> getOutputs() {
- return currentTransform.getOutputs();
- }
+ public Map<TupleTag<?>, PValue> getOutputs() {
+ return currentTransform.getOutputs();
+ }
- public TupleTag<?> getOutputTag() {
- return currentTransform.getOutputs().keySet().iterator().next();
- }
+ public TupleTag<?> getOutputTag() {
+ return currentTransform.getOutputs().keySet().iterator().next();
+ }
- public List<TupleTag<?>> getOutputTags() {
- return Lists.newArrayList(currentTransform.getOutputs().keySet());
- }
+ public List<TupleTag<?>> getOutputTags() {
+ return Lists.newArrayList(currentTransform.getOutputs().keySet());
+ }
- public void recordOutputTaggedPValue() {
- for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
- pValueToTupleTag.put(entry.getValue(), entry.getKey());
- }
- }
+ public void recordOutputTaggedPValue() {
+ for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
+ pValueToTupleTag.put(entry.getValue(), entry.getKey());
+ }
+ }
- public <T> TupleTag<T> findTupleTag(PValue pValue) {
- return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
- }
+ public <T> TupleTag<T> findTupleTag(PValue pValue) {
+ return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
+ }
- public void setWindowed() {
- this.isWindowed = true;
- }
+ public void setWindowed() {
+ this.isWindowed = true;
+ }
- public boolean isWindowed() {
- return this.isWindowed;
- }
+ public boolean isWindowed() {
+ return this.isWindowed;
+ }
- @Override
- public String toString() {
- return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
- .transform(new Function<Map.Entry<PValue,TupleTag>, String>() {
- @Override
- public String apply(Map.Entry<PValue, TupleTag> entry) {
- return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
- }}));
- }
+ @Override
+ public String toString() {
+ return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
+ .transform(new Function<Map.Entry<PValue, TupleTag>, String>() {
+ @Override
+ public String apply(Map.Entry<PValue, TupleTag> entry) {
+ return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
+ }
+ }));
}
+ }
- public static class ExecutionGraphContext {
+ public static class ExecutionGraphContext {
- private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
- private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
+ private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
+ private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
- // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
- private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
- private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
+ // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
+ private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
+ private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
- private final List<Stream> streams = new ArrayList<>();
+ private final List<Stream> streams = new ArrayList<>();
- private int id = 1;
+ private int id = 1;
- public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
- checkNotNull(spout, "spout");
- checkNotNull(output, "output");
- String name = "spout" + genId();
- this.spoutMap.put(name, spout);
- registerStreamProducer(
- output,
- Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
- }
+ public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
+ checkNotNull(spout, "spout");
+ checkNotNull(output, "output");
+ String name = "spout" + genId();
+ this.spoutMap.put(name, spout);
+ registerStreamProducer(
+ output,
+ Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
+ }
- public AdaptorBasicSpout getSpout(String id) {
- if (Strings.isNullOrEmpty(id)) {
- return null;
- }
- return this.spoutMap.get(id);
- }
+ public AdaptorBasicSpout getSpout(String id) {
+ if (Strings.isNullOrEmpty(id)) {
+ return null;
+ }
+ return this.spoutMap.get(id);
+ }
- public Map<String, AdaptorBasicSpout> getSpouts() {
- return this.spoutMap;
- }
+ public Map<String, AdaptorBasicSpout> getSpouts() {
+ return this.spoutMap;
+ }
- public String registerBolt(ExecutorsBolt bolt) {
- checkNotNull(bolt, "bolt");
- String name = "bolt" + genId();
- this.boltMap.put(name, bolt);
- return name;
- }
+ public String registerBolt(ExecutorsBolt bolt) {
+ checkNotNull(bolt, "bolt");
+ String name = "bolt" + genId();
+ this.boltMap.put(name, bolt);
+ return name;
+ }
- public ExecutorsBolt getBolt(String id) {
- if (Strings.isNullOrEmpty(id)) {
- return null;
- }
- return this.boltMap.get(id);
- }
+ public ExecutorsBolt getBolt(String id) {
+ if (Strings.isNullOrEmpty(id)) {
+ return null;
+ }
+ return this.boltMap.get(id);
+ }
- public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
- checkNotNull(taggedPValue, "taggedPValue");
- checkNotNull(producer, "producer");
- pValueToProducer.put(taggedPValue.getValue(), producer);
- producerToTaggedPValue.put(producer, taggedPValue);
- }
+ public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
+ checkNotNull(taggedPValue, "taggedPValue");
+ checkNotNull(producer, "producer");
+ pValueToProducer.put(taggedPValue.getValue(), producer);
+ producerToTaggedPValue.put(producer, taggedPValue);
+ }
- public Stream.Producer getProducer(PValue pValue) {
- return pValueToProducer.get(checkNotNull(pValue, "pValue"));
- }
+ public Stream.Producer getProducer(PValue pValue) {
+ return pValueToProducer.get(checkNotNull(pValue, "pValue"));
+ }
- public String getProducerComponentId(PValue pValue) {
- Stream.Producer producer = getProducer(pValue);
- return producer == null ? null : producer.getComponentId();
- }
+ public String getProducerComponentId(PValue pValue) {
+ Stream.Producer producer = getProducer(pValue);
+ return producer == null ? null : producer.getComponentId();
+ }
- public boolean producedBySpout(PValue pValue) {
- String componentId = getProducerComponentId(pValue);
- return getSpout(componentId) != null;
- }
+ public boolean producedBySpout(PValue pValue) {
+ String componentId = getProducerComponentId(pValue);
+ return getSpout(componentId) != null;
+ }
- public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
- streams.add(Stream.of(
- checkNotNull(producer, "producer"),
- checkNotNull(consumer, "consumer")));
- }
+ public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
+ streams.add(Stream.of(
+ checkNotNull(producer, "producer"),
+ checkNotNull(consumer, "consumer")));
+ }
- public Map<PValue, Stream.Producer> getPValueToProducers() {
- return pValueToProducer;
- }
+ public Map<PValue, Stream.Producer> getPValueToProducers() {
+ return pValueToProducer;
+ }
- public Iterable<Stream> getStreams() {
- return streams;
- }
+ public Iterable<Stream> getStreams() {
+ return streams;
+ }
- @Override
- public String toString() {
- List<String> ret = new ArrayList<>();
- ret.add("SPOUT");
- for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
- ret.add(entry.getKey() + ": " + entry.getValue().toString());
- }
- ret.add("BOLT");
- for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
- ret.add(entry.getKey() + ": " + entry.getValue().toString());
- }
- ret.add("STREAM");
- for (Stream stream : streams) {
- ret.add(String.format(
- "%s@@%s ---> %s@@%s",
- stream.getProducer().getStreamId(),
- stream.getProducer().getComponentId(),
- stream.getConsumer().getGrouping(),
- stream.getConsumer().getComponentId()));
- }
- return Joiner.on("\n").join(ret);
- }
+ @Override
+ public String toString() {
+ List<String> ret = new ArrayList<>();
+ ret.add("SPOUT");
+ for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
+ ret.add(entry.getKey() + ": " + entry.getValue().toString());
+ }
+ ret.add("BOLT");
+ for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
+ ret.add(entry.getKey() + ": " + entry.getValue().toString());
+ }
+ ret.add("STREAM");
+ for (Stream stream : streams) {
+ ret.add(String.format(
+ "%s@@%s ---> %s@@%s",
+ stream.getProducer().getStreamId(),
+ stream.getProducer().getComponentId(),
+ stream.getConsumer().getGrouping(),
+ stream.getConsumer().getComponentId()));
+ }
+ return Joiner.on("\n").join(ret);
+ }
- private synchronized int genId() {
- return id++;
- }
+ private synchronized int genId() {
+ return id++;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index a33f07b..bce5b3e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.jstorm.translation;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator;
import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator;
import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator;
@@ -35,50 +37,49 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* Lookup table mapping PTransform types to associated TransformTranslator implementations.
*/
public class TranslatorRegistry {
- private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
- private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>();
+ private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS =
+ new HashMap<>();
- static {
- TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
- TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
- // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
- // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+ static {
+ TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
+ TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
+ // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+ // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
- TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
- TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
+ TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
+ TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
- //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
- TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
+ //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
+ TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
- TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
- TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
- TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
+ TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
- /**
- * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be
- * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms.
- * If any improvement is required, the composite transforms will be translated in the future.
- */
- // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
- // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
- // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
- }
+ /**
+ * Currently, empty translation is required for combine and reshuffle.
+ * Because, the transforms will be mapped to GroupByKey and Pardo finally.
+ * So we only need to translator the finally transforms.
+ * If any improvement is required, the composite transforms will be translated in the future.
+ */
+ // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+ // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
+ // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
+ }
- public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
- TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
- if (translator == null) {
- LOG.warn("Unsupported operator={}", transform.getClass().getName());
- }
- return translator;
+ public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+ TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
+ if (translator == null) {
+ LOG.warn("Unsupported operator={}", transform.getClass().getName());
}
+ return translator;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
index b07b426..68e9e17 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
@@ -17,54 +17,52 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-
import backtype.storm.topology.IComponent;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
/*
* Enable user to add output stream definitions by API, rather than hard-code.
*/
public abstract class AbstractComponent implements IComponent {
- private Map<String, Fields> streamToFields = new HashMap<>();
- private Map<String, Boolean> keyStreams = new HashMap<>();
- private int parallelismNum = 0;
+ private Map<String, Fields> streamToFields = new HashMap<>();
+ private Map<String, Boolean> keyStreams = new HashMap<>();
+ private int parallelismNum = 0;
- public void addOutputField(String streamId) {
- addOutputField(streamId, new Fields(CommonInstance.VALUE));
- }
+ public void addOutputField(String streamId) {
+ addOutputField(streamId, new Fields(CommonInstance.VALUE));
+ }
- public void addOutputField(String streamId, Fields fields) {
- streamToFields.put(streamId, fields);
- keyStreams.put(streamId, false);
- }
+ public void addOutputField(String streamId, Fields fields) {
+ streamToFields.put(streamId, fields);
+ keyStreams.put(streamId, false);
+ }
- public void addKVOutputField(String streamId) {
- streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
- keyStreams.put(streamId, true);
- }
+ public void addKVOutputField(String streamId) {
+ streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
+ keyStreams.put(streamId, true);
+ }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
- declarer.declareStream(entry.getKey(), entry.getValue());
- }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
+ declarer.declareStream(entry.getKey(), entry.getValue());
}
+ }
- public boolean keyedEmit(String streamId) {
- Boolean isKeyedStream = keyStreams.get(streamId);
- return isKeyedStream == null ? false : isKeyedStream;
- }
+ public boolean keyedEmit(String streamId) {
+ Boolean isKeyedStream = keyStreams.get(streamId);
+ return isKeyedStream == null ? false : isKeyedStream;
+ }
- public int getParallelismNum() {
- return parallelismNum;
- }
+ public int getParallelismNum() {
+ return parallelismNum;
+ }
- public void setParallelismNum(int num) {
- parallelismNum = num;
- }
+ public void setParallelismNum(int num) {
+ parallelismNum = num;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
index 91881f2..5e9b056 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
@@ -20,5 +20,5 @@ package org.apache.beam.runners.jstorm.translation.runtime;
import backtype.storm.topology.IRichBatchBolt;
public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
-
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
index 5a0c6ec..0480518 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
@@ -20,5 +20,5 @@ package org.apache.beam.runners.jstorm.translation.runtime;
import backtype.storm.topology.IRichSpout;
public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
-
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
index c73a3b8..9507948 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
@@ -17,312 +17,319 @@
*/
package org.apache.beam.runners.jstorm.translation.runtime;
-import java.io.Serializable;
-import java.util.*;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import avro.shaded.com.google.common.collect.Iterables;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-
import com.alibaba.jstorm.cache.IKvStoreManager;
import com.alibaba.jstorm.metric.MetricClient;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class DoFnExecutor<InputT, OutputT> implements Executor {
- private static final long serialVersionUID = 5297603063991078668L;
+ private static final long serialVersionUID = 5297603063991078668L;
- private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
- public class DoFnExecutorOutputManager implements OutputManager, Serializable {
- private static final long serialVersionUID = -661113364735206170L;
+ public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+ private static final long serialVersionUID = -661113364735206170L;
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
- protected transient DoFnRunner<InputT, OutputT> runner = null;
- protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
-
- protected final String stepName;
-
- protected int internalDoFnExecutorId;
-
- protected final String description;
-
- protected final TupleTag<OutputT> mainTupleTag;
- protected final List<TupleTag<?>> sideOutputTags;
-
- protected SerializedPipelineOptions serializedOptions;
- protected transient JStormPipelineOptions pipelineOptions;
-
- protected DoFn<InputT, OutputT> doFn;
- protected final Coder<WindowedValue<InputT>> inputCoder;
- protected DoFnInvoker<InputT, OutputT> doFnInvoker;
- protected OutputManager outputManager;
- protected WindowingStrategy<?, ?> windowingStrategy;
- protected final TupleTag<InputT> mainInputTag;
- protected Collection<PCollectionView<?>> sideInputs;
- protected SideInputHandler sideInputHandler;
- protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
-
- // Initialize during runtime
- protected ExecutorContext executorContext;
- protected ExecutorsBolt executorsBolt;
- protected TimerInternals timerInternals;
- protected transient StateInternals pushbackStateInternals;
- protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
- protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
- protected transient IKvStoreManager kvStoreManager;
- protected DefaultStepContext stepContext;
- protected transient MetricClient metricClient;
-
- public DoFnExecutor(
- String stepName,
- String description,
- JStormPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- Coder<WindowedValue<InputT>> inputCoder,
- WindowingStrategy<?, ?> windowingStrategy,
- TupleTag<InputT> mainInputTag,
- Collection<PCollectionView<?>> sideInputs,
- Map<TupleTag, PCollectionView<?>> sideInputTagToView,
- TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags) {
- this.stepName = checkNotNull(stepName, "stepName");
- this.description = checkNotNull(description, "description");
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- this.doFn = doFn;
- this.inputCoder = inputCoder;
- this.outputManager = new DoFnExecutorOutputManager();
- this.windowingStrategy = windowingStrategy;
- this.mainInputTag = mainInputTag;
- this.sideInputs = sideInputs;
- this.mainTupleTag = mainTupleTag;
- this.sideOutputTags = sideOutputTags;
- this.sideInputTagToView = sideInputTagToView;
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ executorsBolt.processExecutorElem(tag, output);
}
-
- protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
- return new DoFnRunnerWithMetrics<>(
- stepName,
- DoFnRunners.simpleRunner(
- this.pipelineOptions,
- this.doFn,
- this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
- this.outputManager,
- this.mainTupleTag,
- this.sideOutputTags,
- this.stepContext,
- this.windowingStrategy),
- MetricsReporter.create(metricClient));
+ }
+
+ protected transient DoFnRunner<InputT, OutputT> runner = null;
+ protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
+
+ protected final String stepName;
+
+ protected int internalDoFnExecutorId;
+
+ protected final String description;
+
+ protected final TupleTag<OutputT> mainTupleTag;
+ protected final List<TupleTag<?>> sideOutputTags;
+
+ protected SerializedPipelineOptions serializedOptions;
+ protected transient JStormPipelineOptions pipelineOptions;
+
+ protected DoFn<InputT, OutputT> doFn;
+ protected final Coder<WindowedValue<InputT>> inputCoder;
+ protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+ protected OutputManager outputManager;
+ protected WindowingStrategy<?, ?> windowingStrategy;
+ protected final TupleTag<InputT> mainInputTag;
+ protected Collection<PCollectionView<?>> sideInputs;
+ protected SideInputHandler sideInputHandler;
+ protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
+
+ // Initialize during runtime
+ protected ExecutorContext executorContext;
+ protected ExecutorsBolt executorsBolt;
+ protected TimerInternals timerInternals;
+ protected transient StateInternals pushbackStateInternals;
+ protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
+ protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
+ protected transient IKvStoreManager kvStoreManager;
+ protected DefaultStepContext stepContext;
+ protected transient MetricClient metricClient;
+
+ public DoFnExecutor(
+ String stepName,
+ String description,
+ JStormPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ Coder<WindowedValue<InputT>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<InputT> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+ TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.description = checkNotNull(description, "description");
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.doFn = doFn;
+ this.inputCoder = inputCoder;
+ this.outputManager = new DoFnExecutorOutputManager();
+ this.windowingStrategy = windowingStrategy;
+ this.mainInputTag = mainInputTag;
+ this.sideInputs = sideInputs;
+ this.mainTupleTag = mainTupleTag;
+ this.sideOutputTags = sideOutputTags;
+ this.sideInputTagToView = sideInputTagToView;
+ }
+
+ protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
+ return new DoFnRunnerWithMetrics<>(
+ stepName,
+ DoFnRunners.simpleRunner(
+ this.pipelineOptions,
+ this.doFn,
+ this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
+ this.outputManager,
+ this.mainTupleTag,
+ this.sideOutputTags,
+ this.stepContext,
+ this.windowingStrategy),
+ MetricsReporter.create(metricClient));
+ }
+
+ protected void initService(ExecutorContext context) {
+ // TODO: what should be set for key in here?
+ timerInternals = new JStormTimerInternals(
+ null /* key */, this, context.getExecutorsBolt().timerService());
+ kvStoreManager = context.getKvStoreManager();
+ stepContext = new DefaultStepContext(timerInternals,
+ new JStormStateInternals(
+ null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ metricClient = new MetricClient(executorContext.getTopologyContext());
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorContext = context;
+ this.executorsBolt = context.getExecutorsBolt();
+ this.pipelineOptions =
+ this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+ initService(context);
+
+ // Side inputs setup
+ if (sideInputs != null && sideInputs.isEmpty() == false) {
+ pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+ watermarkHoldTag =
+ StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
+ pushbackStateInternals = new JStormStateInternals(
+ null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+ sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
+ runner = getDoFnRunner();
+ pushbackRunner =
+ SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
+ } else {
+ runner = getDoFnRunner();
}
- protected void initService(ExecutorContext context) {
- // TODO: what should be set for key in here?
- timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService());
- kvStoreManager = context.getKvStoreManager();
- stepContext = new DefaultStepContext(timerInternals,
- new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
- metricClient = new MetricClient(executorContext.getTopologyContext());
+ // Process user's setup
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
+ tag, mainInputTag, sideInputs, elem.getValue()));
+ if (mainInputTag.equals(tag)) {
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
}
-
- @Override
- public void init(ExecutorContext context) {
- this.executorContext = context;
- this.executorsBolt = context.getExecutorsBolt();
- this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
- initService(context);
-
- // Side inputs setup
- if (sideInputs != null && sideInputs.isEmpty() == false) {
- pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
- watermarkHoldTag =
- StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
- pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
- sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
- runner = getDoFnRunner();
- pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
- } else {
- runner = getDoFnRunner();
+ }
+
+ protected <T> void processMainInput(WindowedValue<T> elem) {
+ if (sideInputs.isEmpty()) {
+ runner.processElement((WindowedValue<InputT>) elem);
+ } else {
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+ if (pushedBackValue.getTimestamp().isBefore(min)) {
+ min = pushedBackValue.getTimestamp();
}
-
- // Process user's setup
- doFnInvoker = DoFnInvokers.invokerFor(doFn);
- doFnInvoker.invokeSetup();
+ min = earlier(min, pushedBackValue.getTimestamp());
+ pushedBack.add(pushedBackValue);
+ }
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
}
+ }
- @Override
- public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
- tag, mainInputTag, sideInputs, elem.getValue()));
- if (mainInputTag.equals(tag)) {
- processMainInput(elem);
- } else {
- processSideInput(tag, elem);
- }
- }
-
- protected <T> void processMainInput(WindowedValue<T> elem) {
- if (sideInputs.isEmpty()) {
- runner.processElement((WindowedValue<InputT>) elem);
- } else {
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
- Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
- if (pushedBackValue.getTimestamp().isBefore(min)) {
- min = pushedBackValue.getTimestamp();
- }
- min = earlier(min, pushedBackValue.getTimestamp());
- pushedBack.add(pushedBackValue);
- }
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
- }
- }
-
- protected void processSideInput(TupleTag tag, WindowedValue elem) {
- LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
-
- PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
- sideInputHandler.addSideInputValue(sideInputView, elem);
+ protected void processSideInput(TupleTag tag, WindowedValue elem) {
+ LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
+ sideInputHandler.addSideInputValue(sideInputView, elem);
- List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
- Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
- if (pushedBackInputs != null) {
- for (WindowedValue<InputT> input : pushedBackInputs) {
+ List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
- Iterable<WindowedValue<InputT>> justPushedBack =
- pushbackRunner.processElementInReadyWindows(input);
- Iterables.addAll(newPushedBack, justPushedBack);
- }
- }
- pushedBack.clear();
-
- Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
- min = earlier(min, pushedBackValue.getTimestamp());
- pushedBack.add(pushedBackValue);
- }
+ Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
+ if (pushedBackInputs != null) {
+ for (WindowedValue<InputT> input : pushedBackInputs) {
- WatermarkHoldState watermarkHold =
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
- // TODO: clear-then-add is not thread-safe.
- watermarkHold.clear();
- watermarkHold.add(min);
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackRunner.processElementInReadyWindows(input);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
}
+ pushedBack.clear();
- /**
- * Process all pushed back elements when receiving watermark with max timestamp
- */
- public void processAllPushBackElements() {
- if (sideInputs != null && sideInputs.isEmpty() == false) {
- BagState<WindowedValue<InputT>> pushedBackElements =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
- if (pushedBackElements != null) {
- for (WindowedValue<InputT> elem : pushedBackElements.read()) {
- LOG.info("Process pushback elem={}", elem);
- runner.processElement(elem);
- }
- pushedBackElements.clear();
- }
-
- WatermarkHoldState watermarkHold =
- pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
- watermarkHold.clear();
- watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
- }
+ Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+ min = earlier(min, pushedBackValue.getTimestamp());
+ pushedBack.add(pushedBackValue);
}
- public void onTimer(Object key, TimerInternals.TimerData timerData) {
- StateNamespace namespace = timerData.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
- BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- if (pushbackRunner != null) {
- pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
- } else {
- runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ WatermarkHoldState watermarkHold =
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+ // TODO: clear-then-add is not thread-safe.
+ watermarkHold.clear();
+ watermarkHold.add(min);
+ }
+
+ /**
+ * Process all pushed back elements when receiving watermark with max timestamp
+ */
+ public void processAllPushBackElements() {
+ if (sideInputs != null && sideInputs.isEmpty() == false) {
+ BagState<WindowedValue<InputT>> pushedBackElements =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ if (pushedBackElements != null) {
+ for (WindowedValue<InputT> elem : pushedBackElements.read()) {
+ LOG.info("Process pushback elem={}", elem);
+ runner.processElement(elem);
}
- }
+ pushedBackElements.clear();
+ }
- @Override
- public void cleanup() {
- doFnInvoker.invokeTeardown();
+ WatermarkHoldState watermarkHold =
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+ watermarkHold.clear();
+ watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
-
- @Override
- public String toString() {
- return description;
+ }
+
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ StateNamespace namespace = timerData.getNamespace();
+ checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+ BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+ if (pushbackRunner != null) {
+ pushbackRunner.onTimer(
+ timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ } else {
+ runner.onTimer(
+ timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
}
-
- private Instant earlier(Instant left, Instant right) {
- return left.isBefore(right) ? left : right;
+ }
+
+ @Override
+ public void cleanup() {
+ doFnInvoker.invokeTeardown();
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ private Instant earlier(Instant left, Instant right) {
+ return left.isBefore(right) ? left : right;
+ }
+
+ public void startBundle() {
+ if (pushbackRunner != null) {
+ pushbackRunner.startBundle();
+ } else {
+ runner.startBundle();
}
+ }
- public void startBundle() {
- if (pushbackRunner != null) {
- pushbackRunner.startBundle();
- } else {
- runner.startBundle();
- }
+ public void finishBundle() {
+ if (pushbackRunner != null) {
+ pushbackRunner.finishBundle();
+ } else {
+ runner.finishBundle();
}
+ }
- public void finishBundle() {
- if (pushbackRunner != null) {
- pushbackRunner.finishBundle();
- } else {
- runner.finishBundle();
- }
- }
+ public void setInternalDoFnExecutorId(int id) {
+ this.internalDoFnExecutorId = id;
+ }
- public void setInternalDoFnExecutorId(int id) {
- this.internalDoFnExecutorId = id;
- }
-
- public int getInternalDoFnExecutorId() {
- return internalDoFnExecutorId;
- }
+ public int getInternalDoFnExecutorId() {
+ return internalDoFnExecutorId;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
index 98dbcc5..1610a8a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
@@ -68,7 +68,8 @@ public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT
}
@Override
- public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ public void onTimer(
+ String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
metricsReporter.getMetricsContainer(stepName))) {
delegate.onTimer(timerId, window, timestamp, timeDomain);
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
index d7214db..1a03cb8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
@@ -18,17 +18,16 @@
package org.apache.beam.runners.jstorm.translation.runtime;
import java.io.Serializable;
-
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
public interface Executor extends Serializable {
- /**
- * Initialization during runtime
- */
- void init(ExecutorContext context);
+ /**
+ * Initialization during runtime
+ */
+ void init(ExecutorContext context);
- <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
+ <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
- void cleanup();
+ void cleanup();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
index 1de881f..1f65921 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
@@ -23,13 +23,16 @@ import com.google.auto.value.AutoValue;
@AutoValue
public abstract class ExecutorContext {
- public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) {
- return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
- }
+ public static ExecutorContext of(
+ TopologyContext topologyContext,
+ ExecutorsBolt bolt,
+ IKvStoreManager kvStoreManager) {
+ return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
+ }
- public abstract TopologyContext getTopologyContext();
+ public abstract TopologyContext getTopologyContext();
- public abstract ExecutorsBolt getExecutorsBolt();
+ public abstract ExecutorsBolt getExecutorsBolt();
- public abstract IKvStoreManager getKvStoreManager();
+ public abstract IKvStoreManager getKvStoreManager();
}