You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/22 19:35:02 UTC
[1/3] beam git commit: This closes #3190
Repository: beam
Updated Branches:
refs/heads/master e83fcece5 -> 996782b62
This closes #3190
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/996782b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/996782b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/996782b6
Branch: refs/heads/master
Commit: 996782b62fa5683a139fd3396df247dbe890144d
Parents: e83fcec 888a5e6
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 22 12:24:45 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 22 12:24:45 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/ParDos.java | 16 +++++++++++++
.../runners/core/construction/ParDosTest.java | 4 ++++
.../beam/runners/dataflow/util/DoFnInfo.java | 25 --------------------
.../control/ProcessBundleHandlerTest.java | 2 +-
4 files changed, 21 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
[3/3] beam git commit: Remove Unused DoFnInfo methods
Posted by tg...@apache.org.
Remove Unused DoFnInfo methods
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10357c20
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10357c20
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10357c20
Branch: refs/heads/master
Commit: 10357c20ba62dab873749f135d23c4dda4033cd7
Parents: e83fcec
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 19 14:06:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 22 12:24:45 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/util/DoFnInfo.java | 25 --------------------
.../control/ProcessBundleHandlerTest.java | 2 +-
2 files changed, 1 insertion(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/10357c20/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index bd2742f..4a26795 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -53,25 +53,6 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
}
- /** TODO: remove this when Dataflow worker uses the DoFn overload. */
- @Deprecated
- @SuppressWarnings("unchecked")
- public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
- Serializable doFn,
- WindowingStrategy<?, ?> windowingStrategy,
- Iterable<PCollectionView<?>> sideInputViews,
- Coder<InputT> inputCoder,
- long mainOutput,
- Map<Long, TupleTag<?>> outputMap) {
- return forFn(
- (DoFn<InputT, OutputT>) doFn,
- windowingStrategy,
- sideInputViews,
- inputCoder,
- mainOutput,
- outputMap);
- }
-
public DoFnInfo<InputT, OutputT> withFn(DoFn<InputT, OutputT> newFn) {
return DoFnInfo.forFn(newFn,
windowingStrategy,
@@ -96,12 +77,6 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
this.outputMap = outputMap;
}
- /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */
- @Deprecated
- public Serializable getFn() {
- return doFn;
- }
-
/** Returns the embedded function. */
public DoFn<InputT, OutputT> getDoFn() {
return doFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/10357c20/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 748ffea..f405728 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -340,7 +340,7 @@ public class ProcessBundleHandlerTest {
new TestDoFn(),
WindowingStrategy.globalDefault(),
ImmutableList.of(),
- STRING_CODER,
+ StringUtf8Coder.of(),
mainOutputId,
ImmutableMap.of(
mainOutputId, TestDoFn.mainOutput,
[2/3] beam git commit: Extract the Main Input PCollection in ParDos
Posted by tg...@apache.org.
Extract the Main Input PCollection in ParDos
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/888a5e6a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/888a5e6a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/888a5e6a
Branch: refs/heads/master
Commit: 888a5e6ab9c1c83ee06281de4c906be69c076286
Parents: 10357c2
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 19 14:24:07 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 22 12:24:45 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/ParDos.java | 16 ++++++++++++++++
.../beam/runners/core/construction/ParDosTest.java | 4 ++++
2 files changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/888a5e6a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
index 4752bd1..2ecc041 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
@@ -161,6 +163,20 @@ public class ParDos {
return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
}
+ public static RunnerApi.PCollection getMainInput(
+ RunnerApi.PTransform ptransform, Components components) throws IOException {
+ checkArgument(
+ ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN),
+ "Unexpected payload type %s",
+ ptransform.getSpec().getUrn());
+ ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class);
+ String mainInputId =
+ Iterables.getOnlyElement(
+ Sets.difference(
+ ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet()));
+ return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
+ }
+
// TODO: Implement
private static StateSpec toProto(StateDeclaration state) {
throw new UnsupportedOperationException("Not yet supported");
http://git-wip-us.apache.org/repos/asf/beam/blob/888a5e6a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
index 74edec1..b6f0b7d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java
@@ -149,6 +149,10 @@ public class ParDosTest {
view.getWindowingStrategyInternal().fixDefaults()));
assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
}
+ String mainInputId = components.registerPCollection(mainInput);
+ assertThat(
+ ParDos.getMainInput(protoTransform, protoComponents),
+ equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
}
private static class DropElementsFn extends DoFn<KV<Long, String>, Void> {