You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/11 22:11:29 UTC
[1/2] beam git commit: DataflowRunner: send windowing strategy using
Runner API proto
Repository: beam
Updated Branches:
refs/heads/master bef2d3738 -> 17a41ab10
DataflowRunner: send windowing strategy using Runner API proto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69f412dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69f412dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69f412dc
Branch: refs/heads/master
Commit: 69f412dc34f36df40b034c2160b8b0cdad815011
Parents: bef2d37
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 11 13:32:11 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 15:04:38 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 12 +++++++++++-
.../runners/dataflow/DataflowPipelineTranslator.java | 14 ++++++++++++--
2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/69f412dc/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index d0d86e6..a57744c 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <dataflow.container_version>beam-master-20170410</dataflow.container_version>
+ <dataflow.container_version>beam-master-20170411</dataflow.container_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>
@@ -114,6 +114,7 @@
<includes>
<include>com.google.cloud.bigtable:bigtable-client-core</include>
<include>com.google.guava:guava</include>
+ <include>org.apache.beam:beam-runners-core-construction-java</include>
</includes>
</artifactSet>
<filters>
@@ -153,6 +154,10 @@
<exclude>com.google.cloud.bigtable.grpc.BigtableTableName</exclude>
</excludes>
</relocation>
+ <relocation>
+ <pattern>org.apache.beam.runners.core</pattern>
+ <shadedPattern>org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core</shadedPattern>
+ </relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
@@ -178,6 +183,11 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-runner-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/69f412dc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 34da996..abeca4d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.WindowingStrategies;
import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
@@ -111,6 +112,15 @@ public class DataflowPipelineTranslator {
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
+ try {
+ return WindowingStrategies.toProto(windowingStrategy).toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Unable to format windowing strategy %s as bytes", windowingStrategy), e);
+ }
+ }
+
/**
* A map from {@link PTransform} subclass to the corresponding
* {@link TransformTranslator} to use to translate that transform.
@@ -813,7 +823,7 @@ public class DataflowPipelineTranslator {
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
- byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
+ byteArrayToJsonString(serializeWindowingStrategy(windowingStrategy)));
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
@@ -891,7 +901,7 @@ public class DataflowPipelineTranslator {
stepContext.addOutput(context.getOutput(transform));
WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
- byte[] serializedBytes = serializeToByteArray(strategy);
+ byte[] serializedBytes = serializeWindowingStrategy(strategy);
String serializedJson = byteArrayToJsonString(serializedBytes);
stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
}
[2/2] beam git commit: This closes #2495
Posted by dh...@apache.org.
This closes #2495
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17a41ab1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17a41ab1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17a41ab1
Branch: refs/heads/master
Commit: 17a41ab10cd841e495b8f9f634119f5e10be4ca9
Parents: bef2d37 69f412d
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 11 15:06:29 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 15:06:29 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 12 +++++++++++-
.../runners/dataflow/DataflowPipelineTranslator.java | 14 ++++++++++++--
2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------