You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/16 02:08:00 UTC

[jira] [Commented] (BEAM-1982) Refactor PTransformOverrideFactories to remove code duplication

    [ https://issues.apache.org/jira/browse/BEAM-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293563#comment-16293563 ] 

ASF GitHub Bot commented on BEAM-1982:
--------------------------------------

jkff closed pull request #2949: [BEAM-1982] - adding common method to extract singleton main input
URL: https://github.com/apache/beam/pull/2949
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 366308ec664..4ee91c1e9b7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -23,6 +23,7 @@
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,13 +34,13 @@
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PrimitiveCreate;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.Pipeline;
@@ -272,7 +273,7 @@ protected String getKindString() {
                       GloballyAsSingletonView<InputT, OutputT>>
                   transform) {
         return PTransformReplacement.of(
-            PTransformReplacements.getSingletonMainInput(transform),
+            getSingletonMainInput(transform),
             new StreamingCombineGloballyAsSingletonView<>(transform.getTransform()));
       }
     }
@@ -337,7 +338,7 @@ public T identity() {
       public PTransformReplacement<PCollection<T>, PCollectionView<T>> getReplacementTransform(
           AppliedPTransform<PCollection<T>, PCollectionView<T>, AsSingleton<T>> transform) {
         return PTransformReplacement.of(
-            PTransformReplacements.getSingletonMainInput(transform),
+            getSingletonMainInput(transform),
             new StreamingViewAsSingleton<>(transform.getTransform()));
       }
     }
@@ -372,7 +373,7 @@ protected String getKindString() {
               AppliedPTransform<PCollection<T>, PCollectionView<Iterable<T>>, AsIterable<T>>
                   transform) {
         return PTransformReplacement.of(
-            PTransformReplacements.getSingletonMainInput(transform),
+            getSingletonMainInput(transform),
             new StreamingViewAsIterable<T>());
       }
     }
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
index 7a59c1c1c80..79e7b6bd1c7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
@@ -19,8 +19,11 @@
 package org.apache.beam.runners.core.construction;
 
 import java.util.Map;
+
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -39,4 +42,10 @@
       Map<TupleTag<?>, PValue> outputs, OutputT newOutput) {
     return ReplacementOutputs.singleton(outputs, newOutput);
   }
+
+  protected final <T> PCollection<T> getSingletonMainInput(
+      AppliedPTransform<? extends PCollection<? extends T>, ?, ?> transform) {
+    return PTransformReplacements.getSingletonMainInput(transform);
+  }
+
 }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
index 80bb0a7a6f2..d53e177ff36 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
@@ -22,6 +22,7 @@
 
 import java.io.Serializable;
 import java.util.Map;
+
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -62,7 +63,7 @@
                             MapElements<Integer, Integer>>
                         transform) {
               return PTransformReplacement.of(
-                  PTransformReplacements.getSingletonMainInput(transform),
+              getSingletonMainInput(transform),
                   transform.getTransform());
             }
           };
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 64eecc852af..dcc7354c1f9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -19,7 +19,6 @@
 
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.KV;
@@ -42,7 +41,7 @@
                   GBKIntoKeyedWorkItems<KeyT, InputT>>
               transform) {
     return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
+        getSingletonMainInput(transform),
         new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>());
   }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index c2eb5e72b84..55f72999e3d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -36,7 +35,9 @@
                   PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>>
               transform) {
     return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
+        getSingletonMainInput(transform),
         new DirectGroupByKey<>(transform.getTransform()));
   }
+
+
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 35d1bcd19f6..f98bcb2c656 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -18,8 +18,10 @@
 package org.apache.beam.runners.flink;
 
 import com.google.common.collect.ImmutableList;
+
 import java.util.List;
 import java.util.Map;
+
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
@@ -241,7 +243,7 @@ private ReflectiveOneToOneOverrideFactory(
     public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
         AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
       return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
+          getSingletonMainInput(transform),
           InstanceBuilder.ofType(replacement)
               .withArg(FlinkRunner.class, runner)
               .withArg(
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ecd0365ed88..06cf897715c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -29,6 +29,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -39,7 +40,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
+
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.dataflow.internal.IsmFormat;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
@@ -1397,7 +1398,7 @@ public void verifyDeterministic()
                 transform) {
       GloballyAsSingletonView<ElemT, ViewT> combine = transform.getTransform();
       return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
+          getSingletonMainInput(transform),
           new BatchCombineGloballyAsSingletonView<>(
               runner, combine.getCombineFn(), combine.getFanout(), combine.getInsertDefault()));
     }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5278a4a576d..ddf987a4fb6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -40,6 +40,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -57,6 +58,7 @@
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
@@ -416,7 +418,7 @@ private ReflectiveOneToOneOverrideFactory(
               .withArg(
                   (Class<TransformT>) transform.getTransform().getClass(), transform.getTransform())
               .build();
-      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), rep);
+      return PTransformReplacement.of(getSingletonMainInput(transform), rep);
     }
   }
 
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 8611d3cad94..504549c1e17 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -19,10 +19,9 @@
 package org.apache.beam.runners.dataflow;
 
 import java.util.List;
+
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.DisplayData;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -48,7 +47,7 @@
                   SingleOutput<InputT, OutputT>>
               transform) {
     return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
+        getSingletonMainInput(transform),
         new ParDoSingle<>(transform.getTransform()));
   }
 
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
index cd9378c6082..4ab9e063a1b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.dataflow;
 
-import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -50,7 +49,7 @@
           AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>, Reshuffle<K, V>>
               transform) {
     return PTransformReplacement.of(
-        PTransformReplacements.getSingletonMainInput(transform),
+        getSingletonMainInput(transform),
         new ReshuffleWithOnlyTrigger<K, V>());
   }
 
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 6c385d74085..9f44cbe2d91 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -20,7 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
+
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.dataflow.DataflowRunner.StreamingPCollectionViewWriterFn;
 import org.apache.beam.sdk.coders.Coder;
@@ -52,7 +52,7 @@
       StreamingCreatePCollectionView<ElemT, ViewT> streamingView =
           new StreamingCreatePCollectionView<>(transform.getTransform().getView());
       return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform), streamingView);
+          getSingletonMainInput(transform), streamingView);
     }
 
     private static class StreamingCreatePCollectionView<ElemT, ViewT>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Refactor PTransformOverrideFactories to remove code duplication
> ---------------------------------------------------------------
>
>                 Key: BEAM-1982
>                 URL: https://issues.apache.org/jira/browse/BEAM-1982
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Borisa Zivkovic
>            Priority: Minor
>              Labels: newbie, starter
>
> The majority of the factories take in a single PCollection as input and produce a single PCollection as output; the code to extract the single input can be invisible as part of the class hierarchy, rather than explicitly called in every override.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)