You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/02 16:17:08 UTC
[1/5] beam git commit: Unify Java and Python WindowingStrategy
representations.
Repository: beam
Updated Branches:
refs/heads/master 7a075cc34 -> 2f9428c3e
Unify Java and Python WindowingStrategy representations.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de757860
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de757860
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de757860
Branch: refs/heads/master
Commit: de757860945d5966a51173c54d29d0a733e66686
Parents: 7a075cc
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 24 17:23:31 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:42 2017 -0700
----------------------------------------------------------------------
pom.xml | 6 ++
runners/core-construction-java/pom.xml | 5 ++
.../WindowingStrategyTranslation.java | 60 ++++++++++++++------
.../src/main/proto/beam_known_payloads.proto | 53 +++++++++++++++++
.../runners/dataflow/dataflow_runner.py | 39 ++++++++++++-
.../runners/dataflow/dataflow_runner_test.py | 11 ++++
sdks/python/apache_beam/transforms/window.py | 57 ++++++++++++-------
sdks/python/apache_beam/utils/proto_utils.py | 6 ++
sdks/python/apache_beam/utils/urns.py | 10 ++--
sdks/python/run_pylint.sh | 2 +
10 files changed, 206 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3e302e7..805a8d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -945,6 +945,12 @@
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-common-protos</artifactId>
<version>${grpc-google-common-protos.version}</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 7eaa6f3..67951e9 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -70,6 +70,11 @@
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index e92565f..a226624 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -17,12 +17,12 @@
*/
package org.apache.beam.runners.core.construction;
-import static com.google.common.base.Preconditions.checkArgument;
-
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.Durations;
+import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -30,6 +30,11 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApiPayloads;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -153,9 +158,13 @@ public class WindowingStrategyTranslation implements Serializable {
}
}
- // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
+ public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1";
+ public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1";
+ public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1";
+ public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1";
+ // This URN says that the WindowFn is just a UDF blob the Java SDK understands
// TODO: standardize such things
- public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+ public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
/**
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -168,7 +177,7 @@ public class WindowingStrategyTranslation implements Serializable {
// TODO: Set environment ID
.setSpec(
FunctionSpec.newBuilder()
- .setUrn(CUSTOM_WINDOWFN_URN)
+ .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
@@ -261,18 +270,37 @@ public class WindowingStrategyTranslation implements Serializable {
public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
throws InvalidProtocolBufferException {
- checkArgument(
- windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
- "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
- WindowFn.class.getSimpleName(),
- CUSTOM_WINDOWFN_URN,
- windowFnSpec.getSpec().getUrn());
-
- Object deserializedWindowFn =
- SerializableUtils.deserializeFromByteArray(
+ switch (windowFnSpec.getSpec().getUrn()) {
+ case GLOBAL_WINDOWS_FN:
+ return new GlobalWindows();
+ case FIXED_WINDOWS_FN:
+ RunnerApiPayloads.FixedWindowsPayload fixedParams =
+ windowFnSpec.getSpec().getParameter().unpack(
+ RunnerApiPayloads.FixedWindowsPayload.class);
+ return FixedWindows.of(
+ Duration.millis(Durations.toMillis(fixedParams.getSize())))
+ .withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
+ case SLIDING_WINDOWS_FN:
+ RunnerApiPayloads.SlidingWindowsPayload slidingParams =
+ windowFnSpec.getSpec().getParameter().unpack(
+ RunnerApiPayloads.SlidingWindowsPayload.class);
+ return SlidingWindows.of(
+ Duration.millis(Durations.toMillis(slidingParams.getSize())))
+ .every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
+ .withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
+ case SESSION_WINDOWS_FN:
+ RunnerApiPayloads.SessionsPayload sessionParams =
+ windowFnSpec.getSpec().getParameter().unpack(
+ RunnerApiPayloads.SessionsPayload.class);
+ return Sessions.withGapDuration(
+ Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
+ case SERIALIZED_JAVA_WINDOWFN_URN:
+ return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
"WindowFn");
-
- return (WindowFn<?, ?>) deserializedWindowFn;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
new file mode 100644
index 0000000..446bd59
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Runner API, which is the runner-independent,
+ * SDK-independent definition of the Beam model.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdk.common.runner.v1";
+option java_outer_classname = "RunnerApiPayloads";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+// beam:windowfn:global_windows:v0.1
+// empty payload
+
+// beam:windowfn:fixed_windows:v0.1
+message FixedWindowsPayload {
+ google.protobuf.Duration size = 1;
+ google.protobuf.Timestamp offset = 2;
+}
+
+// beam:windowfn:sliding_windows:v0.1
+message SlidingWindowsPayload {
+ google.protobuf.Duration size = 1;
+ google.protobuf.Timestamp offset = 2;
+ google.protobuf.Duration period = 3;
+}
+
+// beam:windowfn:session_windows:v0.1
+message SessionsPayload {
+ google.protobuf.Duration gap_size = 1;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 046d3d5..3e0e268 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -25,6 +25,7 @@ import logging
import threading
import time
import traceback
+import urllib
from apache_beam import error
from apache_beam import coders
@@ -416,7 +417,9 @@ class DataflowRunner(PipelineRunner):
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
windowing = transform_node.transform.get_windowing(
transform_node.inputs)
- step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
+ step.add_property(
+ PropertyNames.SERIALIZED_FN,
+ self.serialize_windowing_strategy(windowing))
def run_ParDo(self, transform_node):
transform = transform_node.transform
@@ -697,6 +700,40 @@ class DataflowRunner(PipelineRunner):
PropertyNames.STEP_NAME: input_step.proto.name,
PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+ @classmethod
+ def serialize_windowing_strategy(cls, windowing):
+ from apache_beam.runners import pipeline_context
+ from apache_beam.runners.api import beam_runner_api_pb2
+ context = pipeline_context.PipelineContext()
+ windowing_proto = windowing.to_runner_api(context)
+ return cls.byte_array_to_json_string(
+ beam_runner_api_pb2.MessageWithComponents(
+ components=context.to_runner_api(),
+ windowing_strategy=windowing_proto).SerializeToString())
+
+ @classmethod
+ def deserialize_windowing_strategy(cls, serialized_data):
+ # Imported here to avoid circular dependencies.
+ # pylint: disable=wrong-import-order, wrong-import-position
+ from apache_beam.runners import pipeline_context
+ from apache_beam.runners.api import beam_runner_api_pb2
+ from apache_beam.transforms.core import Windowing
+ proto = beam_runner_api_pb2.MessageWithComponents()
+ proto.ParseFromString(cls.json_string_to_byte_array(serialized_data))
+ return Windowing.from_runner_api(
+ proto.windowing_strategy,
+ pipeline_context.PipelineContext(proto.components))
+
+ @staticmethod
+ def byte_array_to_json_string(raw_bytes):
+ """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
+ return urllib.quote(raw_bytes)
+
+ @staticmethod
+ def json_string_to_byte_array(encoded_string):
+ """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
+ return urllib.unquote(encoded_string)
+
class DataflowPipelineResult(PipelineResult):
"""Represents the state of a pipeline run on the Dataflow service."""
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ff4b51d..74fd01d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -38,6 +38,8 @@ from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_a
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.core import _GroupByKeyOnly
+from apache_beam.transforms.core import Windowing
+from apache_beam.transforms import window
from apache_beam.typehints import typehints
# Protect against environments where apitools library is not available.
@@ -240,6 +242,15 @@ class DataflowRunnerTest(unittest.TestCase):
for _ in range(num_inputs):
self.assertEqual(inputs[0].element_type, output_type)
+ def test_serialize_windowing_strategy(self):
+ # This just tests the basic path; more complete tests
+ # are in window_test.py.
+ strategy = Windowing(window.FixedWindows(10))
+ self.assertEqual(
+ strategy,
+ DataflowRunner.deserialize_windowing_strategy(
+ DataflowRunner.serialize_windowing_strategy(strategy)))
+
if __name__ == '__main__':
unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index 94187e0..f74c8a9 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -51,10 +51,12 @@ from __future__ import absolute_import
import abc
-from google.protobuf import struct_pb2
+from google.protobuf import duration_pb2
+from google.protobuf import timestamp_pb2
from apache_beam.coders import coders
from apache_beam.runners.api import beam_runner_api_pb2
+from apache_beam.runners.api import beam_known_payloads_pb2
from apache_beam.transforms import timeutil
from apache_beam.utils import proto_utils
from apache_beam.utils import urns
@@ -341,14 +343,18 @@ class FixedWindows(NonMergingWindowFn):
def to_runner_api_parameter(self, context):
return (urns.FIXED_WINDOWS_FN,
- proto_utils.pack_Struct(size=self.size.micros,
- offset=self.offset.micros))
-
- @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct)
+ beam_known_payloads_pb2.FixedWindowsPayload(
+ size=proto_utils.from_micros(
+ duration_pb2.Duration, self.size.micros),
+ offset=proto_utils.from_micros(
+ timestamp_pb2.Timestamp, self.offset.micros)))
+
+ @urns.RunnerApiFn.register_urn(
+ urns.FIXED_WINDOWS_FN, beam_known_payloads_pb2.FixedWindowsPayload)
def from_runner_api_parameter(fn_parameter, unused_context):
return FixedWindows(
- size=Duration(micros=fn_parameter['size']),
- offset=Timestamp(micros=fn_parameter['offset']))
+ size=Duration(micros=fn_parameter.size.ToMicroseconds()),
+ offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()))
class SlidingWindows(NonMergingWindowFn):
@@ -392,17 +398,22 @@ class SlidingWindows(NonMergingWindowFn):
def to_runner_api_parameter(self, context):
return (urns.SLIDING_WINDOWS_FN,
- proto_utils.pack_Struct(
- size=self.size.micros,
- offset=self.offset.micros,
- period=self.period.micros))
-
- @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct)
+ beam_known_payloads_pb2.SlidingWindowsPayload(
+ size=proto_utils.from_micros(
+ duration_pb2.Duration, self.size.micros),
+ offset=proto_utils.from_micros(
+ timestamp_pb2.Timestamp, self.offset.micros),
+ period=proto_utils.from_micros(
+ duration_pb2.Duration, self.period.micros)))
+
+ @urns.RunnerApiFn.register_urn(
+ urns.SLIDING_WINDOWS_FN,
+ beam_known_payloads_pb2.SlidingWindowsPayload)
def from_runner_api_parameter(fn_parameter, unused_context):
return SlidingWindows(
- size=Duration(micros=fn_parameter['size']),
- offset=Timestamp(micros=fn_parameter['offset']),
- period=Duration(micros=fn_parameter['period']))
+ size=Duration(micros=fn_parameter.size.ToMicroseconds()),
+ offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()),
+ period=Duration(micros=fn_parameter.period.ToMicroseconds()))
class Sessions(WindowFn):
@@ -452,10 +463,14 @@ class Sessions(WindowFn):
if type(self) == type(other) == Sessions:
return self.gap_size == other.gap_size
- @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct)
- def from_runner_api_parameter(fn_parameter, unused_context):
- return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
-
def to_runner_api_parameter(self, context):
return (urns.SESSION_WINDOWS_FN,
- proto_utils.pack_Struct(gap_size=self.gap_size.micros))
+ beam_known_payloads_pb2.SessionsPayload(
+ gap_size=proto_utils.from_micros(
+ duration_pb2.Duration, self.gap_size.micros)))
+
+ @urns.RunnerApiFn.register_urn(
+ urns.SESSION_WINDOWS_FN, beam_known_payloads_pb2.SessionsPayload)
+ def from_runner_api_parameter(fn_parameter, unused_context):
+ return Sessions(
+ gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds()))
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/proto_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py
index 090a821..af8f218 100644
--- a/sdks/python/apache_beam/utils/proto_utils.py
+++ b/sdks/python/apache_beam/utils/proto_utils.py
@@ -53,3 +53,9 @@ def pack_Struct(**kwargs):
for key, value in kwargs.items():
msg[key] = value # pylint: disable=unsubscriptable-object, unsupported-assignment-operation
return msg
+
+
+def from_micros(cls, micros):
+ result = cls()
+ result.FromMicroseconds(micros)
+ return result
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 379b5ff..849b8e3 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -26,11 +26,11 @@ from apache_beam.internal import pickler
from apache_beam.utils import proto_utils
-PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
-GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
-FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
-SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1"
-SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1"
+PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1"
+GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1"
+FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
+SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
+SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
PICKLED_CODER = "beam:coder:pickled_python:v0.1"
http://git-wip-us.apache.org/repos/asf/beam/blob/de757860/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 400c577..4ef3e7f 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -50,6 +50,8 @@ EXCLUDED_GENERATED_FILES=(
"apache_beam/runners/api/beam_fn_api_pb2_grpc.py"
"apache_beam/runners/api/beam_runner_api_pb2.py"
"apache_beam/runners/api/beam_runner_api_pb2_grpc.py"
+"apache_beam/runners/api/beam_known_payloads_pb2.py"
+"apache_beam/runners/api/beam_known_payloads_pb2_grpc.py"
)
FILES_TO_IGNORE=""
[2/5] beam git commit: Ignore all proto generated files.
Posted by ro...@apache.org.
Ignore all proto generated files.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c20bfa9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c20bfa9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c20bfa9
Branch: refs/heads/master
Commit: 4c20bfa9929664031f3fce67a723d09fc629ad3a
Parents: b490e84
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed May 31 15:53:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700
----------------------------------------------------------------------
sdks/python/run_pylint.sh | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4c20bfa9/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 4ef3e7f..7808136 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -46,12 +46,7 @@ EXCLUDED_GENERATED_FILES=(
"apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py"
"apache_beam/io/gcp/internal/clients/storage/storage_v1_messages.py"
"apache_beam/coders/proto2_coder_test_messages_pb2.py"
-"apache_beam/runners/api/beam_fn_api_pb2.py"
-"apache_beam/runners/api/beam_fn_api_pb2_grpc.py"
-"apache_beam/runners/api/beam_runner_api_pb2.py"
-"apache_beam/runners/api/beam_runner_api_pb2_grpc.py"
-"apache_beam/runners/api/beam_known_payloads_pb2.py"
-"apache_beam/runners/api/beam_known_payloads_pb2_grpc.py"
+apache_beam/runners/api/*pb2*.py
)
FILES_TO_IGNORE=""
[5/5] beam git commit: Add known window serialization for Java.
Posted by ro...@apache.org.
Add known window serialization for Java.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b490e84e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b490e84e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b490e84e
Branch: refs/heads/master
Commit: b490e84ef0b4e56cabc091cfe2cc42f8f1e69caa
Parents: de75786
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu May 25 13:26:02 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700
----------------------------------------------------------------------
.../WindowingStrategyTranslation.java | 105 +++++++++++++++----
.../WindowingStrategyTranslationTest.java | 9 ++
.../src/main/proto/beam_known_payloads.proto | 53 ----------
.../src/main/proto/standard_window_fns.proto | 53 ++++++++++
sdks/python/apache_beam/transforms/window.py | 14 +--
5 files changed, 154 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index a226624..718efe7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApiPayloads;
+import org.apache.beam.sdk.common.runner.v1.StandardWindowFns;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -165,6 +165,9 @@ public class WindowingStrategyTranslation implements Serializable {
// This URN says that the WindowFn is just a UDF blob the Java SDK understands
// TODO: standardize such things
public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
+ public static final String OLD_SERIALIZED_JAVA_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+ // Remove this once the dataflow worker understands all the above formats.
+ private static final boolean USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN = true;
/**
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -173,19 +176,80 @@ public class WindowingStrategyTranslation implements Serializable {
*/
public static SdkFunctionSpec toProto(
WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
- return SdkFunctionSpec.newBuilder()
- // TODO: Set environment ID
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowFn)))
- .build())))
- .build();
+ // TODO: Set environment IDs
+ if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(windowFn)))
+ .build())))
+ .build();
+ } else if (windowFn instanceof GlobalWindows) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN))
+ .build();
+ } else if (windowFn instanceof FixedWindows) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(FIXED_WINDOWS_FN)
+ .setParameter(
+ Any.pack(
+ StandardWindowFns.FixedWindowsPayload.newBuilder()
+ .setSize(Durations.fromMillis(
+ ((FixedWindows) windowFn).getSize().getMillis()))
+ .setOffset(Timestamps.fromMillis(
+ ((FixedWindows) windowFn).getOffset().getMillis()))
+ .build())))
+ .build();
+ } else if (windowFn instanceof SlidingWindows) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(SLIDING_WINDOWS_FN)
+ .setParameter(
+ Any.pack(
+ StandardWindowFns.SlidingWindowsPayload.newBuilder()
+ .setSize(Durations.fromMillis(
+ ((SlidingWindows) windowFn).getSize().getMillis()))
+ .setOffset(Timestamps.fromMillis(
+ ((SlidingWindows) windowFn).getOffset().getMillis()))
+ .setPeriod(Durations.fromMillis(
+ ((SlidingWindows) windowFn).getPeriod().getMillis()))
+ .build())))
+ .build();
+ } else if (windowFn instanceof Sessions) {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(SESSION_WINDOWS_FN)
+ .setParameter(
+ Any.pack(
+ StandardWindowFns.SessionsPayload.newBuilder()
+ .setGapSize(Durations.fromMillis(
+ ((Sessions) windowFn).getGapDuration().getMillis()))
+ .build())))
+ .build();
+ } else {
+ return SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(windowFn)))
+ .build())))
+ .build();
+ }
}
/**
@@ -274,27 +338,28 @@ public class WindowingStrategyTranslation implements Serializable {
case GLOBAL_WINDOWS_FN:
return new GlobalWindows();
case FIXED_WINDOWS_FN:
- RunnerApiPayloads.FixedWindowsPayload fixedParams =
+ StandardWindowFns.FixedWindowsPayload fixedParams =
windowFnSpec.getSpec().getParameter().unpack(
- RunnerApiPayloads.FixedWindowsPayload.class);
+ StandardWindowFns.FixedWindowsPayload.class);
return FixedWindows.of(
Duration.millis(Durations.toMillis(fixedParams.getSize())))
.withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
case SLIDING_WINDOWS_FN:
- RunnerApiPayloads.SlidingWindowsPayload slidingParams =
+ StandardWindowFns.SlidingWindowsPayload slidingParams =
windowFnSpec.getSpec().getParameter().unpack(
- RunnerApiPayloads.SlidingWindowsPayload.class);
+ StandardWindowFns.SlidingWindowsPayload.class);
return SlidingWindows.of(
Duration.millis(Durations.toMillis(slidingParams.getSize())))
.every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
.withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
case SESSION_WINDOWS_FN:
- RunnerApiPayloads.SessionsPayload sessionParams =
+ StandardWindowFns.SessionsPayload sessionParams =
windowFnSpec.getSpec().getParameter().unpack(
- RunnerApiPayloads.SessionsPayload.class);
+ StandardWindowFns.SessionsPayload.class);
return Sessions.withGapDuration(
Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
case SERIALIZED_JAVA_WINDOWFN_URN:
+ case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
"WindowFn");
http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index 1e52803..e406545 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
@@ -62,6 +64,13 @@ public class WindowingStrategyTranslationTest {
public static Iterable<ToProtoAndBackSpec> data() {
return ImmutableList.of(
toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+ toProtoAndBackSpec(WindowingStrategy.of(
+ FixedWindows.of(Duration.millis(11)).withOffset(Duration.millis(3)))),
+ toProtoAndBackSpec(WindowingStrategy.of(
+ SlidingWindows.of(Duration.millis(37)).every(Duration.millis(3))
+ .withOffset(Duration.millis(2)))),
+ toProtoAndBackSpec(WindowingStrategy.of(
+ Sessions.withGapDuration(Duration.millis(389)))),
toProtoAndBackSpec(
WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto b/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
deleted file mode 100644
index 446bd59..0000000
--- a/sdks/common/runner-api/src/main/proto/beam_known_payloads.proto
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Runner API, which is the runner-independent,
- * SDK-independent definition of the Beam model.
- */
-
-syntax = "proto3";
-
-package org.apache.beam.runner_api.v1;
-
-option java_package = "org.apache.beam.sdk.common.runner.v1";
-option java_outer_classname = "RunnerApiPayloads";
-
-import "google/protobuf/duration.proto";
-import "google/protobuf/timestamp.proto";
-
-// beam:windowfn:global_windows:v0.1
-// empty payload
-
-// beam:windowfn:fixed_windows:v0.1
-message FixedWindowsPayload {
- google.protobuf.Duration size = 1;
- google.protobuf.Timestamp offset = 2;
-}
-
-// beam:windowfn:sliding_windows:v0.1
-message SlidingWindowsPayload {
- google.protobuf.Duration size = 1;
- google.protobuf.Timestamp offset = 2;
- google.protobuf.Duration period = 3;
-}
-
-// beam:windowfn:session_windows:v0.1
-message SessionsPayload {
- google.protobuf.Duration gap_size = 1;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/standard_window_fns.proto b/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
new file mode 100644
index 0000000..0682044
--- /dev/null
+++ b/sdks/common/runner-api/src/main/proto/standard_window_fns.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Protocol Buffers describing the Runner API, which is the runner-independent,
+ * SDK-independent definition of the Beam model.
+ */
+
+syntax = "proto3";
+
+package org.apache.beam.runner_api.v1;
+
+option java_package = "org.apache.beam.sdk.common.runner.v1";
+option java_outer_classname = "StandardWindowFns";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+// beam:windowfn:global_windows:v0.1
+// empty payload
+
+// beam:windowfn:fixed_windows:v0.1
+message FixedWindowsPayload {
+ google.protobuf.Duration size = 1;
+ google.protobuf.Timestamp offset = 2;
+}
+
+// beam:windowfn:sliding_windows:v0.1
+message SlidingWindowsPayload {
+ google.protobuf.Duration size = 1;
+ google.protobuf.Timestamp offset = 2;
+ google.protobuf.Duration period = 3;
+}
+
+// beam:windowfn:session_windows:v0.1
+message SessionsPayload {
+ google.protobuf.Duration gap_size = 1;
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b490e84e/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index f74c8a9..e87a007 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -56,7 +56,7 @@ from google.protobuf import timestamp_pb2
from apache_beam.coders import coders
from apache_beam.runners.api import beam_runner_api_pb2
-from apache_beam.runners.api import beam_known_payloads_pb2
+from apache_beam.runners.api import standard_window_fns_pb2
from apache_beam.transforms import timeutil
from apache_beam.utils import proto_utils
from apache_beam.utils import urns
@@ -343,14 +343,14 @@ class FixedWindows(NonMergingWindowFn):
def to_runner_api_parameter(self, context):
return (urns.FIXED_WINDOWS_FN,
- beam_known_payloads_pb2.FixedWindowsPayload(
+ standard_window_fns_pb2.FixedWindowsPayload(
size=proto_utils.from_micros(
duration_pb2.Duration, self.size.micros),
offset=proto_utils.from_micros(
timestamp_pb2.Timestamp, self.offset.micros)))
@urns.RunnerApiFn.register_urn(
- urns.FIXED_WINDOWS_FN, beam_known_payloads_pb2.FixedWindowsPayload)
+ urns.FIXED_WINDOWS_FN, standard_window_fns_pb2.FixedWindowsPayload)
def from_runner_api_parameter(fn_parameter, unused_context):
return FixedWindows(
size=Duration(micros=fn_parameter.size.ToMicroseconds()),
@@ -398,7 +398,7 @@ class SlidingWindows(NonMergingWindowFn):
def to_runner_api_parameter(self, context):
return (urns.SLIDING_WINDOWS_FN,
- beam_known_payloads_pb2.SlidingWindowsPayload(
+ standard_window_fns_pb2.SlidingWindowsPayload(
size=proto_utils.from_micros(
duration_pb2.Duration, self.size.micros),
offset=proto_utils.from_micros(
@@ -408,7 +408,7 @@ class SlidingWindows(NonMergingWindowFn):
@urns.RunnerApiFn.register_urn(
urns.SLIDING_WINDOWS_FN,
- beam_known_payloads_pb2.SlidingWindowsPayload)
+ standard_window_fns_pb2.SlidingWindowsPayload)
def from_runner_api_parameter(fn_parameter, unused_context):
return SlidingWindows(
size=Duration(micros=fn_parameter.size.ToMicroseconds()),
@@ -465,12 +465,12 @@ class Sessions(WindowFn):
def to_runner_api_parameter(self, context):
return (urns.SESSION_WINDOWS_FN,
- beam_known_payloads_pb2.SessionsPayload(
+ standard_window_fns_pb2.SessionsPayload(
gap_size=proto_utils.from_micros(
duration_pb2.Duration, self.gap_size.micros)))
@urns.RunnerApiFn.register_urn(
- urns.SESSION_WINDOWS_FN, beam_known_payloads_pb2.SessionsPayload)
+ urns.SESSION_WINDOWS_FN, standard_window_fns_pb2.SessionsPayload)
def from_runner_api_parameter(fn_parameter, unused_context):
return Sessions(
gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds()))
[3/5] beam git commit: Increase dataflow worker version.
Posted by ro...@apache.org.
Increase dataflow worker version.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/99907b94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/99907b94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/99907b94
Branch: refs/heads/master
Commit: 99907b94c074d4dc2ac3911b690da6790056412a
Parents: 4c20bfa
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 1 16:01:56 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/dataflow/internal/dependency.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/99907b94/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index e69c8d7..3a0ff46 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -73,7 +73,7 @@ from apache_beam.options.pipeline_options import SetupOptions
# Update this version to the next version whenever there is a change that will
# require changes to the execution environment.
# This should be in the beam-[version]-[date] format, date is optional.
-BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170518'
+BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170601'
# Standard file names used for staging files.
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
[4/5] beam git commit: Closes #3222
Posted by ro...@apache.org.
Closes #3222
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f9428c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f9428c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f9428c3
Branch: refs/heads/master
Commit: 2f9428c3e5e648d147fa68afdd13db7f6bfbcb0b
Parents: 7a075cc 99907b9
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jun 2 09:16:43 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 2 09:16:43 2017 -0700
----------------------------------------------------------------------
pom.xml | 6 +
runners/core-construction-java/pom.xml | 5 +
.../WindowingStrategyTranslation.java | 149 +++++++++++++++----
.../WindowingStrategyTranslationTest.java | 9 ++
.../src/main/proto/standard_window_fns.proto | 53 +++++++
.../runners/dataflow/dataflow_runner.py | 39 ++++-
.../runners/dataflow/dataflow_runner_test.py | 11 ++
.../runners/dataflow/internal/dependency.py | 2 +-
sdks/python/apache_beam/transforms/window.py | 57 ++++---
sdks/python/apache_beam/utils/proto_utils.py | 6 +
sdks/python/apache_beam/utils/urns.py | 10 +-
sdks/python/run_pylint.sh | 5 +-
12 files changed, 292 insertions(+), 60 deletions(-)
----------------------------------------------------------------------