You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/01 21:07:17 UTC
[2/2] incubator-beam git commit: Makes PTransformOverrideFactory
type-safe
Makes PTransformOverrideFactory type-safe
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb661437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb661437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb661437
Branch: refs/heads/master
Commit: bb66143754cc405435db734fb7629423b9ee292c
Parents: 1283308
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Oct 26 16:14:51 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Nov 1 14:06:35 2016 -0700
----------------------------------------------------------------------
...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 14 +++---
.../direct/DirectGroupByKeyOverrideFactory.java | 25 ++++------
.../GroupAlsoByWindowEvaluatorFactory.java | 1 -
.../direct/PTransformOverrideFactory.java | 8 ++--
.../beam/runners/direct/ParDoEvaluator.java | 5 --
.../direct/ParDoMultiEvaluatorFactory.java | 1 -
.../runners/direct/ParDoOverrideFactory.java | 14 +++---
.../direct/ParDoSingleEvaluatorFactory.java | 1 -
.../direct/TestStreamEvaluatorFactory.java | 14 ++----
.../direct/UncommittedBundleOutputManager.java | 50 --------------------
.../runners/direct/ViewEvaluatorFactory.java | 19 +++-----
.../direct/WriteWithShardingFactory.java | 15 ++----
.../beam/runners/direct/ParDoEvaluatorTest.java | 1 -
.../direct/WriteWithShardingFactoryTest.java | 26 ++--------
14 files changed, 47 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index e232552..b63e23b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -28,16 +28,16 @@ import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
-class DirectGBKIntoKeyedWorkItemsOverrideFactory implements PTransformOverrideFactory {
+class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
+ implements PTransformOverrideFactory<
+ PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
+ GBKIntoKeyedWorkItems<KeyT, InputT>> {
@Override
- @SuppressWarnings("unchecked")
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- return new DirectGBKIntoKeyedWorkItems(transform.getName());
+ public PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>>
+ override(GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
+ return new DirectGBKIntoKeyedWorkItems<>(transform.getName());
}
/** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index c64f3f0..9acf5e9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -19,23 +19,16 @@ package org.apache.beam.runners.direct;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
-/**
- * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
- */
-final class DirectGroupByKeyOverrideFactory
- implements PTransformOverrideFactory {
+/** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
+final class DirectGroupByKeyOverrideFactory<K, V>
+ implements PTransformOverrideFactory<
+ PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
@Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof GroupByKey) {
- @SuppressWarnings({"rawtypes", "unchecked"})
- PTransform<InputT, OutputT> override =
- (PTransform) new DirectGroupByKey((GroupByKey) transform);
- return override;
- }
- return transform;
+ public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> override(
+ GroupByKey<K, V> transform) {
+ return new DirectGroupByKey<>(transform);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 0c2aa1b..4115bb7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -124,7 +124,6 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
ParDoEvaluator.create(
evaluationContext,
stepContext,
- inputBundle,
application,
gabwDoFn,
Collections.<PCollectionView<?>>emptyList(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
index 81e4863..8db6e9b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
@@ -21,13 +21,15 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
-interface PTransformOverrideFactory {
+interface PTransformOverrideFactory<
+ InputT extends PInput,
+ OutputT extends POutput,
+ TransformT extends PTransform<InputT, OutputT>> {
/**
* Create a {@link PTransform} override for the provided {@link PTransform} if applicable.
* Otherwise, return the input {@link PTransform}.
*
* <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}.
*/
- <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform);
+ PTransform<InputT, OutputT> override(TransformT transform);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index b524dfa..ff49b60 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -29,7 +29,6 @@ import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
@@ -44,16 +43,12 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
public static <InputT, OutputT> ParDoEvaluator<InputT> create(
EvaluationContext evaluationContext,
DirectStepContext stepContext,
- CommittedBundle<InputT> inputBundle,
AppliedPTransform<PCollection<InputT>, ?, ?> application,
Serializable fn, // may be OldDoFn or DoFn
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
Map<TupleTag<?>, PCollection<?>> outputs) {
- DirectExecutionContext executionContext =
- evaluationContext.getExecutionContext(application, inputBundle.getKey());
-
AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index 02469ff..ccda0e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -85,7 +85,6 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
ParDoEvaluator.create(
evaluationContext,
stepContext,
- inputBundle,
application,
(DoFn) fnLocal.get(),
application.getTransform().getSideInputs(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
index 0881868..6052a41 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
@@ -24,22 +24,20 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PCollection;
/**
* A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
* in the direct runner. Currently overrides applications of <a
* href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
*/
-class ParDoOverrideFactory implements PTransformOverrideFactory {
+class ParDoOverrideFactory<InputT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> {
@Override
@SuppressWarnings("unchecked")
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (!(transform instanceof ParDo.Bound)) {
- return transform;
- }
+ public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override(
+ ParDo.Bound<InputT, OutputT> transform) {
ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform;
DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn());
if (fn == null) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 0584e41..d2a678d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -89,7 +89,6 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
ParDoEvaluator.create(
evaluationContext,
stepContext,
- inputBundle,
application,
fnLocal.get(),
application.getTransform().getSideInputs(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 97e6b4d..58f2fa9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -45,8 +45,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -157,15 +155,11 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
}
- static class DirectTestStreamFactory implements PTransformOverrideFactory {
+ static class DirectTestStreamFactory<T>
+ implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
@Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof TestStream) {
- return (PTransform<InputT, OutputT>)
- new DirectTestStream<OutputT>((TestStream<OutputT>) transform);
- }
- return transform;
+ public PTransform<PBegin, PCollection<T>> override(TestStream<T> transform) {
+ return new DirectTestStream<>(transform);
}
private static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
deleted file mode 100644
index 6c7dec8..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
- * {@link DirectRunner}.
- */
-class UncommittedBundleOutputManager implements OutputManager {
- private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-
- public static UncommittedBundleOutputManager create(
- Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
- return new UncommittedBundleOutputManager(outputBundles);
- }
-
- UncommittedBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
- this.bundles = bundles;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- @SuppressWarnings("rawtypes")
- UncommittedBundle bundle = bundles.get(tag);
- bundle.add(output);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 43a1225..2dd280a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -33,8 +33,6 @@ import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
/**
* The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
@@ -95,18 +93,13 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
};
}
- public static class ViewOverrideFactory implements PTransformOverrideFactory {
+ public static class ViewOverrideFactory<ElemT, ViewT>
+ implements PTransformOverrideFactory<
+ PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
@Override
- public <InputT extends PInput, OutputT extends POutput>
- PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
- if (transform instanceof CreatePCollectionView) {
-
- }
- @SuppressWarnings({"rawtypes", "unchecked"})
- PTransform<InputT, OutputT> createView =
- (PTransform<InputT, OutputT>)
- new DirectCreatePCollectionView<>((CreatePCollectionView) transform);
- return createView;
+ public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> override(
+ CreatePCollectionView<ElemT, ViewT> transform) {
+ return new DirectCreatePCollectionView<>(transform);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 8727cb5..cf535cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -39,8 +39,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.joda.time.Duration;
/**
@@ -48,17 +46,14 @@ import org.joda.time.Duration;
* with an unspecified number of shards with a write with a specified number of shards. The number
* of shards is the log base 10 of the number of input records, with up to 2 additional shards.
*/
-class WriteWithShardingFactory implements PTransformOverrideFactory {
+class WriteWithShardingFactory<InputT>
+ implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write.Bound<InputT>> {
static final int MAX_RANDOM_EXTRA_SHARDS = 3;
@Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof Write.Bound) {
- Write.Bound<InputT> that = (Write.Bound<InputT>) transform;
- if (that.getNumShards() == 0) {
- return (PTransform<InputT, OutputT>) new DynamicallyReshardedWrite<InputT>(that);
- }
+ public PTransform<PCollection<InputT>, PDone> override(Write.Bound<InputT> transform) {
+ if (transform.getNumShards() == 0) {
+ return new DynamicallyReshardedWrite<>(transform);
}
return transform;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 6d00aa1..89f9bfb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -159,7 +159,6 @@ public class ParDoEvaluatorTest {
return ParDoEvaluator.create(
evaluationContext,
stepContext,
- inputBundle,
(AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(),
fn,
ImmutableList.<PCollectionView<?>>of(singletonView),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb661437/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 2dd477d..1ff5de2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -49,15 +49,12 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -71,7 +68,7 @@ import org.junit.runners.JUnit4;
public class WriteWithShardingFactoryTest {
public static final int INPUT_SIZE = 10000;
@Rule public TemporaryFolder tmp = new TemporaryFolder();
- private WriteWithShardingFactory factory = new WriteWithShardingFactory();
+ private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
@Test
public void dynamicallyReshardedWrite() throws Exception {
@@ -122,28 +119,15 @@ public class WriteWithShardingFactoryTest {
@Test
public void withShardingSpecifiesOriginalTransform() {
- PTransform<PCollection<Object>, PDone> original = Write.to(new TestSink()).withNumShards(3);
+ Write.Bound<Object> original = Write.to(new TestSink()).withNumShards(3);
- assertThat(factory.override(original), equalTo(original));
- }
-
- @Test
- public void withNonWriteReturnsOriginalTransform() {
- PTransform<PCollection<Object>, PDone> original =
- new PTransform<PCollection<Object>, PDone>() {
- @Override
- public PDone apply(PCollection<Object> input) {
- return PDone.in(input.getPipeline());
- }
- };
-
- assertThat(factory.override(original), equalTo(original));
+ assertThat(factory.override(original), equalTo((Object) original));
}
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
- PTransform<PCollection<Object>, PDone> original = Write.to(new TestSink());
- assertThat(factory.override(original), not(equalTo(original)));
+ Write.Bound<Object> original = Write.to(new TestSink());
+ assertThat(factory.override(original), not(equalTo((Object) original)));
}
@Test