You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:24 UTC
[49/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
Move all files to apache_beam folder
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b14dfadd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b14dfadd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b14dfadd
Branch: refs/heads/python-sdk
Commit: b14dfadd1f414063eb0710eae8237eb2fa9c8a2f
Parents: e507928
Author: Silviu Calinoiu <si...@google.com>
Authored: Tue Jun 14 08:49:04 2016 -0700
Committer: Silviu Calinoiu <si...@google.com>
Committed: Tue Jun 14 12:07:07 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/__init__.py | 78 +
sdks/python/apache_beam/coders/__init__.py | 16 +
sdks/python/apache_beam/coders/coder_impl.pxd | 109 +
sdks/python/apache_beam/coders/coder_impl.py | 316 ++
sdks/python/apache_beam/coders/coders.py | 511 +++
sdks/python/apache_beam/coders/coders_test.py | 60 +
.../apache_beam/coders/coders_test_common.py | 180 ++
.../apache_beam/coders/fast_coders_test.py | 34 +
sdks/python/apache_beam/coders/observable.py | 33 +
.../apache_beam/coders/observable_test.py | 54 +
.../apache_beam/coders/slow_coders_test.py | 36 +
sdks/python/apache_beam/coders/slow_stream.py | 136 +
sdks/python/apache_beam/coders/stream.pxd | 58 +
sdks/python/apache_beam/coders/stream.pyx | 201 ++
sdks/python/apache_beam/coders/stream_test.py | 168 +
sdks/python/apache_beam/coders/typecoders.py | 154 +
.../apache_beam/coders/typecoders_test.py | 114 +
sdks/python/apache_beam/dataflow_test.py | 405 +++
sdks/python/apache_beam/error.py | 39 +
sdks/python/apache_beam/examples/__init__.py | 0
.../examples/complete/autocomplete.py | 79 +
.../examples/complete/autocomplete_test.py | 78 +
.../examples/complete/estimate_pi.py | 109 +
.../examples/complete/estimate_pi_test.py | 46 +
.../complete/juliaset/juliaset/__init__.py | 0
.../complete/juliaset/juliaset/juliaset.py | 119 +
.../complete/juliaset/juliaset/juliaset_test.py | 83 +
.../examples/complete/juliaset/juliaset_main.py | 55 +
.../examples/complete/juliaset/setup.py | 115 +
.../apache_beam/examples/complete/tfidf.py | 196 ++
.../apache_beam/examples/complete/tfidf_test.py | 88 +
.../examples/complete/top_wikipedia_sessions.py | 170 +
.../complete/top_wikipedia_sessions_test.py | 58 +
.../examples/cookbook/bigquery_schema.py | 127 +
.../examples/cookbook/bigquery_side_input.py | 114 +
.../cookbook/bigquery_side_input_test.py | 59 +
.../examples/cookbook/bigquery_tornadoes.py | 96 +
.../cookbook/bigquery_tornadoes_test.py | 41 +
.../apache_beam/examples/cookbook/bigshuffle.py | 84 +
.../examples/cookbook/bigshuffle_test.py | 61 +
.../apache_beam/examples/cookbook/coders.py | 92 +
.../examples/cookbook/coders_test.py | 56 +
.../examples/cookbook/combiners_test.py | 73 +
.../examples/cookbook/custom_ptransform.py | 132 +
.../examples/cookbook/custom_ptransform_test.py | 64 +
.../apache_beam/examples/cookbook/filters.py | 104 +
.../examples/cookbook/filters_test.py | 65 +
.../examples/cookbook/group_with_coder.py | 111 +
.../examples/cookbook/group_with_coder_test.py | 87 +
.../examples/cookbook/mergecontacts.py | 126 +
.../examples/cookbook/mergecontacts_test.py | 121 +
.../examples/cookbook/multiple_output_pardo.py | 171 +
.../cookbook/multiple_output_pardo_test.py | 69 +
.../apache_beam/examples/snippets/snippets.py | 872 +++++
.../examples/snippets/snippets_test.py | 560 ++++
.../apache_beam/examples/streaming_wordcap.py | 61 +
.../apache_beam/examples/streaming_wordcount.py | 71 +
sdks/python/apache_beam/examples/wordcount.py | 99 +
.../apache_beam/examples/wordcount_debugging.py | 154 +
.../examples/wordcount_debugging_test.py | 56 +
.../apache_beam/examples/wordcount_minimal.py | 111 +
.../examples/wordcount_minimal_test.py | 56 +
.../apache_beam/examples/wordcount_test.py | 55 +
sdks/python/apache_beam/internal/__init__.py | 0
sdks/python/apache_beam/internal/apiclient.py | 935 ++++++
.../apache_beam/internal/apiclient_test.py | 110 +
sdks/python/apache_beam/internal/auth.py | 161 +
.../apache_beam/internal/clients/__init__.py | 0
.../internal/clients/bigquery/__init__.py | 10 +
.../clients/bigquery/bigquery_v2_client.py | 642 ++++
.../clients/bigquery/bigquery_v2_messages.py | 1893 +++++++++++
.../internal/clients/dataflow/__init__.py | 10 +
.../clients/dataflow/dataflow_v1b3_client.py | 316 ++
.../clients/dataflow/dataflow_v1b3_messages.py | 3056 ++++++++++++++++++
.../internal/clients/storage/__init__.py | 10 +
.../clients/storage/storage_v1_client.py | 1021 ++++++
.../clients/storage/storage_v1_messages.py | 1903 +++++++++++
sdks/python/apache_beam/internal/json_value.py | 127 +
.../apache_beam/internal/json_value_test.py | 63 +
sdks/python/apache_beam/internal/module_test.py | 59 +
sdks/python/apache_beam/internal/pickler.py | 205 ++
.../python/apache_beam/internal/pickler_test.py | 78 +
sdks/python/apache_beam/internal/util.py | 90 +
sdks/python/apache_beam/internal/util_test.py | 58 +
.../python/apache_beam/internal/windmill_pb2.py | 2275 +++++++++++++
.../internal/windmill_service_pb2.py | 161 +
sdks/python/apache_beam/io/__init__.py | 25 +
sdks/python/apache_beam/io/bigquery.py | 826 +++++
sdks/python/apache_beam/io/bigquery_test.py | 450 +++
sdks/python/apache_beam/io/fileio.py | 747 +++++
sdks/python/apache_beam/io/fileio_test.py | 522 +++
sdks/python/apache_beam/io/gcsio.py | 602 ++++
sdks/python/apache_beam/io/gcsio_test.py | 503 +++
sdks/python/apache_beam/io/iobase.py | 1073 ++++++
sdks/python/apache_beam/io/pubsub.py | 73 +
sdks/python/apache_beam/io/range_trackers.py | 270 ++
.../apache_beam/io/range_trackers_test.py | 318 ++
sdks/python/apache_beam/io/sources_test.py | 65 +
sdks/python/apache_beam/pipeline.py | 435 +++
sdks/python/apache_beam/pipeline_test.py | 345 ++
sdks/python/apache_beam/pvalue.py | 459 +++
sdks/python/apache_beam/pvalue_test.py | 63 +
sdks/python/apache_beam/python_sdk_releases.py | 53 +
sdks/python/apache_beam/runners/__init__.py | 24 +
sdks/python/apache_beam/runners/common.pxd | 28 +
sdks/python/apache_beam/runners/common.py | 181 ++
.../apache_beam/runners/dataflow_runner.py | 639 ++++
.../python/apache_beam/runners/direct_runner.py | 326 ++
sdks/python/apache_beam/runners/runner.py | 305 ++
sdks/python/apache_beam/runners/runner_test.py | 66 +
sdks/python/apache_beam/transforms/__init__.py | 23 +
.../python/apache_beam/transforms/aggregator.py | 105 +
.../apache_beam/transforms/aggregator_test.py | 73 +
sdks/python/apache_beam/transforms/combiners.py | 523 +++
.../apache_beam/transforms/combiners_test.py | 225 ++
sdks/python/apache_beam/transforms/core.py | 1292 ++++++++
.../apache_beam/transforms/cy_combiners.pxd | 89 +
.../apache_beam/transforms/cy_combiners.py | 250 ++
.../python/apache_beam/transforms/ptransform.py | 703 ++++
.../apache_beam/transforms/ptransform_test.py | 1814 +++++++++++
.../python/apache_beam/transforms/sideinputs.py | 145 +
sdks/python/apache_beam/transforms/timeutil.py | 310 ++
.../apache_beam/transforms/timeutil_test.py | 165 +
sdks/python/apache_beam/transforms/trigger.py | 958 ++++++
.../apache_beam/transforms/trigger_test.py | 566 ++++
.../transforms/trigger_transcripts.yaml | 207 ++
sdks/python/apache_beam/transforms/util.py | 227 ++
sdks/python/apache_beam/transforms/window.py | 383 +++
.../apache_beam/transforms/window_test.py | 201 ++
.../transforms/write_ptransform_test.py | 124 +
sdks/python/apache_beam/typehints/__init__.py | 19 +
sdks/python/apache_beam/typehints/decorators.py | 530 +++
sdks/python/apache_beam/typehints/opcodes.py | 331 ++
.../apache_beam/typehints/trivial_inference.py | 415 +++
.../typehints/trivial_inference_test.py | 148 +
sdks/python/apache_beam/typehints/typecheck.py | 161 +
.../typehints/typed_pipeline_test.py | 248 ++
sdks/python/apache_beam/typehints/typehints.py | 1054 ++++++
.../apache_beam/typehints/typehints_test.py | 1053 ++++++
sdks/python/apache_beam/utils/__init__.py | 19 +
sdks/python/apache_beam/utils/counters.pxd | 27 +
sdks/python/apache_beam/utils/counters.py | 180 ++
sdks/python/apache_beam/utils/dependency.py | 439 +++
.../python/apache_beam/utils/dependency_test.py | 394 +++
sdks/python/apache_beam/utils/names.py | 75 +
sdks/python/apache_beam/utils/options.py | 486 +++
sdks/python/apache_beam/utils/path.py | 44 +
sdks/python/apache_beam/utils/path_test.py | 67 +
.../apache_beam/utils/pipeline_options_test.py | 104 +
.../utils/pipeline_options_validator.py | 166 +
.../utils/pipeline_options_validator_test.py | 234 ++
sdks/python/apache_beam/utils/processes.py | 49 +
sdks/python/apache_beam/utils/processes_test.py | 103 +
sdks/python/apache_beam/utils/profiler.py | 66 +
sdks/python/apache_beam/utils/retry.py | 194 ++
sdks/python/apache_beam/utils/retry_test.py | 165 +
sdks/python/apache_beam/version.py | 17 +
sdks/python/google/cloud/dataflow/__init__.py | 78 -
.../google/cloud/dataflow/coders/__init__.py | 16 -
.../google/cloud/dataflow/coders/coder_impl.pxd | 109 -
.../google/cloud/dataflow/coders/coder_impl.py | 316 --
.../google/cloud/dataflow/coders/coders.py | 511 ---
.../google/cloud/dataflow/coders/coders_test.py | 60 -
.../cloud/dataflow/coders/coders_test_common.py | 180 --
.../cloud/dataflow/coders/fast_coders_test.py | 34 -
.../google/cloud/dataflow/coders/observable.py | 33 -
.../cloud/dataflow/coders/observable_test.py | 54 -
.../cloud/dataflow/coders/slow_coders_test.py | 36 -
.../google/cloud/dataflow/coders/slow_stream.py | 136 -
.../google/cloud/dataflow/coders/stream.pxd | 58 -
.../google/cloud/dataflow/coders/stream.pyx | 201 --
.../google/cloud/dataflow/coders/stream_test.py | 168 -
.../google/cloud/dataflow/coders/typecoders.py | 154 -
.../cloud/dataflow/coders/typecoders_test.py | 114 -
.../google/cloud/dataflow/dataflow_test.py | 405 ---
sdks/python/google/cloud/dataflow/error.py | 39 -
.../google/cloud/dataflow/examples/__init__.py | 0
.../dataflow/examples/complete/autocomplete.py | 79 -
.../examples/complete/autocomplete_test.py | 78 -
.../dataflow/examples/complete/estimate_pi.py | 109 -
.../examples/complete/estimate_pi_test.py | 46 -
.../complete/juliaset/juliaset/__init__.py | 0
.../complete/juliaset/juliaset/juliaset.py | 119 -
.../complete/juliaset/juliaset/juliaset_test.py | 83 -
.../examples/complete/juliaset/juliaset_main.py | 55 -
.../examples/complete/juliaset/setup.py | 115 -
.../cloud/dataflow/examples/complete/tfidf.py | 196 --
.../dataflow/examples/complete/tfidf_test.py | 88 -
.../examples/complete/top_wikipedia_sessions.py | 170 -
.../complete/top_wikipedia_sessions_test.py | 58 -
.../examples/cookbook/bigquery_schema.py | 127 -
.../examples/cookbook/bigquery_side_input.py | 114 -
.../cookbook/bigquery_side_input_test.py | 59 -
.../examples/cookbook/bigquery_tornadoes.py | 96 -
.../cookbook/bigquery_tornadoes_test.py | 41 -
.../dataflow/examples/cookbook/bigshuffle.py | 84 -
.../examples/cookbook/bigshuffle_test.py | 61 -
.../cloud/dataflow/examples/cookbook/coders.py | 92 -
.../dataflow/examples/cookbook/coders_test.py | 56 -
.../examples/cookbook/combiners_test.py | 73 -
.../examples/cookbook/custom_ptransform.py | 132 -
.../examples/cookbook/custom_ptransform_test.py | 64 -
.../cloud/dataflow/examples/cookbook/filters.py | 104 -
.../dataflow/examples/cookbook/filters_test.py | 65 -
.../examples/cookbook/group_with_coder.py | 111 -
.../examples/cookbook/group_with_coder_test.py | 87 -
.../dataflow/examples/cookbook/mergecontacts.py | 126 -
.../examples/cookbook/mergecontacts_test.py | 121 -
.../examples/cookbook/multiple_output_pardo.py | 171 -
.../cookbook/multiple_output_pardo_test.py | 69 -
.../dataflow/examples/snippets/snippets.py | 872 -----
.../dataflow/examples/snippets/snippets_test.py | 560 ----
.../dataflow/examples/streaming_wordcap.py | 61 -
.../dataflow/examples/streaming_wordcount.py | 71 -
.../google/cloud/dataflow/examples/wordcount.py | 99 -
.../dataflow/examples/wordcount_debugging.py | 154 -
.../examples/wordcount_debugging_test.py | 56 -
.../dataflow/examples/wordcount_minimal.py | 111 -
.../dataflow/examples/wordcount_minimal_test.py | 56 -
.../cloud/dataflow/examples/wordcount_test.py | 55 -
.../google/cloud/dataflow/internal/__init__.py | 0
.../google/cloud/dataflow/internal/apiclient.py | 935 ------
.../cloud/dataflow/internal/apiclient_test.py | 110 -
.../google/cloud/dataflow/internal/auth.py | 161 -
.../cloud/dataflow/internal/clients/__init__.py | 0
.../internal/clients/bigquery/__init__.py | 10 -
.../clients/bigquery/bigquery_v2_client.py | 642 ----
.../clients/bigquery/bigquery_v2_messages.py | 1893 -----------
.../internal/clients/dataflow/__init__.py | 10 -
.../clients/dataflow/dataflow_v1b3_client.py | 316 --
.../clients/dataflow/dataflow_v1b3_messages.py | 3056 ------------------
.../internal/clients/storage/__init__.py | 10 -
.../clients/storage/storage_v1_client.py | 1021 ------
.../clients/storage/storage_v1_messages.py | 1903 -----------
.../cloud/dataflow/internal/json_value.py | 127 -
.../cloud/dataflow/internal/json_value_test.py | 63 -
.../cloud/dataflow/internal/module_test.py | 59 -
.../google/cloud/dataflow/internal/pickler.py | 205 --
.../cloud/dataflow/internal/pickler_test.py | 78 -
.../google/cloud/dataflow/internal/util.py | 90 -
.../google/cloud/dataflow/internal/util_test.py | 58 -
.../cloud/dataflow/internal/windmill_pb2.py | 2275 -------------
.../dataflow/internal/windmill_service_pb2.py | 161 -
.../python/google/cloud/dataflow/io/__init__.py | 25 -
.../python/google/cloud/dataflow/io/bigquery.py | 826 -----
.../google/cloud/dataflow/io/bigquery_test.py | 450 ---
sdks/python/google/cloud/dataflow/io/fileio.py | 747 -----
.../google/cloud/dataflow/io/fileio_test.py | 522 ---
sdks/python/google/cloud/dataflow/io/gcsio.py | 602 ----
.../google/cloud/dataflow/io/gcsio_test.py | 503 ---
sdks/python/google/cloud/dataflow/io/iobase.py | 1073 ------
sdks/python/google/cloud/dataflow/io/pubsub.py | 73 -
.../google/cloud/dataflow/io/range_trackers.py | 270 --
.../cloud/dataflow/io/range_trackers_test.py | 318 --
.../google/cloud/dataflow/io/sources_test.py | 65 -
sdks/python/google/cloud/dataflow/pipeline.py | 435 ---
.../google/cloud/dataflow/pipeline_test.py | 345 --
sdks/python/google/cloud/dataflow/pvalue.py | 459 ---
.../python/google/cloud/dataflow/pvalue_test.py | 63 -
.../cloud/dataflow/python_sdk_releases.py | 53 -
.../google/cloud/dataflow/runners/__init__.py | 24 -
.../google/cloud/dataflow/runners/common.pxd | 28 -
.../google/cloud/dataflow/runners/common.py | 181 --
.../cloud/dataflow/runners/dataflow_runner.py | 639 ----
.../cloud/dataflow/runners/direct_runner.py | 326 --
.../google/cloud/dataflow/runners/runner.py | 305 --
.../cloud/dataflow/runners/runner_test.py | 66 -
.../cloud/dataflow/transforms/__init__.py | 23 -
.../cloud/dataflow/transforms/aggregator.py | 105 -
.../dataflow/transforms/aggregator_test.py | 73 -
.../cloud/dataflow/transforms/combiners.py | 523 ---
.../cloud/dataflow/transforms/combiners_test.py | 225 --
.../google/cloud/dataflow/transforms/core.py | 1292 --------
.../cloud/dataflow/transforms/cy_combiners.pxd | 89 -
.../cloud/dataflow/transforms/cy_combiners.py | 250 --
.../cloud/dataflow/transforms/ptransform.py | 703 ----
.../dataflow/transforms/ptransform_test.py | 1814 -----------
.../cloud/dataflow/transforms/sideinputs.py | 145 -
.../cloud/dataflow/transforms/timeutil.py | 310 --
.../cloud/dataflow/transforms/timeutil_test.py | 165 -
.../google/cloud/dataflow/transforms/trigger.py | 958 ------
.../cloud/dataflow/transforms/trigger_test.py | 566 ----
.../transforms/trigger_transcripts.yaml | 207 --
.../google/cloud/dataflow/transforms/util.py | 227 --
.../google/cloud/dataflow/transforms/window.py | 383 ---
.../cloud/dataflow/transforms/window_test.py | 201 --
.../transforms/write_ptransform_test.py | 124 -
.../google/cloud/dataflow/typehints/__init__.py | 19 -
.../cloud/dataflow/typehints/decorators.py | 530 ---
.../google/cloud/dataflow/typehints/opcodes.py | 331 --
.../dataflow/typehints/trivial_inference.py | 415 ---
.../typehints/trivial_inference_test.py | 148 -
.../cloud/dataflow/typehints/typecheck.py | 161 -
.../dataflow/typehints/typed_pipeline_test.py | 248 --
.../cloud/dataflow/typehints/typehints.py | 1054 ------
.../cloud/dataflow/typehints/typehints_test.py | 1053 ------
.../google/cloud/dataflow/utils/__init__.py | 19 -
.../google/cloud/dataflow/utils/counters.pxd | 27 -
.../google/cloud/dataflow/utils/counters.py | 180 --
.../google/cloud/dataflow/utils/dependency.py | 439 ---
.../cloud/dataflow/utils/dependency_test.py | 394 ---
.../python/google/cloud/dataflow/utils/names.py | 75 -
.../google/cloud/dataflow/utils/options.py | 486 ---
sdks/python/google/cloud/dataflow/utils/path.py | 44 -
.../google/cloud/dataflow/utils/path_test.py | 67 -
.../dataflow/utils/pipeline_options_test.py | 104 -
.../utils/pipeline_options_validator.py | 166 -
.../utils/pipeline_options_validator_test.py | 234 --
.../google/cloud/dataflow/utils/processes.py | 49 -
.../cloud/dataflow/utils/processes_test.py | 103 -
.../google/cloud/dataflow/utils/profiler.py | 66 -
.../python/google/cloud/dataflow/utils/retry.py | 194 --
.../google/cloud/dataflow/utils/retry_test.py | 165 -
sdks/python/google/cloud/dataflow/version.py | 17 -
314 files changed, 44598 insertions(+), 44598 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py
new file mode 100644
index 0000000..af28d3a
--- /dev/null
+++ b/sdks/python/apache_beam/__init__.py
@@ -0,0 +1,78 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google Cloud Dataflow SDK for Python.
+
+Google Cloud Dataflow <http://cloud.google.com/dataflow/>
+provides a simple, powerful programming model for building both batch
+and streaming parallel data processing pipelines.
+
+The Dataflow SDK for Python provides access to Dataflow capabilities
+from the Python programming language.
+
+Status
+------
+The SDK is still early in its development, and significant changes
+should be expected before the first stable version.
+
+Overview
+--------
+The key concepts in this programming model are
+
+* PCollection: represents a collection of data, which could be
+ bounded or unbounded in size.
+* PTransform: represents a computation that transforms input
+ PCollections into output PCollections.
+* Pipeline: manages a directed acyclic graph of PTransforms and
+ PCollections that is ready for execution.
+* Runner: specifies where and how the Pipeline should execute.
+* Reading and Writing Data: your pipeline can read from an external
+ source and write to an external data sink.
+
+Typical usage
+-------------
+At the top of your source file::
+
+ import google.cloud.dataflow as df
+
+After this import statement
+
+* transform classes are available as df.FlatMap, df.GroupByKey, etc.
+* Pipeline class is available as df.Pipeline
+* text source/sink classes are available as df.io.TextFileSource,
+ df.io.TextFileSink
+
+Examples
+--------
+The examples subdirectory has some examples.
+
+"""
+
+
+import sys
+
+
+if sys.version_info.major != 2:
+ raise RuntimeError(
+ 'Dataflow SDK for Python is supported only on Python 2.7. '
+ 'It is not supported on Python [%s].' % sys.version)
+
+
+import google.cloud.dataflow.internal.pickler
+
+from google.cloud.dataflow import coders
+from google.cloud.dataflow import io
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.pipeline import Pipeline
+from google.cloud.dataflow.transforms import *
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/__init__.py b/sdks/python/apache_beam/coders/__init__.py
new file mode 100644
index 0000000..610a6ef
--- /dev/null
+++ b/sdks/python/apache_beam/coders/__init__.py
@@ -0,0 +1,16 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from google.cloud.dataflow.coders.coders import *
+from google.cloud.dataflow.coders.typecoders import registry
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
new file mode 100644
index 0000000..663d37d
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -0,0 +1,109 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# cython: profile=True
+
+cimport cython
+
+cimport cpython.ref
+cimport cpython.tuple
+cimport libc.stdint
+cimport libc.stdlib
+cimport libc.string
+
+from .stream cimport InputStream, OutputStream
+
+
+cdef object loads, dumps, create_InputStream, create_OutputStream
+cdef type WindowedValue
+
+
+cdef class CoderImpl(object):
+ cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+ cpdef decode_from_stream(self, InputStream stream, bint nested)
+ cpdef bytes encode(self, value)
+ cpdef decode(self, bytes encoded)
+
+
+cdef class SimpleCoderImpl(CoderImpl):
+ pass
+
+
+cdef class StreamCoderImpl(CoderImpl):
+ pass
+
+
+cdef class CallbackCoderImpl(CoderImpl):
+ cdef object _encoder
+ cdef object _decoder
+
+
+cdef class DeterministicPickleCoderImpl(CoderImpl):
+ cdef CoderImpl _pickle_coder
+ cdef object _step_label
+ cdef bint _check_safe(self, value) except -1
+
+
+cdef class BytesCoderImpl(CoderImpl):
+ pass
+
+
+cdef class FloatCoderImpl(StreamCoderImpl):
+ pass
+
+
+cdef class TimestampCoderImpl(StreamCoderImpl):
+ cdef object timestamp_class
+
+
+cdef list small_ints
+cdef class VarIntCoderImpl(StreamCoderImpl):
+ @cython.locals(ivalue=libc.stdint.int64_t)
+ cpdef bytes encode(self, value)
+
+
+cdef class SingletonCoderImpl(CoderImpl):
+ cdef object _value
+
+
+cdef class AbstractComponentCoderImpl(StreamCoderImpl):
+ cdef tuple _coder_impls
+
+ cpdef _extract_components(self, value)
+ cpdef _construct_from_components(self, components)
+
+ @cython.locals(c=CoderImpl)
+ cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+ @cython.locals(c=CoderImpl)
+ cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+
+cdef class TupleCoderImpl(AbstractComponentCoderImpl):
+ pass
+
+
+cdef class SequenceCoderImpl(StreamCoderImpl):
+ cdef CoderImpl _elem_coder
+ cpdef _construct_from_sequence(self, values)
+
+
+cdef class TupleSequenceCoderImpl(SequenceCoderImpl):
+ pass
+
+
+cdef class WindowedValueCoderImpl(StreamCoderImpl):
+ """A coder for windowed values."""
+ cdef CoderImpl _value_coder
+ cdef CoderImpl _timestamp_coder
+ cdef CoderImpl _windows_coder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
new file mode 100644
index 0000000..0ce4354
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -0,0 +1,316 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Coder implementations.
+
+The actual encode/decode implementations are split off from coders to
+allow conditional (compiled/pure) implementations, which can be used to
+encode many elements with minimal overhead.
+
+This module may be optionally compiled with Cython, using the corresponding
+coder_impl.pxd file for type hints.
+"""
+
+import collections
+from cPickle import loads, dumps
+
+
+# pylint: disable=g-import-not-at-top
+try:
+ # Don't depend on the full dataflow sdk to test coders.
+ from google.cloud.dataflow.transforms.window import WindowedValue
+except ImportError:
+ WindowedValue = collections.namedtuple(
+ 'WindowedValue', ('value', 'timestamp', 'windows'))
+
+try:
+ from stream import InputStream as create_InputStream
+ from stream import OutputStream as create_OutputStream
+except ImportError:
+ from slow_stream import InputStream as create_InputStream
+ from slow_stream import OutputStream as create_OutputStream
+# pylint: enable=g-import-not-at-top
+
+
+class CoderImpl(object):
+
+ def encode_to_stream(self, value, stream, nested):
+ """Reads object from potentially-nested encoding in stream."""
+ raise NotImplementedError
+
+ def decode_from_stream(self, stream, nested):
+ """Reads object from potentially-nested encoding in stream."""
+ raise NotImplementedError
+
+ def encode(self, value):
+ """Encodes an object to an unnested string."""
+ raise NotImplementedError
+
+ def decode(self, encoded):
+ """Encodes an object to an unnested string."""
+ raise NotImplementedError
+
+
+class SimpleCoderImpl(CoderImpl):
+ """Subclass of CoderImpl implementing stream methods using encode/decode."""
+
+ def encode_to_stream(self, value, stream, nested):
+ """Reads object from potentially-nested encoding in stream."""
+ stream.write(self.encode(value), nested)
+
+ def decode_from_stream(self, stream, nested):
+ """Reads object from potentially-nested encoding in stream."""
+ return self.decode(stream.read_all(nested))
+
+
+class StreamCoderImpl(CoderImpl):
+ """Subclass of CoderImpl implementing encode/decode using stream methods."""
+
+ def encode(self, value):
+ out = create_OutputStream()
+ self.encode_to_stream(value, out, False)
+ return out.get()
+
+ def decode(self, encoded):
+ return self.decode_from_stream(create_InputStream(encoded), False)
+
+
+class CallbackCoderImpl(CoderImpl):
+ """A CoderImpl that calls back to the _impl methods on the Coder itself.
+
+ This is the default implementation used if Coder._get_impl()
+ is not overwritten.
+ """
+
+ def __init__(self, encoder, decoder):
+ self._encoder = encoder
+ self._decoder = decoder
+
+ def encode_to_stream(self, value, stream, nested):
+ return stream.write(self._encoder(value), nested)
+
+ def decode_from_stream(self, stream, nested):
+ return self._decoder(stream.read_all(nested))
+
+ def encode(self, value):
+ return self._encoder(value)
+
+ def decode(self, encoded):
+ return self._decoder(encoded)
+
+
+class DeterministicPickleCoderImpl(CoderImpl):
+
+ def __init__(self, pickle_coder, step_label):
+ self._pickle_coder = pickle_coder
+ self._step_label = step_label
+
+ def _check_safe(self, value):
+ if isinstance(value, (str, unicode, long, int, float)):
+ pass
+ elif value is None:
+ pass
+ elif isinstance(value, (tuple, list)):
+ for x in value:
+ self._check_safe(x)
+ else:
+ raise TypeError(
+ "Unable to deterministically code '%s' of type '%s', "
+ "please provide a type hint for the input of '%s'" % (
+ value, type(value), self._step_label))
+
+ def encode_to_stream(self, value, stream, nested):
+ self._check_safe(value)
+ return self._pickle_coder.encode_to_stream(value, stream, nested)
+
+ def decode_from_stream(self, stream, nested):
+ return self._pickle_coder.decode_from_stream(stream, nested)
+
+ def encode(self, value):
+ self._check_safe(value)
+ return self._pickle_coder.encode(value)
+
+ def decode(self, encoded):
+ return self._pickle_coder.decode(encoded)
+
+
+class BytesCoderImpl(CoderImpl):
+ """A coder for bytes/str objects."""
+
+ def encode_to_stream(self, value, out, nested):
+ out.write(value, nested)
+
+ def decode_from_stream(self, in_stream, nested):
+ return in_stream.read_all(nested)
+
+ def encode(self, value):
+ assert isinstance(value, bytes), (value, type(value))
+ return value
+
+ def decode(self, encoded):
+ return encoded
+
+
+class FloatCoderImpl(StreamCoderImpl):
+
+ def encode_to_stream(self, value, out, nested):
+ out.write_bigendian_double(value)
+
+ def decode_from_stream(self, in_stream, nested):
+ return in_stream.read_bigendian_double()
+
+
+class TimestampCoderImpl(StreamCoderImpl):
+
+ def __init__(self, timestamp_class):
+ self.timestamp_class = timestamp_class
+
+ def encode_to_stream(self, value, out, nested):
+ out.write_bigendian_int64(value.micros)
+
+ def decode_from_stream(self, in_stream, nested):
+ return self.timestamp_class(micros=in_stream.read_bigendian_int64())
+
+
+small_ints = [chr(_) for _ in range(128)]
+
+
+class VarIntCoderImpl(StreamCoderImpl):
+ """A coder for long/int objects."""
+
+ def encode_to_stream(self, value, out, nested):
+ out.write_var_int64(value)
+
+ def decode_from_stream(self, in_stream, nested):
+ return in_stream.read_var_int64()
+
+ def encode(self, value):
+ ivalue = value # type cast
+ if 0 <= ivalue < len(small_ints):
+ return small_ints[ivalue]
+ else:
+ return StreamCoderImpl.encode(self, value)
+
+ def decode(self, encoded):
+ if len(encoded) == 1:
+ i = ord(encoded)
+ if 0 <= i < 128:
+ return i
+ return StreamCoderImpl.decode(self, encoded)
+
+
+class SingletonCoderImpl(CoderImpl):
+ """A coder that always encodes exactly one value."""
+
+ def __init__(self, value):
+ self._value = value
+
+ def encode_to_stream(self, value, stream, nested):
+ pass
+
+ def decode_from_stream(self, stream, nested):
+ return self._value
+
+ def encode(self, value):
+ b = '' # avoid byte vs str vs unicode error
+ return b
+
+ def decode(self, encoded):
+ return self._value
+
+
+class AbstractComponentCoderImpl(StreamCoderImpl):
+
+ def __init__(self, coder_impls):
+ for c in coder_impls:
+ assert isinstance(c, CoderImpl), c
+ self._coder_impls = tuple(coder_impls)
+
+ def _extract_components(self, value):
+ raise NotImplementedError
+
+ def _construct_from_components(self, components):
+ raise NotImplementedError
+
+ def encode_to_stream(self, value, out, nested):
+ values = self._extract_components(value)
+ if len(self._coder_impls) != len(values):
+ raise ValueError(
+ 'Number of components does not match number of coders.')
+ for i in range(0, len(self._coder_impls)):
+ c = self._coder_impls[i] # type cast
+ c.encode_to_stream(values[i], out, True)
+
+ def decode_from_stream(self, in_stream, nested):
+ return self._construct_from_components(
+ [c.decode_from_stream(in_stream, True) for c in self._coder_impls])
+
+
+class TupleCoderImpl(AbstractComponentCoderImpl):
+ """A coder for tuple objects."""
+
+ def _extract_components(self, value):
+ return value
+
+ def _construct_from_components(self, components):
+ return tuple(components)
+
+
+class SequenceCoderImpl(StreamCoderImpl):
+ """A coder for sequences of known length."""
+
+ def __init__(self, elem_coder):
+ self._elem_coder = elem_coder
+
+ def _construct_from_sequence(self, values):
+ raise NotImplementedError
+
+ def encode_to_stream(self, value, out, nested):
+ # Compatible with Java's IterableLikeCoder.
+ out.write_bigendian_int32(len(value))
+ for elem in value:
+ self._elem_coder.encode_to_stream(elem, out, True)
+
+ def decode_from_stream(self, in_stream, nested):
+ size = in_stream.read_bigendian_int32()
+ return self._construct_from_sequence(
+ [self._elem_coder.decode_from_stream(in_stream, True)
+ for _ in range(size)])
+
+
+class TupleSequenceCoderImpl(SequenceCoderImpl):
+ """A coder for homogeneous tuple objects."""
+
+ def _construct_from_sequence(self, components):
+ return tuple(components)
+
+
+class WindowedValueCoderImpl(StreamCoderImpl):
+ """A coder for windowed values."""
+
+ def __init__(self, value_coder, timestamp_coder, window_coder):
+ self._value_coder = value_coder
+ self._timestamp_coder = timestamp_coder
+ self._windows_coder = TupleSequenceCoderImpl(window_coder)
+
+ def encode_to_stream(self, value, out, nested):
+ self._value_coder.encode_to_stream(value.value, out, True)
+ self._timestamp_coder.encode_to_stream(value.timestamp, out, True)
+ self._windows_coder.encode_to_stream(value.windows, out, True)
+
+ def decode_from_stream(self, in_stream, nested):
+ return WindowedValue(
+ self._value_coder.decode_from_stream(in_stream, True),
+ self._timestamp_coder.decode_from_stream(in_stream, True),
+ self._windows_coder.decode_from_stream(in_stream, True))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
new file mode 100644
index 0000000..16edff0
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -0,0 +1,511 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Collection of useful coders."""
+
+import base64
+import collections
+import cPickle as pickle
+
+from google.cloud.dataflow.coders import coder_impl
+
+
+# pylint: disable=g-import-not-at-top
+# Avoid dependencies on the full SDK.
+try:
+ # Import dill from the pickler module to make sure our monkey-patching of dill
+ # occurs.
+ from google.cloud.dataflow.internal.pickler import dill
+ from google.cloud.dataflow.transforms.timeutil import Timestamp
+except ImportError:
+ # We fall back to using the stock dill library in tests that don't use the
+ # full Python SDK.
+ import dill
+ Timestamp = collections.namedtuple('Timestamp', 'micros')
+
+
+def serialize_coder(coder):
+ from google.cloud.dataflow.internal import pickler
+ return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder))
+
+
+def deserialize_coder(serialized):
+ from google.cloud.dataflow.internal import pickler
+ return pickler.loads(serialized.split('$', 1)[1])
+# pylint: enable=g-import-not-at-top
+
+
+class Coder(object):
+ """Base class for coders."""
+
+ def encode(self, value):
+ """Encodes the given object into a byte string."""
+ raise NotImplementedError('Encode not implemented: %s.' % self)
+
+ def decode(self, encoded):
+ """Decodes the given byte string into the corresponding object."""
+ raise NotImplementedError('Decode not implemented: %s.' % self)
+
+ def is_deterministic(self):
+ """Whether this coder is guaranteed to encode values deterministically.
+
+ A deterministic coder is required for key coders in GroupByKey operations
+ to produce consistent results.
+
+ For example, note that the default coder, the PickleCoder, is not
+ deterministic: the ordering of picked entries in maps may vary across
+ executions since there is no defined order, and such a coder is not in
+ general suitable for usage as a key coder in GroupByKey operations, since
+ each instance of the same key may be encoded differently.
+
+ Returns:
+ Whether coder is deterministic.
+ """
+ return False
+
+ # ===========================================================================
+ # Methods below are internal SDK details that don't need to be modified for
+ # user-defined coders.
+ # ===========================================================================
+
+ def _create_impl(self):
+ """Creates a CoderImpl to do the actual encoding and decoding.
+ """
+ return coder_impl.CallbackCoderImpl(self.encode, self.decode)
+
+ def get_impl(self):
+ if not hasattr(self, '_impl'):
+ self._impl = self._create_impl()
+ assert isinstance(self._impl, coder_impl.CoderImpl)
+ return self._impl
+
+ def __getstate__(self):
+ return self._dict_without_impl()
+
+ def _dict_without_impl(self):
+ if hasattr(self, '_impl'):
+ d = dict(self.__dict__)
+ del d['_impl']
+ return d
+ else:
+ return self.__dict__
+
+ @classmethod
+ def from_type_hint(cls, unused_typehint, unused_registry):
+ # If not overridden, just construct the coder without arguments.
+ return cls()
+
+ def is_kv_coder(self):
+ return False
+
+ def key_coder(self):
+ if self.is_kv_coder():
+ raise NotImplementedError('key_coder: %s' % self)
+ else:
+ raise ValueError('Not a KV coder: %s.' % self)
+
+ def value_coder(self):
+ if self.is_kv_coder():
+ raise NotImplementedError('value_coder: %s' % self)
+ else:
+ raise ValueError('Not a KV coder: %s.' % self)
+
+ def _get_component_coders(self):
+ """Returns the internal component coders of this coder."""
+ # This is an internal detail of the Coder API and does not need to be
+ # refined in user-defined Coders.
+ return []
+
+ def as_cloud_object(self):
+ """Returns Google Cloud Dataflow API description of this coder."""
+ # This is an internal detail of the Coder API and does not need to be
+ # refined in user-defined Coders.
+
+ value = {
+ # We pass coders in the form "<coder_name>$<pickled_data>" to make the
+ # job description JSON more readable. Data before the $ is ignored by
+ # the worker.
+ '@type': serialize_coder(self),
+ 'component_encodings': list(
+ component.as_cloud_object()
+ for component in self._get_component_coders())
+ }
+ return value
+
+ def __repr__(self):
+ return self.__class__.__name__
+
+ def __eq__(self, other):
+ # pylint: disable=protected-access
+ return (self.__class__ == other.__class__
+ and self._dict_without_impl() == other._dict_without_impl())
+ # pylint: enable=protected-access
+
+
+class StrUtf8Coder(Coder):
+ """A coder used for reading and writing strings as UTF-8."""
+
+ def encode(self, value):
+ return value.encode('utf-8')
+
+ def decode(self, value):
+ return value.decode('utf-8')
+
+ def is_deterministic(self):
+ return True
+
+
+class ToStringCoder(Coder):
+ """A default string coder used if no sink coder is specified."""
+
+ def encode(self, value):
+ if isinstance(value, unicode):
+ return value.encode('utf-8')
+ elif isinstance(value, str):
+ return value
+ else:
+ return str(value)
+
+ def decode(self, _):
+ raise NotImplementedError('ToStringCoder cannot be used for decoding.')
+
+ def is_deterministic(self):
+ return True
+
+
+class FastCoder(Coder):
+ """Coder subclass used when a (faster) CoderImpl is supplied directly.
+
+ The Coder class defines _create_impl in terms of encode() and decode();
+ this class inverts that defining encode() and decode() in terms of
+ _create_impl().
+ """
+
+ def encode(self, value):
+ """Encodes the given object into a byte string."""
+ return self.get_impl().encode(value)
+
+ def decode(self, encoded):
+ """Decodes the given byte string into the corresponding object."""
+ return self.get_impl().decode(encoded)
+
+ def _create_impl(self):
+ raise NotImplementedError
+
+
+class BytesCoder(FastCoder):
+ """Byte string coder."""
+
+ def _create_impl(self):
+ return coder_impl.BytesCoderImpl()
+
+ def is_deterministic(self):
+ return True
+
+
+class VarIntCoder(FastCoder):
+ """Variable-length integer coder."""
+
+ def _create_impl(self):
+ return coder_impl.VarIntCoderImpl()
+
+ def is_deterministic(self):
+ return True
+
+
+class FloatCoder(FastCoder):
+ """A coder used for floating-point values."""
+
+ def _create_impl(self):
+ return coder_impl.FloatCoderImpl()
+
+ def is_deterministic(self):
+ return True
+
+
+class TimestampCoder(FastCoder):
+ """A coder used for timeutil.Timestamp values."""
+
+ def _create_impl(self):
+ return coder_impl.TimestampCoderImpl(Timestamp)
+
+ def is_deterministic(self):
+ return True
+
+
+class SingletonCoder(FastCoder):
+ """A coder that always encodes exactly one value."""
+
+ def __init__(self, value):
+ self._value = value
+
+ def _create_impl(self):
+ return coder_impl.SingletonCoderImpl(self._value)
+
+ def is_deterministic(self):
+ return True
+
+
+def maybe_dill_dumps(o):
+ """Pickle using cPickle or the Dill pickler as a fallback."""
+ # We need to use the dill pickler for objects of certain custom classes,
+ # including, for example, ones that contain lambdas.
+ try:
+ return pickle.dumps(o)
+ except Exception: # pylint: disable=broad-except
+ return dill.dumps(o)
+
+
+def maybe_dill_loads(o):
+ """Unpickle using cPickle or the Dill pickler as a fallback."""
+ try:
+ return pickle.loads(o)
+ except Exception: # pylint: disable=broad-except
+ return dill.loads(o)
+
+
+class _PickleCoderBase(FastCoder):
+ """Base class for pickling coders."""
+
+ def is_deterministic(self):
+ # Note that the default coder, the PickleCoder, is not deterministic (for
+ # example, the ordering of picked entries in maps may vary across
+ # executions), and so is not in general suitable for usage as a key coder in
+ # GroupByKey operations.
+ return False
+
+ def as_cloud_object(self, is_pair_like=True):
+ value = super(_PickleCoderBase, self).as_cloud_object()
+ # We currently use this coder in places where we cannot infer the coder to
+ # use for the value type in a more granular way. In places where the
+ # service expects a pair, it checks for the "is_pair_like" key, in which
+ # case we would fail without the hack below.
+ if is_pair_like:
+ value['is_pair_like'] = True
+ value['component_encodings'] = [
+ self.as_cloud_object(is_pair_like=False),
+ self.as_cloud_object(is_pair_like=False)
+ ]
+
+ return value
+
+ # We allow .key_coder() and .value_coder() to be called on PickleCoder since
+ # we can't always infer the return values of lambdas in ParDo operations, the
+ # result of which may be used in a GroupBykey.
+ def is_kv_coder(self):
+ return True
+
+ def key_coder(self):
+ return self
+
+ def value_coder(self):
+ return self
+
+
+class PickleCoder(_PickleCoderBase):
+ """Coder using Python's pickle functionality."""
+
+ def _create_impl(self):
+ return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
+
+
+class DillCoder(_PickleCoderBase):
+ """Coder using dill's pickle functionality."""
+
+ def _create_impl(self):
+ return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads)
+
+
+class DeterministicPickleCoder(FastCoder):
+ """Throws runtime errors when pickling non-deterministic values."""
+
+ def __init__(self, pickle_coder, step_label):
+ self._pickle_coder = pickle_coder
+ self._step_label = step_label
+
+ def _create_impl(self):
+ return coder_impl.DeterministicPickleCoderImpl(
+ self._pickle_coder.get_impl(), self._step_label)
+
+ def is_deterministic(self):
+ return True
+
+ def is_kv_coder(self):
+ return True
+
+ def key_coder(self):
+ return self
+
+ def value_coder(self):
+ return self
+
+
+class Base64PickleCoder(Coder):
+ """Coder of objects by Python pickle, then base64 encoding."""
+ # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather
+ # than via a special Coder.
+
+ def encode(self, value):
+ return base64.b64encode(pickle.dumps(value))
+
+ def decode(self, encoded):
+ return pickle.loads(base64.b64decode(encoded))
+
+ def is_deterministic(self):
+ # Note that the Base64PickleCoder is not deterministic. See the
+ # corresponding comments for PickleCoder above.
+ return False
+
+ # We allow .key_coder() and .value_coder() to be called on Base64PickleCoder
+ # since we can't always infer the return values of lambdas in ParDo
+ # operations, the result of which may be used in a GroupBykey.
+ #
+ # TODO(ccy): this is currently only used for KV values from Create transforms.
+ # Investigate a way to unify this with PickleCoder.
+ def is_kv_coder(self):
+ return True
+
+ def key_coder(self):
+ return self
+
+ def value_coder(self):
+ return self
+
+
+class TupleCoder(FastCoder):
+ """Coder of tuple objects."""
+
+ def __init__(self, components):
+ self._coders = tuple(components)
+
+ def _create_impl(self):
+ return coder_impl.TupleCoderImpl([c.get_impl() for c in self._coders])
+
+ def is_deterministic(self):
+ return all(c.is_deterministic() for c in self._coders)
+
+ @staticmethod
+ def from_type_hint(typehint, registry):
+ return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
+
+ def as_cloud_object(self):
+ value = super(TupleCoder, self).as_cloud_object()
+ value['is_pair_like'] = True
+ return value
+
+ def _get_component_coders(self):
+ return self.coders()
+
+ def coders(self):
+ return self._coders
+
+ def is_kv_coder(self):
+ return len(self._coders) == 2
+
+ def key_coder(self):
+ if len(self._coders) != 2:
+ raise ValueError('TupleCoder does not have exactly 2 components.')
+ return self._coders[0]
+
+ def value_coder(self):
+ if len(self._coders) != 2:
+ raise ValueError('TupleCoder does not have exactly 2 components.')
+ return self._coders[1]
+
+ def __repr__(self):
+ return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
+
+
+class TupleSequenceCoder(FastCoder):
+ """Coder of homogeneous tuple objects."""
+
+ def __init__(self, elem_coder):
+ self._elem_coder = elem_coder
+
+ def _create_impl(self):
+ return coder_impl.TupleSequenceCoderImpl(self._elem_coder.get_impl())
+
+ def is_deterministic(self):
+ return self._elem_coder.is_deterministic()
+
+ @staticmethod
+ def from_type_hint(typehint, registry):
+ return TupleSequenceCoder(registry.get_coder(typehint.inner_type))
+
+ def _get_component_coders(self):
+ return (self._elem_coder,)
+
+ def __repr__(self):
+ return 'TupleSequenceCoder[%r]' % self._elem_coder
+
+
+class WindowCoder(PickleCoder):
+ """Coder for windows in windowed values."""
+
+ def _create_impl(self):
+ return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
+
+ def is_deterministic(self):
+ # Note that WindowCoder as implemented is not deterministic because the
+ # implementation simply pickles windows. See the corresponding comments
+ # on PickleCoder for more details.
+ return False
+
+ def as_cloud_object(self):
+ return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
+
+
+class WindowedValueCoder(FastCoder):
+ """Coder for windowed values."""
+
+ def __init__(self, wrapped_value_coder, timestamp_coder=None,
+ window_coder=None):
+ if not timestamp_coder:
+ timestamp_coder = TimestampCoder()
+ if not window_coder:
+ window_coder = PickleCoder()
+ self.wrapped_value_coder = wrapped_value_coder
+ self.timestamp_coder = timestamp_coder
+ self.window_coder = window_coder
+
+ def _create_impl(self):
+ return coder_impl.WindowedValueCoderImpl(
+ self.wrapped_value_coder.get_impl(),
+ self.timestamp_coder.get_impl(),
+ self.window_coder.get_impl())
+
+ def is_deterministic(self):
+ return all(c.is_deterministic() for c in [self.wrapped_value_coder,
+ self.timestamp_coder,
+ self.window_coder])
+
+ def as_cloud_object(self):
+ value = super(WindowedValueCoder, self).as_cloud_object()
+ value['is_wrapper'] = True
+ return value
+
+ def _get_component_coders(self):
+ return [self.wrapped_value_coder, self.timestamp_coder, self.window_coder]
+
+ def is_kv_coder(self):
+ return self.wrapped_value_coder.is_kv_coder()
+
+ def key_coder(self):
+ return self.wrapped_value_coder.key_coder()
+
+ def value_coder(self):
+ return self.wrapped_value_coder.value_coder()
+
+ def __repr__(self):
+ return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
new file mode 100644
index 0000000..d11d310
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -0,0 +1,60 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import base64
+import logging
+import unittest
+
+from google.cloud.dataflow import coders
+
+
+class PickleCoderTest(unittest.TestCase):
+
+ def test_basics(self):
+ v = ('a' * 10, 'b' * 90)
+ pickler = coders.PickleCoder()
+ self.assertEquals(v, pickler.decode(pickler.encode(v)))
+ pickler = coders.Base64PickleCoder()
+ self.assertEquals(v, pickler.decode(pickler.encode(v)))
+ self.assertEquals(
+ coders.Base64PickleCoder().encode(v),
+ base64.b64encode(coders.PickleCoder().encode(v)))
+
+ def test_equality(self):
+ self.assertEquals(coders.PickleCoder(), coders.PickleCoder())
+ self.assertEquals(coders.Base64PickleCoder(), coders.Base64PickleCoder())
+ self.assertNotEquals(coders.Base64PickleCoder(), coders.PickleCoder())
+ self.assertNotEquals(coders.Base64PickleCoder(), object())
+
+
+class CodersTest(unittest.TestCase):
+
+ def test_str_utf8_coder(self):
+ real_coder = coders.registry.get_coder(str)
+ expected_coder = coders.BytesCoder()
+ self.assertEqual(
+ real_coder.encode('abc'), expected_coder.encode('abc'))
+ self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+ real_coder = coders.registry.get_coder(bytes)
+ expected_coder = coders.BytesCoder()
+ self.assertEqual(
+ real_coder.encode('abc'), expected_coder.encode('abc'))
+ self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
new file mode 100644
index 0000000..29eaace
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -0,0 +1,180 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests common to all coder implementations."""
+
+import logging
+import math
+import sys
+import unittest
+
+import dill
+
+import coders
+
+
+# Defined out of line for picklability.
+class CustomCoder(coders.Coder):
+
+ def encode(self, x):
+ return str(x+1)
+
+ def decode(self, encoded):
+ return int(encoded) - 1
+
+
+class CodersTest(unittest.TestCase):
+
+ # These class methods ensure that we test each defined coder in both
+ # nested and unnested context.
+
+ @classmethod
+ def setUpClass(cls):
+ cls.seen = set()
+ cls.seen_nested = set()
+
+ @classmethod
+ def tearDownClass(cls):
+ standard = set(c
+ for c in coders.__dict__.values()
+ if isinstance(c, type) and issubclass(c, coders.Coder) and
+ 'Base' not in c.__name__)
+ standard -= set([coders.Coder,
+ coders.FastCoder,
+ coders.Base64PickleCoder,
+ coders.FloatCoder,
+ coders.TimestampCoder,
+ coders.ToStringCoder,
+ coders.WindowCoder,
+ coders.WindowedValueCoder])
+ assert not standard - cls.seen, standard - cls.seen
+ assert not standard - cls.seen_nested, standard - cls.seen_nested
+
+ @classmethod
+ def _observe(cls, coder):
+ cls.seen.add(type(coder))
+ cls._observe_nested(coder)
+
+ @classmethod
+ def _observe_nested(cls, coder):
+ if isinstance(coder, coders.TupleCoder):
+ for c in coder.coders():
+ cls.seen_nested.add(type(c))
+ cls._observe_nested(c)
+
+ def check_coder(self, coder, *values):
+ self._observe(coder)
+ for v in values:
+ self.assertEqual(v, coder.decode(coder.encode(v)))
+ copy1 = dill.loads(dill.dumps(coder))
+ copy2 = dill.loads(dill.dumps(coder))
+ for v in values:
+ self.assertEqual(v, copy1.decode(copy2.encode(v)))
+
+ def test_custom_coder(self):
+
+ self.check_coder(CustomCoder(), 1, -10, 5)
+ self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())),
+ (1, 'a'), (-10, 'b'), (5, 'c'))
+
+ def test_pickle_coder(self):
+ self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
+
+ def test_deterministic_pickle_coder(self):
+ coder = coders.DeterministicPickleCoder(coders.PickleCoder(), 'step')
+ self.check_coder(coder, 'a', 1, 1.5, (1, 2, 3))
+ with self.assertRaises(TypeError):
+ self.check_coder(coder, dict())
+ with self.assertRaises(TypeError):
+ self.check_coder(coder, [1, dict()])
+
+ self.check_coder(coders.TupleCoder((coder, coders.PickleCoder())),
+ (1, dict()), ('a', [dict()]))
+
+ def test_dill_coder(self):
+ cell_value = (lambda x: lambda: x)(0).func_closure[0]
+ self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
+ self.check_coder(
+ coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
+ (1, cell_value))
+
+ def test_bytes_coder(self):
+ self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
+
+ def test_varint_coder(self):
+ # Small ints.
+ self.check_coder(coders.VarIntCoder(), *range(-10, 10))
+ # Multi-byte encoding starts at 128
+ self.check_coder(coders.VarIntCoder(), *range(120, 140))
+ # Large values
+ self.check_coder(coders.VarIntCoder(),
+ *[int(math.pow(-1, k) * math.exp(k))
+ for k in range(0, int(math.log(sys.maxint)))])
+
+ def test_float_coder(self):
+ self.check_coder(coders.FloatCoder(),
+ *[float(0.1 * x) for x in range(-100, 100)])
+ self.check_coder(coders.FloatCoder(),
+ *[float(2 ** (0.1 * x)) for x in range(-100, 100)])
+ self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf'))
+
+ def test_singleton_coder(self):
+ a = 'anything'
+ b = 'something else'
+ self.check_coder(coders.SingletonCoder(a), a)
+ self.check_coder(coders.SingletonCoder(b), b)
+ self.check_coder(coders.TupleCoder((coders.SingletonCoder(a),
+ coders.SingletonCoder(b))), (a, b))
+
+ def test_timestamp_coder(self):
+ self.check_coder(coders.TimestampCoder(),
+ *[coders.Timestamp(micros=x) for x in range(-100, 100)])
+ self.check_coder(coders.TimestampCoder(),
+ coders.Timestamp(micros=-1234567890),
+ coders.Timestamp(micros=1234567890))
+ self.check_coder(coders.TimestampCoder(),
+ coders.Timestamp(micros=-1234567890123456789),
+ coders.Timestamp(micros=1234567890123456789))
+
+ def test_tuple_coder(self):
+ self.check_coder(
+ coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())),
+ (1, 'a'),
+ (-2, 'a' * 100),
+ (300, 'abc\0' * 5))
+ self.check_coder(
+ coders.TupleCoder(
+ (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())),
+ coders.StrUtf8Coder())),
+ ((1, 2), 'a'),
+ ((-2, 5), u'a\u0101' * 100),
+ ((300, 1), 'abc\0' * 5))
+
+ def test_tuple_sequence_coder(self):
+ int_tuple_coder = coders.TupleSequenceCoder(coders.VarIntCoder())
+ self.check_coder(int_tuple_coder, (1, -1, 0), (), tuple(range(1000)))
+ self.check_coder(
+ coders.TupleCoder((coders.VarIntCoder(), int_tuple_coder)),
+ (1, (1, 2, 3)))
+
+ def test_base64_pickle_coder(self):
+ self.check_coder(coders.Base64PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
+
+ def test_utf8_coder(self):
+ self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/fast_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py
new file mode 100644
index 0000000..f2f4e6c
--- /dev/null
+++ b/sdks/python/apache_beam/coders/fast_coders_test.py
@@ -0,0 +1,34 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for compiled implementation of coder impls."""
+
+import logging
+import unittest
+
+
+# Run all the standard coder test cases.
+from google.cloud.dataflow.coders.coders_test_common import *
+
+
+class FastCoders(unittest.TestCase):
+
+ def test_using_fast_impl(self):
+ # pylint: disable=g-import-not-at-top
+ # pylint: disable=unused-variable
+ import google.cloud.dataflow.coders.stream
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py
new file mode 100644
index 0000000..8a01752
--- /dev/null
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -0,0 +1,33 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""Observable base class for iterables."""
+
+
+class ObservableMixin(object):
+ """An observable iterable.
+
+ Subclasses need to call self.notify_observers with any object yielded.
+ """
+
+ def __init__(self):
+ self.observers = []
+
+ def register_observer(self, callback):
+ self.observers.append(callback)
+
+ def notify_observers(self, value, **kwargs):
+ for o in self.observers:
+ o(value, **kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/observable_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable_test.py b/sdks/python/apache_beam/coders/observable_test.py
new file mode 100644
index 0000000..2b091bf
--- /dev/null
+++ b/sdks/python/apache_beam/coders/observable_test.py
@@ -0,0 +1,54 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the Observable mixin class."""
+
+import logging
+import unittest
+
+
+from google.cloud.dataflow.coders import observable
+
+
+class ObservableMixinTest(unittest.TestCase):
+ observed_count = 0
+ observed_sum = 0
+ observed_keys = []
+
+ def observer(self, value, key=None):
+ self.observed_count += 1
+ self.observed_sum += value
+ self.observed_keys.append(key)
+
+ def test_observable(self):
+ class Watched(observable.ObservableMixin):
+
+ def __iter__(self):
+ for i in (1, 4, 3):
+ self.notify_observers(i, key='a%d' % i)
+ yield i
+
+ watched = Watched()
+ watched.register_observer(lambda v, key: self.observer(v, key=key))
+ for _ in watched:
+ pass
+
+ self.assertEquals(3, self.observed_count)
+ self.assertEquals(8, self.observed_sum)
+ self.assertEquals(['a1', 'a3', 'a4'], sorted(self.observed_keys))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/slow_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py
new file mode 100644
index 0000000..8cb23ae
--- /dev/null
+++ b/sdks/python/apache_beam/coders/slow_coders_test.py
@@ -0,0 +1,36 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for uncompiled implementation of coder impls."""
+
+import logging
+import unittest
+
+
+# Run all the standard coder test cases.
+from google.cloud.dataflow.coders.coders_test_common import *
+
+
+class SlowCoders(unittest.TestCase):
+
+ def test_using_slow_impl(self):
+ # Assert that we are not using the compiled implementation.
+ with self.assertRaises(ImportError):
+ # pylint: disable=g-import-not-at-top
+ # pylint: disable=unused-variable
+ import google.cloud.dataflow.coders.stream
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
new file mode 100644
index 0000000..ea09d54
--- /dev/null
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -0,0 +1,136 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A pure Python implementation of stream.pyx."""
+
+import struct
+
+
+class OutputStream(object):
+ """A pure Python implementation of stream.OutputStream."""
+
+ def __init__(self):
+ self.data = []
+
+ def write(self, b, nested=False):
+ assert isinstance(b, str)
+ if nested:
+ self.write_var_int64(len(b))
+ self.data.append(b)
+
+ def write_byte(self, val):
+ self.data.append(chr(val))
+
+ def write_var_int64(self, v):
+ if v < 0:
+ v += 1 << 64
+ if v <= 0:
+ raise ValueError('Value too large (negative).')
+ while True:
+ bits = v & 0x7F
+ v >>= 7
+ if v:
+ bits |= 0x80
+ self.write_byte(bits)
+ if not v:
+ break
+
+ def write_bigendian_int64(self, v):
+ self.write(struct.pack('>q', v))
+
+ def write_bigendian_int32(self, v):
+ self.write(struct.pack('>i', v))
+
+ def write_bigendian_double(self, v):
+ self.write(struct.pack('>d', v))
+
+ def get(self):
+ return ''.join(self.data)
+
+
+class ByteCountingOutputStream(OutputStream):
+ """A pure Python implementation of stream.ByteCountingOutputStream."""
+
+ def __init__(self):
+ # Note that we don't actually use any of the data initialized by our super.
+ super(ByteCountingOutputStream, self).__init__()
+ self.count = 0
+
+ def write(self, byte_array, nested=False):
+ blen = len(byte_array)
+ if nested:
+ self.write_var_int64(blen)
+ self.count += blen
+
+ def write_byte(self, _):
+ self.count += 1
+
+ def get_count(self):
+ return self.count
+
+ def get(self):
+ raise NotImplementedError
+
+ def __str__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.count)
+
+
+class InputStream(object):
+ """A pure Python implementation of stream.InputStream."""
+
+ def __init__(self, data):
+ self.data = data
+ self.pos = 0
+
+ def size(self):
+ return len(self.data) - self.pos
+
+ def read(self, size):
+ self.pos += size
+ return self.data[self.pos - size : self.pos]
+
+ def read_all(self, nested):
+ return self.read(self.read_var_int64() if nested else self.size())
+
+ def read_byte(self):
+ self.pos += 1
+ return ord(self.data[self.pos - 1])
+
+ def read_var_int64(self):
+ shift = 0
+ result = 0
+ while True:
+ byte = self.read_byte()
+ if byte < 0:
+ raise RuntimeError('VarLong not terminated.')
+
+ bits = byte & 0x7F
+ if shift >= 64 or (shift >= 63 and bits > 1):
+ raise RuntimeError('VarLong too long.')
+ result |= bits << shift
+ shift += 7
+ if not byte & 0x80:
+ break
+ if result >= 1 << 63:
+ result -= 1 << 64
+ return result
+
+ def read_bigendian_int64(self):
+ return struct.unpack('>q', self.read(8))[0]
+
+ def read_bigendian_int32(self):
+ return struct.unpack('>i', self.read(4))[0]
+
+ def read_bigendian_double(self):
+ return struct.unpack('>d', self.read(8))[0]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd
new file mode 100644
index 0000000..3da7324
--- /dev/null
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -0,0 +1,58 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cimport libc.stdint
+
+
+cdef class OutputStream(object):
+ cdef char* data
+ cdef size_t size
+ cdef size_t pos
+
+ cpdef write(self, bytes b, bint nested=*)
+ cpdef write_byte(self, unsigned char val)
+ cpdef write_var_int64(self, libc.stdint.int64_t v)
+ cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
+ cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
+ cpdef write_bigendian_double(self, double d)
+
+ cpdef bytes get(self)
+
+ cdef extend(self, size_t missing)
+
+
+cdef class ByteCountingOutputStream(OutputStream):
+ cdef size_t count
+
+ cpdef write(self, bytes b, bint nested=*)
+ cpdef write_byte(self, unsigned char val)
+ cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
+ cpdef write_bigendian_int32(self, libc.stdint.int32_t val)
+ cpdef size_t get_count(self)
+ cpdef bytes get(self)
+
+
+cdef class InputStream(object):
+ cdef size_t pos
+ cdef bytes all
+ cdef char* allc
+
+ cpdef size_t size(self) except? -1
+ cpdef bytes read(self, size_t len)
+ cpdef long read_byte(self) except? -1
+ cpdef libc.stdint.int64_t read_var_int64(self) except? -1
+ cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
+ cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
+ cpdef double read_bigendian_double(self) except? -1
+ cpdef bytes read_all(self, bint nested=*)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
new file mode 100644
index 0000000..6df186a
--- /dev/null
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -0,0 +1,201 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cimport libc.stdlib
+cimport libc.string
+
+
+cdef class OutputStream(object):
+ """An output string stream implementation supporting write() and get()."""
+
+ #TODO(robertwb): Consider using raw C++ streams.
+
+ def __cinit__(self):
+ self.size = 1024
+ self.pos = 0
+ self.data = <char*>libc.stdlib.malloc(self.size)
+ assert self.data, "OutputStream malloc failed."
+
+ def __dealloc__(self):
+ if self.data:
+ libc.stdlib.free(self.data)
+
+ cpdef write(self, bytes b, bint nested=False):
+ cdef size_t blen = len(b)
+ if nested:
+ self.write_var_int64(blen)
+ if self.size < self.pos + blen:
+ self.extend(blen)
+ libc.string.memcpy(self.data + self.pos, <char*>b, blen)
+ self.pos += blen
+
+ cpdef write_byte(self, unsigned char val):
+ if self.size < self.pos + 1:
+ self.extend(1)
+ self.data[self.pos] = val
+ self.pos += 1
+
+ cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
+ """Encode a long using variable-length encoding to a stream."""
+ cdef libc.stdint.uint64_t v = signed_v
+ cdef long bits
+ while True:
+ bits = v & 0x7F
+ v >>= 7
+ if v:
+ bits |= 0x80
+ self.write_byte(bits)
+ if not v:
+ break
+
+ cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
+ cdef libc.stdint.uint64_t v = signed_v
+ if self.size < self.pos + 8:
+ self.extend(8)
+ self.data[self.pos ] = <unsigned char>(v >> 56)
+ self.data[self.pos + 1] = <unsigned char>(v >> 48)
+ self.data[self.pos + 2] = <unsigned char>(v >> 40)
+ self.data[self.pos + 3] = <unsigned char>(v >> 32)
+ self.data[self.pos + 4] = <unsigned char>(v >> 24)
+ self.data[self.pos + 5] = <unsigned char>(v >> 16)
+ self.data[self.pos + 6] = <unsigned char>(v >> 8)
+ self.data[self.pos + 7] = <unsigned char>(v )
+ self.pos += 8
+
+ cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
+ cdef libc.stdint.uint32_t v = signed_v
+ if self.size < self.pos + 4:
+ self.extend(4)
+ self.data[self.pos ] = <unsigned char>(v >> 24)
+ self.data[self.pos + 1] = <unsigned char>(v >> 16)
+ self.data[self.pos + 2] = <unsigned char>(v >> 8)
+ self.data[self.pos + 3] = <unsigned char>(v )
+ self.pos += 4
+
+ cpdef write_bigendian_double(self, double d):
+ self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0])
+
+ cpdef bytes get(self):
+ return self.data[:self.pos]
+
+ cdef extend(self, size_t missing):
+ while missing > self.size - self.pos:
+ self.size *= 2
+ self.data = <char*>libc.stdlib.realloc(self.data, self.size)
+ assert self.data, "OutputStream realloc failed."
+
+
+cdef class ByteCountingOutputStream(OutputStream):
+ """An output string stream implementation that only counts the bytes.
+
+ This implementation counts the number of bytes it "writes" but
+ doesn't actually write them anyway. Thus it has write() but not
+ get(). get_count() returns how many bytes were written.
+
+ This is useful for sizing an encoding.
+ """
+
+ def __cinit__(self):
+ self.count = 0
+
+ cpdef write(self, bytes b, bint nested=False):
+ cdef size_t blen = len(b)
+ if nested:
+ self.write_var_int64(blen)
+ self.count += blen
+
+ cpdef write_byte(self, unsigned char _):
+ self.count += 1
+
+ cpdef write_bigendian_int64(self, libc.stdint.int64_t _):
+ self.count += 8
+
+ cpdef write_bigendian_int32(self, libc.stdint.int32_t _):
+ self.count += 4
+
+ cpdef size_t get_count(self):
+ return self.count
+
+ cpdef bytes get(self):
+ raise NotImplementedError
+
+ def __str__(self):
+ return '<%s %s>' % (self.__class__.__name__, self.count)
+
+
+cdef class InputStream(object):
+ """An input string stream implementation supporting read() and size()."""
+
+ def __init__(self, all):
+ self.allc = self.all = all
+
+ cpdef bytes read(self, size_t size):
+ self.pos += size
+ return self.allc[self.pos - size : self.pos]
+
+ cpdef long read_byte(self) except? -1:
+ self.pos += 1
+ # Note: the C++ compiler on Dataflow workers treats the char array below as
+ # a signed char. This causes incorrect coder behavior unless explicitly
+ # cast to an unsigned char here.
+ return <long>(<unsigned char> self.allc[self.pos - 1])
+
+ cpdef size_t size(self) except? -1:
+ return len(self.all) - self.pos
+
+ cpdef bytes read_all(self, bint nested=False):
+ return self.read(self.read_var_int64() if nested else self.size())
+
+ cpdef libc.stdint.int64_t read_var_int64(self) except? -1:
+ """Decode a variable-length encoded long from a stream."""
+ cdef long byte
+ cdef long bits
+ cdef long shift = 0
+ cdef libc.stdint.int64_t result = 0
+ while True:
+ byte = self.read_byte()
+ if byte < 0:
+ raise RuntimeError('VarInt not terminated.')
+
+ bits = byte & 0x7F
+ if (shift >= sizeof(long) * 8 or
+ (shift >= (sizeof(long) * 8 - 1) and bits > 1)):
+ raise RuntimeError('VarLong too long.')
+ result |= bits << shift
+ shift += 7
+ if not (byte & 0x80):
+ break
+ return result
+
+ cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
+ self.pos += 8
+ return (<unsigned char>self.allc[self.pos - 1]
+ | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 2] << 8
+ | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 3] << 16
+ | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 4] << 24
+ | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 5] << 32
+ | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 6] << 40
+ | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 7] << 48
+ | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 8] << 56)
+
+ cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1:
+ self.pos += 4
+ return (<unsigned char>self.allc[self.pos - 1]
+ | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 2] << 8
+ | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 3] << 16
+ | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 4] << 24)
+
+ cpdef double read_bigendian_double(self) except? -1:
+ cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
+ return (<double*><char*>&as_long)[0]
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py
new file mode 100644
index 0000000..3002116
--- /dev/null
+++ b/sdks/python/apache_beam/coders/stream_test.py
@@ -0,0 +1,168 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the stream implementations."""
+
+import logging
+import math
+import unittest
+
+
+from google.cloud.dataflow.coders import slow_stream
+
+
+class StreamTest(unittest.TestCase):
+ # pylint: disable=invalid-name
+ InputStream = slow_stream.InputStream
+ OutputStream = slow_stream.OutputStream
+ ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
+ # pylint: enable=invalid-name
+
+ def test_read_write(self):
+ out_s = self.OutputStream()
+ out_s.write('abc')
+ out_s.write('\0\t\n')
+ out_s.write('xyz', True)
+ out_s.write('', True)
+ in_s = self.InputStream(out_s.get())
+ self.assertEquals('abc\0\t\n', in_s.read(6))
+ self.assertEquals('xyz', in_s.read_all(True))
+ self.assertEquals('', in_s.read_all(True))
+
+ def test_read_all(self):
+ out_s = self.OutputStream()
+ out_s.write('abc')
+ in_s = self.InputStream(out_s.get())
+ self.assertEquals('abc', in_s.read_all(False))
+
+ def test_read_write_byte(self):
+ out_s = self.OutputStream()
+ out_s.write_byte(1)
+ out_s.write_byte(0)
+ out_s.write_byte(0xFF)
+ in_s = self.InputStream(out_s.get())
+ self.assertEquals(1, in_s.read_byte())
+ self.assertEquals(0, in_s.read_byte())
+ self.assertEquals(0xFF, in_s.read_byte())
+
+ def test_read_write_large(self):
+ values = range(4 * 1024)
+ out_s = self.OutputStream()
+ for v in values:
+ out_s.write_bigendian_int64(v)
+ in_s = self.InputStream(out_s.get())
+ for v in values:
+ self.assertEquals(v, in_s.read_bigendian_int64())
+
+ def run_read_write_var_int64(self, values):
+ out_s = self.OutputStream()
+ for v in values:
+ out_s.write_var_int64(v)
+ in_s = self.InputStream(out_s.get())
+ for v in values:
+ self.assertEquals(v, in_s.read_var_int64())
+
+ def test_small_var_int64(self):
+ self.run_read_write_var_int64(range(-10, 30))
+
+ def test_medium_var_int64(self):
+ base = -1.7
+ self.run_read_write_var_int64(
+ [int(base**pow)
+ for pow in range(1, int(63 * math.log(2) / math.log(-base)))])
+
+ def test_large_var_int64(self):
+ self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3])
+
+ def test_read_write_double(self):
+ values = 0, 1, -1, 1e100, 1.0/3, math.pi, float('inf')
+ out_s = self.OutputStream()
+ for v in values:
+ out_s.write_bigendian_double(v)
+ in_s = self.InputStream(out_s.get())
+ for v in values:
+ self.assertEquals(v, in_s.read_bigendian_double())
+
+ def test_read_write_bigendian_int64(self):
+ values = 0, 1, -1, 2**63-1, -2**63, int(2**61 * math.pi)
+ out_s = self.OutputStream()
+ for v in values:
+ out_s.write_bigendian_int64(v)
+ in_s = self.InputStream(out_s.get())
+ for v in values:
+ self.assertEquals(v, in_s.read_bigendian_int64())
+
+ def test_read_write_bigendian_int32(self):
+ values = 0, 1, -1, 2**31-1, -2**31, int(2**29 * math.pi)
+ out_s = self.OutputStream()
+ for v in values:
+ out_s.write_bigendian_int32(v)
+ in_s = self.InputStream(out_s.get())
+ for v in values:
+ self.assertEquals(v, in_s.read_bigendian_int32())
+
+ def test_byte_counting(self):
+ bc_s = self.ByteCountingOutputStream()
+ self.assertEquals(0, bc_s.get_count())
+ bc_s.write('def')
+ self.assertEquals(3, bc_s.get_count())
+ bc_s.write('')
+ self.assertEquals(3, bc_s.get_count())
+ bc_s.write_byte(10)
+ self.assertEquals(4, bc_s.get_count())
+ # "nested" also writes the length of the string, which should
+ # cause 1 extra byte to be counted.
+ bc_s.write('2345', nested=True)
+ self.assertEquals(9, bc_s.get_count())
+ bc_s.write_var_int64(63)
+ self.assertEquals(10, bc_s.get_count())
+ bc_s.write_bigendian_int64(42)
+ self.assertEquals(18, bc_s.get_count())
+ bc_s.write_bigendian_int32(36)
+ self.assertEquals(22, bc_s.get_count())
+ bc_s.write_bigendian_double(6.25)
+ self.assertEquals(30, bc_s.get_count())
+
+
+try:
+ # pylint: disable=g-import-not-at-top
+ from google.cloud.dataflow.coders import stream
+
+ class FastStreamTest(StreamTest):
+ """Runs the test with the compiled stream classes."""
+ InputStream = stream.InputStream
+ OutputStream = stream.OutputStream
+ ByteCountingOutputStream = stream.ByteCountingOutputStream
+
+
+ class SlowFastStreamTest(StreamTest):
+ """Runs the test with compiled and uncompiled stream classes."""
+ InputStream = stream.InputStream
+ OutputStream = slow_stream.OutputStream
+ ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
+
+
+ class FastSlowStreamTest(StreamTest):
+ """Runs the test with uncompiled and compiled stream classes."""
+ InputStream = slow_stream.InputStream
+ OutputStream = stream.OutputStream
+ ByteCountingOutputStream = stream.ByteCountingOutputStream
+
+except ImportError:
+ pass
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()