You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/01 21:07:17 UTC

[2/2] incubator-beam git commit: Makes PTransformOverrideFactory type-safe

Makes PTransformOverrideFactory type-safe


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb661437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb661437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb661437

Branch: refs/heads/master
Commit: bb66143754cc405435db734fb7629423b9ee292c
Parents: 1283308
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Oct 26 16:14:51 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 14:06:35 2016 -0700

----------------------------------------------------------------------
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 14 +++---
 .../direct/DirectGroupByKeyOverrideFactory.java | 25 ++++------
 .../GroupAlsoByWindowEvaluatorFactory.java      |  1 -
 .../direct/PTransformOverrideFactory.java       |  8 ++--
 .../beam/runners/direct/ParDoEvaluator.java     |  5 --
 .../direct/ParDoMultiEvaluatorFactory.java      |  1 -
 .../runners/direct/ParDoOverrideFactory.java    | 14 +++---
 .../direct/ParDoSingleEvaluatorFactory.java     |  1 -
 .../direct/TestStreamEvaluatorFactory.java      | 14 ++----
 .../direct/UncommittedBundleOutputManager.java  | 50 --------------------
 .../runners/direct/ViewEvaluatorFactory.java    | 19 +++-----
 .../direct/WriteWithShardingFactory.java        | 15 ++----
 .../beam/runners/direct/ParDoEvaluatorTest.java |  1 -
 .../direct/WriteWithShardingFactoryTest.java    | 26 ++--------
 14 files changed, 47 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index e232552..b63e23b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -28,16 +28,16 @@ import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 
 /** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
-class DirectGBKIntoKeyedWorkItemsOverrideFactory implements PTransformOverrideFactory {
+class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
+    implements PTransformOverrideFactory<
+        PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
+        GBKIntoKeyedWorkItems<KeyT, InputT>> {
   @Override
-  @SuppressWarnings("unchecked")
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    return new DirectGBKIntoKeyedWorkItems(transform.getName());
+  public PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>>
+      override(GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
+    return new DirectGBKIntoKeyedWorkItems<>(transform.getName());
   }
 
   /** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index c64f3f0..9acf5e9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -19,23 +19,16 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 
-/**
- * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
- */
-final class DirectGroupByKeyOverrideFactory
-    implements PTransformOverrideFactory {
+/** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
+final class DirectGroupByKeyOverrideFactory<K, V>
+    implements PTransformOverrideFactory<
+        PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
   @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof GroupByKey) {
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      PTransform<InputT, OutputT> override =
-          (PTransform) new DirectGroupByKey((GroupByKey) transform);
-      return override;
-    }
-    return transform;
+  public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> override(
+      GroupByKey<K, V> transform) {
+    return new DirectGroupByKey<>(transform);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 0c2aa1b..4115bb7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -124,7 +124,6 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
           ParDoEvaluator.create(
               evaluationContext,
               stepContext,
-              inputBundle,
               application,
               gabwDoFn,
               Collections.<PCollectionView<?>>emptyList(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
index 81e4863..8db6e9b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
@@ -21,13 +21,15 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
-interface PTransformOverrideFactory {
+interface PTransformOverrideFactory<
+    InputT extends PInput,
+    OutputT extends POutput,
+    TransformT extends PTransform<InputT, OutputT>> {
   /**
    * Create a {@link PTransform} override for the provided {@link PTransform} if applicable.
    * Otherwise, return the input {@link PTransform}.
    *
    * <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}.
    */
-  <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform);
+  PTransform<InputT, OutputT> override(TransformT transform);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index b524dfa..ff49b60 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -29,7 +29,6 @@ import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
@@ -44,16 +43,12 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
   public static <InputT, OutputT> ParDoEvaluator<InputT> create(
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
-      CommittedBundle<InputT> inputBundle,
       AppliedPTransform<PCollection<InputT>, ?, ?> application,
       Serializable fn, // may be OldDoFn or DoFn
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       Map<TupleTag<?>, PCollection<?>> outputs) {
-    DirectExecutionContext executionContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey());
-
     AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
 
     Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index 02469ff..ccda0e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -85,7 +85,6 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
           ParDoEvaluator.create(
               evaluationContext,
               stepContext,
-              inputBundle,
               application,
               (DoFn) fnLocal.get(),
               application.getTransform().getSideInputs(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
index 0881868..6052a41 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
@@ -24,22 +24,20 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
  * in the direct runner. Currently overrides applications of <a
  * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
  */
-class ParDoOverrideFactory implements PTransformOverrideFactory {
+class ParDoOverrideFactory<InputT, OutputT>
+    implements PTransformOverrideFactory<
+        PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
   @Override
   @SuppressWarnings("unchecked")
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (!(transform instanceof ParDo.Bound)) {
-      return transform;
-    }
+  public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override(
+      ParDo.Bound<InputT, OutputT> transform) {
     ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform;
     DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn());
     if (fn == null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 0584e41..d2a678d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -89,7 +89,6 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
           ParDoEvaluator.create(
               evaluationContext,
               stepContext,
-              inputBundle,
               application,
               fnLocal.get(),
               application.getTransform().getSideInputs(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 97e6b4d..58f2fa9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -45,8 +45,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -157,15 +155,11 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     }
   }
 
-  static class DirectTestStreamFactory implements PTransformOverrideFactory {
+  static class DirectTestStreamFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
     @Override
-    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-        PTransform<InputT, OutputT> transform) {
-      if (transform instanceof TestStream) {
-        return (PTransform<InputT, OutputT>)
-            new DirectTestStream<OutputT>((TestStream<OutputT>) transform);
-      }
-      return transform;
+    public PTransform<PBegin, PCollection<T>> override(TestStream<T> transform) {
+      return new DirectTestStream<>(transform);
     }
 
     private static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
deleted file mode 100644
index 6c7dec8..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
- * {@link DirectRunner}.
- */
-class UncommittedBundleOutputManager implements OutputManager {
-  private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-
-  public static UncommittedBundleOutputManager create(
-      Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
-    return new UncommittedBundleOutputManager(outputBundles);
-  }
-
-  UncommittedBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
-    this.bundles = bundles;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-    @SuppressWarnings("rawtypes")
-    UncommittedBundle bundle = bundles.get(tag);
-    bundle.add(output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 43a1225..2dd280a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -33,8 +33,6 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
@@ -95,18 +93,13 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
     };
   }
 
-  public static class ViewOverrideFactory implements PTransformOverrideFactory {
+  public static class ViewOverrideFactory<ElemT, ViewT>
+      implements PTransformOverrideFactory<
+          PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
     @Override
-    public <InputT extends PInput, OutputT extends POutput>
-        PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
-      if (transform instanceof CreatePCollectionView) {
-
-      }
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      PTransform<InputT, OutputT> createView =
-          (PTransform<InputT, OutputT>)
-              new DirectCreatePCollectionView<>((CreatePCollectionView) transform);
-      return createView;
+    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> override(
+        CreatePCollectionView<ElemT, ViewT> transform) {
+      return new DirectCreatePCollectionView<>(transform);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 8727cb5..cf535cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -39,8 +39,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.joda.time.Duration;
 
 /**
@@ -48,17 +46,14 @@ import org.joda.time.Duration;
  * with an unspecified number of shards with a write with a specified number of shards. The number
  * of shards is the log base 10 of the number of input records, with up to 2 additional shards.
  */
-class WriteWithShardingFactory implements PTransformOverrideFactory {
+class WriteWithShardingFactory<InputT>
+    implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write.Bound<InputT>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
 
   @Override
-  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-      PTransform<InputT, OutputT> transform) {
-    if (transform instanceof Write.Bound) {
-      Write.Bound<InputT> that = (Write.Bound<InputT>) transform;
-      if (that.getNumShards() == 0) {
-        return (PTransform<InputT, OutputT>) new DynamicallyReshardedWrite<InputT>(that);
-      }
+  public PTransform<PCollection<InputT>, PDone> override(Write.Bound<InputT> transform) {
+    if (transform.getNumShards() == 0) {
+      return new DynamicallyReshardedWrite<>(transform);
     }
     return transform;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 6d00aa1..89f9bfb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -159,7 +159,6 @@ public class ParDoEvaluatorTest {
     return ParDoEvaluator.create(
         evaluationContext,
         stepContext,
-        inputBundle,
         (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(),
         fn,
         ImmutableList.<PCollectionView<?>>of(singletonView),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 2dd477d..1ff5de2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -49,15 +49,12 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -71,7 +68,7 @@ import org.junit.runners.JUnit4;
 public class WriteWithShardingFactoryTest {
   public static final int INPUT_SIZE = 10000;
   @Rule public TemporaryFolder tmp = new TemporaryFolder();
-  private WriteWithShardingFactory factory = new WriteWithShardingFactory();
+  private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
 
   @Test
   public void dynamicallyReshardedWrite() throws Exception {
@@ -122,28 +119,15 @@ public class WriteWithShardingFactoryTest {
 
   @Test
   public void withShardingSpecifiesOriginalTransform() {
-    PTransform<PCollection<Object>, PDone> original = Write.to(new TestSink()).withNumShards(3);
+    Write.Bound<Object> original = Write.to(new TestSink()).withNumShards(3);
 
-    assertThat(factory.override(original), equalTo(original));
-  }
-
-  @Test
-  public void withNonWriteReturnsOriginalTransform() {
-    PTransform<PCollection<Object>, PDone> original =
-        new PTransform<PCollection<Object>, PDone>() {
-          @Override
-          public PDone apply(PCollection<Object> input) {
-            return PDone.in(input.getPipeline());
-          }
-        };
-
-    assertThat(factory.override(original), equalTo(original));
+    assertThat(factory.override(original), equalTo((Object) original));
   }
 
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
-    PTransform<PCollection<Object>, PDone> original = Write.to(new TestSink());
-    assertThat(factory.override(original), not(equalTo(original)));
+    Write.Bound<Object> original = Write.to(new TestSink());
+    assertThat(factory.override(original), not(equalTo((Object) original)));
   }
 
   @Test