You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:49 UTC

[18/50] [abbrv] beam git commit: DataflowRunner: send windowing strategy using Runner API proto

DataflowRunner: send windowing strategy using Runner API proto


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69f412dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69f412dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69f412dc

Branch: refs/heads/DSL_SQL
Commit: 69f412dc34f36df40b034c2160b8b0cdad815011
Parents: bef2d37
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 11 13:32:11 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 11 15:04:38 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml            | 12 +++++++++++-
 .../runners/dataflow/DataflowPipelineTranslator.java  | 14 ++++++++++++--
 2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/69f412dc/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index d0d86e6..a57744c 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170410</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170411</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>
@@ -114,6 +114,7 @@
                 <includes>
                   <include>com.google.cloud.bigtable:bigtable-client-core</include>
                   <include>com.google.guava:guava</include>
+                  <include>org.apache.beam:beam-runners-core-construction-java</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -153,6 +154,10 @@
                     <exclude>com.google.cloud.bigtable.grpc.BigtableTableName</exclude>
                   </excludes>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.beam.runners.core</pattern>
+                  <shadedPattern>org.apache.beam.runners.dataflow.repackaged.org.apache.beam.runners.core</shadedPattern>
+                </relocation>
               </relocations>
               <transformers>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
@@ -178,6 +183,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-runner-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-construction-java</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/69f412dc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 34da996..abeca4d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -55,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.WindowingStrategies;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
 import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
@@ -111,6 +112,15 @@ public class DataflowPipelineTranslator {
   private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
+  private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
+    try {
+      return WindowingStrategies.toProto(windowingStrategy).toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Unable to format windowing strategy %s as bytes", windowingStrategy), e);
+    }
+  }
+
   /**
    * A map from {@link PTransform} subclass to the corresponding
    * {@link TransformTranslator} to use to translate that transform.
@@ -813,7 +823,7 @@ public class DataflowPipelineTranslator {
             stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
             stepContext.addInput(
                 PropertyNames.SERIALIZED_FN,
-                byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
+                byteArrayToJsonString(serializeWindowingStrategy(windowingStrategy)));
             stepContext.addInput(
                 PropertyNames.IS_MERGING_WINDOW_FN,
                 !windowingStrategy.getWindowFn().isNonMerging());
@@ -891,7 +901,7 @@ public class DataflowPipelineTranslator {
             stepContext.addOutput(context.getOutput(transform));
 
             WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
-            byte[] serializedBytes = serializeToByteArray(strategy);
+            byte[] serializedBytes = serializeWindowingStrategy(strategy);
             String serializedJson = byteArrayToJsonString(serializedBytes);
             stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
           }