You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/27 18:44:00 UTC
[1/3] incubator-beam git commit: Make DoFnInfo carry OldDoFn or DoFn
Repository: incubator-beam
Updated Branches:
refs/heads/master ee6ad2fe4 -> 064f18a8f
Make DoFnInfo carry OldDoFn or DoFn
This will allow consumers to prepare to accept
DoFn while still accepting existing jobs that
use OldDoFn. It is a move towards treating
the Fn itself as just a serialized blob.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/73db5608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/73db5608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/73db5608
Branch: refs/heads/master
Commit: 73db5608a58ff64a0b452140736a150f973986b8
Parents: 95bf7a8
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 24 15:11:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 27 10:48:34 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/util/DoFnInfo.java | 43 ++++++++++++++++++--
1 file changed, 39 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/73db5608/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index b211c04..bfa12e2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -17,29 +17,38 @@
*/
package org.apache.beam.runners.dataflow.util;
+import static com.google.common.base.Preconditions.checkState;
+
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
/**
- * Wrapper class holding the necessary information to serialize a {@link OldDoFn}.
+ * Wrapper class holding the necessary information to serialize a {@link OldDoFn}
+ * or {@link DoFn}.
*
* @param <InputT> the type of the (main) input elements of the {@link OldDoFn}
* @param <OutputT> the type of the (main) output elements of the {@link OldDoFn}
*/
public class DoFnInfo<InputT, OutputT> implements Serializable {
- private final OldDoFn<InputT, OutputT> doFn;
+ private final Serializable doFn;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Iterable<PCollectionView<?>> sideInputViews;
private final Coder<InputT> inputCoder;
private final long mainOutput;
private final Map<Long, TupleTag<?>> outputMap;
- public DoFnInfo(OldDoFn<InputT, OutputT> doFn,
+ /**
+ * Creates a {@link DoFnInfo} for the given {@link DoFn} or {@link OldDoFn} and auxiliary bits and
+ * pieces.
+ */
+ public DoFnInfo(
+ Serializable doFn,
WindowingStrategy<?, ?> windowingStrategy,
Iterable<PCollectionView<?>> sideInputViews,
Coder<InputT> inputCoder,
@@ -53,10 +62,36 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
this.outputMap = outputMap;
}
- public OldDoFn<InputT, OutputT> getDoFn() {
+ /**
+ * @deprecated call the constructor with a {@link Serializable}
+ */
+ @Deprecated
+ public DoFnInfo(
+ OldDoFn doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Iterable<PCollectionView<?>> sideInputViews,
+ Coder<InputT> inputCoder,
+ long mainOutput,
+ Map<Long, TupleTag<?>> outputMap) {
+ this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap);
+ }
+
+ /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}. */
+ public Serializable getFn() {
return doFn;
}
+ /** @deprecated use {@link #getFn()} */
+ @Deprecated
+ public OldDoFn getDoFn() {
+ checkState(
+ doFn instanceof OldDoFn,
+ "Deprecated %s.getDoFn() called when the payload was actually a new %s",
+ DoFnInfo.class.getSimpleName(),
+ DoFn.class.getSimpleName());
+ return (OldDoFn) doFn;
+ }
+
public WindowingStrategy<?, ?> getWindowingStrategy() {
return windowingStrategy;
}
[2/3] incubator-beam git commit: Update Dataflow worker image pointer
Posted by ke...@apache.org.
Update Dataflow worker image pointer
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95bf7a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95bf7a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95bf7a83
Branch: refs/heads/master
Commit: 95bf7a835d81a7b62b734362386e0aea64a42532
Parents: ee6ad2f
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 26 14:45:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 27 10:48:34 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95bf7a83/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2324196..2943ab9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -208,9 +208,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Default Docker container images that execute Dataflow worker harness, residing in Google
// Container Registry, separately for Batch and Streaming.
public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE =
- "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161024";
+ "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161026";
public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE =
- "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161024";
+ "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161026";
// The limit of CreateJob request size.
private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
[3/3] incubator-beam git commit: This closes #1176
Posted by ke...@apache.org.
This closes #1176
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/064f18a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/064f18a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/064f18a8
Branch: refs/heads/master
Commit: 064f18a8f704ba4713701b7c8432a6f9b401a110
Parents: ee6ad2f 73db560
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 27 10:48:35 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 27 10:48:35 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 43 ++++++++++++++++++--
2 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------