You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/01 04:10:50 UTC
[3/9] beam git commit: BEAM-1419 Flatten should comply with
PTransform style guide
BEAM-1419 Flatten should comply with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5056efc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5056efc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5056efc
Branch: refs/heads/master
Commit: f5056efc80bf4f240ec1eeea4e2b50bf567a2d6c
Parents: b87621e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Feb 7 16:55:19 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800
----------------------------------------------------------------------
.../apex/translation/ApexPipelineTranslator.java | 2 +-
.../translation/FlattenPCollectionTranslator.java | 6 +++---
.../translation/operators/ApexFlattenOperator.java | 4 ++--
.../construction/EmptyFlattenAsCreateFactory.java | 7 +++----
.../core/construction/PTransformMatchers.java | 4 ++--
.../core/construction/PTransformMatchersTest.java | 6 +++---
.../beam/runners/direct/EmptyInputProvider.java | 4 ++--
.../beam/runners/direct/FlattenEvaluatorFactory.java | 4 ++--
.../beam/runners/direct/RootProviderRegistry.java | 4 ++--
.../runners/direct/TransformEvaluatorRegistry.java | 4 ++--
.../beam/runners/direct/DirectGraphVisitorTest.java | 4 ++--
.../flink/FlinkBatchTransformTranslators.java | 6 +++---
.../flink/FlinkStreamingTransformTranslators.java | 6 +++---
.../runners/dataflow/DataflowPipelineTranslator.java | 8 ++++----
.../spark/translation/TransformTranslator.java | 8 ++++----
.../streaming/StreamingTransformTranslator.java | 8 ++++----
.../java/org/apache/beam/sdk/transforms/Flatten.java | 15 ++++++++-------
.../org/apache/beam/sdk/transforms/FlattenTest.java | 12 ++++++------
18 files changed, 56 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 36d679a..e9d6571 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -64,7 +64,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
- registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+ registerTransformTranslator(Flatten.PCollections.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(PrimitiveCreate.class, new CreateValuesTranslator());
registerTransformTranslator(CreateApexPCollectionView.class,
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 2e31dfc..080c5e9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -35,14 +35,14 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TaggedPValue;
/**
- * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
+ * {@link Flatten.PCollections} translation to Apex operator.
*/
class FlattenPCollectionTranslator<T> implements
- TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+ TransformTranslator<Flatten.PCollections<T>> {
private static final long serialVersionUID = 1L;
@Override
- public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+ public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
List<PCollection<T>> inputCollections = extractPCollections(context.getInputs());
if (inputCollections.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
index 3d9db51..4594765 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
@@ -21,15 +21,15 @@ import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
-
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}.
+ * Apex operator for Beam {@link PCollections}.
*/
public class ApexFlattenOperator<InputT> extends BaseOperator {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
index 3b29c0a..0168039 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -36,11 +35,11 @@ import org.apache.beam.sdk.values.TaggedPValue;
/**
* A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
- * Flatten.FlattenPCollectionList} that takes no input {@link PCollection PCollections}.
+ * Flatten.PCollections} that takes no input {@link PCollection PCollections}.
*/
public class EmptyFlattenAsCreateFactory<T>
implements PTransformOverrideFactory<
- PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>> {
+ PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>> {
private static final EmptyFlattenAsCreateFactory<Object> INSTANCE =
new EmptyFlattenAsCreateFactory<>();
@@ -52,7 +51,7 @@ public class EmptyFlattenAsCreateFactory<T>
@Override
public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
- FlattenPCollectionList<T> transform) {
+ Flatten.PCollections<T> transform) {
return (PTransform) Create.empty(VoidCoder.of());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 7f8d467..efcc455 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -165,14 +165,14 @@ public class PTransformMatchers {
}
/**
- * A {@link PTransformMatcher} which matches a {@link Flatten.FlattenPCollectionList} which
+ * A {@link PTransformMatcher} which matches a {@link Flatten.PCollections} which
* consumes no input {@link PCollection PCollections}.
*/
public static PTransformMatcher emptyFlatten() {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
- return (application.getTransform() instanceof Flatten.FlattenPCollectionList)
+ return (application.getTransform() instanceof Flatten.PCollections)
&& application.getInputs().isEmpty();
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index be3ed6b..491c14f 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -327,7 +327,7 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+ .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
of(
"EmptyFlatten",
Collections.<TaggedPValue>emptyList(),
@@ -346,7 +346,7 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+ .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
of(
"Flatten",
Collections.singletonList(
@@ -369,7 +369,7 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.FlattenIterables<Object>>
+ .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
of(
"EmptyFlatten",
Collections.<TaggedPValue>emptyList(),
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index 1185130..98d4a64 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollectionList;
/** A {@link RootInputProvider} that provides a singleton empty bundle. */
class EmptyInputProvider<T>
- implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.FlattenPCollectionList<T>> {
+ implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.PCollections<T>> {
EmptyInputProvider() {}
/**
@@ -37,7 +37,7 @@ class EmptyInputProvider<T>
*/
@Override
public Collection<CommittedBundle<Void>> getInitialInputs(
- AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>>
+ AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>>
transform,
int targetParallelism) {
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 66862ea..8528905 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -22,7 +22,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
@@ -53,7 +53,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
final AppliedPTransform<
- PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
+ PCollectionList<InputT>, PCollection<InputT>, PCollections<InputT>>
application) {
final UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index e8a7665..eb9492c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
/**
@@ -42,7 +42,7 @@ class RootProviderRegistry {
.put(
TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
new TestStreamEvaluatorFactory.InputProvider(context))
- .put(FlattenPCollectionList.class, new EmptyInputProvider());
+ .put(PCollections.class, new EmptyInputProvider());
return new RootProviderRegistry(defaultProviders.build());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 1ddf9f4..9fdefc3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
.put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
.put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
.put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
- .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
+ .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
.put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
.put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
// Runner-specific primitives used in expansion of GroupByKey
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index df49796..8b4573f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -110,7 +110,7 @@ public class DirectGraphVisitorTest implements Serializable {
@Test
public void getRootTransformsContainsEmptyFlatten() {
- FlattenPCollectionList<String> flatten = Flatten.pCollections();
+ PCollections<String> flatten = Flatten.pCollections();
PCollectionList<String> emptyList = PCollectionList.empty(p);
PCollection<String> empty = emptyList.apply(flatten);
empty.setCoder(StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 30e9d68..acc204d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -108,7 +108,7 @@ class FlinkBatchTransformTranslators {
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
- TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
@@ -706,12 +706,12 @@ class FlinkBatchTransformTranslators {
private static class FlattenPCollectionTranslatorBatch<T>
implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
- Flatten.FlattenPCollectionList<T>> {
+ Flatten.PCollections<T>> {
@Override
@SuppressWarnings("unchecked")
public void translateNode(
- Flatten.FlattenPCollectionList<T> transform,
+ Flatten.PCollections<T> transform,
FlinkBatchTranslationContext context) {
List<TaggedPValue> allInputs = context.getInputs(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index a3cceb2..03f567d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -125,7 +125,7 @@ class FlinkStreamingTransformTranslators {
TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
- TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
TRANSLATORS.put(
FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
new CreateViewStreamingTranslator());
@@ -999,11 +999,11 @@ class FlinkStreamingTransformTranslators {
private static class FlattenPCollectionTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- Flatten.FlattenPCollectionList<T>> {
+ Flatten.PCollections<T>> {
@Override
public void translateNode(
- Flatten.FlattenPCollectionList<T> transform,
+ Flatten.PCollections<T> transform,
FlinkStreamingTranslationContext context) {
List<TaggedPValue> allInputs = context.getInputs(transform);
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index c672e99..fe5db5a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -744,16 +744,16 @@ public class DataflowPipelineTranslator {
});
registerTransformTranslator(
- Flatten.FlattenPCollectionList.class,
- new TransformTranslator<Flatten.FlattenPCollectionList>() {
+ Flatten.PCollections.class,
+ new TransformTranslator<Flatten.PCollections>() {
@Override
public void translate(
- Flatten.FlattenPCollectionList transform, TranslationContext context) {
+ Flatten.PCollections transform, TranslationContext context) {
flattenHelper(transform, context);
}
private <T> void flattenHelper(
- Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
+ Flatten.PCollections<T> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "Flatten");
List<OutputReference> inputs = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a643651..7fc09ad 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -97,11 +97,11 @@ public final class TransformTranslator {
private TransformTranslator() {
}
- private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
- return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+ private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
+ return new TransformEvaluator<Flatten.PCollections<T>>() {
@SuppressWarnings("unchecked")
@Override
- public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+ public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
List<TaggedPValue> pcs = context.getInputs(transform);
JavaRDD<WindowedValue<T>> unionRDD;
if (pcs.size() == 0) {
@@ -729,7 +729,7 @@ public final class TransformTranslator {
EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
- EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+ EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
EVALUATORS.put(Create.Values.class, create());
EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
EVALUATORS.put(View.AsIterable.class, viewAsIter());
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 4a07741..a856897 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -170,11 +170,11 @@ final class StreamingTransformTranslator {
};
}
- private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
- return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+ private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
+ return new TransformEvaluator<Flatten.PCollections<T>>() {
@SuppressWarnings("unchecked")
@Override
- public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+ public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context) {
List<TaggedPValue> pcs = context.getInputs(transform);
// since this is a streaming pipeline, at least one of the PCollections to "flatten" are
// unbounded, meaning it represents a DStream.
@@ -445,7 +445,7 @@ final class StreamingTransformTranslator {
EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
EVALUATORS.put(CreateStream.class, createFromQueue());
EVALUATORS.put(Window.Bound.class, window());
- EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+ EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 3ef2e55..7b282b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -64,8 +64,8 @@ public class Flatten {
* @param <T> the type of the elements in the input and output
* {@code PCollection}s.
*/
- public static <T> FlattenPCollectionList<T> pCollections() {
- return new FlattenPCollectionList<>();
+ public static <T> PCollections<T> pCollections() {
+ return new PCollections<>();
}
/**
@@ -86,8 +86,8 @@ public class Flatten {
* @param <T> the type of the elements of the input {@code Iterable} and
* the output {@code PCollection}
*/
- public static <T> FlattenIterables<T> iterables() {
- return new FlattenIterables<>();
+ public static <T> Iterables<T> iterables() {
+ return new Iterables<>();
}
/**
@@ -99,10 +99,10 @@ public class Flatten {
* @param <T> the type of the elements in the input and output
* {@code PCollection}s.
*/
- public static class FlattenPCollectionList<T>
+ public static class PCollections<T>
extends PTransform<PCollectionList<T>, PCollection<T>> {
- private FlattenPCollectionList() { }
+ private PCollections() { }
@Override
public PCollection<T> expand(PCollectionList<T> inputs) {
@@ -159,8 +159,9 @@ public class Flatten {
* @param <T> the type of the elements of the input {@code Iterable}s and
* the output {@code PCollection}
*/
- public static class FlattenIterables<T>
+ public static class Iterables<T>
extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
+ private Iterables() {}
@Override
public PCollection<T> expand(PCollection<? extends Iterable<T>> in) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 3b5011b..bc3e322 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -81,7 +81,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionList() {
+ public void testFlattenPCollections() {
List<List<String>> inputs = Arrays.asList(
LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
@@ -95,7 +95,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionListThenParDo() {
+ public void testFlattenPCollectionsThenParDo() {
List<List<String>> inputs = Arrays.asList(
LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
@@ -110,7 +110,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionListEmpty() {
+ public void testFlattenPCollectionsEmpty() {
PCollection<String> output =
PCollectionList.<String>empty(p)
.apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of());
@@ -198,7 +198,7 @@ public class FlattenTest implements Serializable {
@Test
@Category(RunnableOnService.class)
- public void testFlattenPCollectionListEmptyThenParDo() {
+ public void testFlattenPCollectionsEmptyThenParDo() {
PCollection<String> output =
PCollectionList.<String>empty(p)
.apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
@@ -366,8 +366,8 @@ public class FlattenTest implements Serializable {
@Test
public void testFlattenGetName() {
- Assert.assertEquals("Flatten.FlattenIterables", Flatten.<String>iterables().getName());
- Assert.assertEquals("Flatten.FlattenPCollectionList", Flatten.<String>pCollections().getName());
+ Assert.assertEquals("Flatten.Iterables", Flatten.<String>iterables().getName());
+ Assert.assertEquals("Flatten.PCollections", Flatten.<String>pCollections().getName());
}
/////////////////////////////////////////////////////////////////////////////