You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:27 UTC
[09/39] incubator-beam git commit: BEAM-261 Enable checkstyle and
cleanup.
BEAM-261 Enable checkstyle and cleanup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9454b3bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9454b3bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9454b3bd
Branch: refs/heads/master
Commit: 9454b3bdc6f6ff69363dcd339cfb069c2c2f8cc9
Parents: 1ec7cd9
Author: Thomas Weise <th...@apache.org>
Authored: Sun Oct 16 17:36:01 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Oct 17 09:22:49 2016 -0700
----------------------------------------------------------------------
runners/apex/pom.xml | 2 -
.../runners/apex/ApexPipelineTranslator.java | 39 +--
.../apache/beam/runners/apex/ApexRunner.java | 314 +++----------------
.../beam/runners/apex/ApexRunnerResult.java | 23 +-
.../beam/runners/apex/TestApexRunner.java | 9 +-
.../apache/beam/runners/apex/package-info.java | 22 ++
.../translators/CreateValuesTranslator.java | 12 +-
.../FlattenPCollectionTranslator.java | 13 +-
.../apex/translators/GroupByKeyTranslator.java | 4 +-
.../translators/ParDoBoundMultiTranslator.java | 47 +--
.../apex/translators/ParDoBoundTranslator.java | 5 +-
.../translators/ReadUnboundedTranslator.java | 4 +-
.../apex/translators/TransformTranslator.java | 8 +-
.../apex/translators/TranslationContext.java | 40 +--
.../functions/ApexFlattenOperator.java | 42 ++-
.../functions/ApexGroupByKeyOperator.java | 155 +++++----
.../functions/ApexParDoOperator.java | 140 ++++-----
.../translators/functions/package-info.java | 22 ++
.../io/ApexReadUnboundedInputOperator.java | 57 ++--
.../apex/translators/io/ValuesSource.java | 23 +-
.../apex/translators/io/package-info.java | 22 ++
.../runners/apex/translators/package-info.java | 22 ++
.../apex/translators/utils/ApexStreamTuple.java | 85 +++--
.../utils/CoderAdapterStreamCodec.java | 24 +-
.../apex/translators/utils/NoOpStepContext.java | 7 +-
.../utils/SerializablePipelineOptions.java | 21 +-
.../utils/ValueAndCoderKryoSerializable.java | 26 +-
.../apex/translators/utils/package-info.java | 22 ++
.../beam/runners/apex/examples/IntTest.java | 133 --------
.../apex/examples/StreamingWordCountTest.java | 15 +-
.../apex/examples/UnboundedTextSource.java | 16 +-
.../runners/apex/examples/package-info.java | 22 ++
.../FlattenPCollectionTranslatorTest.java | 32 +-
.../translators/GroupByKeyTranslatorTest.java | 45 ++-
.../translators/ParDoBoundTranslatorTest.java | 20 +-
.../translators/ReadUnboundTranslatorTest.java | 45 ++-
.../translators/utils/CollectionSource.java | 13 +-
.../translators/utils/PipelineOptionsTest.java | 28 +-
.../apex/src/test/resources/log4j.properties | 8 +-
39 files changed, 662 insertions(+), 925 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 929feb4..8b62410 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -148,12 +148,10 @@
<build>
<plugins>
- <!-- Checkstyle errors for now
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
- -->
<!-- Integration Tests -->
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index a16f551..a6857ee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -18,6 +18,11 @@
package org.apache.beam.runners.apex;
+import com.datatorrent.api.DAG;
+
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
@@ -43,18 +48,13 @@ import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* {@link ApexPipelineTranslator} translates {@link Pipeline} objects
* into Apex logical plan {@link DAG}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
-
- private static final Logger LOG = LoggerFactory.getLogger(
- ApexPipelineTranslator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
/**
* A map from {@link PTransform} subclass to the corresponding
@@ -75,8 +75,10 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
registerTransformTranslator(Flatten.FlattenPCollectionList.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
- registerTransformTranslator(CreateApexPCollectionView.class, new CreateApexPCollectionViewTranslator());
- registerTransformTranslator(CreatePCollectionView.class, new CreatePCollectionViewTranslator());
+ registerTransformTranslator(CreateApexPCollectionView.class,
+ new CreateApexPCollectionViewTranslator());
+ registerTransformTranslator(CreatePCollectionView.class,
+ new CreatePCollectionViewTranslator());
}
public ApexPipelineTranslator(TranslationContext translationContext) {
@@ -134,7 +136,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
* Returns the {@link TransformTranslator} to use for instances of the
* specified PTransform class, or null if none registered.
*/
- private <TransformT extends PTransform<?,?>>
+ private <TransformT extends PTransform<?, ?>>
TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
return transformTranslators.get(transformClass);
}
@@ -145,7 +147,8 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
@Override
public void translate(Read.Bounded<T> transform, TranslationContext context) {
// TODO: adapter is visibleForTesting
- BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(transform.getSource());
+ BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
+ transform.getSource());
ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
unboundedSource, context.getPipelineOptions());
context.addOperator(operator, operator.output);
@@ -153,26 +156,26 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
}
- private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>>
- {
+ private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
+ implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> {
private static final long serialVersionUID = 1L;
@Override
- public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context)
- {
+ public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
+ TranslationContext context) {
PCollectionView<ViewT> view = transform.getView();
context.addView(view);
LOG.debug("view {}", view.getName());
}
}
- private static class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>>
- {
+ private static class CreatePCollectionViewTranslator<ElemT, ViewT>
+ implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> {
private static final long serialVersionUID = 1L;
@Override
- public void translate(CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context)
- {
+ public void translate(CreatePCollectionView<ElemT, ViewT> transform,
+ TranslationContext context) {
PCollectionView<ViewT> view = transform.getView();
context.addView(view);
LOG.debug("view {}", view.getName());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 667f1c8..f3c44bb 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -19,17 +19,18 @@ package org.apache.beam.runners.apex;
import static com.google.common.base.Preconditions.checkArgument;
-import java.util.ArrayList;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.google.common.base.Throwables;
+
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Combine;
@@ -39,31 +40,22 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.Context.DAGContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.google.common.base.Throwables;
-
/**
* A {@link PipelineRunner} that translates the
* pipeline to an Apex DAG and executes it on an Apex cluster.
- * <p>
- * Currently execution is always in embedded mode,
+ *
+ * <p>Currently execution is always in embedded mode,
* launch on Hadoop cluster will be added in subsequent iteration.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,37 +91,16 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
input.getPipeline(),
WindowingStrategy.globalDefault(),
PCollection.IsBounded.BOUNDED);
-// TODO: replace this with a mapping
-////
-
} else if (Combine.GloballyAsSingletonView.class.equals(transform.getClass())) {
- PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(this,
- (Combine.GloballyAsSingletonView)transform);
+ PTransform<InputT, OutputT> customTransform = (PTransform)
+ new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(
+ this, (Combine.GloballyAsSingletonView) transform);
return Pipeline.applyTransform(input, customTransform);
} else if (View.AsSingleton.class.equals(transform.getClass())) {
- // note this assumes presence of above Combine.GloballyAsSingletonView mapping
- PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsSingleton<InputT>(this,
- (View.AsSingleton)transform);
+ // assumes presence of above Combine.GloballyAsSingletonView mapping
+ PTransform<InputT, OutputT> customTransform = (PTransform)
+ new StreamingViewAsSingleton<InputT>(this, (View.AsSingleton) transform);
return Pipeline.applyTransform(input, customTransform);
-/*
- } else if (View.AsIterable.class.equals(transform.getClass())) {
- PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsIterable<InputT>(this,
- (View.AsIterable)transform);
- return Pipeline.applyTransform(input, customTransform);
- } else if (View.AsList.class.equals(transform.getClass())) {
- PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsList<InputT>(this,
- (View.AsList)transform);
- return Pipeline.applyTransform(input, customTransform);
- } else if (View.AsMap.class.equals(transform.getClass())) {
- PTransform<InputT, OutputT> customTransform = new StreamingViewAsMap(this,
- (View.AsMap)transform);
- return Pipeline.applyTransform(input, customTransform);
- } else if (View.AsMultimap.class.equals(transform.getClass())) {
- PTransform<InputT, OutputT> customTransform = new StreamingViewAsMultimap(this,
- (View.AsMultimap)transform);
- return Pipeline.applyTransform(input, customTransform);
-*/
-////
} else {
return super.apply(transform, input);
}
@@ -142,17 +113,16 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext);
translator.translate(pipeline);
- StreamingApplication apexApp = new StreamingApplication()
- {
+ StreamingApplication apexApp = new StreamingApplication() {
@Override
- public void populateDAG(DAG dag, Configuration conf)
- {
+ public void populateDAG(DAG dag, Configuration conf) {
dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
translationContext.populateDAG(dag);
}
};
- checkArgument(options.isEmbeddedExecution(), "only embedded execution is supported at this time");
+ checkArgument(options.isEmbeddedExecution(),
+ "only embedded execution is supported at this time");
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
try {
@@ -178,7 +148,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
return new ApexRunnerResult(lma.getDAG(), lc);
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
}
}
@@ -231,13 +202,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
// Adapted from FlinkRunner for View support
/**
- * Records that the {@link PTransform} requires a deterministic key coder.
- */
- private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
- //throw new UnsupportedOperationException();
- }
-
- /**
* Creates a primitive {@link PCollectionView}.
*
* <p>For internal use only by runner implementors.
@@ -247,6 +211,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
*/
public static class CreateApexPCollectionView<ElemT, ViewT>
extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private static final long serialVersionUID = 1L;
private PCollectionView<ViewT> view;
private CreateApexPCollectionView(PCollectionView<ViewT> view) {
@@ -276,52 +241,50 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>>
- {
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ private static final long serialVersionUID = 1L;
Combine.GloballyAsSingletonView<InputT, OutputT> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
public StreamingCombineGloballyAsSingletonView(ApexRunner runner,
- Combine.GloballyAsSingletonView<InputT, OutputT> transform)
- {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
this.transform = transform;
}
@Override
- public PCollectionView<OutputT> apply(PCollection<InputT> input)
- {
+ public PCollectionView<OutputT> apply(PCollection<InputT> input) {
PCollection<OutputT> combined = input
- .apply(Combine.globally(transform.getCombineFn()).withoutDefaults().withFanout(transform.getFanout()));
+ .apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults().withFanout(transform.getFanout()));
PCollectionView<OutputT> view = PCollectionViews.singletonView(combined.getPipeline(),
combined.getWindowingStrategy(), transform.getInsertDefault(),
- transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, combined.getCoder());
+ transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
return combined.apply(ParDo.of(new WrapAsList<OutputT>()))
.apply(CreateApexPCollectionView.<OutputT, OutputT> of(view));
}
@Override
- protected String getKindString()
- {
+ protected String getKindString() {
return "StreamingCombineGloballyAsSingletonView";
}
}
- private static class StreamingViewAsSingleton<T> extends PTransform<PCollection<T>, PCollectionView<T>>
- {
+ private static class StreamingViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
private static final long serialVersionUID = 1L;
+
private View.AsSingleton<T> transform;
- public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform)
- {
+ public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform) {
this.transform = transform;
}
@Override
- public PCollectionView<T> apply(PCollection<T> input)
- {
+ public PCollectionView<T> apply(PCollection<T> input) {
Combine.Globally<T, T> combine = Combine
.globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
if (!transform.hasDefaultValue()) {
@@ -331,33 +294,28 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
@Override
- protected String getKindString()
- {
+ protected String getKindString() {
return "StreamingViewAsSingleton";
}
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T>
- {
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
private boolean hasDefaultValue;
private T defaultValue;
- SingletonCombine(boolean hasDefaultValue, T defaultValue)
- {
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
this.hasDefaultValue = hasDefaultValue;
this.defaultValue = defaultValue;
}
@Override
- public T apply(T left, T right)
- {
+ public T apply(T left, T right) {
throw new IllegalArgumentException("PCollection with more than one element "
+ "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ "combine the PCollection into a single value");
}
@Override
- public T identity()
- {
+ public T identity() {
if (hasDefaultValue) {
return defaultValue;
} else {
@@ -368,194 +326,4 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
}
- private static class StreamingViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- private final ApexRunner runner;
-
- public StreamingViewAsMap(ApexRunner runner, View.AsMap<K, V> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
- }
-
- /**
- * Specialized expansion for {@link
- * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
- * Flink runner in streaming mode.
- */
- private static class StreamingViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
- private final ApexRunner runner;
-
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsMultimap(ApexRunner runner, View.AsMultimap<K, V> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
- * Flink runner in streaming mode.
- */
- private static class StreamingViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsList(ApexRunner runner, View.AsList<T> transform) {}
-
- @Override
- public PCollectionView<List<T>> apply(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateApexPCollectionView.<T, List<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
- * Flink runner in streaming mode.
- */
- private static class StreamingViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
- public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> transform) { }
-
- @Override
- public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- *
- * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
- * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<T>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 6817684..d5613fe 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -17,20 +17,19 @@
*/
package org.apache.beam.runners.apex;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
import java.io.IOException;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.joda.time.Duration;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-
/**
* Result of executing a {@link Pipeline} with Apex in embedded mode.
*/
@@ -56,28 +55,24 @@ public class ApexRunnerResult implements PipelineResult {
}
@Override
- public State cancel() throws IOException
- {
+ public State cancel() throws IOException {
ctrl.shutdown();
state = State.CANCELLED;
return state;
}
@Override
- public State waitUntilFinish(Duration duration) throws IOException, InterruptedException
- {
+ public State waitUntilFinish(Duration duration) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
- public State waitUntilFinish() throws IOException, InterruptedException
- {
+ public State waitUntilFinish() throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
- public MetricResults metrics()
- {
+ public MetricResults metrics() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
index 45c143e..2e048f0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -25,7 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
-
+/**
+ * Apex {@link PipelineRunner} for testing.
+ */
public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
private ApexRunner delegate;
@@ -38,13 +40,14 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
}
public static TestApexRunner fromOptions(PipelineOptions options) {
- ApexPipelineOptions apexOptions = PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
+ ApexPipelineOptions apexOptions = PipelineOptionsValidator
+ .validate(ApexPipelineOptions.class, options);
return new TestApexRunner(apexOptions);
}
@Override
public <OutputT extends POutput, InputT extends PInput>
- OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+ OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
return delegate.apply(transform, input);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
new file mode 100644
index 0000000..4d2f417
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
index 7a29057..539f311 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
@@ -25,12 +25,10 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PBegin;
-import com.google.common.base.Throwables;
-
/**
* Wraps elements from Create.Values into an {@link UnboundedSource}.
- * mainly used for test
+ * mainly used for testing
*/
public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
private static final long serialVersionUID = 1451000241832745629L;
@@ -39,12 +37,12 @@ public class CreateValuesTranslator<T> implements TransformTranslator<Create.Val
public void translate(Create.Values<T> transform, TranslationContext context) {
try {
UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
- transform.getDefaultOutputCoder((PBegin)context.getInput()));
- ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(unboundedSource,
- context.getPipelineOptions());
+ transform.getDefaultOutputCoder((PBegin) context.getInput()));
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, context.getPipelineOptions());
context.addOperator(operator, operator.output);
} catch (CannotProvideCoderException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index 6737767..a39aacb 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.apex.translators;
+import com.google.common.collect.Lists;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -32,8 +34,6 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
-import com.google.common.collect.Lists;
-
/**
* {@link Flatten.FlattenPCollectionList} translation to Apex operator.
*/
@@ -72,7 +72,8 @@ public class FlattenPCollectionTranslator<T> implements
* @param finalCollection
* @param context
*/
- static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>, Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
+ static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>,
+ Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
List<PCollection<T>> remainingCollections = Lists.newArrayList();
PCollection<T> firstCollection = null;
while (!collections.isEmpty()) {
@@ -93,7 +94,8 @@ public class FlattenPCollectionTranslator<T> implements
}
if (collections.size() > 2) {
- PCollection<T> intermediateCollection = intermediateCollection(collection, collection.getCoder());
+ PCollection<T> intermediateCollection = intermediateCollection(collection,
+ collection.getCoder());
context.addOperator(operator, operator.out, intermediateCollection);
remainingCollections.add(intermediateCollection);
} else {
@@ -118,7 +120,8 @@ public class FlattenPCollectionTranslator<T> implements
}
static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
- PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ input.getWindowingStrategy(), input.isBounded());
output.setCoder(outputCoder);
return output;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
index 43c82a9..d3e7d2d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
@@ -31,9 +31,9 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
-
PCollection<KV<K, V>> input = context.getInput();
- ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), input);
+ ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
+ input);
context.addOperator(group, group.output);
context.addStream(input, group.input);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index a229a81..13f07c1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -20,6 +20,10 @@ package org.apache.beam.runners.apex.translators;
import static com.google.common.base.Preconditions.checkArgument;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.OutputPort;
+import com.google.common.collect.Maps;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -27,6 +31,7 @@ import java.util.Map;
import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -35,16 +40,16 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.OutputPort;
-import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * {@link ParDo.BoundMulti} is translated to Apex operator that wraps the {@link DoFn}
+ * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}.
*/
-public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
+public class ParDoBoundMultiTranslator<InputT, OutputT>
+ implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class);
@Override
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
@@ -56,7 +61,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
input.getWindowingStrategy().getWindowFn().windowCoder());
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
+ ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+ context.getPipelineOptions(),
doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder);
@@ -73,36 +79,37 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
}
}
- static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, TranslationContext context) {
+ static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+ TranslationContext context) {
Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
if (sideInputs.size() > sideInputPorts.length) {
- // String msg = String.format("Too many side inputs in %s (currently only supporting %s).",
- // transform.toString(), sideInputPorts.length);
- // throw new UnsupportedOperationException(msg);
PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
context.addStream(unionCollection, sideInputPorts[0]);
} else {
- for (int i=0; i<sideInputs.size(); i++) {
- // the number of input ports for side inputs are fixed and each port can only take one input.
- // more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced.
+ // the number of ports for side inputs is fixed and each port can only take one input.
+ for (int i = 0; i < sideInputs.size(); i++) {
context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
}
}
}
- private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, TranslationContext context) {
+ private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
+ TranslationContext context) {
checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
// flatten and assign union tag
List<PCollection<Object>> sourceCollections = new ArrayList<>();
Map<PCollection<?>, Integer> unionTags = new HashMap<>();
PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0));
- for (int i=0; i < sideInputs.size(); i++) {
+ for (int i = 0; i < sideInputs.size(); i++) {
PCollectionView<?> sideInput = sideInputs.get(i);
PCollection<?> sideInputCollection = context.getViewInput(sideInput);
- if (!sideInputCollection.getWindowingStrategy().equals(firstSideInput.getWindowingStrategy())) {
+ if (!sideInputCollection.getWindowingStrategy().equals(
+ firstSideInput.getWindowingStrategy())) {
// TODO: check how to handle this in stream codec
//String msg = "Multiple side inputs with different window strategies.";
//throw new UnsupportedOperationException(msg);
+ LOG.warn("Side inputs union with different windowing strategies {} {}",
+ firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
}
if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
String msg = "Multiple side inputs with different coders.";
@@ -112,8 +119,10 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
unionTags.put(sideInputCollection, i);
}
- PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(firstSideInput, firstSideInput.getCoder());
- FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, context);
+ PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
+ firstSideInput, firstSideInput.getCoder());
+ FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
+ context);
return resultCollection;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index 7749a06..bd7115e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
/**
- * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn}
+ * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
*/
public class ParDoBoundTranslator<InputT, OutputT> implements
TransformTranslator<ParDo.Bound<InputT, OutputT>> {
@@ -49,7 +49,8 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
input.getWindowingStrategy().getWindowFn().windowCoder());
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
+ ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+ context.getPipelineOptions(),
doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
output.getWindowingStrategy(), sideInputs, wvInputCoder);
context.addOperator(operator, operator.output);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
index b53e4dd..3097276 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
@@ -18,12 +18,12 @@
package org.apache.beam.runners.apex.translators;
+import com.datatorrent.api.InputOperator;
+
import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
-import com.datatorrent.api.InputOperator;
-
/**
* {@link Read.Unbounded} is translated to Apex {@link InputOperator}
* that wraps {@link UnboundedSource}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
index 1a99885..dfd2045 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
@@ -19,13 +19,13 @@
package org.apache.beam.runners.apex.translators;
-import org.apache.beam.sdk.transforms.PTransform;
-
import java.io.Serializable;
+import org.apache.beam.sdk.transforms.PTransform;
+
/**
- * translates {@link PTransform} to Apex functions.
+ * Translates {@link PTransform} to Apex functions.
*/
-public interface TransformTranslator<T extends PTransform<?,?>> extends Serializable {
+public interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
void translate(T transform, TranslationContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
index bd44a20..ddacc29 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -19,6 +19,17 @@ package org.apache.beam.runners.apex.translators;
import static com.google.common.base.Preconditions.checkArgument;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec;
@@ -34,17 +45,6 @@ import org.apache.beam.sdk.values.POutput;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.Operator.OutputPort;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Maintains context data for {@link TransformTranslator}s.
*/
@@ -64,7 +64,7 @@ public class TranslationContext {
public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
PInput input = this.viewInputs.get(view);
checkArgument(input != null, "unknown view " + view.getName());
- return (InputT)input;
+ return (InputT) input;
}
public TranslationContext(ApexPipelineOptions pipelineOptions) {
@@ -109,13 +109,14 @@ public class TranslationContext {
addOperator(operator, portEntry.getValue(), portEntry.getKey());
first = false;
} else {
- this.streams.put(portEntry.getKey(), (Pair)new ImmutablePair<>(portEntry.getValue(), new ArrayList<>()));
+ this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(),
+ new ArrayList<>()));
}
}
}
/**
- * Add intermediate operator for the current transformation.
+ * Add the operator with its output port for the given result {link PCollection}.
* @param operator
* @param port
* @param output
@@ -124,9 +125,11 @@ public class TranslationContext {
// Apex DAG requires a unique operator name
// use the transform's name and make it unique
String name = getCurrentTransform().getFullName();
- for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++);
+ for (int i = 1; this.operators.containsKey(name); i++) {
+ name = getCurrentTransform().getFullName() + i;
+ }
this.operators.put(name, operator);
- this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
+ this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>()));
}
public void addStream(PInput input, InputPort inputPort) {
@@ -140,11 +143,12 @@ public class TranslationContext {
dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
}
int streamIndex = 0;
- for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.streams.entrySet()) {
+ for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.
+ streams.entrySet()) {
List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
if (sinks.length > 0) {
- dag.addStream("stream"+streamIndex++, streamEntry.getValue().getLeft(), sinks);
+ dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks);
for (InputPort port : sinks) {
PCollection pc = streamEntry.getKey();
Coder coder = pc.getCoder();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
index 202f2d3..dd8fcd1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -17,23 +17,22 @@
*/
package org.apache.beam.runners.apex.translators.functions;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple;
-import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
/**
* Apex operator for Beam {@link Flatten.FlattenPCollectionList}.
*/
-public class ApexFlattenOperator<InputT> extends BaseOperator
-{
+public class ApexFlattenOperator<InputT> extends BaseOperator {
+
private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
private boolean traceTuples = true;
@@ -47,16 +46,15 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
/**
* Data input port 1.
*/
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
- {
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
/**
* Emits to port "out"
*/
@Override
- public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
- {
+ public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
if (tuple instanceof WatermarkTuple) {
- WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+ WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
if (wmTuple.getTimestamp() > inputWM1) {
inputWM1 = wmTuple.getTimestamp();
if (inputWM1 <= inputWM2) {
@@ -75,7 +73,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
}
if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
- ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data1Tag);
+ ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag);
}
out.emit(tuple);
}
@@ -84,16 +82,15 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
/**
* Data input port 2.
*/
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
- {
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
/**
* Emits to port "out"
*/
@Override
- public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
- {
+ public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
if (tuple instanceof WatermarkTuple) {
- WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+ WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
if (wmTuple.getTimestamp() > inputWM2) {
inputWM2 = wmTuple.getTimestamp();
if (inputWM2 <= inputWM1) {
@@ -112,7 +109,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
}
if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
- ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data2Tag);
+ ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag);
}
out.emit(tuple);
}
@@ -121,6 +118,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
/**
* Output port.
*/
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
+ new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index 5970f36..845618d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -17,6 +17,20 @@
*/
package org.apache.beam.runners.apex.translators.functions;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -61,19 +75,6 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
/**
* Apex operator for Beam {@link GroupByKey}.
* This operator expects the input stream already partitioned by K,
@@ -82,8 +83,7 @@ import com.google.common.collect.Multimap;
* @param <K>
* @param <V>
*/
-public class ApexGroupByKeyOperator<K, V> implements Operator
-{
+public class ApexGroupByKeyOperator<K, V> implements Operator {
private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
private boolean traceTuples = true;
@@ -98,7 +98,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
private final SerializablePipelineOptions serializedOptions;
@Bind(JavaSerializer.class)
// TODO: InMemoryStateInternals not serializable
-transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
+ private transient Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
private transient ProcessContext context;
@@ -106,19 +106,19 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
private Instant inputWatermark = new Instant(0);
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>()
- {
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() {
@Override
- public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t)
- {
+ public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) {
try {
if (t instanceof ApexStreamTuple.WatermarkTuple) {
- ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t;
+ ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t;
processWatermark(mark);
if (traceTuples) {
LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
}
- output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp()));
+ output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(
+ mark.getTimestamp()));
return;
}
if (traceTuples) {
@@ -126,53 +126,49 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
}
processElement(t.getValue());
} catch (Exception e) {
- Throwables.propagate(e);
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
}
}
};
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> output = new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>>
+ output = new DefaultOutputPort<>();
@SuppressWarnings("unchecked")
- public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input)
- {
- Preconditions.checkNotNull(pipelineOptions);
+ public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input) {
+ checkNotNull(pipelineOptions);
this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
- this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy();
- this.keyCoder = ((KvCoder<K, V>)input.getCoder()).getKeyCoder();
- this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder();
+ this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
+ this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
+ this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
}
@SuppressWarnings("unused") // for Kryo
- private ApexGroupByKeyOperator()
- {
+ private ApexGroupByKeyOperator() {
this.serializedOptions = null;
}
@Override
- public void beginWindow(long l)
- {
+ public void beginWindow(long l) {
}
@Override
- public void endWindow()
- {
+ public void endWindow() {
}
@Override
- public void setup(OperatorContext context)
- {
+ public void setup(OperatorContext context) {
this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
- this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory,
- SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
+ this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
+ stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
this.context = new ProcessContext(fn, this.timerInternals);
}
@Override
- public void teardown()
- {
+ public void teardown() {
}
/**
@@ -181,14 +177,16 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
* We keep these timers in a Set, so that they are deduplicated, as the same
* timer can be registered multiple times.
*/
- private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+ private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
+ long currentWatermark) {
// we keep the timers to return in a different list and launch them later
// because we cannot prevent a trigger from registering another trigger,
// which would lead to concurrent modification exception.
Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
- Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+ Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
+ activeTimers.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
@@ -223,18 +221,15 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
fn.processElement(context);
}
- private StateInternals<K> getStateInternalsForKey(K key)
- {
+ private StateInternals<K> getStateInternalsForKey(K key) {
final ByteBuffer keyBytes;
try {
keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
} catch (CoderException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
if (stateInternals == null) {
- //Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
- //OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
stateInternals = InMemoryStateInternals.forKey(key);
perKeyStateInternals.put(keyBytes, stateInternals);
}
@@ -246,7 +241,7 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
try {
keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
} catch (CoderException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
if (timersForKey == null) {
@@ -261,7 +256,7 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
try {
keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
} catch (CoderException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
if (timersForKey != null) {
@@ -276,7 +271,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
this.inputWatermark = new Instant(mark.getTimestamp());
- Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+ Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+ mark.getTimestamp());
if (!timers.isEmpty()) {
for (ByteBuffer keyBytes : timers.keySet()) {
K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
@@ -287,7 +283,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
}
}
- private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, KeyedWorkItem<K, V>>.ProcessContext {
+ private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
+ KeyedWorkItem<K, V>>.ProcessContext {
private final ApexTimerInternals timerInternals;
private StateInternals<K> stateInternals;
@@ -296,7 +293,7 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
ApexTimerInternals timerInternals) {
function.super();
- this.timerInternals = Preconditions.checkNotNull(timerInternals);
+ this.timerInternals = checkNotNull(timerInternals);
}
public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
@@ -311,7 +308,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
@Override
public Instant timestamp() {
- throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+ throw new UnsupportedOperationException(
+ "timestamp() is not available when processing KeyedWorkItems.");
}
@Override
@@ -333,7 +331,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
@Override
public PaneInfo pane() {
- throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+ throw new UnsupportedOperationException(
+ "pane() is not available when processing KeyedWorkItems.");
}
@Override
@@ -352,11 +351,13 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
}
@Override
- public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
+ Collection<? extends BoundedWindow> windows, PaneInfo pane) {
if (traceTuples) {
LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
}
- ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+ ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
+ WindowedValue.of(output, timestamp, windows, pane)));
}
@Override
@@ -375,7 +376,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
}
@Override
- public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+ public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+ Coder<T> elemCoder) throws IOException {
throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
}
@@ -404,7 +406,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
}
@Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
throw new UnsupportedOperationException();
}
}
@@ -416,52 +419,44 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
public class ApexTimerInternals implements TimerInternals {
@Override
- public void setTimer(TimerData timerKey)
- {
+ public void setTimer(TimerData timerKey) {
registerActiveTimer(context.element().key(), timerKey);
}
@Override
- public void deleteTimer(TimerData timerKey)
- {
+ public void deleteTimer(TimerData timerKey) {
unregisterActiveTimer(context.element().key(), timerKey);
}
@Override
- public Instant currentProcessingTime()
- {
+ public Instant currentProcessingTime() {
return Instant.now();
}
@Override
- public Instant currentSynchronizedProcessingTime()
- {
+ public Instant currentSynchronizedProcessingTime() {
// TODO Auto-generated method stub
return null;
}
@Override
- public Instant currentInputWatermarkTime()
- {
+ public Instant currentInputWatermarkTime() {
return inputWatermark;
}
@Override
- public Instant currentOutputWatermarkTime()
- {
+ public Instant currentOutputWatermarkTime() {
// TODO Auto-generated method stub
return null;
}
-
}
- private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable
- {
+ private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
+ private static final long serialVersionUID = 1L;
+
@Override
- public StateInternals<K> stateInternalsForKey(K key)
- {
+ public StateInternals<K> stateInternalsForKey(K key) {
return getStateInternalsForKey(key);
}
}
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index 96be11d..9e8f3dc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -17,6 +17,18 @@
*/
package org.apache.beam.runners.apex.translators.functions;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -29,9 +41,9 @@ import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOption
import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -52,18 +64,6 @@ import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
/**
* Apex operator for Beam {@link DoFn}.
*/
@@ -85,8 +85,8 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
private final List<PCollectionView<?>> sideInputs;
// TODO: not Kryo serializable, integrate codec
-//@Bind(JavaSerializer.class)
-private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null);
+ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals
+ .forKey(null);
private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
private LongMin pushedBackWatermark = new LongMin();
private long currentInputWatermark = Long.MIN_VALUE;
@@ -94,7 +94,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
private transient SideInputHandler sideInputHandler;
- private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping = Maps.newHashMapWithExpectedSize(5);
+ private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
+ Maps.newHashMapWithExpectedSize(5);
public ApexParDoOperator(
ApexPipelineOptions pipelineOptions,
@@ -104,8 +105,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
WindowingStrategy<?, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
Coder<WindowedValue<InputT>> inputCoder
- )
- {
+ ) {
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
this.doFn = doFn;
this.mainOutputTag = mainOutputTag;
@@ -120,7 +120,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
}
Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
- this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), coder);
+ this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
+ coder);
}
@@ -135,13 +136,12 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
this.pushedBack = null;
}
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
- {
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
@Override
- public void process(ApexStreamTuple<WindowedValue<InputT>> t)
- {
+ public void process(ApexStreamTuple<WindowedValue<InputT>> t) {
if (t instanceof ApexStreamTuple.WatermarkTuple) {
- processWatermark((ApexStreamTuple.WatermarkTuple<?>)t);
+ processWatermark((ApexStreamTuple.WatermarkTuple<?>) t);
} else {
if (traceTuples) {
LOG.debug("\ninput {}\n", t.getValue());
@@ -155,12 +155,11 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
}
};
- @InputPortFieldAnnotation(optional=true)
- public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>()
- {
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() {
@Override
- public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t)
- {
+ public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) {
if (t instanceof ApexStreamTuple.WatermarkTuple) {
// ignore side input watermarks
return;
@@ -168,7 +167,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
int sideInputIndex = 0;
if (t instanceof ApexStreamTuple.DataTuple) {
- sideInputIndex = ((ApexStreamTuple.DataTuple<?>)t).getUnionTag();
+ sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag();
}
if (traceTuples) {
@@ -196,25 +195,30 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
}
};
- @OutputPortFieldAnnotation(optional=true)
+ @OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 = new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 = new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 = new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 = new DefaultOutputPort<>();
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 = new DefaultOutputPort<>();
-
- public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2, sideOutput3, sideOutput4, sideOutput5};
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 =
+ new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 =
+ new DefaultOutputPort<>();
+
+ public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2,
+ sideOutput3, sideOutput4, sideOutput5};
@Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple)
- {
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
if (sideOutputPort != null) {
sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
@@ -229,19 +233,19 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
try {
pushbackDoFnRunner.startBundle();
- Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner.processElementInReadyWindows(elem);
+ Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
+ .processElementInReadyWindows(elem);
pushbackDoFnRunner.finishBundle();
return pushedBack;
} catch (UserCodeException ue) {
if (ue.getCause() instanceof AssertionError) {
- ApexRunner.assertionError = (AssertionError)ue.getCause();
+ ApexRunner.assertionError = (AssertionError) ue.getCause();
}
throw ue;
}
}
- private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark)
- {
+ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
this.currentInputWatermark = mark.getTimestamp();
if (sideInputs.isEmpty()) {
@@ -264,8 +268,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
}
@Override
- public void setup(OperatorContext context)
- {
+ public void setup(OperatorContext context) {
this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
if (!sideInputs.isEmpty()) {
@@ -273,9 +276,10 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
sideInputReader = sideInputHandler;
}
- for (int i=0; i < sideOutputTags.size(); i++) {
+ for (int i = 0; i < sideOutputTags.size(); i++) {
@SuppressWarnings("unchecked")
- DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)sideOutputPorts[i];
+ DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)
+ sideOutputPorts[i];
sideOutputPortMapping.put(sideOutputTags.get(i), port);
}
@@ -297,25 +301,18 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
try {
doFn.setup();
} catch (Exception e) {
- Throwables.propagate(e);
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
}
}
@Override
- public void beginWindow(long windowId)
- {
- /*
- Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn);
- if (!aggregators.isEmpty()) {
- System.out.println("\n" + Thread.currentThread().getName() + "\n" +AggregatorRetriever.getAggregators(doFn) + "\n");
- }
- */
+ public void beginWindow(long windowId) {
}
@Override
- public void endWindow()
- {
+ public void endWindow() {
}
/**
@@ -334,32 +331,27 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
return new NoOpAggregator<InputT, OutputT>();
}
- private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, java.io.Serializable
- {
+ private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
+ java.io.Serializable {
private static final long serialVersionUID = 1L;
@Override
- public void addValue(InputT value)
- {
+ public void addValue(InputT value) {
}
@Override
- public String getName()
- {
+ public String getName() {
// TODO Auto-generated method stub
return null;
}
@Override
- public CombineFn<InputT, ?, OutputT> getCombineFn()
- {
+ public CombineFn<InputT, ?, OutputT> getCombineFn() {
// TODO Auto-generated method stub
return null;
}
};
-
-
}
private static class LongMin {