You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/02 16:17:08 UTC

[1/5] beam git commit: Unify Java and Python WindowingStrategy representations.

Repository: beam
Updated Branches:
  refs/heads/master 7a075cc34 -> 2f9428c3e


Unify Java and Python WindowingStrategy representations.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de757860
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de757860
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de757860

Branch: refs/heads/master
Commit: de757860945d5966a51173c54d29d0a733e66686
Parents: 7a075cc
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 24 17:23:31 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:42 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |  6 ++
 runners/core-construction-java/pom.xml          |  5 ++
 .../WindowingStrategyTranslation.java           | 60 ++++++++++++++------
 .../src/main/proto/beam_known_payloads.proto    | 53 +++++++++++++++++
 .../runners/dataflow/dataflow_runner.py         | 39 ++++++++++++-
 .../runners/dataflow/dataflow_runner_test.py    | 11 ++++
 sdks/python/apache_beam/transforms/window.py    | 57 ++++++++++++-------
 sdks/python/apache_beam/utils/proto_utils.py    |  6 ++
 sdks/python/apache_beam/utils/urns.py           | 10 ++--
 sdks/python/run_pylint.sh                       |  2 +
 10 files changed, 206 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3e302e7..805a8d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -945,6 +945,12 @@
       </dependency>
 
       <dependency>
+        <groupId>com.google.protobuf</groupId>
+        <artifactId>protobuf-java-util</artifactId>
+        <version>${protobuf.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.api.grpc</groupId>
         <artifactId>grpc-google-common-protos</artifactId>
         <version>${grpc-google-common-protos.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 7eaa6f3..67951e9 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -70,6 +70,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index e92565f..a226624 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.runners.core.construction;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.Durations;
+import com.google.protobuf.util.Timestamps;
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -30,6 +30,11 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApiPayloads;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -153,9 +158,13 @@ public class WindowingStrategyTranslation implements Serializable {
     }
   }
 
-  // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
+  public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1";
+  public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1";
+  public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1";
+  public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1";
+  // This URN says that the WindowFn is just a UDF blob the Java SDK understands
   // TODO: standardize such things
-  public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+  public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
 
   /**
    * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -168,7 +177,7 @@ public class WindowingStrategyTranslation implements Serializable {
         // TODO: Set environment ID
         .setSpec(
             FunctionSpec.newBuilder()
-                .setUrn(CUSTOM_WINDOWFN_URN)
+                .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
                 .setParameter(
                     Any.pack(
                         BytesValue.newBuilder()
@@ -261,18 +270,37 @@ public class WindowingStrategyTranslation implements Serializable {
 
   public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
       throws InvalidProtocolBufferException {
-    checkArgument(
-        windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
-        "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
-        WindowFn.class.getSimpleName(),
-        CUSTOM_WINDOWFN_URN,
-        windowFnSpec.getSpec().getUrn());
-
-    Object deserializedWindowFn =
-        SerializableUtils.deserializeFromByteArray(
+    switch (windowFnSpec.getSpec().getUrn()) {
+      case GLOBAL_WINDOWS_FN:
+        return new GlobalWindows();
+      case FIXED_WINDOWS_FN:
+        RunnerApiPayloads.FixedWindowsPayload fixedParams =
+            windowFnSpec.getSpec().getParameter().unpack(
+                RunnerApiPayloads.FixedWindowsPayload.class);
+        return FixedWindows.of(
+            Duration.millis(Durations.toMillis(fixedParams.getSize())))
+            .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
+      case SLIDING_WINDOWS_FN:
+        RunnerApiPayloads.SlidingWindowsPayload slidingParams =
+            windowFnSpec.getSpec().getParameter().unpack(
+                RunnerApiPayloads.SlidingWindowsPayload.class);
+        return SlidingWindows.of(
+            Duration.millis(Durations.toMillis(slidingParams.getSize())))
+            .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
+            .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
+      case SESSION_WINDOWS_FN:
+        RunnerApiPayloads.SessionsPayload sessionParams =
+            windowFnSpec.getSpec().getParameter().unpack(
+                RunnerApiPayloads.SessionsPayload.class);
+        return Sessions.withGapDuration(
+            Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
+      case SERIALIZED_JAVA_WINDOWFN_URN:
+        return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
             windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
             "WindowFn");
-
-    return (WindowFn<?, ?>) deserializedWindowFn;
+      default:
+        throw new IllegalArgumentException(
+            "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
new file mode 100644
index 0000000..446bd59
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Runner API, which is the runner-independent,
+ * SDK-independent definition of the Beam model.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdk.common.runner.v1";
+option java_outer_classname = "RunnerApiPayloads";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+// beam:windowfn:global_windows:v0.1
+// empty payload
+
+// beam:windowfn:fixed_windows:v0.1
+message FixedWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+}
+
+// beam:windowfn:sliding_windows:v0.1
+message SlidingWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+  google.protobuf.Duration period = 3;
+}
+
+// beam:windowfn:session_windows:v0.1
+message SessionsPayload {
+  google.protobuf.Duration gap_size = 1;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 046d3d5..3e0e268 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -25,6 +25,7 @@ import logging
 import threading
 import time
 import traceback
+import urllib
 
 from apache_beam import error
 from apache_beam import coders
@@ -416,7 +417,9 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
     windowing = transform_node.transform.get_windowing(
         transform_node.inputs)
-    step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
+    step.add_property(
+        PropertyNames.SERIALIZED_FN,
+        self.serialize_windowing_strategy(windowing))
 
   def run_ParDo(self, transform_node):
     transform = transform_node.transform
@@ -697,6 +700,40 @@ class DataflowRunner(PipelineRunner):
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
 
+  @classmethod
+  def serialize_windowing_strategy(cls, windowing):
+    from apache_beam.runners import pipeline_context
+    from apache_beam.runners.api import beam_runner_api_pb2
+    context = pipeline_context.PipelineContext()
+    windowing_proto = windowing.to_runner_api(context)
+    return cls.byte_array_to_json_string(
+        beam_runner_api_pb2.MessageWithComponents(
+            components=context.to_runner_api(),
+            windowing_strategy=windowing_proto).SerializeToString())
+
+  @classmethod
+  def deserialize_windowing_strategy(cls, serialized_data):
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.runners import pipeline_context
+    from apache_beam.runners.api import beam_runner_api_pb2
+    from apache_beam.transforms.core import Windowing
+    proto = beam_runner_api_pb2.MessageWithComponents()
+    proto.ParseFromString(cls.json_string_to_byte_array(serialized_data))
+    return Windowing.from_runner_api(
+        proto.windowing_strategy,
+        pipeline_context.PipelineContext(proto.components))
+
+  @staticmethod
+  def byte_array_to_json_string(raw_bytes):
+    """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
+    return urllib.quote(raw_bytes)
+
+  @staticmethod
+  def json_string_to_byte_array(encoded_string):
+    """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
+    return urllib.unquote(encoded_string)
+
 
 class DataflowPipelineResult(PipelineResult):
   """Represents the state of a pipeline run on the Dataflow service."""

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ff4b51d..74fd01d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -38,6 +38,8 @@ from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_a
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.transforms.core import _GroupByKeyOnly
+from apache_beam.transforms.core import Windowing
+from apache_beam.transforms import window
 from apache_beam.typehints import typehints
 
 # Protect against environments where apitools library is not available.
@@ -240,6 +242,15 @@ class DataflowRunnerTest(unittest.TestCase):
     for _ in range(num_inputs):
       self.assertEqual(inputs[0].element_type, output_type)
 
+  def test_serialize_windowing_strategy(self):
+    # This just tests the basic path; more complete tests
+    # are in window_test.py.
+    strategy = Windowing(window.FixedWindows(10))
+    self.assertEqual(
+        strategy,
+        DataflowRunner.deserialize_windowing_strategy(
+            DataflowRunner.serialize_windowing_strategy(strategy)))
+
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 94187e0..f74c8a9 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -51,10 +51,12 @@ from __future__ import absolute_import
 
 import abc
 
-from google.protobuf import struct_pb2
+from google.protobuf import duration_pb2
+from google.protobuf import timestamp_pb2
 
 from apache_beam.coders import coders
 from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.runners.api import beam_known_payloads_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
@@ -341,14 +343,18 @@ class FixedWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.FIXED_WINDOWS_FN,
-            proto_utils.pack_Struct(size=self.size.micros,
-                                    offset=self.offset.micros))
-
-  @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct)
+            beam_known_payloads_pb2.FixedWindowsPayload(
+                size=proto_utils.from_micros(
+                    duration_pb2.Duration, self.size.micros),
+                offset=proto_utils.from_micros(
+                    timestamp_pb2.Timestamp, self.offset.micros)))
+
+  @urns.RunnerApiFn.register_urn(
+      urns.FIXED_WINDOWS_FN, beam_known_payloads_pb2.FixedWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return FixedWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']))
+        size=Duration(micros=fn_parameter.size.ToMicroseconds()),
+        offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()))
 
 
 class SlidingWindows(NonMergingWindowFn):
@@ -392,17 +398,22 @@ class SlidingWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.SLIDING_WINDOWS_FN,
-            proto_utils.pack_Struct(
-                size=self.size.micros,
-                offset=self.offset.micros,
-                period=self.period.micros))
-
-  @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct)
+            beam_known_payloads_pb2.SlidingWindowsPayload(
+                size=proto_utils.from_micros(
+                    duration_pb2.Duration, self.size.micros),
+                offset=proto_utils.from_micros(
+                    timestamp_pb2.Timestamp, self.offset.micros),
+                period=proto_utils.from_micros(
+                    duration_pb2.Duration, self.period.micros)))
+
+  @urns.RunnerApiFn.register_urn(
+      urns.SLIDING_WINDOWS_FN,
+      beam_known_payloads_pb2.SlidingWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return SlidingWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']),
-        period=Duration(micros=fn_parameter['period']))
+        size=Duration(micros=fn_parameter.size.ToMicroseconds()),
+        offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()),
+        period=Duration(micros=fn_parameter.period.ToMicroseconds()))
 
 
 class Sessions(WindowFn):
@@ -452,10 +463,14 @@ class Sessions(WindowFn):
     if type(self) == type(other) == Sessions:
       return self.gap_size == other.gap_size
 
-  @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct)
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
-
   def to_runner_api_parameter(self, context):
     return (urns.SESSION_WINDOWS_FN,
-            proto_utils.pack_Struct(gap_size=self.gap_size.micros))
+            beam_known_payloads_pb2.SessionsPayload(
+                gap_size=proto_utils.from_micros(
+                    duration_pb2.Duration, self.gap_size.micros)))
+
+  @urns.RunnerApiFn.register_urn(
+      urns.SESSION_WINDOWS_FN, beam_known_payloads_pb2.SessionsPayload)
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return Sessions(
+        gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds()))

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index 090a821..af8f218 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -53,3 +53,9 @@ def pack_Struct(**kwargs):
   for key, value in kwargs.items():
     msg[key] = value  # pylint: disable=unsubscriptable-object, unsupported-assignment-operation
   return msg
+
+
+def from_micros(cls, micros):
+  result = cls()
+  result.FromMicroseconds(micros)
+  return result

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 379b5ff..849b8e3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -26,11 +26,11 @@ from apache_beam.internal import pickler
 from apache_beam.utils import proto_utils
 
 
-PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
-GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
-FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
-SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
-SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
+PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1"
+GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1"
+FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
+SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
+SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
 
 PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 

http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 400c577..4ef3e7f 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -50,6 +50,8 @@ EXCLUDED_GENERATED_FILES=(
 "apache_beam/runners/api/beam_fn_api_pb2_grpc.py"
 "apache_beam/runners/api/beam_runner_api_pb2.py"
 "apache_beam/runners/api/beam_runner_api_pb2_grpc.py"
+"apache_beam/runners/api/beam_known_payloads_pb2.py"
+"apache_beam/runners/api/beam_known_payloads_pb2_grpc.py"
 )
 
 FILES_TO_IGNORE=""


[2/5] beam git commit: Ignore all proto generated files.

Posted by ro...@apache.org.
Ignore all proto generated files.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c20bfa9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c20bfa9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c20bfa9

Branch: refs/heads/master
Commit: 4c20bfa9929664031f3fce67a723d09fc629ad3a
Parents: b490e84
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 31 15:53:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700

----------------------------------------------------------------------
 sdks/python/run_pylint.sh | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4c20bfa9/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 4ef3e7f..7808136 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -46,12 +46,7 @@ EXCLUDED_GENERATED_FILES=(
 "apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py"
 "apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py"
 "apache_beam/coders/proto2_coder_test_messages_pb2.py"
-"apache_beam/runners/api/beam_fn_api_pb2.py"
-"apache_beam/runners/api/beam_fn_api_pb2_grpc.py"
-"apache_beam/runners/api/beam_runner_api_pb2.py"
-"apache_beam/runners/api/beam_runner_api_pb2_grpc.py"
-"apache_beam/runners/api/beam_known_payloads_pb2.py"
-"apache_beam/runners/api/beam_known_payloads_pb2_grpc.py"
+apache_beam/runners/api/*pb2*.py
 )
 
 FILES_TO_IGNORE=""


[5/5] beam git commit: Add known window serialization for Java.

Posted by ro...@apache.org.
Add known window serialization for Java.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b490e84e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b490e84e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b490e84e

Branch: refs/heads/master
Commit: b490e84ef0b4e56cabc091cfe2cc42f8f1e69caa
Parents: de75786
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 25 13:26:02 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700

----------------------------------------------------------------------
 .../WindowingStrategyTranslation.java           | 105 +++++++++++++++----
 .../WindowingStrategyTranslationTest.java       |   9 ++
 .../src/main/proto/beam_known_payloads.proto    |  53 ----------
 .../src/main/proto/standard_window_fns.proto    |  53 ++++++++++
 sdks/python/apache_beam/transforms/window.py    |  14 +--
 5 files changed, 154 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index a226624..718efe7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApiPayloads;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -165,6 +165,9 @@ public class WindowingStrategyTranslation implements Serializable {
   // This URN says that the WindowFn is just a UDF blob the Java SDK understands
   // TODO: standardize such things
   public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
+  public static final String OLD_SERIALIZED_JAVA_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+  // Remove this once the dataflow worker understands all the above formats.
+  private static final boolean USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN = true;
 
   /**
    * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -173,19 +176,80 @@ public class WindowingStrategyTranslation implements Serializable {
    */
   public static SdkFunctionSpec toProto(
       WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
-    return SdkFunctionSpec.newBuilder()
-        // TODO: Set environment ID
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(windowFn)))
-                            .build())))
-        .build();
+    // TODO: Set environment IDs
+    if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
+                  .setParameter(
+                      Any.pack(
+                          BytesValue.newBuilder()
+                              .setValue(
+                                  ByteString.copyFrom(
+                                      SerializableUtils.serializeToByteArray(windowFn)))
+                              .build())))
+          .build();
+    } else if (windowFn instanceof GlobalWindows) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN))
+          .build();
+    } else if (windowFn instanceof FixedWindows) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(FIXED_WINDOWS_FN)
+                  .setParameter(
+                      Any.pack(
+                          StandardWindowFns.FixedWindowsPayload.newBuilder()
+                              .setSize(Durations.fromMillis(
+                                  ((FixedWindows) windowFn).getSize().getMillis()))
+                              .setOffset(Timestamps.fromMillis(
+                                  ((FixedWindows) windowFn).getOffset().getMillis()))
+                              .build())))
+          .build();
+    } else if (windowFn instanceof SlidingWindows) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(SLIDING_WINDOWS_FN)
+                  .setParameter(
+                      Any.pack(
+                          StandardWindowFns.SlidingWindowsPayload.newBuilder()
+                              .setSize(Durations.fromMillis(
+                                  ((SlidingWindows) windowFn).getSize().getMillis()))
+                              .setOffset(Timestamps.fromMillis(
+                                  ((SlidingWindows) windowFn).getOffset().getMillis()))
+                              .setPeriod(Durations.fromMillis(
+                                  ((SlidingWindows) windowFn).getPeriod().getMillis()))
+                              .build())))
+          .build();
+    } else if (windowFn instanceof Sessions) {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(SESSION_WINDOWS_FN)
+                  .setParameter(
+                      Any.pack(
+                          StandardWindowFns.SessionsPayload.newBuilder()
+                              .setGapSize(Durations.fromMillis(
+                                  ((Sessions) windowFn).getGapDuration().getMillis()))
+                              .build())))
+          .build();
+    } else {
+      return SdkFunctionSpec.newBuilder()
+          .setSpec(
+              FunctionSpec.newBuilder()
+                  .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
+                  .setParameter(
+                      Any.pack(
+                          BytesValue.newBuilder()
+                              .setValue(
+                                  ByteString.copyFrom(
+                                      SerializableUtils.serializeToByteArray(windowFn)))
+                              .build())))
+          .build();
+    }
   }
 
   /**
@@ -274,27 +338,28 @@ public class WindowingStrategyTranslation implements Serializable {
       case GLOBAL_WINDOWS_FN:
         return new GlobalWindows();
       case FIXED_WINDOWS_FN:
-        RunnerApiPayloads.FixedWindowsPayload fixedParams =
+        StandardWindowFns.FixedWindowsPayload fixedParams =
             windowFnSpec.getSpec().getParameter().unpack(
-                RunnerApiPayloads.FixedWindowsPayload.class);
+                StandardWindowFns.FixedWindowsPayload.class);
         return FixedWindows.of(
             Duration.millis(Durations.toMillis(fixedParams.getSize())))
             .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
       case SLIDING_WINDOWS_FN:
-        RunnerApiPayloads.SlidingWindowsPayload slidingParams =
+        StandardWindowFns.SlidingWindowsPayload slidingParams =
             windowFnSpec.getSpec().getParameter().unpack(
-                RunnerApiPayloads.SlidingWindowsPayload.class);
+                StandardWindowFns.SlidingWindowsPayload.class);
         return SlidingWindows.of(
             Duration.millis(Durations.toMillis(slidingParams.getSize())))
             .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
             .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
       case SESSION_WINDOWS_FN:
-        RunnerApiPayloads.SessionsPayload sessionParams =
+        StandardWindowFns.SessionsPayload sessionParams =
             windowFnSpec.getSpec().getParameter().unpack(
-                RunnerApiPayloads.SessionsPayload.class);
+                StandardWindowFns.SessionsPayload.class);
         return Sessions.withGapDuration(
             Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
       case SERIALIZED_JAVA_WINDOWFN_URN:
+      case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
         return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
             windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
             "WindowFn");

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index 1e52803..e406545 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -62,6 +64,13 @@ public class WindowingStrategyTranslationTest {
   public static Iterable<ToProtoAndBackSpec> data() {
     return ImmutableList.of(
         toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+        toProtoAndBackSpec(WindowingStrategy.of(
+            FixedWindows.of(Duration.millis(11)).withOffset(Duration.millis(3)))),
+        toProtoAndBackSpec(WindowingStrategy.of(
+            SlidingWindows.of(Duration.millis(37)).every(Duration.millis(3))
+                .withOffset(Duration.millis(2)))),
+        toProtoAndBackSpec(WindowingStrategy.of(
+            Sessions.withGapDuration(Duration.millis(389)))),
         toProtoAndBackSpec(
             WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
                 .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
deleted file mode 100644
index 446bd59..0000000
--- a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Runner API, which is the runner-independent,
- * SDK-independent definition of the Beam model.
- */
-
-syntax = "proto3";
-
-package org.apache.beam.runner_api.v1;
-
-option java_package = "org.apache.beam.sdk.common.runner.v1";
-option java_outer_classname = "RunnerApiPayloads";
-
-import "google/protobuf/duration.proto";
-import "google/protobuf/timestamp.proto";
-
-// beam:windowfn:global_windows:v0.1
-// empty payload
-
-// beam:windowfn:fixed_windows:v0.1
-message FixedWindowsPayload {
-  google.protobuf.Duration size = 1;
-  google.protobuf.Timestamp offset = 2;
-}
-
-// beam:windowfn:sliding_windows:v0.1
-message SlidingWindowsPayload {
-  google.protobuf.Duration size = 1;
-  google.protobuf.Timestamp offset = 2;
-  google.protobuf.Duration period = 3;
-}
-
-// beam:windowfn:session_windows:v0.1
-message SessionsPayload {
-  google.protobuf.Duration gap_size = 1;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/standard_window_fns.proto b/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
new file mode 100644
index 0000000..0682044
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Runner API, which is the runner-independent,
+ * SDK-independent definition of the Beam model.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdk.common.runner.v1";
+option java_outer_classname = "StandardWindowFns";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+// beam:windowfn:global_windows:v0.1
+// empty payload
+
+// beam:windowfn:fixed_windows:v0.1
+message FixedWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+}
+
+// beam:windowfn:sliding_windows:v0.1
+message SlidingWindowsPayload {
+  google.protobuf.Duration size = 1;
+  google.protobuf.Timestamp offset = 2;
+  google.protobuf.Duration period = 3;
+}
+
+// beam:windowfn:session_windows:v0.1
+message SessionsPayload {
+  google.protobuf.Duration gap_size = 1;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index f74c8a9..e87a007 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -56,7 +56,7 @@ from google.protobuf import timestamp_pb2
 
 from apache_beam.coders import coders
 from apache_beam.runners.api import beam_runner_api_pb2
-from apache_beam.runners.api import beam_known_payloads_pb2
+from apache_beam.runners.api import standard_window_fns_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
@@ -343,14 +343,14 @@ class FixedWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.FIXED_WINDOWS_FN,
-            beam_known_payloads_pb2.FixedWindowsPayload(
+            standard_window_fns_pb2.FixedWindowsPayload(
                 size=proto_utils.from_micros(
                     duration_pb2.Duration, self.size.micros),
                 offset=proto_utils.from_micros(
                     timestamp_pb2.Timestamp, self.offset.micros)))
 
   @urns.RunnerApiFn.register_urn(
-      urns.FIXED_WINDOWS_FN, beam_known_payloads_pb2.FixedWindowsPayload)
+      urns.FIXED_WINDOWS_FN, standard_window_fns_pb2.FixedWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return FixedWindows(
         size=Duration(micros=fn_parameter.size.ToMicroseconds()),
@@ -398,7 +398,7 @@ class SlidingWindows(NonMergingWindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.SLIDING_WINDOWS_FN,
-            beam_known_payloads_pb2.SlidingWindowsPayload(
+            standard_window_fns_pb2.SlidingWindowsPayload(
                 size=proto_utils.from_micros(
                     duration_pb2.Duration, self.size.micros),
                 offset=proto_utils.from_micros(
@@ -408,7 +408,7 @@ class SlidingWindows(NonMergingWindowFn):
 
   @urns.RunnerApiFn.register_urn(
       urns.SLIDING_WINDOWS_FN,
-      beam_known_payloads_pb2.SlidingWindowsPayload)
+      standard_window_fns_pb2.SlidingWindowsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return SlidingWindows(
         size=Duration(micros=fn_parameter.size.ToMicroseconds()),
@@ -465,12 +465,12 @@ class Sessions(WindowFn):
 
   def to_runner_api_parameter(self, context):
     return (urns.SESSION_WINDOWS_FN,
-            beam_known_payloads_pb2.SessionsPayload(
+            standard_window_fns_pb2.SessionsPayload(
                 gap_size=proto_utils.from_micros(
                     duration_pb2.Duration, self.gap_size.micros)))
 
   @urns.RunnerApiFn.register_urn(
-      urns.SESSION_WINDOWS_FN, beam_known_payloads_pb2.SessionsPayload)
+      urns.SESSION_WINDOWS_FN, standard_window_fns_pb2.SessionsPayload)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return Sessions(
         gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds()))


[3/5] beam git commit: Increase dataflow worker version.

Posted by ro...@apache.org.
Increase dataflow worker version.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/99907b94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/99907b94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/99907b94

Branch: refs/heads/master
Commit: 99907b94c074d4dc2ac3911b690da6790056412a
Parents: 4c20bfa
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 1 16:01:56 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow/internal/dependency.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/99907b94/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index e69c8d7..3a0ff46 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -73,7 +73,7 @@ from apache_beam.options.pipeline_options import SetupOptions
 # Update this version to the next version whenever there is a change that will
 # require changes to the execution environment.
 # This should be in the beam-[version]-[date] format, date is optional.
-BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170518'
+BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170601'
 
 # Standard file names used for staging files.
 WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'


[4/5] beam git commit: Closes #3222

Posted by ro...@apache.org.
Closes #3222


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f9428c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f9428c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f9428c3

Branch: refs/heads/master
Commit: 2f9428c3e5e648d147fa68afdd13db7f6bfbcb0b
Parents: 7a075cc 99907b9
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jun 2 09:16:43 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 runners/core-construction-java/pom.xml          |   5 +
 .../WindowingStrategyTranslation.java           | 149 +++++++++++++++----
 .../WindowingStrategyTranslationTest.java       |   9 ++
 .../src/main/proto/standard_window_fns.proto    |  53 +++++++
 .../runners/dataflow/dataflow_runner.py         |  39 ++++-
 .../runners/dataflow/dataflow_runner_test.py    |  11 ++
 .../runners/dataflow/internal/dependency.py     |   2 +-
 sdks/python/apache_beam/transforms/window.py    |  57 ++++---
 sdks/python/apache_beam/utils/proto_utils.py    |   6 +
 sdks/python/apache_beam/utils/urns.py           |  10 +-
 sdks/python/run_pylint.sh                       |   5 +-
 12 files changed, 292 insertions(+), 60 deletions(-)
----------------------------------------------------------------------