You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/20 23:40:49 UTC

[1/3] beam git commit: [BEAM-1377] Splittable DoFn in Dataflow streaming runner

Repository: beam
Updated Branches:
  refs/heads/master 698b89e2b -> fd40d4b29


[BEAM-1377] Splittable DoFn in Dataflow streaming runner

Transform expansion and translation for the involved primitive
transforms. Of course, the current PR will only work after the
respective Dataflow worker and backend changes are released.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a06c8bfa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a06c8bfa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a06c8bfa

Branch: refs/heads/master
Commit: a06c8bfae6fb9e35deeb4adfdd7761889b12be89
Parents: 4f6032c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Feb 1 17:26:55 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 16:27:12 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  6 +-
 .../dataflow/DataflowPipelineTranslator.java    | 40 +++++++++
 .../beam/runners/dataflow/DataflowRunner.java   | 14 +++
 .../dataflow/SplittableParDoOverrides.java      | 76 +++++++++++++++++
 .../runners/dataflow/util/PropertyNames.java    |  1 +
 .../DataflowPipelineTranslatorTest.java         | 89 ++++++++++++++++++++
 .../beam/sdk/transforms/SplittableDoFnTest.java | 22 ++++-
 7 files changed, 246 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index f627f12..d1bce32 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -216,13 +216,17 @@
           <execution>
             <id>validates-runner-tests</id>
             <configuration>
+              <!--
+                UsesSplittableParDoWithWindowedSideInputs because of
+                https://issues.apache.org/jira/browse/BEAM-2476
+              -->
               <excludedGroups>
                 org.apache.beam.sdk.testing.LargeKeys$Above10MB,
                 org.apache.beam.sdk.testing.UsesDistributionMetrics,
                 org.apache.beam.sdk.testing.UsesGaugeMetrics,
                 org.apache.beam.sdk.testing.UsesSetState,
                 org.apache.beam.sdk.testing.UsesMapState,
-                org.apache.beam.sdk.testing.UsesSplittableParDo,
+                org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs,
                 org.apache.beam.sdk.testing.UsesUnboundedPCollections,
                 org.apache.beam.sdk.testing.UsesTestStream,
               </excludedGroups>

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index afc34e6..bfd9b64 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
@@ -886,6 +887,45 @@ public class DataflowPipelineTranslator {
     // IO Translation.
 
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Splittable DoFn translation.
+
+    registerTransformTranslator(
+        SplittableParDo.ProcessKeyedElements.class,
+        new TransformTranslator<SplittableParDo.ProcessKeyedElements>() {
+          @Override
+          public void translate(
+              SplittableParDo.ProcessKeyedElements transform, TranslationContext context) {
+            translateTyped(transform, context);
+          }
+
+          private <InputT, OutputT, RestrictionT> void translateTyped(
+              SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> transform,
+              TranslationContext context) {
+            StepTranslationContext stepContext =
+                context.addStep(transform, "SplittableProcessKeyed");
+
+            translateInputs(
+                stepContext, context.getInput(transform), transform.getSideInputs(), context);
+            BiMap<Long, TupleTag<?>> outputMap =
+                translateOutputs(context.getOutputs(transform), stepContext);
+            stepContext.addInput(
+                PropertyNames.SERIALIZED_FN,
+                byteArrayToJsonString(
+                    serializeToByteArray(
+                        DoFnInfo.forFn(
+                            transform.getFn(),
+                            transform.getInputWindowingStrategy(),
+                            transform.getSideInputs(),
+                            transform.getElementCoder(),
+                            outputMap.inverse().get(transform.getMainOutputTag()),
+                            outputMap))));
+            stepContext.addInput(
+                PropertyNames.RESTRICTION_CODER,
+                CloudObjects.asCloudObject(transform.getRestrictionCoder()));
+          }
+        });
   }
 
   private static void translateInputs(

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ea9db24..c584b31 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -325,6 +325,20 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                 new StreamingFnApiCreateOverrideFactory()));
       }
       overridesBuilder
+          // Support Splittable DoFn for now only in streaming mode.
+          // The order of the following overrides is important because they are applied in order.
+
+          // By default Dataflow runner replaces single-output ParDo with a ParDoSingle override.
+          // However, we want a different expansion for single-output splittable ParDo.
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.splittableParDoSingle(),
+                  new ReflectiveOneToOneOverrideFactory(
+                      SplittableParDoOverrides.ParDoSingleViaMulti.class, this)))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.splittableParDoMulti(),
+                  new SplittableParDoOverrides.SplittableParDoOverrideFactory()))
           .add(
               // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
               // must precede it

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
new file mode 100644
index 0000000..9322878
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import java.util.Map;
+import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/** Transform overrides for supporting {@link SplittableParDo} in the Dataflow runner. */
+class SplittableParDoOverrides {
+  static class ParDoSingleViaMulti<InputT, OutputT>
+      extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
+    private final ParDo.SingleOutput<InputT, OutputT> original;
+
+    public ParDoSingleViaMulti(
+        DataflowRunner ignored, ParDo.SingleOutput<InputT, OutputT> original) {
+      this.original = original;
+    }
+
+    @Override
+    protected PTransform<PCollection<? extends InputT>, PCollection<OutputT>> delegate() {
+      return original;
+    }
+
+    @Override
+    public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+      TupleTag<OutputT> mainOutput = new TupleTag<>();
+      return input.apply(original.withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
+    }
+  }
+
+  static class SplittableParDoOverrideFactory<InputT, OutputT, RestrictionT>
+      implements PTransformOverrideFactory<
+          PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {
+    @Override
+    public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
+        AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>>
+            appliedTransform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(appliedTransform),
+          new SplittableParDo<>(appliedTransform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
+      return ReplacementOutputs.tagged(outputs, newOutput);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index f82f1f1..55e0c4e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -63,4 +63,5 @@ public class PropertyNames {
   public static final String USES_KEYED_STATE = "uses_keyed_state";
   public static final String VALUE = "value";
   public static final String DISPLAY_DATA = "display_data";
+  public static final String RESTRICTION_CODER = "restriction_coder";
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 53215f6..948af1c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -18,11 +18,14 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -66,11 +69,15 @@ import java.util.Set;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.Structs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
@@ -91,7 +98,13 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -100,6 +113,8 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -896,6 +911,68 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         not(equalTo("true")));
   }
 
+  /**
+   * Smoke test to fail fast if translation of a splittable ParDo
+   * in streaming breaks.
+   */
+  @Test
+  public void testStreamingSplittableParDoTranslation() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowRunner runner = DataflowRunner.fromOptions(options);
+    options.setStreaming(true);
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> windowedInput = pipeline
+        .apply(Create.of("a"))
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+    windowedInput.apply(ParDo.of(new TestSplittableFn()));
+
+    runner.replaceTransforms(pipeline);
+
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                runner,
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    // The job should contain a SplittableParDo.ProcessKeyedElements step, translated as
+    // "SplittableProcessKeyed".
+
+    List<Step> steps = job.getSteps();
+    Step processKeyedStep = null;
+    for (Step step : steps) {
+      if (step.getKind().equals("SplittableProcessKeyed")) {
+        assertNull(processKeyedStep);
+        processKeyedStep = step;
+      }
+    }
+    assertNotNull(processKeyedStep);
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    DoFnInfo<String, Integer> fnInfo =
+        (DoFnInfo<String, Integer>)
+            SerializableUtils.deserializeFromByteArray(
+                jsonStringToByteArray(
+                    Structs.getString(
+                        processKeyedStep.getProperties(), PropertyNames.SERIALIZED_FN)),
+                "DoFnInfo");
+    assertThat(fnInfo.getDoFn(), instanceOf(TestSplittableFn.class));
+    assertThat(
+        fnInfo.getWindowingStrategy().getWindowFn(),
+        Matchers.<WindowFn>equalTo(FixedWindows.of(Duration.standardMinutes(1))));
+    Coder<?> restrictionCoder =
+        CloudObjects.coderFromCloudObject(
+            (CloudObject)
+                Structs.getObject(
+                    processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER));
+
+    assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
+  }
+
   @Test
   public void testToSingletonTranslationWithIsmSideInput() throws Exception {
     // A "change detector" test that makes sure the translation
@@ -1090,4 +1167,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     assertTrue(String.format("Found duplicate output ids %s", outputIds),
         outputIds.size() == 0);
   }
+
+  private static class TestSplittableFn extends DoFn<String, Integer> {
+    @ProcessElement
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
+      // noop
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(String element) {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a06c8bfa/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 646d8d3..0c2bd1c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -33,6 +34,8 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
@@ -97,8 +100,25 @@ public class SplittableDoFnTest implements Serializable {
     }
   }
 
+  private static PipelineOptions streamingTestPipelineOptions() {
+    // Using testing options with streaming=true makes it possible to enable UsesSplittableParDo
+    // tests in Dataflow runner, because as of writing, it can run Splittable DoFn only in
+    // streaming mode.
+    // This is a no-op for other runners currently (Direct runner doesn't care, and other
+    // runners don't implement SDF at all yet).
+    //
+    // This is a workaround until https://issues.apache.org/jira/browse/BEAM-1620
+    // is properly implemented and supports marking tests as streaming-only.
+    //
+    // https://issues.apache.org/jira/browse/BEAM-2483 specifically tracks the removal of the
+    // current workaround.
+    PipelineOptions options = testingPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(true);
+    return options;
+  }
+
   @Rule
-  public final transient TestPipeline p = TestPipeline.create();
+  public final transient TestPipeline p = TestPipeline.fromOptions(streamingTestPipelineOptions());
 
   @Test
   @Category({ValidatesRunner.class, UsesSplittableParDo.class})


[3/3] beam git commit: This closes #1898: [BEAM-1377] Splittable DoFn in Dataflow streaming runner

Posted by jk...@apache.org.
This closes #1898: [BEAM-1377] Splittable DoFn in Dataflow streaming runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fd40d4b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fd40d4b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fd40d4b2

Branch: refs/heads/master
Commit: fd40d4b29d3e46dfe25bd7cea65eb7b51dde135f
Parents: 698b89e a06c8bf
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 20 16:27:26 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 16:27:26 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  8 +-
 .../dataflow/DataflowPipelineTranslator.java    | 40 +++++++++
 .../beam/runners/dataflow/DataflowRunner.java   | 14 +++
 .../dataflow/SplittableParDoOverrides.java      | 76 +++++++++++++++++
 .../runners/dataflow/util/PropertyNames.java    |  1 +
 .../DataflowPipelineTranslatorTest.java         | 89 ++++++++++++++++++++
 .../beam/sdk/transforms/SplittableDoFnTest.java | 22 ++++-
 7 files changed, 247 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Bump Dataflow worker to 20170619

Posted by jk...@apache.org.
Bump Dataflow worker to 20170619


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f6032c9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f6032c9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f6032c9

Branch: refs/heads/master
Commit: 4f6032c9c1774a9797e3ff25cc2a05fe56453f21
Parents: 698b89e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jun 19 08:34:31 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 16:27:12 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4f6032c9/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 92c94a8..f627f12 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170530</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170619</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>