You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/08 19:28:51 UTC
[1/2] incubator-beam git commit: Add PTransformOverrideFactory
Repository: incubator-beam
Updated Branches:
refs/heads/master 245cffa5e -> 6348a1fe2
Add PTransformOverrideFactory
This constructs an appropriate custom PTransform override for the runner
it is meant to be used in, which is applied in place of the original
PTransform.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d1486c66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d1486c66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d1486c66
Branch: refs/heads/master
Commit: d1486c66619891903ed4c0c57c354af06ffbd23c
Parents: 1aee017
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 10:03:25 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 10:27:50 2016 -0700
----------------------------------------------------------------------
.../inprocess/GroupByKeyEvaluatorFactory.java | 21 +++++++++-
.../sdk/runners/inprocess/InProcessCreate.java | 18 +++++++++
.../inprocess/InProcessPipelineRunner.java | 41 +++++++-------------
.../inprocess/PTransformOverrideFactory.java | 33 ++++++++++++++++
.../runners/inprocess/ViewEvaluatorFactory.java | 28 +++++++++++--
5 files changed, 109 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 0518eec..4b478ad 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -42,6 +42,8 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
@@ -179,10 +181,27 @@ class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
}
/**
+ * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
+ */
+ public static final class InProcessGroupByKeyOverrideFactory
+ implements PTransformOverrideFactory {
+ @Override
+ public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform) {
+ if (transform instanceof GroupByKey) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ PTransform<InputT, OutputT> override = new InProcessGroupByKey((GroupByKey) transform);
+ return override;
+ }
+ return transform;
+ }
+ }
+
+ /**
* An in-memory implementation of the {@link GroupByKey} primitive as a composite
* {@link PTransform}.
*/
- public static final class InProcessGroupByKey<K, V>
+ private static final class InProcessGroupByKey<K, V>
extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
private final GroupByKey<K, V> original;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
index efda8fc..00587ea 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
@@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -53,6 +54,23 @@ import javax.annotation.Nullable;
class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
private final Create.Values<T> original;
+ /**
+ * A {@link PTransformOverrideFactory} for {@link InProcessCreate}.
+ */
+ public static class InProcessCreateOverrideFactory implements PTransformOverrideFactory {
+ @Override
+ public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform) {
+ if (transform instanceof Create.Values) {
+ @SuppressWarnings("unchecked")
+ PTransform<InputT, OutputT> override =
+ (PTransform<InputT, OutputT>) from((Create.Values<OutputT>) transform);
+ return override;
+ }
+ return transform;
+ }
+ }
+
public static <T> InProcessCreate<T> from(Create.Values<T> original) {
return new InProcessCreate<>(original);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 4fb01b7..9ce4430 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -26,15 +26,16 @@ import com.google.cloud.dataflow.sdk.runners.AggregatorPipelineExtractor;
import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessCreate.InProcessCreateOverrideFactory;
+import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessViewOverrideFactory;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
@@ -75,14 +76,12 @@ public class InProcessPipelineRunner
* type of transform it is overriding.
*/
@SuppressWarnings("rawtypes")
- private static Map<Class<? extends PTransform>, Class<? extends PTransform>>
+ private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
defaultTransformOverrides =
- ImmutableMap.<Class<? extends PTransform>, Class<? extends PTransform>>builder()
- .put(Create.Values.class, InProcessCreate.class)
- .put(GroupByKey.class, InProcessGroupByKey.class)
- .put(
- CreatePCollectionView.class,
- ViewEvaluatorFactory.InProcessCreatePCollectionView.class)
+ ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
+ .put(Create.Values.class, new InProcessCreateOverrideFactory())
+ .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
+ .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
.build();
/**
@@ -191,28 +190,14 @@ public class InProcessPipelineRunner
@Override
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
- Class<?> overrideClass = defaultTransformOverrides.get(transform.getClass());
- if (overrideClass != null) {
- // It is the responsibility of whoever constructs overrides to ensure this is type safe.
- @SuppressWarnings("unchecked")
- Class<PTransform<InputT, OutputT>> transformClass =
- (Class<PTransform<InputT, OutputT>>) transform.getClass();
+ PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
+ if (overrideFactory != null) {
+ PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
- @SuppressWarnings("unchecked")
- Class<PTransform<InputT, OutputT>> customTransformClass =
- (Class<PTransform<InputT, OutputT>>) overrideClass;
-
- PTransform<InputT, OutputT> customTransform =
- InstanceBuilder.ofType(customTransformClass)
- .withArg(transformClass, transform)
- .build();
-
- // This overrides the contents of the apply method without changing the TransformTreeNode that
- // is generated by the PCollection application.
return super.apply(customTransform, input);
- } else {
- return super.apply(transform, input);
}
+ // If there is no override, or we should not apply the override, apply the original transform
+ return super.apply(transform, input);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java
new file mode 100644
index 0000000..77fa585
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PTransformOverrideFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+interface PTransformOverrideFactory {
+ /**
+ * Create a {@link PTransform} override for the provided {@link PTransform} if applicable.
+ * Otherwise, return the input {@link PTransform}.
+ *
+ * <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}.
+ */
+ <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d1486c66/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
index f78fc49..112ea0c 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
@@ -29,6 +29,8 @@ import com.google.cloud.dataflow.sdk.transforms.WithKeys;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
import java.util.ArrayList;
import java.util.List;
@@ -50,7 +52,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
InProcessPipelineRunner.CommittedBundle<?> inputBundle,
InProcessEvaluationContext evaluationContext) {
@SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createEvaluator(
+ TransformEvaluator<T> evaluator = createEvaluator(
(AppliedPTransform) application, evaluationContext);
return evaluator;
}
@@ -80,11 +82,26 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
};
}
+ public static class InProcessViewOverrideFactory implements PTransformOverrideFactory {
+ @Override
+ public <InputT extends PInput, OutputT extends POutput>
+ PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
+ if (transform instanceof CreatePCollectionView) {
+
+ }
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ PTransform<InputT, OutputT> createView =
+ (PTransform<InputT, OutputT>)
+ new InProcessCreatePCollectionView<>((CreatePCollectionView) transform);
+ return createView;
+ }
+ }
+
/**
* An in-process override for {@link CreatePCollectionView}.
*/
- public static class InProcessCreatePCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ private static class InProcessCreatePCollectionView<ElemT, ViewT>
+ extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
private final CreatePCollectionView<ElemT, ViewT> og;
private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
@@ -99,6 +116,11 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
.apply(Values.<Iterable<ElemT>>create())
.apply(new WriteView<ElemT, ViewT>(og));
}
+
+ @Override
+ protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
+ return og;
+ }
}
/**
[2/2] incubator-beam git commit: This closes #131
Posted by ke...@apache.org.
This closes #131
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6348a1fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6348a1fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6348a1fe
Branch: refs/heads/master
Commit: 6348a1fe24683d531e053d5be1e56e38635d2957
Parents: 245cffa d1486c6
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Apr 8 10:28:32 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Apr 8 10:28:32 2016 -0700
----------------------------------------------------------------------
.../inprocess/GroupByKeyEvaluatorFactory.java | 21 +++++++++-
.../sdk/runners/inprocess/InProcessCreate.java | 18 ++++++++
.../inprocess/InProcessPipelineRunner.java | 43 +++++++-------------
.../inprocess/PTransformOverrideFactory.java | 33 +++++++++++++++
.../runners/inprocess/ViewEvaluatorFactory.java | 28 +++++++++++--
5 files changed, 110 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6348a1fe/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6348a1fe/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --cc sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index f5b7f3c,9ce4430..fa93994
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@@ -33,9 -35,7 +35,8 @@@ import com.google.cloud.dataflow.sdk.tr
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
- import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
import com.google.cloud.dataflow.sdk.util.UserCodeException;