You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:05 UTC

[10/50] [abbrv] beam git commit: Update Signature of PTransformOverrideFactory

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 684dc14..4eec6b8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -61,6 +61,7 @@ import java.util.TreeSet;
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
@@ -96,6 +97,7 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -390,25 +392,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   private static class ReflectiveOneToOneOverrideFactory<
-          InputT extends PValue,
-          OutputT extends PValue,
-          TransformT extends PTransform<InputT, OutputT>>
-      extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
-    private final Class<PTransform<InputT, OutputT>> replacement;
+          InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
+      extends SingleInputOutputOverrideFactory<
+          PCollection<InputT>, PCollection<OutputT>, TransformT> {
+    private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
     private final DataflowRunner runner;
 
     private ReflectiveOneToOneOverrideFactory(
-        Class<PTransform<InputT, OutputT>> replacement, DataflowRunner runner) {
+        Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
+        DataflowRunner runner) {
       this.replacement = replacement;
       this.runner = runner;
     }
 
     @Override
-    public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) {
-      return InstanceBuilder.ofType(replacement)
-          .withArg(DataflowRunner.class, runner)
-          .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform)
-          .build();
+    public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
+        AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
+      PTransform<PCollection<InputT>, PCollection<OutputT>> rep =
+          InstanceBuilder.ofType(replacement)
+              .withArg(DataflowRunner.class, runner)
+              .withArg(
+                  (Class<TransformT>) transform.getTransform().getClass(), transform.getTransform())
+              .build();
+      return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), rep);
     }
   }
 
@@ -423,19 +429,18 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       this.replacement = replacement;
       this.runner = runner;
     }
-    @Override
-    public PTransform<PBegin, PCollection<T>> getReplacementTransform(
-        PTransform<PInput, PCollection<T>> transform) {
-      return InstanceBuilder.ofType(replacement)
-          .withArg(DataflowRunner.class, runner)
-          .withArg(
-              (Class<? super PTransform<PInput, PCollection<T>>>) transform.getClass(), transform)
-          .build();
-    }
 
     @Override
-    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-      return p.begin();
+    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) {
+      PTransform<PInput, PCollection<T>> original = transform.getTransform();
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(),
+          InstanceBuilder.ofType(replacement)
+              .withArg(DataflowRunner.class, runner)
+              .withArg(
+                  (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original)
+              .build());
     }
 
     @Override
@@ -805,13 +810,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PTransform<PCollection<T>, PDone> getReplacementTransform(Write<T> transform) {
-      return new BatchWrite<>(runner, transform);
-    }
-
-    @Override
-    public PCollection<T> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-      return (PCollection<T>) Iterables.getOnlyElement(inputs.values());
+    public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
+        AppliedPTransform<PCollection<T>, PDone, Write<T>> transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new BatchWrite<>(runner, transform.getTransform()));
     }
 
     @Override
@@ -1295,15 +1298,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>,
           Combine.GroupedValues<K, InputT, OutputT>> {
     @Override
-    public PTransform<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>>
-        getReplacementTransform(GroupedValues<K, InputT, OutputT> transform) {
-      return new CombineGroupedValues<>(transform);
-    }
-
-    @Override
-    public PCollection<KV<K, Iterable<InputT>>> getInput(
-        Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-      return (PCollection<KV<K, Iterable<InputT>>>) Iterables.getOnlyElement(inputs.values());
+    public PTransformReplacement<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>,
+                    GroupedValues<K, InputT, OutputT>>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new CombineGroupedValues<>(transform.getTransform()));
     }
 
     @Override
@@ -1322,14 +1325,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PTransform<PCollection<T>, PDone> getReplacementTransform(
-        PubsubUnboundedSink<T> transform) {
-      return new StreamingPubsubIOWrite<>(runner, transform);
-    }
-
-    @Override
-    public PCollection<T> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-      return (PCollection<T>) Iterables.getOnlyElement(inputs.values());
+    public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
+        AppliedPTransform<PCollection<T>, PDone, PubsubUnboundedSink<T>> transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new StreamingPubsubIOWrite<>(runner, transform.getTransform()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index db50cc2..2e50cb5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -20,12 +20,15 @@ package org.apache.beam.runners.dataflow;
 
 import java.util.List;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.DisplayData;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 
@@ -38,9 +41,15 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
     extends SingleInputOutputOverrideFactory<
         PCollection<? extends InputT>, PCollection<OutputT>, ParDo.SingleOutput<InputT, OutputT>> {
   @Override
-  public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> getReplacementTransform(
-      ParDo.SingleOutput<InputT, OutputT> transform) {
-    return new ParDoSingle<>(transform);
+  public PTransformReplacement<PCollection<? extends InputT>, PCollection<OutputT>>
+      getReplacementTransform(
+          AppliedPTransform<
+                  PCollection<? extends InputT>, PCollection<OutputT>,
+                  SingleOutput<InputT, OutputT>>
+              transform) {
+    return PTransformReplacement.of(
+        PTransformReplacements.getSingletonMainInput(transform),
+        new ParDoSingle<>(transform.getTransform()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
index 2e6455d..aa9d9f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java
@@ -18,8 +18,10 @@
 
 package org.apache.beam.runners.dataflow;
 
+import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -43,9 +45,13 @@ class ReshuffleOverrideFactory<K, V>
     extends SingleInputOutputOverrideFactory<
         PCollection<KV<K, V>>, PCollection<KV<K, V>>, Reshuffle<K, V>> {
   @Override
-  public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> getReplacementTransform(
-      Reshuffle<K, V> transform) {
-    return new ReshuffleWithOnlyTrigger<>();
+  public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, V>>>
+      getReplacementTransform(
+          AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>, Reshuffle<K, V>>
+              transform) {
+    return PTransformReplacement.of(
+        PTransformReplacements.getSingletonMainInput(transform),
+        new ReshuffleWithOnlyTrigger<K, V>());
   }
 
   private static class ReshuffleWithOnlyTrigger<K, V>

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index c407517..eb385de 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -20,11 +20,13 @@ package org.apache.beam.runners.dataflow;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.dataflow.DataflowRunner.StreamingPCollectionViewWriterFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -42,9 +44,15 @@ class StreamingViewOverrides {
       extends SingleInputOutputOverrideFactory<
           PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
     @Override
-    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
-        final CreatePCollectionView<ElemT, ViewT> transform) {
-      return new StreamingCreatePCollectionView<>(transform.getView());
+    public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>>
+                transform) {
+      StreamingCreatePCollectionView<ElemT, ViewT> streamingView =
+          new StreamingCreatePCollectionView<>(transform.getTransform().getView());
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform), streamingView);
     }
 
     private static class StreamingCreatePCollectionView<ElemT, ViewT>

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java
index bff46ea..e320036 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java
@@ -27,10 +27,11 @@ import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
@@ -64,17 +65,27 @@ public class PrimitiveParDoSingleFactoryTest implements Serializable {
   public void getReplacementTransformPopulateDisplayData() {
     ParDo.SingleOutput<Integer, Long> originalTransform = ParDo.of(new ToLongFn());
     DisplayData originalDisplayData = DisplayData.from(originalTransform);
-
-    PTransform<PCollection<? extends Integer>, PCollection<Long>> replacement =
-        factory.getReplacementTransform(originalTransform);
-    DisplayData replacementDisplayData = DisplayData.from(replacement);
+    PCollection<? extends Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    AppliedPTransform<
+        PCollection<? extends Integer>, PCollection<Long>, ParDo.SingleOutput<Integer, Long>>
+        application =
+        AppliedPTransform.of(
+            "original",
+            input.expand(),
+            input.apply(originalTransform).expand(),
+            originalTransform,
+            pipeline);
+
+    PTransformReplacement<PCollection<? extends Integer>, PCollection<Long>> replacement =
+        factory.getReplacementTransform(application);
+    DisplayData replacementDisplayData = DisplayData.from(replacement.getTransform());
 
     assertThat(replacementDisplayData, equalTo(originalDisplayData));
 
     DisplayData primitiveDisplayData =
         Iterables.getOnlyElement(
             DisplayDataEvaluator.create()
-                .displayDataForPrimitiveTransforms(replacement, VarIntCoder.of()));
+                .displayDataForPrimitiveTransforms(replacement.getTransform(), VarIntCoder.of()));
     assertThat(primitiveDisplayData, equalTo(replacementDisplayData));
   }
 
@@ -91,9 +102,21 @@ public class PrimitiveParDoSingleFactoryTest implements Serializable {
     ParDo.SingleOutput<Integer, Long> originalTransform =
         ParDo.of(new ToLongFn()).withSideInputs(sideLong, sideStrings);
 
-    PTransform<PCollection<? extends Integer>, PCollection<Long>> replacementTransform =
-        factory.getReplacementTransform(originalTransform);
-    ParDoSingle<Integer, Long> parDoSingle = (ParDoSingle<Integer, Long>) replacementTransform;
+    PCollection<? extends Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    AppliedPTransform<
+        PCollection<? extends Integer>, PCollection<Long>, ParDo.SingleOutput<Integer, Long>>
+        application =
+        AppliedPTransform.of(
+            "original",
+            input.expand(),
+            input.apply(originalTransform).expand(),
+            originalTransform,
+            pipeline);
+
+    PTransformReplacement<PCollection<? extends Integer>, PCollection<Long>> replacementTransform =
+        factory.getReplacementTransform(application);
+    ParDoSingle<Integer, Long> parDoSingle =
+        (ParDoSingle<Integer, Long>) replacementTransform.getTransform();
     assertThat(parDoSingle.getSideInputs(), containsInAnyOrder(sideStrings, sideLong));
   }
 
@@ -101,9 +124,21 @@ public class PrimitiveParDoSingleFactoryTest implements Serializable {
   public void getReplacementTransformGetFn() {
     DoFn<Integer, Long> originalFn = new ToLongFn();
     ParDo.SingleOutput<Integer, Long> originalTransform = ParDo.of(originalFn);
-    PTransform<PCollection<? extends Integer>, PCollection<Long>> replacementTransform =
-        factory.getReplacementTransform(originalTransform);
-    ParDoSingle<Integer, Long> parDoSingle = (ParDoSingle<Integer, Long>) replacementTransform;
+    PCollection<? extends Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    AppliedPTransform<
+            PCollection<? extends Integer>, PCollection<Long>, ParDo.SingleOutput<Integer, Long>>
+        application =
+            AppliedPTransform.of(
+                "original",
+                input.expand(),
+                input.apply(originalTransform).expand(),
+                originalTransform,
+                pipeline);
+
+    PTransformReplacement<PCollection<? extends Integer>, PCollection<Long>> replacementTransform =
+        factory.getReplacementTransform(application);
+    ParDoSingle<Integer, Long> parDoSingle =
+        (ParDoSingle<Integer, Long>) replacementTransform.getTransform();
 
     assertThat(parDoSingle.getFn(), equalTo(originalTransform.getFn()));
     assertThat(parDoSingle.getFn(), equalTo(originalFn));

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index aacb942..61fcaa9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.ValueWithRecordId;
@@ -244,14 +245,11 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
         implements PTransformOverrideFactory<
             PBegin, PCollection<T>, BoundedReadFromUnboundedSource<T>> {
       @Override
-      public PTransform<PBegin, PCollection<T>> getReplacementTransform(
-          BoundedReadFromUnboundedSource<T> transform) {
-        return new AdaptedBoundedAsUnbounded<>(transform);
-      }
-
-      @Override
-      public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-        return p.begin();
+      public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
+          AppliedPTransform<PBegin, PCollection<T>, BoundedReadFromUnboundedSource<T>> transform) {
+        return PTransformReplacement.of(
+            transform.getPipeline().begin(),
+            new AdaptedBoundedAsUnbounded<T>(transform.getTransform()));
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 791166e..1ff4c30 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -33,11 +33,13 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -497,17 +499,18 @@ public class Pipeline {
       void applyReplacement(
           Node original,
           PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory) {
-    PTransform<InputT, OutputT> replacement =
-        replacementFactory.getReplacementTransform((TransformT) original.getTransform());
-    if (replacement == original.getTransform()) {
+    PTransformReplacement<InputT, OutputT> replacement =
+        replacementFactory.getReplacementTransform(
+            (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform());
+    if (replacement.getTransform() == original.getTransform()) {
       return;
     }
-    InputT originalInput = replacementFactory.getInput(original.getInputs(), this);
+    InputT originalInput = replacement.getInput();
 
     LOG.debug("Replacing {} with {}", original, replacement);
-    transforms.replaceNode(original, originalInput, replacement);
+    transforms.replaceNode(original, originalInput, replacement.getTransform());
     try {
-      OutputT newOutput = replacement.expand(originalInput);
+      OutputT newOutput = replacement.getTransform().expand(originalInput);
       Map<PValue, ReplacementOutput> originalToReplacement =
           replacementFactory.mapOutputs(original.getOutputs(), newOutput);
       // Ensure the internal TransformHierarchy data structures are consistent.

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
index 57cba50..786c61c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
@@ -21,9 +21,9 @@ package org.apache.beam.sdk.runners;
 
 import com.google.auto.value.AutoValue;
 import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -41,14 +41,11 @@ public interface PTransformOverrideFactory<
     OutputT extends POutput,
     TransformT extends PTransform<? super InputT, OutputT>> {
   /**
-   * Returns a {@link PTransform} that produces equivalent output to the provided transform.
+   * Returns a {@link PTransform} that produces equivalent output to the provided {@link
+   * AppliedPTransform transform}.
    */
-  PTransform<InputT, OutputT> getReplacementTransform(TransformT transform);
-
-  /**
-   * Returns the composite type that replacement transforms consumed from an equivalent expansion.
-   */
-  InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p);
+  PTransformReplacement<InputT, OutputT> getReplacementTransform(
+      AppliedPTransform<InputT, OutputT, TransformT> transform);
 
   /**
    * Returns a {@link Map} from the expanded values in {@code newOutput} to the values produced by
@@ -56,7 +53,25 @@ public interface PTransformOverrideFactory<
    */
   Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, OutputT newOutput);
 
-  /** A mapping between original {@link TaggedPValue} outputs and their replacements. */
+  /**
+   * A {@link PTransform} that replaces an {@link AppliedPTransform}, and the input required to
+   * do so. The input must be constructed from the expanded form, as the transform may not have
+   * originally been applied within this process or from within a Java SDK.
+   */
+  @AutoValue
+  abstract class PTransformReplacement<InputT extends PInput, OutputT extends POutput> {
+    public static <InputT extends PInput, OutputT extends POutput>
+        PTransformReplacement<InputT, OutputT> of(
+            InputT input, PTransform<InputT, OutputT> transform) {
+      return new AutoValue_PTransformOverrideFactory_PTransformReplacement(input, transform);
+    }
+    public abstract InputT getInput();
+    public abstract PTransform<InputT, OutputT> getTransform();
+  }
+
+  /**
+   * A mapping between original {@link TaggedPValue} outputs and their replacements.
+   */
   @AutoValue
   abstract class ReplacementOutput {
     public static ReplacementOutput of(TaggedPValue original, TaggedPValue replacement) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
index 8d99a62..bdb61b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
@@ -31,6 +31,11 @@ import org.apache.beam.sdk.values.TupleTag;
  *
  * <p>For internal use.
  *
+ * <p>Inputs and outputs are stored in their expanded forms, as the condensed form of a composite
+ * {@link PInput} or {@link POutput} is a language-specific concept, and {@link AppliedPTransform}
+ * represents a possibly cross-language transform for which no appropriate composite type exists
+ * in the Java SDK.
+ *
  * @param <InputT>     transform input type
  * @param <OutputT>    transform output type
  * @param <TransformT> transform type

http://git-wip-us.apache.org/repos/asf/beam/blob/f3b49605/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 6ce016d..75cabf2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -406,16 +406,10 @@ public class PipelineTest {
     class ReplacementOverrideFactory
         implements PTransformOverrideFactory<
             PCollection<String>, PCollection<Long>, OriginalTransform> {
-
       @Override
-      public PTransform<PCollection<String>, PCollection<Long>> getReplacementTransform(
-          OriginalTransform transform) {
-        return new ReplacementTransform();
-      }
-
-      @Override
-      public PCollection<String> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-        return originalInput;
+      public PTransformReplacement<PCollection<String>, PCollection<Long>> getReplacementTransform(
+          AppliedPTransform<PCollection<String>, PCollection<Long>, OriginalTransform> transform) {
+        return PTransformReplacement.of(originalInput, new ReplacementTransform());
       }
 
       @Override
@@ -464,14 +458,9 @@ public class PipelineTest {
   static class BoundedCountingInputOverride
       implements PTransformOverrideFactory<PBegin, PCollection<Long>, BoundedCountingInput> {
     @Override
-    public PTransform<PBegin, PCollection<Long>> getReplacementTransform(
-        BoundedCountingInput transform) {
-      return Create.of(0L);
-    }
-
-    @Override
-    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-      return p.begin();
+    public PTransformReplacement<PBegin, PCollection<Long>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<Long>, BoundedCountingInput> transform) {
+      return PTransformReplacement.of(transform.getPipeline().begin(), Create.of(0L));
     }
 
     @Override
@@ -489,15 +478,11 @@ public class PipelineTest {
   }
   static class UnboundedCountingInputOverride
       implements PTransformOverrideFactory<PBegin, PCollection<Long>, UnboundedCountingInput> {
-    @Override
-    public PTransform<PBegin, PCollection<Long>> getReplacementTransform(
-        UnboundedCountingInput transform) {
-      return CountingInput.upTo(100L);
-    }
 
     @Override
-    public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
-      return p.begin();
+    public PTransformReplacement<PBegin, PCollection<Long>> getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<Long>, UnboundedCountingInput> transform) {
+      return PTransformReplacement.of(transform.getPipeline().begin(), CountingInput.upTo(100L));
     }
 
     @Override