You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:24 UTC

[49/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

Move all files to apache_beam folder


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b14dfadd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b14dfadd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b14dfadd

Branch: refs/heads/python-sdk
Commit: b14dfadd1f414063eb0710eae8237eb2fa9c8a2f
Parents: e507928
Author: Silviu Calinoiu <si...@google.com>
Authored: Tue Jun 14 08:49:04 2016 -0700
Committer: Silviu Calinoiu <si...@google.com>
Committed: Tue Jun 14 12:07:07 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/__init__.py             |   78 +
 sdks/python/apache_beam/coders/__init__.py      |   16 +
 sdks/python/apache_beam/coders/coder_impl.pxd   |  109 +
 sdks/python/apache_beam/coders/coder_impl.py    |  316 ++
 sdks/python/apache_beam/coders/coders.py        |  511 +++
 sdks/python/apache_beam/coders/coders_test.py   |   60 +
 .../apache_beam/coders/coders_test_common.py    |  180 ++
 .../apache_beam/coders/fast_coders_test.py      |   34 +
 sdks/python/apache_beam/coders/observable.py    |   33 +
 .../apache_beam/coders/observable_test.py       |   54 +
 .../apache_beam/coders/slow_coders_test.py      |   36 +
 sdks/python/apache_beam/coders/slow_stream.py   |  136 +
 sdks/python/apache_beam/coders/stream.pxd       |   58 +
 sdks/python/apache_beam/coders/stream.pyx       |  201 ++
 sdks/python/apache_beam/coders/stream_test.py   |  168 +
 sdks/python/apache_beam/coders/typecoders.py    |  154 +
 .../apache_beam/coders/typecoders_test.py       |  114 +
 sdks/python/apache_beam/dataflow_test.py        |  405 +++
 sdks/python/apache_beam/error.py                |   39 +
 sdks/python/apache_beam/examples/__init__.py    |    0
 .../examples/complete/autocomplete.py           |   79 +
 .../examples/complete/autocomplete_test.py      |   78 +
 .../examples/complete/estimate_pi.py            |  109 +
 .../examples/complete/estimate_pi_test.py       |   46 +
 .../complete/juliaset/juliaset/__init__.py      |    0
 .../complete/juliaset/juliaset/juliaset.py      |  119 +
 .../complete/juliaset/juliaset/juliaset_test.py |   83 +
 .../examples/complete/juliaset/juliaset_main.py |   55 +
 .../examples/complete/juliaset/setup.py         |  115 +
 .../apache_beam/examples/complete/tfidf.py      |  196 ++
 .../apache_beam/examples/complete/tfidf_test.py |   88 +
 .../examples/complete/top_wikipedia_sessions.py |  170 +
 .../complete/top_wikipedia_sessions_test.py     |   58 +
 .../examples/cookbook/bigquery_schema.py        |  127 +
 .../examples/cookbook/bigquery_side_input.py    |  114 +
 .../cookbook/bigquery_side_input_test.py        |   59 +
 .../examples/cookbook/bigquery_tornadoes.py     |   96 +
 .../cookbook/bigquery_tornadoes_test.py         |   41 +
 .../apache_beam/examples/cookbook/bigshuffle.py |   84 +
 .../examples/cookbook/bigshuffle_test.py        |   61 +
 .../apache_beam/examples/cookbook/coders.py     |   92 +
 .../examples/cookbook/coders_test.py            |   56 +
 .../examples/cookbook/combiners_test.py         |   73 +
 .../examples/cookbook/custom_ptransform.py      |  132 +
 .../examples/cookbook/custom_ptransform_test.py |   64 +
 .../apache_beam/examples/cookbook/filters.py    |  104 +
 .../examples/cookbook/filters_test.py           |   65 +
 .../examples/cookbook/group_with_coder.py       |  111 +
 .../examples/cookbook/group_with_coder_test.py  |   87 +
 .../examples/cookbook/mergecontacts.py          |  126 +
 .../examples/cookbook/mergecontacts_test.py     |  121 +
 .../examples/cookbook/multiple_output_pardo.py  |  171 +
 .../cookbook/multiple_output_pardo_test.py      |   69 +
 .../apache_beam/examples/snippets/snippets.py   |  872 +++++
 .../examples/snippets/snippets_test.py          |  560 ++++
 .../apache_beam/examples/streaming_wordcap.py   |   61 +
 .../apache_beam/examples/streaming_wordcount.py |   71 +
 sdks/python/apache_beam/examples/wordcount.py   |   99 +
 .../apache_beam/examples/wordcount_debugging.py |  154 +
 .../examples/wordcount_debugging_test.py        |   56 +
 .../apache_beam/examples/wordcount_minimal.py   |  111 +
 .../examples/wordcount_minimal_test.py          |   56 +
 .../apache_beam/examples/wordcount_test.py      |   55 +
 sdks/python/apache_beam/internal/__init__.py    |    0
 sdks/python/apache_beam/internal/apiclient.py   |  935 ++++++
 .../apache_beam/internal/apiclient_test.py      |  110 +
 sdks/python/apache_beam/internal/auth.py        |  161 +
 .../apache_beam/internal/clients/__init__.py    |    0
 .../internal/clients/bigquery/__init__.py       |   10 +
 .../clients/bigquery/bigquery_v2_client.py      |  642 ++++
 .../clients/bigquery/bigquery_v2_messages.py    | 1893 +++++++++++
 .../internal/clients/dataflow/__init__.py       |   10 +
 .../clients/dataflow/dataflow_v1b3_client.py    |  316 ++
 .../clients/dataflow/dataflow_v1b3_messages.py  | 3056 ++++++++++++++++++
 .../internal/clients/storage/__init__.py        |   10 +
 .../clients/storage/storage_v1_client.py        | 1021 ++++++
 .../clients/storage/storage_v1_messages.py      | 1903 +++++++++++
 sdks/python/apache_beam/internal/json_value.py  |  127 +
 .../apache_beam/internal/json_value_test.py     |   63 +
 sdks/python/apache_beam/internal/module_test.py |   59 +
 sdks/python/apache_beam/internal/pickler.py     |  205 ++
 .../python/apache_beam/internal/pickler_test.py |   78 +
 sdks/python/apache_beam/internal/util.py        |   90 +
 sdks/python/apache_beam/internal/util_test.py   |   58 +
 .../python/apache_beam/internal/windmill_pb2.py | 2275 +++++++++++++
 .../internal/windmill_service_pb2.py            |  161 +
 sdks/python/apache_beam/io/__init__.py          |   25 +
 sdks/python/apache_beam/io/bigquery.py          |  826 +++++
 sdks/python/apache_beam/io/bigquery_test.py     |  450 +++
 sdks/python/apache_beam/io/fileio.py            |  747 +++++
 sdks/python/apache_beam/io/fileio_test.py       |  522 +++
 sdks/python/apache_beam/io/gcsio.py             |  602 ++++
 sdks/python/apache_beam/io/gcsio_test.py        |  503 +++
 sdks/python/apache_beam/io/iobase.py            | 1073 ++++++
 sdks/python/apache_beam/io/pubsub.py            |   73 +
 sdks/python/apache_beam/io/range_trackers.py    |  270 ++
 .../apache_beam/io/range_trackers_test.py       |  318 ++
 sdks/python/apache_beam/io/sources_test.py      |   65 +
 sdks/python/apache_beam/pipeline.py             |  435 +++
 sdks/python/apache_beam/pipeline_test.py        |  345 ++
 sdks/python/apache_beam/pvalue.py               |  459 +++
 sdks/python/apache_beam/pvalue_test.py          |   63 +
 sdks/python/apache_beam/python_sdk_releases.py  |   53 +
 sdks/python/apache_beam/runners/__init__.py     |   24 +
 sdks/python/apache_beam/runners/common.pxd      |   28 +
 sdks/python/apache_beam/runners/common.py       |  181 ++
 .../apache_beam/runners/dataflow_runner.py      |  639 ++++
 .../python/apache_beam/runners/direct_runner.py |  326 ++
 sdks/python/apache_beam/runners/runner.py       |  305 ++
 sdks/python/apache_beam/runners/runner_test.py  |   66 +
 sdks/python/apache_beam/transforms/__init__.py  |   23 +
 .../python/apache_beam/transforms/aggregator.py |  105 +
 .../apache_beam/transforms/aggregator_test.py   |   73 +
 sdks/python/apache_beam/transforms/combiners.py |  523 +++
 .../apache_beam/transforms/combiners_test.py    |  225 ++
 sdks/python/apache_beam/transforms/core.py      | 1292 ++++++++
 .../apache_beam/transforms/cy_combiners.pxd     |   89 +
 .../apache_beam/transforms/cy_combiners.py      |  250 ++
 .../python/apache_beam/transforms/ptransform.py |  703 ++++
 .../apache_beam/transforms/ptransform_test.py   | 1814 +++++++++++
 .../python/apache_beam/transforms/sideinputs.py |  145 +
 sdks/python/apache_beam/transforms/timeutil.py  |  310 ++
 .../apache_beam/transforms/timeutil_test.py     |  165 +
 sdks/python/apache_beam/transforms/trigger.py   |  958 ++++++
 .../apache_beam/transforms/trigger_test.py      |  566 ++++
 .../transforms/trigger_transcripts.yaml         |  207 ++
 sdks/python/apache_beam/transforms/util.py      |  227 ++
 sdks/python/apache_beam/transforms/window.py    |  383 +++
 .../apache_beam/transforms/window_test.py       |  201 ++
 .../transforms/write_ptransform_test.py         |  124 +
 sdks/python/apache_beam/typehints/__init__.py   |   19 +
 sdks/python/apache_beam/typehints/decorators.py |  530 +++
 sdks/python/apache_beam/typehints/opcodes.py    |  331 ++
 .../apache_beam/typehints/trivial_inference.py  |  415 +++
 .../typehints/trivial_inference_test.py         |  148 +
 sdks/python/apache_beam/typehints/typecheck.py  |  161 +
 .../typehints/typed_pipeline_test.py            |  248 ++
 sdks/python/apache_beam/typehints/typehints.py  | 1054 ++++++
 .../apache_beam/typehints/typehints_test.py     | 1053 ++++++
 sdks/python/apache_beam/utils/__init__.py       |   19 +
 sdks/python/apache_beam/utils/counters.pxd      |   27 +
 sdks/python/apache_beam/utils/counters.py       |  180 ++
 sdks/python/apache_beam/utils/dependency.py     |  439 +++
 .../python/apache_beam/utils/dependency_test.py |  394 +++
 sdks/python/apache_beam/utils/names.py          |   75 +
 sdks/python/apache_beam/utils/options.py        |  486 +++
 sdks/python/apache_beam/utils/path.py           |   44 +
 sdks/python/apache_beam/utils/path_test.py      |   67 +
 .../apache_beam/utils/pipeline_options_test.py  |  104 +
 .../utils/pipeline_options_validator.py         |  166 +
 .../utils/pipeline_options_validator_test.py    |  234 ++
 sdks/python/apache_beam/utils/processes.py      |   49 +
 sdks/python/apache_beam/utils/processes_test.py |  103 +
 sdks/python/apache_beam/utils/profiler.py       |   66 +
 sdks/python/apache_beam/utils/retry.py          |  194 ++
 sdks/python/apache_beam/utils/retry_test.py     |  165 +
 sdks/python/apache_beam/version.py              |   17 +
 sdks/python/google/cloud/dataflow/__init__.py   |   78 -
 .../google/cloud/dataflow/coders/__init__.py    |   16 -
 .../google/cloud/dataflow/coders/coder_impl.pxd |  109 -
 .../google/cloud/dataflow/coders/coder_impl.py  |  316 --
 .../google/cloud/dataflow/coders/coders.py      |  511 ---
 .../google/cloud/dataflow/coders/coders_test.py |   60 -
 .../cloud/dataflow/coders/coders_test_common.py |  180 --
 .../cloud/dataflow/coders/fast_coders_test.py   |   34 -
 .../google/cloud/dataflow/coders/observable.py  |   33 -
 .../cloud/dataflow/coders/observable_test.py    |   54 -
 .../cloud/dataflow/coders/slow_coders_test.py   |   36 -
 .../google/cloud/dataflow/coders/slow_stream.py |  136 -
 .../google/cloud/dataflow/coders/stream.pxd     |   58 -
 .../google/cloud/dataflow/coders/stream.pyx     |  201 --
 .../google/cloud/dataflow/coders/stream_test.py |  168 -
 .../google/cloud/dataflow/coders/typecoders.py  |  154 -
 .../cloud/dataflow/coders/typecoders_test.py    |  114 -
 .../google/cloud/dataflow/dataflow_test.py      |  405 ---
 sdks/python/google/cloud/dataflow/error.py      |   39 -
 .../google/cloud/dataflow/examples/__init__.py  |    0
 .../dataflow/examples/complete/autocomplete.py  |   79 -
 .../examples/complete/autocomplete_test.py      |   78 -
 .../dataflow/examples/complete/estimate_pi.py   |  109 -
 .../examples/complete/estimate_pi_test.py       |   46 -
 .../complete/juliaset/juliaset/__init__.py      |    0
 .../complete/juliaset/juliaset/juliaset.py      |  119 -
 .../complete/juliaset/juliaset/juliaset_test.py |   83 -
 .../examples/complete/juliaset/juliaset_main.py |   55 -
 .../examples/complete/juliaset/setup.py         |  115 -
 .../cloud/dataflow/examples/complete/tfidf.py   |  196 --
 .../dataflow/examples/complete/tfidf_test.py    |   88 -
 .../examples/complete/top_wikipedia_sessions.py |  170 -
 .../complete/top_wikipedia_sessions_test.py     |   58 -
 .../examples/cookbook/bigquery_schema.py        |  127 -
 .../examples/cookbook/bigquery_side_input.py    |  114 -
 .../cookbook/bigquery_side_input_test.py        |   59 -
 .../examples/cookbook/bigquery_tornadoes.py     |   96 -
 .../cookbook/bigquery_tornadoes_test.py         |   41 -
 .../dataflow/examples/cookbook/bigshuffle.py    |   84 -
 .../examples/cookbook/bigshuffle_test.py        |   61 -
 .../cloud/dataflow/examples/cookbook/coders.py  |   92 -
 .../dataflow/examples/cookbook/coders_test.py   |   56 -
 .../examples/cookbook/combiners_test.py         |   73 -
 .../examples/cookbook/custom_ptransform.py      |  132 -
 .../examples/cookbook/custom_ptransform_test.py |   64 -
 .../cloud/dataflow/examples/cookbook/filters.py |  104 -
 .../dataflow/examples/cookbook/filters_test.py  |   65 -
 .../examples/cookbook/group_with_coder.py       |  111 -
 .../examples/cookbook/group_with_coder_test.py  |   87 -
 .../dataflow/examples/cookbook/mergecontacts.py |  126 -
 .../examples/cookbook/mergecontacts_test.py     |  121 -
 .../examples/cookbook/multiple_output_pardo.py  |  171 -
 .../cookbook/multiple_output_pardo_test.py      |   69 -
 .../dataflow/examples/snippets/snippets.py      |  872 -----
 .../dataflow/examples/snippets/snippets_test.py |  560 ----
 .../dataflow/examples/streaming_wordcap.py      |   61 -
 .../dataflow/examples/streaming_wordcount.py    |   71 -
 .../google/cloud/dataflow/examples/wordcount.py |   99 -
 .../dataflow/examples/wordcount_debugging.py    |  154 -
 .../examples/wordcount_debugging_test.py        |   56 -
 .../dataflow/examples/wordcount_minimal.py      |  111 -
 .../dataflow/examples/wordcount_minimal_test.py |   56 -
 .../cloud/dataflow/examples/wordcount_test.py   |   55 -
 .../google/cloud/dataflow/internal/__init__.py  |    0
 .../google/cloud/dataflow/internal/apiclient.py |  935 ------
 .../cloud/dataflow/internal/apiclient_test.py   |  110 -
 .../google/cloud/dataflow/internal/auth.py      |  161 -
 .../cloud/dataflow/internal/clients/__init__.py |    0
 .../internal/clients/bigquery/__init__.py       |   10 -
 .../clients/bigquery/bigquery_v2_client.py      |  642 ----
 .../clients/bigquery/bigquery_v2_messages.py    | 1893 -----------
 .../internal/clients/dataflow/__init__.py       |   10 -
 .../clients/dataflow/dataflow_v1b3_client.py    |  316 --
 .../clients/dataflow/dataflow_v1b3_messages.py  | 3056 ------------------
 .../internal/clients/storage/__init__.py        |   10 -
 .../clients/storage/storage_v1_client.py        | 1021 ------
 .../clients/storage/storage_v1_messages.py      | 1903 -----------
 .../cloud/dataflow/internal/json_value.py       |  127 -
 .../cloud/dataflow/internal/json_value_test.py  |   63 -
 .../cloud/dataflow/internal/module_test.py      |   59 -
 .../google/cloud/dataflow/internal/pickler.py   |  205 --
 .../cloud/dataflow/internal/pickler_test.py     |   78 -
 .../google/cloud/dataflow/internal/util.py      |   90 -
 .../google/cloud/dataflow/internal/util_test.py |   58 -
 .../cloud/dataflow/internal/windmill_pb2.py     | 2275 -------------
 .../dataflow/internal/windmill_service_pb2.py   |  161 -
 .../python/google/cloud/dataflow/io/__init__.py |   25 -
 .../python/google/cloud/dataflow/io/bigquery.py |  826 -----
 .../google/cloud/dataflow/io/bigquery_test.py   |  450 ---
 sdks/python/google/cloud/dataflow/io/fileio.py  |  747 -----
 .../google/cloud/dataflow/io/fileio_test.py     |  522 ---
 sdks/python/google/cloud/dataflow/io/gcsio.py   |  602 ----
 .../google/cloud/dataflow/io/gcsio_test.py      |  503 ---
 sdks/python/google/cloud/dataflow/io/iobase.py  | 1073 ------
 sdks/python/google/cloud/dataflow/io/pubsub.py  |   73 -
 .../google/cloud/dataflow/io/range_trackers.py  |  270 --
 .../cloud/dataflow/io/range_trackers_test.py    |  318 --
 .../google/cloud/dataflow/io/sources_test.py    |   65 -
 sdks/python/google/cloud/dataflow/pipeline.py   |  435 ---
 .../google/cloud/dataflow/pipeline_test.py      |  345 --
 sdks/python/google/cloud/dataflow/pvalue.py     |  459 ---
 .../python/google/cloud/dataflow/pvalue_test.py |   63 -
 .../cloud/dataflow/python_sdk_releases.py       |   53 -
 .../google/cloud/dataflow/runners/__init__.py   |   24 -
 .../google/cloud/dataflow/runners/common.pxd    |   28 -
 .../google/cloud/dataflow/runners/common.py     |  181 --
 .../cloud/dataflow/runners/dataflow_runner.py   |  639 ----
 .../cloud/dataflow/runners/direct_runner.py     |  326 --
 .../google/cloud/dataflow/runners/runner.py     |  305 --
 .../cloud/dataflow/runners/runner_test.py       |   66 -
 .../cloud/dataflow/transforms/__init__.py       |   23 -
 .../cloud/dataflow/transforms/aggregator.py     |  105 -
 .../dataflow/transforms/aggregator_test.py      |   73 -
 .../cloud/dataflow/transforms/combiners.py      |  523 ---
 .../cloud/dataflow/transforms/combiners_test.py |  225 --
 .../google/cloud/dataflow/transforms/core.py    | 1292 --------
 .../cloud/dataflow/transforms/cy_combiners.pxd  |   89 -
 .../cloud/dataflow/transforms/cy_combiners.py   |  250 --
 .../cloud/dataflow/transforms/ptransform.py     |  703 ----
 .../dataflow/transforms/ptransform_test.py      | 1814 -----------
 .../cloud/dataflow/transforms/sideinputs.py     |  145 -
 .../cloud/dataflow/transforms/timeutil.py       |  310 --
 .../cloud/dataflow/transforms/timeutil_test.py  |  165 -
 .../google/cloud/dataflow/transforms/trigger.py |  958 ------
 .../cloud/dataflow/transforms/trigger_test.py   |  566 ----
 .../transforms/trigger_transcripts.yaml         |  207 --
 .../google/cloud/dataflow/transforms/util.py    |  227 --
 .../google/cloud/dataflow/transforms/window.py  |  383 ---
 .../cloud/dataflow/transforms/window_test.py    |  201 --
 .../transforms/write_ptransform_test.py         |  124 -
 .../google/cloud/dataflow/typehints/__init__.py |   19 -
 .../cloud/dataflow/typehints/decorators.py      |  530 ---
 .../google/cloud/dataflow/typehints/opcodes.py  |  331 --
 .../dataflow/typehints/trivial_inference.py     |  415 ---
 .../typehints/trivial_inference_test.py         |  148 -
 .../cloud/dataflow/typehints/typecheck.py       |  161 -
 .../dataflow/typehints/typed_pipeline_test.py   |  248 --
 .../cloud/dataflow/typehints/typehints.py       | 1054 ------
 .../cloud/dataflow/typehints/typehints_test.py  | 1053 ------
 .../google/cloud/dataflow/utils/__init__.py     |   19 -
 .../google/cloud/dataflow/utils/counters.pxd    |   27 -
 .../google/cloud/dataflow/utils/counters.py     |  180 --
 .../google/cloud/dataflow/utils/dependency.py   |  439 ---
 .../cloud/dataflow/utils/dependency_test.py     |  394 ---
 .../python/google/cloud/dataflow/utils/names.py |   75 -
 .../google/cloud/dataflow/utils/options.py      |  486 ---
 sdks/python/google/cloud/dataflow/utils/path.py |   44 -
 .../google/cloud/dataflow/utils/path_test.py    |   67 -
 .../dataflow/utils/pipeline_options_test.py     |  104 -
 .../utils/pipeline_options_validator.py         |  166 -
 .../utils/pipeline_options_validator_test.py    |  234 --
 .../google/cloud/dataflow/utils/processes.py    |   49 -
 .../cloud/dataflow/utils/processes_test.py      |  103 -
 .../google/cloud/dataflow/utils/profiler.py     |   66 -
 .../python/google/cloud/dataflow/utils/retry.py |  194 --
 .../google/cloud/dataflow/utils/retry_test.py   |  165 -
 sdks/python/google/cloud/dataflow/version.py    |   17 -
 314 files changed, 44598 insertions(+), 44598 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py
new file mode 100644
index 0000000..af28d3a
--- /dev/null
+++ b/sdks/python/apache_beam/__init__.py
@@ -0,0 +1,78 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google Cloud Dataflow SDK for Python.
+
+Google Cloud Dataflow <http://cloud.google.com/dataflow/>
+provides a simple, powerful programming model for building both batch
+and streaming parallel data processing pipelines.
+
+The Dataflow SDK for Python provides access to Dataflow capabilities
+from the Python programming language.
+
+Status
+------
+The SDK is still early in its development, and significant changes
+should be expected before the first stable version.
+
+Overview
+--------
+The key concepts in this programming model are
+
+* PCollection:  represents a collection of data, which could be
+  bounded or unbounded in size.
+* PTransform:  represents a computation that transforms input
+  PCollections into output PCollections.
+* Pipeline:  manages a directed acyclic graph of PTransforms and
+  PCollections that is ready for execution.
+* Runner:  specifies where and how the Pipeline should execute.
+* Reading and Writing Data:  your pipeline can read from an external
+  source and write to an external data sink.
+
+Typical usage
+-------------
+At the top of your source file::
+
+    import google.cloud.dataflow as df
+
+After this import statement
+
+* transform classes are available as df.FlatMap, df.GroupByKey, etc.
+* Pipeline class is available as df.Pipeline
+* text source/sink classes are available as df.io.TextFileSource,
+  df.io.TextFileSink
+
+Examples
+--------
+The examples subdirectory has some examples.
+
+"""
+
+
+import sys
+
+
+if sys.version_info.major != 2:
+  raise RuntimeError(
+      'Dataflow SDK for Python is supported only on Python 2.7. '
+      'It is not supported on Python [%s].' % sys.version)
+
+
+import google.cloud.dataflow.internal.pickler
+
+from google.cloud.dataflow import coders
+from google.cloud.dataflow import io
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.pipeline import Pipeline
+from google.cloud.dataflow.transforms import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/__init__.py b/sdks/python/apache_beam/coders/__init__.py
new file mode 100644
index 0000000..610a6ef
--- /dev/null
+++ b/sdks/python/apache_beam/coders/__init__.py
@@ -0,0 +1,16 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from google.cloud.dataflow.coders.coders import *
+from google.cloud.dataflow.coders.typecoders import registry

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd
new file mode 100644
index 0000000..663d37d
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -0,0 +1,109 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# cython: profile=True
+
+cimport cython
+
+cimport cpython.ref
+cimport cpython.tuple
+cimport libc.stdint
+cimport libc.stdlib
+cimport libc.string
+
+from .stream cimport InputStream, OutputStream
+
+
+cdef object loads, dumps, create_InputStream, create_OutputStream
+cdef type WindowedValue
+
+
+cdef class CoderImpl(object):
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+  cpdef decode_from_stream(self, InputStream stream, bint nested)
+  cpdef bytes encode(self, value)
+  cpdef decode(self, bytes encoded)
+
+
+cdef class SimpleCoderImpl(CoderImpl):
+  pass
+
+
+cdef class StreamCoderImpl(CoderImpl):
+  pass
+
+
+cdef class CallbackCoderImpl(CoderImpl):
+  cdef object _encoder
+  cdef object _decoder
+
+
+cdef class DeterministicPickleCoderImpl(CoderImpl):
+  cdef CoderImpl _pickle_coder
+  cdef object _step_label
+  cdef bint _check_safe(self, value) except -1
+
+
+cdef class BytesCoderImpl(CoderImpl):
+  pass
+
+
+cdef class FloatCoderImpl(StreamCoderImpl):
+  pass
+
+
+cdef class TimestampCoderImpl(StreamCoderImpl):
+  cdef object timestamp_class
+
+
+cdef list small_ints
+cdef class VarIntCoderImpl(StreamCoderImpl):
+  @cython.locals(ivalue=libc.stdint.int64_t)
+  cpdef bytes encode(self, value)
+
+
+cdef class SingletonCoderImpl(CoderImpl):
+  cdef object _value
+
+
+cdef class AbstractComponentCoderImpl(StreamCoderImpl):
+  cdef tuple _coder_impls
+
+  cpdef _extract_components(self, value)
+  cpdef _construct_from_components(self, components)
+
+  @cython.locals(c=CoderImpl)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+  @cython.locals(c=CoderImpl)
+  cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+
+cdef class TupleCoderImpl(AbstractComponentCoderImpl):
+  pass
+
+
+cdef class SequenceCoderImpl(StreamCoderImpl):
+  cdef CoderImpl _elem_coder
+  cpdef _construct_from_sequence(self, values)
+
+
+cdef class TupleSequenceCoderImpl(SequenceCoderImpl):
+  pass
+
+
+cdef class WindowedValueCoderImpl(StreamCoderImpl):
+  """A coder for windowed values."""
+  cdef CoderImpl _value_coder
+  cdef CoderImpl _timestamp_coder
+  cdef CoderImpl _windows_coder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
new file mode 100644
index 0000000..0ce4354
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -0,0 +1,316 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Coder implementations.
+
+The actual encode/decode implementations are split off from coders to
+allow conditional (compiled/pure) implementations, which can be used to
+encode many elements with minimal overhead.
+
+This module may be optionally compiled with Cython, using the corresponding
+coder_impl.pxd file for type hints.
+"""
+
+import collections
+from cPickle import loads, dumps
+
+
+# pylint: disable=g-import-not-at-top
+try:
+  # Don't depend on the full dataflow sdk to test coders.
+  from google.cloud.dataflow.transforms.window import WindowedValue
+except ImportError:
+  WindowedValue = collections.namedtuple(
+      'WindowedValue', ('value', 'timestamp', 'windows'))
+
+try:
+  from stream import InputStream as create_InputStream
+  from stream import OutputStream as create_OutputStream
+except ImportError:
+  from slow_stream import InputStream as create_InputStream
+  from slow_stream import OutputStream as create_OutputStream
+# pylint: enable=g-import-not-at-top
+
+
+class CoderImpl(object):
+
+  def encode_to_stream(self, value, stream, nested):
+    """Reads object from potentially-nested encoding in stream."""
+    raise NotImplementedError
+
+  def decode_from_stream(self, stream, nested):
+    """Reads object from potentially-nested encoding in stream."""
+    raise NotImplementedError
+
+  def encode(self, value):
+    """Encodes an object to an unnested string."""
+    raise NotImplementedError
+
+  def decode(self, encoded):
+    """Encodes an object to an unnested string."""
+    raise NotImplementedError
+
+
+class SimpleCoderImpl(CoderImpl):
+  """Subclass of CoderImpl implementing stream methods using encode/decode."""
+
+  def encode_to_stream(self, value, stream, nested):
+    """Reads object from potentially-nested encoding in stream."""
+    stream.write(self.encode(value), nested)
+
+  def decode_from_stream(self, stream, nested):
+    """Reads object from potentially-nested encoding in stream."""
+    return self.decode(stream.read_all(nested))
+
+
+class StreamCoderImpl(CoderImpl):
+  """Subclass of CoderImpl implementing encode/decode using stream methods."""
+
+  def encode(self, value):
+    out = create_OutputStream()
+    self.encode_to_stream(value, out, False)
+    return out.get()
+
+  def decode(self, encoded):
+    return self.decode_from_stream(create_InputStream(encoded), False)
+
+
+class CallbackCoderImpl(CoderImpl):
+  """A CoderImpl that calls back to the _impl methods on the Coder itself.
+
+  This is the default implementation used if Coder._get_impl()
+  is not overwritten.
+  """
+
+  def __init__(self, encoder, decoder):
+    self._encoder = encoder
+    self._decoder = decoder
+
+  def encode_to_stream(self, value, stream, nested):
+    return stream.write(self._encoder(value), nested)
+
+  def decode_from_stream(self, stream, nested):
+    return self._decoder(stream.read_all(nested))
+
+  def encode(self, value):
+    return self._encoder(value)
+
+  def decode(self, encoded):
+    return self._decoder(encoded)
+
+
+class DeterministicPickleCoderImpl(CoderImpl):
+
+  def __init__(self, pickle_coder, step_label):
+    self._pickle_coder = pickle_coder
+    self._step_label = step_label
+
+  def _check_safe(self, value):
+    if isinstance(value, (str, unicode, long, int, float)):
+      pass
+    elif value is None:
+      pass
+    elif isinstance(value, (tuple, list)):
+      for x in value:
+        self._check_safe(x)
+    else:
+      raise TypeError(
+          "Unable to deterministically code '%s' of type '%s', "
+          "please provide a type hint for the input of '%s'" % (
+              value, type(value), self._step_label))
+
+  def encode_to_stream(self, value, stream, nested):
+    self._check_safe(value)
+    return self._pickle_coder.encode_to_stream(value, stream, nested)
+
+  def decode_from_stream(self, stream, nested):
+    return self._pickle_coder.decode_from_stream(stream, nested)
+
+  def encode(self, value):
+    self._check_safe(value)
+    return self._pickle_coder.encode(value)
+
+  def decode(self, encoded):
+    return self._pickle_coder.decode(encoded)
+
+
+class BytesCoderImpl(CoderImpl):
+  """A coder for bytes/str objects."""
+
+  def encode_to_stream(self, value, out, nested):
+    out.write(value, nested)
+
+  def decode_from_stream(self, in_stream, nested):
+    return in_stream.read_all(nested)
+
+  def encode(self, value):
+    assert isinstance(value, bytes), (value, type(value))
+    return value
+
+  def decode(self, encoded):
+    return encoded
+
+
+class FloatCoderImpl(StreamCoderImpl):
+
+  def encode_to_stream(self, value, out, nested):
+    out.write_bigendian_double(value)
+
+  def decode_from_stream(self, in_stream, nested):
+    return in_stream.read_bigendian_double()
+
+
+class TimestampCoderImpl(StreamCoderImpl):
+
+  def __init__(self, timestamp_class):
+    self.timestamp_class = timestamp_class
+
+  def encode_to_stream(self, value, out, nested):
+    out.write_bigendian_int64(value.micros)
+
+  def decode_from_stream(self, in_stream, nested):
+    return self.timestamp_class(micros=in_stream.read_bigendian_int64())
+
+
+small_ints = [chr(_) for _ in range(128)]
+
+
+class VarIntCoderImpl(StreamCoderImpl):
+  """A coder for long/int objects."""
+
+  def encode_to_stream(self, value, out, nested):
+    out.write_var_int64(value)
+
+  def decode_from_stream(self, in_stream, nested):
+    return in_stream.read_var_int64()
+
+  def encode(self, value):
+    ivalue = value  # type cast
+    if 0 <= ivalue < len(small_ints):
+      return small_ints[ivalue]
+    else:
+      return StreamCoderImpl.encode(self, value)
+
+  def decode(self, encoded):
+    if len(encoded) == 1:
+      i = ord(encoded)
+      if 0 <= i < 128:
+        return i
+    return StreamCoderImpl.decode(self, encoded)
+
+
+class SingletonCoderImpl(CoderImpl):
+  """A coder that always encodes exactly one value."""
+
+  def __init__(self, value):
+    self._value = value
+
+  def encode_to_stream(self, value, stream, nested):
+    pass
+
+  def decode_from_stream(self, stream, nested):
+    return self._value
+
+  def encode(self, value):
+    b = ''  # avoid byte vs str vs unicode error
+    return b
+
+  def decode(self, encoded):
+    return self._value
+
+
+class AbstractComponentCoderImpl(StreamCoderImpl):
+
+  def __init__(self, coder_impls):
+    for c in coder_impls:
+      assert isinstance(c, CoderImpl), c
+    self._coder_impls = tuple(coder_impls)
+
+  def _extract_components(self, value):
+    raise NotImplementedError
+
+  def _construct_from_components(self, components):
+    raise NotImplementedError
+
+  def encode_to_stream(self, value, out, nested):
+    values = self._extract_components(value)
+    if len(self._coder_impls) != len(values):
+      raise ValueError(
+          'Number of components does not match number of coders.')
+    for i in range(0, len(self._coder_impls)):
+      c = self._coder_impls[i]   # type cast
+      c.encode_to_stream(values[i], out, True)
+
+  def decode_from_stream(self, in_stream, nested):
+    return self._construct_from_components(
+        [c.decode_from_stream(in_stream, True) for c in self._coder_impls])
+
+
+class TupleCoderImpl(AbstractComponentCoderImpl):
+  """A coder for tuple objects."""
+
+  def _extract_components(self, value):
+    return value
+
+  def _construct_from_components(self, components):
+    return tuple(components)
+
+
+class SequenceCoderImpl(StreamCoderImpl):
+  """A coder for sequences of known length."""
+
+  def __init__(self, elem_coder):
+    self._elem_coder = elem_coder
+
+  def _construct_from_sequence(self, values):
+    raise NotImplementedError
+
+  def encode_to_stream(self, value, out, nested):
+    # Compatible with Java's IterableLikeCoder.
+    out.write_bigendian_int32(len(value))
+    for elem in value:
+      self._elem_coder.encode_to_stream(elem, out, True)
+
+  def decode_from_stream(self, in_stream, nested):
+    size = in_stream.read_bigendian_int32()
+    return self._construct_from_sequence(
+        [self._elem_coder.decode_from_stream(in_stream, True)
+         for _ in range(size)])
+
+
+class TupleSequenceCoderImpl(SequenceCoderImpl):
+  """A coder for homogeneous tuple objects."""
+
+  def _construct_from_sequence(self, components):
+    return tuple(components)
+
+
+class WindowedValueCoderImpl(StreamCoderImpl):
+  """A coder for windowed values."""
+
+  def __init__(self, value_coder, timestamp_coder, window_coder):
+    self._value_coder = value_coder
+    self._timestamp_coder = timestamp_coder
+    self._windows_coder = TupleSequenceCoderImpl(window_coder)
+
+  def encode_to_stream(self, value, out, nested):
+    self._value_coder.encode_to_stream(value.value, out, True)
+    self._timestamp_coder.encode_to_stream(value.timestamp, out, True)
+    self._windows_coder.encode_to_stream(value.windows, out, True)
+
+  def decode_from_stream(self, in_stream, nested):
+    return WindowedValue(
+        self._value_coder.decode_from_stream(in_stream, True),
+        self._timestamp_coder.decode_from_stream(in_stream, True),
+        self._windows_coder.decode_from_stream(in_stream, True))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
new file mode 100644
index 0000000..16edff0
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -0,0 +1,511 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Collection of useful coders."""
+
+import base64
+import collections
+import cPickle as pickle
+
+from google.cloud.dataflow.coders import coder_impl
+
+
+# pylint: disable=g-import-not-at-top
+# Avoid dependencies on the full SDK.
+try:
+  # Import dill from the pickler module to make sure our monkey-patching of dill
+  # occurs.
+  from google.cloud.dataflow.internal.pickler import dill
+  from google.cloud.dataflow.transforms.timeutil import Timestamp
+except ImportError:
+  # We fall back to using the stock dill library in tests that don't use the
+  # full Python SDK.
+  import dill
+  Timestamp = collections.namedtuple('Timestamp', 'micros')
+
+
+def serialize_coder(coder):
+  from google.cloud.dataflow.internal import pickler
+  return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder))
+
+
+def deserialize_coder(serialized):
+  from google.cloud.dataflow.internal import pickler
+  return pickler.loads(serialized.split('$', 1)[1])
+# pylint: enable=g-import-not-at-top
+
+
+class Coder(object):
+  """Base class for coders."""
+
+  def encode(self, value):
+    """Encodes the given object into a byte string."""
+    raise NotImplementedError('Encode not implemented: %s.' % self)
+
+  def decode(self, encoded):
+    """Decodes the given byte string into the corresponding object."""
+    raise NotImplementedError('Decode not implemented: %s.' % self)
+
+  def is_deterministic(self):
+    """Whether this coder is guaranteed to encode values deterministically.
+
+    A deterministic coder is required for key coders in GroupByKey operations
+    to produce consistent results.
+
+    For example, note that the default coder, the PickleCoder, is not
+    deterministic: the ordering of picked entries in maps may vary across
+    executions since there is no defined order, and such a coder is not in
+    general suitable for usage as a key coder in GroupByKey operations, since
+    each instance of the same key may be encoded differently.
+
+    Returns:
+      Whether coder is deterministic.
+    """
+    return False
+
+  # ===========================================================================
+  # Methods below are internal SDK details that don't need to be modified for
+  # user-defined coders.
+  # ===========================================================================
+
+  def _create_impl(self):
+    """Creates a CoderImpl to do the actual encoding and decoding.
+    """
+    return coder_impl.CallbackCoderImpl(self.encode, self.decode)
+
+  def get_impl(self):
+    if not hasattr(self, '_impl'):
+      self._impl = self._create_impl()
+      assert isinstance(self._impl, coder_impl.CoderImpl)
+    return self._impl
+
+  def __getstate__(self):
+    return self._dict_without_impl()
+
+  def _dict_without_impl(self):
+    if hasattr(self, '_impl'):
+      d = dict(self.__dict__)
+      del d['_impl']
+      return d
+    else:
+      return self.__dict__
+
+  @classmethod
+  def from_type_hint(cls, unused_typehint, unused_registry):
+    # If not overridden, just construct the coder without arguments.
+    return cls()
+
+  def is_kv_coder(self):
+    return False
+
+  def key_coder(self):
+    if self.is_kv_coder():
+      raise NotImplementedError('key_coder: %s' % self)
+    else:
+      raise ValueError('Not a KV coder: %s.' % self)
+
+  def value_coder(self):
+    if self.is_kv_coder():
+      raise NotImplementedError('value_coder: %s' % self)
+    else:
+      raise ValueError('Not a KV coder: %s.' % self)
+
+  def _get_component_coders(self):
+    """Returns the internal component coders of this coder."""
+    # This is an internal detail of the Coder API and does not need to be
+    # refined in user-defined Coders.
+    return []
+
+  def as_cloud_object(self):
+    """Returns Google Cloud Dataflow API description of this coder."""
+    # This is an internal detail of the Coder API and does not need to be
+    # refined in user-defined Coders.
+
+    value = {
+        # We pass coders in the form "<coder_name>$<pickled_data>" to make the
+        # job description JSON more readable.  Data before the $ is ignored by
+        # the worker.
+        '@type': serialize_coder(self),
+        'component_encodings': list(
+            component.as_cloud_object()
+            for component in self._get_component_coders())
+    }
+    return value
+
+  def __repr__(self):
+    return self.__class__.__name__
+
+  def __eq__(self, other):
+    # pylint: disable=protected-access
+    return (self.__class__ == other.__class__
+       and self._dict_without_impl() == other._dict_without_impl())
+    # pylint: enable=protected-access
+
+
+class StrUtf8Coder(Coder):
+  """A coder used for reading and writing strings as UTF-8."""
+
+  def encode(self, value):
+    return value.encode('utf-8')
+
+  def decode(self, value):
+    return value.decode('utf-8')
+
+  def is_deterministic(self):
+    return True
+
+
+class ToStringCoder(Coder):
+  """A default string coder used if no sink coder is specified."""
+
+  def encode(self, value):
+    if isinstance(value, unicode):
+      return value.encode('utf-8')
+    elif isinstance(value, str):
+      return value
+    else:
+      return str(value)
+
+  def decode(self, _):
+    raise NotImplementedError('ToStringCoder cannot be used for decoding.')
+
+  def is_deterministic(self):
+    return True
+
+
+class FastCoder(Coder):
+  """Coder subclass used when a (faster) CoderImpl is supplied directly.
+
+  The Coder class defines _create_impl in terms of encode() and decode();
+  this class inverts that defining encode() and decode() in terms of
+  _create_impl().
+  """
+
+  def encode(self, value):
+    """Encodes the given object into a byte string."""
+    return self.get_impl().encode(value)
+
+  def decode(self, encoded):
+    """Decodes the given byte string into the corresponding object."""
+    return self.get_impl().decode(encoded)
+
+  def _create_impl(self):
+    raise NotImplementedError
+
+
+class BytesCoder(FastCoder):
+  """Byte string coder."""
+
+  def _create_impl(self):
+    return coder_impl.BytesCoderImpl()
+
+  def is_deterministic(self):
+    return True
+
+
+class VarIntCoder(FastCoder):
+  """Variable-length integer coder."""
+
+  def _create_impl(self):
+    return coder_impl.VarIntCoderImpl()
+
+  def is_deterministic(self):
+    return True
+
+
+class FloatCoder(FastCoder):
+  """A coder used for floating-point values."""
+
+  def _create_impl(self):
+    return coder_impl.FloatCoderImpl()
+
+  def is_deterministic(self):
+    return True
+
+
+class TimestampCoder(FastCoder):
+  """A coder used for timeutil.Timestamp values."""
+
+  def _create_impl(self):
+    return coder_impl.TimestampCoderImpl(Timestamp)
+
+  def is_deterministic(self):
+    return True
+
+
+class SingletonCoder(FastCoder):
+  """A coder that always encodes exactly one value."""
+
+  def __init__(self, value):
+    self._value = value
+
+  def _create_impl(self):
+    return coder_impl.SingletonCoderImpl(self._value)
+
+  def is_deterministic(self):
+    return True
+
+
+def maybe_dill_dumps(o):
+  """Pickle using cPickle or the Dill pickler as a fallback."""
+  # We need to use the dill pickler for objects of certain custom classes,
+  # including, for example, ones that contain lambdas.
+  try:
+    return pickle.dumps(o)
+  except Exception:  # pylint: disable=broad-except
+    return dill.dumps(o)
+
+
+def maybe_dill_loads(o):
+  """Unpickle using cPickle or the Dill pickler as a fallback."""
+  try:
+    return pickle.loads(o)
+  except Exception:  # pylint: disable=broad-except
+    return dill.loads(o)
+
+
+class _PickleCoderBase(FastCoder):
+  """Base class for pickling coders."""
+
+  def is_deterministic(self):
+    # Note that the default coder, the PickleCoder, is not deterministic (for
+    # example, the ordering of picked entries in maps may vary across
+    # executions), and so is not in general suitable for usage as a key coder in
+    # GroupByKey operations.
+    return False
+
+  def as_cloud_object(self, is_pair_like=True):
+    value = super(_PickleCoderBase, self).as_cloud_object()
+    # We currently use this coder in places where we cannot infer the coder to
+    # use for the value type in a more granular way.  In places where the
+    # service expects a pair, it checks for the "is_pair_like" key, in which
+    # case we would fail without the hack below.
+    if is_pair_like:
+      value['is_pair_like'] = True
+      value['component_encodings'] = [
+          self.as_cloud_object(is_pair_like=False),
+          self.as_cloud_object(is_pair_like=False)
+      ]
+
+    return value
+
+  # We allow .key_coder() and .value_coder() to be called on PickleCoder since
+  # we can't always infer the return values of lambdas in ParDo operations, the
+  # result of which may be used in a GroupBykey.
+  def is_kv_coder(self):
+    return True
+
+  def key_coder(self):
+    return self
+
+  def value_coder(self):
+    return self
+
+
+class PickleCoder(_PickleCoderBase):
+  """Coder using Python's pickle functionality."""
+
+  def _create_impl(self):
+    return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
+
+
+class DillCoder(_PickleCoderBase):
+  """Coder using dill's pickle functionality."""
+
+  def _create_impl(self):
+    return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads)
+
+
+class DeterministicPickleCoder(FastCoder):
+  """Throws runtime errors when pickling non-deterministic values."""
+
+  def __init__(self, pickle_coder, step_label):
+    self._pickle_coder = pickle_coder
+    self._step_label = step_label
+
+  def _create_impl(self):
+    return coder_impl.DeterministicPickleCoderImpl(
+        self._pickle_coder.get_impl(), self._step_label)
+
+  def is_deterministic(self):
+    return True
+
+  def is_kv_coder(self):
+    return True
+
+  def key_coder(self):
+    return self
+
+  def value_coder(self):
+    return self
+
+
+class Base64PickleCoder(Coder):
+  """Coder of objects by Python pickle, then base64 encoding."""
+  # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather
+  # than via a special Coder.
+
+  def encode(self, value):
+    return base64.b64encode(pickle.dumps(value))
+
+  def decode(self, encoded):
+    return pickle.loads(base64.b64decode(encoded))
+
+  def is_deterministic(self):
+    # Note that the Base64PickleCoder is not deterministic.  See the
+    # corresponding comments for PickleCoder above.
+    return False
+
+  # We allow .key_coder() and .value_coder() to be called on Base64PickleCoder
+  # since we can't always infer the return values of lambdas in ParDo
+  # operations, the result of which may be used in a GroupBykey.
+  #
+  # TODO(ccy): this is currently only used for KV values from Create transforms.
+  # Investigate a way to unify this with PickleCoder.
+  def is_kv_coder(self):
+    return True
+
+  def key_coder(self):
+    return self
+
+  def value_coder(self):
+    return self
+
+
+class TupleCoder(FastCoder):
+  """Coder of tuple objects."""
+
+  def __init__(self, components):
+    self._coders = tuple(components)
+
+  def _create_impl(self):
+    return coder_impl.TupleCoderImpl([c.get_impl() for c in self._coders])
+
+  def is_deterministic(self):
+    return all(c.is_deterministic() for c in self._coders)
+
+  @staticmethod
+  def from_type_hint(typehint, registry):
+    return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
+
+  def as_cloud_object(self):
+    value = super(TupleCoder, self).as_cloud_object()
+    value['is_pair_like'] = True
+    return value
+
+  def _get_component_coders(self):
+    return self.coders()
+
+  def coders(self):
+    return self._coders
+
+  def is_kv_coder(self):
+    return len(self._coders) == 2
+
+  def key_coder(self):
+    if len(self._coders) != 2:
+      raise ValueError('TupleCoder does not have exactly 2 components.')
+    return self._coders[0]
+
+  def value_coder(self):
+    if len(self._coders) != 2:
+      raise ValueError('TupleCoder does not have exactly 2 components.')
+    return self._coders[1]
+
+  def __repr__(self):
+    return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
+
+
+class TupleSequenceCoder(FastCoder):
+  """Coder of homogeneous tuple objects."""
+
+  def __init__(self, elem_coder):
+    self._elem_coder = elem_coder
+
+  def _create_impl(self):
+    return coder_impl.TupleSequenceCoderImpl(self._elem_coder.get_impl())
+
+  def is_deterministic(self):
+    return self._elem_coder.is_deterministic()
+
+  @staticmethod
+  def from_type_hint(typehint, registry):
+    return TupleSequenceCoder(registry.get_coder(typehint.inner_type))
+
+  def _get_component_coders(self):
+    return (self._elem_coder,)
+
+  def __repr__(self):
+    return 'TupleSequenceCoder[%r]' % self._elem_coder
+
+
+class WindowCoder(PickleCoder):
+  """Coder for windows in windowed values."""
+
+  def _create_impl(self):
+    return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads)
+
+  def is_deterministic(self):
+    # Note that WindowCoder as implemented is not deterministic because the
+    # implementation simply pickles windows.  See the corresponding comments
+    # on PickleCoder for more details.
+    return False
+
+  def as_cloud_object(self):
+    return super(WindowCoder, self).as_cloud_object(is_pair_like=False)
+
+
+class WindowedValueCoder(FastCoder):
+  """Coder for windowed values."""
+
+  def __init__(self, wrapped_value_coder, timestamp_coder=None,
+               window_coder=None):
+    if not timestamp_coder:
+      timestamp_coder = TimestampCoder()
+    if not window_coder:
+      window_coder = PickleCoder()
+    self.wrapped_value_coder = wrapped_value_coder
+    self.timestamp_coder = timestamp_coder
+    self.window_coder = window_coder
+
+  def _create_impl(self):
+    return coder_impl.WindowedValueCoderImpl(
+        self.wrapped_value_coder.get_impl(),
+        self.timestamp_coder.get_impl(),
+        self.window_coder.get_impl())
+
+  def is_deterministic(self):
+    return all(c.is_deterministic() for c in [self.wrapped_value_coder,
+                                              self.timestamp_coder,
+                                              self.window_coder])
+
+  def as_cloud_object(self):
+    value = super(WindowedValueCoder, self).as_cloud_object()
+    value['is_wrapper'] = True
+    return value
+
+  def _get_component_coders(self):
+    return [self.wrapped_value_coder, self.timestamp_coder, self.window_coder]
+
+  def is_kv_coder(self):
+    return self.wrapped_value_coder.is_kv_coder()
+
+  def key_coder(self):
+    return self.wrapped_value_coder.key_coder()
+
+  def value_coder(self):
+    return self.wrapped_value_coder.value_coder()
+
+  def __repr__(self):
+    return 'WindowedValueCoder[%s]' % self.wrapped_value_coder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
new file mode 100644
index 0000000..d11d310
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -0,0 +1,60 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import base64
+import logging
+import unittest
+
+from google.cloud.dataflow import coders
+
+
+class PickleCoderTest(unittest.TestCase):
+
+  def test_basics(self):
+    v = ('a' * 10, 'b' * 90)
+    pickler = coders.PickleCoder()
+    self.assertEquals(v, pickler.decode(pickler.encode(v)))
+    pickler = coders.Base64PickleCoder()
+    self.assertEquals(v, pickler.decode(pickler.encode(v)))
+    self.assertEquals(
+        coders.Base64PickleCoder().encode(v),
+        base64.b64encode(coders.PickleCoder().encode(v)))
+
+  def test_equality(self):
+    self.assertEquals(coders.PickleCoder(), coders.PickleCoder())
+    self.assertEquals(coders.Base64PickleCoder(), coders.Base64PickleCoder())
+    self.assertNotEquals(coders.Base64PickleCoder(), coders.PickleCoder())
+    self.assertNotEquals(coders.Base64PickleCoder(), object())
+
+
+class CodersTest(unittest.TestCase):
+
+  def test_str_utf8_coder(self):
+    real_coder = coders.registry.get_coder(str)
+    expected_coder = coders.BytesCoder()
+    self.assertEqual(
+        real_coder.encode('abc'), expected_coder.encode('abc'))
+    self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+    real_coder = coders.registry.get_coder(bytes)
+    expected_coder = coders.BytesCoder()
+    self.assertEqual(
+        real_coder.encode('abc'), expected_coder.encode('abc'))
+    self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
new file mode 100644
index 0000000..29eaace
--- /dev/null
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -0,0 +1,180 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests common to all coder implementations."""
+
+import logging
+import math
+import sys
+import unittest
+
+import dill
+
+import coders
+
+
+# Defined out of line for picklability.
+class CustomCoder(coders.Coder):
+
+  def encode(self, x):
+    return str(x+1)
+
+  def decode(self, encoded):
+    return int(encoded) - 1
+
+
+class CodersTest(unittest.TestCase):
+
+  # These class methods ensure that we test each defined coder in both
+  # nested and unnested context.
+
+  @classmethod
+  def setUpClass(cls):
+    cls.seen = set()
+    cls.seen_nested = set()
+
+  @classmethod
+  def tearDownClass(cls):
+    standard = set(c
+                   for c in coders.__dict__.values()
+                   if isinstance(c, type) and issubclass(c, coders.Coder) and
+                   'Base' not in c.__name__)
+    standard -= set([coders.Coder,
+                     coders.FastCoder,
+                     coders.Base64PickleCoder,
+                     coders.FloatCoder,
+                     coders.TimestampCoder,
+                     coders.ToStringCoder,
+                     coders.WindowCoder,
+                     coders.WindowedValueCoder])
+    assert not standard - cls.seen, standard - cls.seen
+    assert not standard - cls.seen_nested, standard - cls.seen_nested
+
+  @classmethod
+  def _observe(cls, coder):
+    cls.seen.add(type(coder))
+    cls._observe_nested(coder)
+
+  @classmethod
+  def _observe_nested(cls, coder):
+    if isinstance(coder, coders.TupleCoder):
+      for c in coder.coders():
+        cls.seen_nested.add(type(c))
+        cls._observe_nested(c)
+
+  def check_coder(self, coder, *values):
+    self._observe(coder)
+    for v in values:
+      self.assertEqual(v, coder.decode(coder.encode(v)))
+    copy1 = dill.loads(dill.dumps(coder))
+    copy2 = dill.loads(dill.dumps(coder))
+    for v in values:
+      self.assertEqual(v, copy1.decode(copy2.encode(v)))
+
+  def test_custom_coder(self):
+
+    self.check_coder(CustomCoder(), 1, -10, 5)
+    self.check_coder(coders.TupleCoder((CustomCoder(), coders.BytesCoder())),
+                     (1, 'a'), (-10, 'b'), (5, 'c'))
+
+  def test_pickle_coder(self):
+    self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
+
+  def test_deterministic_pickle_coder(self):
+    coder = coders.DeterministicPickleCoder(coders.PickleCoder(), 'step')
+    self.check_coder(coder, 'a', 1, 1.5, (1, 2, 3))
+    with self.assertRaises(TypeError):
+      self.check_coder(coder, dict())
+    with self.assertRaises(TypeError):
+      self.check_coder(coder, [1, dict()])
+
+    self.check_coder(coders.TupleCoder((coder, coders.PickleCoder())),
+                     (1, dict()), ('a', [dict()]))
+
+  def test_dill_coder(self):
+    cell_value = (lambda x: lambda: x)(0).func_closure[0]
+    self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
+    self.check_coder(
+        coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
+        (1, cell_value))
+
+  def test_bytes_coder(self):
+    self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
+
+  def test_varint_coder(self):
+    # Small ints.
+    self.check_coder(coders.VarIntCoder(), *range(-10, 10))
+    # Multi-byte encoding starts at 128
+    self.check_coder(coders.VarIntCoder(), *range(120, 140))
+    # Large values
+    self.check_coder(coders.VarIntCoder(),
+                     *[int(math.pow(-1, k) * math.exp(k))
+                       for k in range(0, int(math.log(sys.maxint)))])
+
+  def test_float_coder(self):
+    self.check_coder(coders.FloatCoder(),
+                     *[float(0.1 * x) for x in range(-100, 100)])
+    self.check_coder(coders.FloatCoder(),
+                     *[float(2 ** (0.1 * x)) for x in range(-100, 100)])
+    self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf'))
+
+  def test_singleton_coder(self):
+    a = 'anything'
+    b = 'something else'
+    self.check_coder(coders.SingletonCoder(a), a)
+    self.check_coder(coders.SingletonCoder(b), b)
+    self.check_coder(coders.TupleCoder((coders.SingletonCoder(a),
+                                        coders.SingletonCoder(b))), (a, b))
+
+  def test_timestamp_coder(self):
+    self.check_coder(coders.TimestampCoder(),
+                     *[coders.Timestamp(micros=x) for x in range(-100, 100)])
+    self.check_coder(coders.TimestampCoder(),
+                     coders.Timestamp(micros=-1234567890),
+                     coders.Timestamp(micros=1234567890))
+    self.check_coder(coders.TimestampCoder(),
+                     coders.Timestamp(micros=-1234567890123456789),
+                     coders.Timestamp(micros=1234567890123456789))
+
+  def test_tuple_coder(self):
+    self.check_coder(
+        coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())),
+        (1, 'a'),
+        (-2, 'a' * 100),
+        (300, 'abc\0' * 5))
+    self.check_coder(
+        coders.TupleCoder(
+            (coders.TupleCoder((coders.PickleCoder(), coders.VarIntCoder())),
+             coders.StrUtf8Coder())),
+        ((1, 2), 'a'),
+        ((-2, 5), u'a\u0101' * 100),
+        ((300, 1), 'abc\0' * 5))
+
+  def test_tuple_sequence_coder(self):
+    int_tuple_coder = coders.TupleSequenceCoder(coders.VarIntCoder())
+    self.check_coder(int_tuple_coder, (1, -1, 0), (), tuple(range(1000)))
+    self.check_coder(
+        coders.TupleCoder((coders.VarIntCoder(), int_tuple_coder)),
+        (1, (1, 2, 3)))
+
+  def test_base64_pickle_coder(self):
+    self.check_coder(coders.Base64PickleCoder(), 'a', 1, 1.5, (1, 2, 3))
+
+  def test_utf8_coder(self):
+    self.check_coder(coders.StrUtf8Coder(), 'a', u'ab\u00FF', u'\u0101\0')
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/fast_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py
new file mode 100644
index 0000000..f2f4e6c
--- /dev/null
+++ b/sdks/python/apache_beam/coders/fast_coders_test.py
@@ -0,0 +1,34 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for compiled implementation of coder impls."""
+
+import logging
+import unittest
+
+
+# Run all the standard coder test cases.
+from google.cloud.dataflow.coders.coders_test_common import *
+
+
+class FastCoders(unittest.TestCase):
+
+  def test_using_fast_impl(self):
+    # pylint: disable=g-import-not-at-top
+    # pylint: disable=unused-variable
+    import google.cloud.dataflow.coders.stream
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/observable.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable.py b/sdks/python/apache_beam/coders/observable.py
new file mode 100644
index 0000000..8a01752
--- /dev/null
+++ b/sdks/python/apache_beam/coders/observable.py
@@ -0,0 +1,33 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""Observable base class for iterables."""
+
+
+class ObservableMixin(object):
+  """An observable iterable.
+
+  Subclasses need to call self.notify_observers with any object yielded.
+  """
+
+  def __init__(self):
+    self.observers = []
+
+  def register_observer(self, callback):
+    self.observers.append(callback)
+
+  def notify_observers(self, value, **kwargs):
+    for o in self.observers:
+      o(value, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/observable_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/observable_test.py b/sdks/python/apache_beam/coders/observable_test.py
new file mode 100644
index 0000000..2b091bf
--- /dev/null
+++ b/sdks/python/apache_beam/coders/observable_test.py
@@ -0,0 +1,54 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the Observable mixin class."""
+
+import logging
+import unittest
+
+
+from google.cloud.dataflow.coders import observable
+
+
+class ObservableMixinTest(unittest.TestCase):
+  observed_count = 0
+  observed_sum = 0
+  observed_keys = []
+
+  def observer(self, value, key=None):
+    self.observed_count += 1
+    self.observed_sum += value
+    self.observed_keys.append(key)
+
+  def test_observable(self):
+    class Watched(observable.ObservableMixin):
+
+      def __iter__(self):
+        for i in (1, 4, 3):
+          self.notify_observers(i, key='a%d' % i)
+          yield i
+
+    watched = Watched()
+    watched.register_observer(lambda v, key: self.observer(v, key=key))
+    for _ in watched:
+      pass
+
+    self.assertEquals(3, self.observed_count)
+    self.assertEquals(8, self.observed_sum)
+    self.assertEquals(['a1', 'a3', 'a4'], sorted(self.observed_keys))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/slow_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py
new file mode 100644
index 0000000..8cb23ae
--- /dev/null
+++ b/sdks/python/apache_beam/coders/slow_coders_test.py
@@ -0,0 +1,36 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for uncompiled implementation of coder impls."""
+
+import logging
+import unittest
+
+
+# Run all the standard coder test cases.
+from google.cloud.dataflow.coders.coders_test_common import *
+
+
+class SlowCoders(unittest.TestCase):
+
+  def test_using_slow_impl(self):
+    # Assert that we are not using the compiled implementation.
+    with self.assertRaises(ImportError):
+      # pylint: disable=g-import-not-at-top
+      # pylint: disable=unused-variable
+      import google.cloud.dataflow.coders.stream
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/slow_stream.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py
new file mode 100644
index 0000000..ea09d54
--- /dev/null
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -0,0 +1,136 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A pure Python implementation of stream.pyx."""
+
+import struct
+
+
+class OutputStream(object):
+  """A pure Python implementation of stream.OutputStream."""
+
+  def __init__(self):
+    self.data = []
+
+  def write(self, b, nested=False):
+    assert isinstance(b, str)
+    if nested:
+      self.write_var_int64(len(b))
+    self.data.append(b)
+
+  def write_byte(self, val):
+    self.data.append(chr(val))
+
+  def write_var_int64(self, v):
+    if v < 0:
+      v += 1 << 64
+      if v <= 0:
+        raise ValueError('Value too large (negative).')
+    while True:
+      bits = v & 0x7F
+      v >>= 7
+      if v:
+        bits |= 0x80
+      self.write_byte(bits)
+      if not v:
+        break
+
+  def write_bigendian_int64(self, v):
+    self.write(struct.pack('>q', v))
+
+  def write_bigendian_int32(self, v):
+    self.write(struct.pack('>i', v))
+
+  def write_bigendian_double(self, v):
+    self.write(struct.pack('>d', v))
+
+  def get(self):
+    return ''.join(self.data)
+
+
+class ByteCountingOutputStream(OutputStream):
+  """A pure Python implementation of stream.ByteCountingOutputStream."""
+
+  def __init__(self):
+    # Note that we don't actually use any of the data initialized by our super.
+    super(ByteCountingOutputStream, self).__init__()
+    self.count = 0
+
+  def write(self, byte_array, nested=False):
+    blen = len(byte_array)
+    if nested:
+      self.write_var_int64(blen)
+    self.count += blen
+
+  def write_byte(self, _):
+    self.count += 1
+
+  def get_count(self):
+    return self.count
+
+  def get(self):
+    raise NotImplementedError
+
+  def __str__(self):
+    return '<%s %s>' % (self.__class__.__name__, self.count)
+
+
+class InputStream(object):
+  """A pure Python implementation of stream.InputStream."""
+
+  def __init__(self, data):
+    self.data = data
+    self.pos = 0
+
+  def size(self):
+    return len(self.data) - self.pos
+
+  def read(self, size):
+    self.pos += size
+    return self.data[self.pos - size : self.pos]
+
+  def read_all(self, nested):
+    return self.read(self.read_var_int64() if nested else self.size())
+
+  def read_byte(self):
+    self.pos += 1
+    return ord(self.data[self.pos - 1])
+
+  def read_var_int64(self):
+    shift = 0
+    result = 0
+    while True:
+      byte = self.read_byte()
+      if byte < 0:
+        raise RuntimeError('VarLong not terminated.')
+
+      bits = byte & 0x7F
+      if shift >= 64 or (shift >= 63 and bits > 1):
+        raise RuntimeError('VarLong too long.')
+      result |= bits << shift
+      shift += 7
+      if not byte & 0x80:
+        break
+    if result >= 1 << 63:
+      result -= 1 << 64
+    return result
+
+  def read_bigendian_int64(self):
+    return struct.unpack('>q', self.read(8))[0]
+
+  def read_bigendian_int32(self):
+    return struct.unpack('>i', self.read(4))[0]
+
+  def read_bigendian_double(self):
+    return struct.unpack('>d', self.read(8))[0]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd
new file mode 100644
index 0000000..3da7324
--- /dev/null
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -0,0 +1,58 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cimport libc.stdint
+
+
+cdef class OutputStream(object):
+  cdef char* data
+  cdef size_t size
+  cdef size_t pos
+
+  cpdef write(self, bytes b, bint nested=*)
+  cpdef write_byte(self, unsigned char val)
+  cpdef write_var_int64(self, libc.stdint.int64_t v)
+  cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
+  cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
+  cpdef write_bigendian_double(self, double d)
+
+  cpdef bytes get(self)
+
+  cdef extend(self, size_t missing)
+
+
+cdef class ByteCountingOutputStream(OutputStream):
+  cdef size_t count
+
+  cpdef write(self, bytes b, bint nested=*)
+  cpdef write_byte(self, unsigned char val)
+  cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
+  cpdef write_bigendian_int32(self, libc.stdint.int32_t val)
+  cpdef size_t get_count(self)
+  cpdef bytes get(self)
+
+
+cdef class InputStream(object):
+  cdef size_t pos
+  cdef bytes all
+  cdef char* allc
+
+  cpdef size_t size(self) except? -1
+  cpdef bytes read(self, size_t len)
+  cpdef long read_byte(self) except? -1
+  cpdef libc.stdint.int64_t read_var_int64(self) except? -1
+  cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
+  cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
+  cpdef double read_bigendian_double(self) except? -1
+  cpdef bytes read_all(self, bint nested=*)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx
new file mode 100644
index 0000000..6df186a
--- /dev/null
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -0,0 +1,201 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cimport libc.stdlib
+cimport libc.string
+
+
+cdef class OutputStream(object):
+  """An output string stream implementation supporting write() and get()."""
+
+  #TODO(robertwb): Consider using raw C++ streams.
+
+  def __cinit__(self):
+    self.size = 1024
+    self.pos = 0
+    self.data = <char*>libc.stdlib.malloc(self.size)
+    assert self.data, "OutputStream malloc failed."
+
+  def __dealloc__(self):
+    if self.data:
+      libc.stdlib.free(self.data)
+
+  cpdef write(self, bytes b, bint nested=False):
+    cdef size_t blen = len(b)
+    if nested:
+      self.write_var_int64(blen)
+    if self.size < self.pos + blen:
+      self.extend(blen)
+    libc.string.memcpy(self.data + self.pos, <char*>b, blen)
+    self.pos += blen
+
+  cpdef write_byte(self, unsigned char val):
+    if  self.size < self.pos + 1:
+      self.extend(1)
+    self.data[self.pos] = val
+    self.pos += 1
+
+  cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
+    """Encode a long using variable-length encoding to a stream."""
+    cdef libc.stdint.uint64_t v = signed_v
+    cdef long bits
+    while True:
+      bits = v & 0x7F
+      v >>= 7
+      if v:
+        bits |= 0x80
+      self.write_byte(bits)
+      if not v:
+        break
+
+  cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
+    cdef libc.stdint.uint64_t v = signed_v
+    if  self.size < self.pos + 8:
+      self.extend(8)
+    self.data[self.pos    ] = <unsigned char>(v >> 56)
+    self.data[self.pos + 1] = <unsigned char>(v >> 48)
+    self.data[self.pos + 2] = <unsigned char>(v >> 40)
+    self.data[self.pos + 3] = <unsigned char>(v >> 32)
+    self.data[self.pos + 4] = <unsigned char>(v >> 24)
+    self.data[self.pos + 5] = <unsigned char>(v >> 16)
+    self.data[self.pos + 6] = <unsigned char>(v >>  8)
+    self.data[self.pos + 7] = <unsigned char>(v      )
+    self.pos += 8
+
+  cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
+    cdef libc.stdint.uint32_t v = signed_v
+    if  self.size < self.pos + 4:
+      self.extend(4)
+    self.data[self.pos    ] = <unsigned char>(v >> 24)
+    self.data[self.pos + 1] = <unsigned char>(v >> 16)
+    self.data[self.pos + 2] = <unsigned char>(v >>  8)
+    self.data[self.pos + 3] = <unsigned char>(v      )
+    self.pos += 4
+
+  cpdef write_bigendian_double(self, double d):
+    self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0])
+
+  cpdef bytes get(self):
+    return self.data[:self.pos]
+
+  cdef extend(self, size_t missing):
+    while missing > self.size - self.pos:
+      self.size *= 2
+    self.data = <char*>libc.stdlib.realloc(self.data, self.size)
+    assert self.data, "OutputStream realloc failed."
+
+
+cdef class ByteCountingOutputStream(OutputStream):
+  """An output string stream implementation that only counts the bytes.
+
+  This implementation counts the number of bytes it "writes" but
+  doesn't actually write them anyway.  Thus it has write() but not
+  get().  get_count() returns how many bytes were written.
+
+  This is useful for sizing an encoding.
+  """
+
+  def __cinit__(self):
+    self.count = 0
+
+  cpdef write(self, bytes b, bint nested=False):
+    cdef size_t blen = len(b)
+    if nested:
+      self.write_var_int64(blen)
+    self.count += blen
+
+  cpdef write_byte(self, unsigned char _):
+    self.count += 1
+
+  cpdef write_bigendian_int64(self, libc.stdint.int64_t _):
+    self.count += 8
+
+  cpdef write_bigendian_int32(self, libc.stdint.int32_t _):
+    self.count += 4
+
+  cpdef size_t get_count(self):
+    return self.count
+
+  cpdef bytes get(self):
+    raise NotImplementedError
+
+  def __str__(self):
+    return '<%s %s>' % (self.__class__.__name__, self.count)
+
+
+cdef class InputStream(object):
+  """An input string stream implementation supporting read() and size()."""
+
+  def __init__(self, all):
+    self.allc = self.all = all
+
+  cpdef bytes read(self, size_t size):
+    self.pos += size
+    return self.allc[self.pos - size : self.pos]
+
+  cpdef long read_byte(self) except? -1:
+    self.pos += 1
+    # Note: the C++ compiler on Dataflow workers treats the char array below as
+    # a signed char.  This causes incorrect coder behavior unless explicitly
+    # cast to an unsigned char here.
+    return <long>(<unsigned char> self.allc[self.pos - 1])
+
+  cpdef size_t size(self) except? -1:
+    return len(self.all) - self.pos
+
+  cpdef bytes read_all(self, bint nested=False):
+    return self.read(self.read_var_int64() if nested else self.size())
+
+  cpdef libc.stdint.int64_t read_var_int64(self) except? -1:
+    """Decode a variable-length encoded long from a stream."""
+    cdef long byte
+    cdef long bits
+    cdef long shift = 0
+    cdef libc.stdint.int64_t result = 0
+    while True:
+      byte = self.read_byte()
+      if byte < 0:
+        raise RuntimeError('VarInt not terminated.')
+
+      bits = byte & 0x7F
+      if (shift >= sizeof(long) * 8 or
+          (shift >= (sizeof(long) * 8 - 1) and bits > 1)):
+        raise RuntimeError('VarLong too long.')
+      result |= bits << shift
+      shift += 7
+      if not (byte & 0x80):
+        break
+    return result
+
+  cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
+    self.pos += 8
+    return (<unsigned char>self.allc[self.pos - 1]
+      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 2] <<  8
+      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 3] << 16
+      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 4] << 24
+      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 5] << 32
+      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 6] << 40
+      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 7] << 48
+      | <libc.stdint.uint64_t><unsigned char>self.allc[self.pos - 8] << 56)
+
+  cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1:
+    self.pos += 4
+    return (<unsigned char>self.allc[self.pos - 1]
+      | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 2] <<  8
+      | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 3] << 16
+      | <libc.stdint.uint32_t><unsigned char>self.allc[self.pos - 4] << 24)
+
+  cpdef double read_bigendian_double(self) except? -1:
+    cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
+    return (<double*><char*>&as_long)[0]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py
new file mode 100644
index 0000000..3002116
--- /dev/null
+++ b/sdks/python/apache_beam/coders/stream_test.py
@@ -0,0 +1,168 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the stream implementations."""
+
+import logging
+import math
+import unittest
+
+
+from google.cloud.dataflow.coders import slow_stream
+
+
+class StreamTest(unittest.TestCase):
+  # pylint: disable=invalid-name
+  InputStream = slow_stream.InputStream
+  OutputStream = slow_stream.OutputStream
+  ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
+  # pylint: enable=invalid-name
+
+  def test_read_write(self):
+    out_s = self.OutputStream()
+    out_s.write('abc')
+    out_s.write('\0\t\n')
+    out_s.write('xyz', True)
+    out_s.write('', True)
+    in_s = self.InputStream(out_s.get())
+    self.assertEquals('abc\0\t\n', in_s.read(6))
+    self.assertEquals('xyz', in_s.read_all(True))
+    self.assertEquals('', in_s.read_all(True))
+
+  def test_read_all(self):
+    out_s = self.OutputStream()
+    out_s.write('abc')
+    in_s = self.InputStream(out_s.get())
+    self.assertEquals('abc', in_s.read_all(False))
+
+  def test_read_write_byte(self):
+    out_s = self.OutputStream()
+    out_s.write_byte(1)
+    out_s.write_byte(0)
+    out_s.write_byte(0xFF)
+    in_s = self.InputStream(out_s.get())
+    self.assertEquals(1, in_s.read_byte())
+    self.assertEquals(0, in_s.read_byte())
+    self.assertEquals(0xFF, in_s.read_byte())
+
+  def test_read_write_large(self):
+    values = range(4 * 1024)
+    out_s = self.OutputStream()
+    for v in values:
+      out_s.write_bigendian_int64(v)
+    in_s = self.InputStream(out_s.get())
+    for v in values:
+      self.assertEquals(v, in_s.read_bigendian_int64())
+
+  def run_read_write_var_int64(self, values):
+    out_s = self.OutputStream()
+    for v in values:
+      out_s.write_var_int64(v)
+    in_s = self.InputStream(out_s.get())
+    for v in values:
+      self.assertEquals(v, in_s.read_var_int64())
+
+  def test_small_var_int64(self):
+    self.run_read_write_var_int64(range(-10, 30))
+
+  def test_medium_var_int64(self):
+    base = -1.7
+    self.run_read_write_var_int64(
+        [int(base**pow)
+          for pow in range(1, int(63 * math.log(2) / math.log(-base)))])
+
+  def test_large_var_int64(self):
+    self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3])
+
+  def test_read_write_double(self):
+    values = 0, 1, -1, 1e100, 1.0/3, math.pi, float('inf')
+    out_s = self.OutputStream()
+    for v in values:
+      out_s.write_bigendian_double(v)
+    in_s = self.InputStream(out_s.get())
+    for v in values:
+      self.assertEquals(v, in_s.read_bigendian_double())
+
+  def test_read_write_bigendian_int64(self):
+    values = 0, 1, -1, 2**63-1, -2**63, int(2**61 * math.pi)
+    out_s = self.OutputStream()
+    for v in values:
+      out_s.write_bigendian_int64(v)
+    in_s = self.InputStream(out_s.get())
+    for v in values:
+      self.assertEquals(v, in_s.read_bigendian_int64())
+
+  def test_read_write_bigendian_int32(self):
+    values = 0, 1, -1, 2**31-1, -2**31, int(2**29 * math.pi)
+    out_s = self.OutputStream()
+    for v in values:
+      out_s.write_bigendian_int32(v)
+    in_s = self.InputStream(out_s.get())
+    for v in values:
+      self.assertEquals(v, in_s.read_bigendian_int32())
+
+  def test_byte_counting(self):
+    bc_s = self.ByteCountingOutputStream()
+    self.assertEquals(0, bc_s.get_count())
+    bc_s.write('def')
+    self.assertEquals(3, bc_s.get_count())
+    bc_s.write('')
+    self.assertEquals(3, bc_s.get_count())
+    bc_s.write_byte(10)
+    self.assertEquals(4, bc_s.get_count())
+    # "nested" also writes the length of the string, which should
+    # cause 1 extra byte to be counted.
+    bc_s.write('2345', nested=True)
+    self.assertEquals(9, bc_s.get_count())
+    bc_s.write_var_int64(63)
+    self.assertEquals(10, bc_s.get_count())
+    bc_s.write_bigendian_int64(42)
+    self.assertEquals(18, bc_s.get_count())
+    bc_s.write_bigendian_int32(36)
+    self.assertEquals(22, bc_s.get_count())
+    bc_s.write_bigendian_double(6.25)
+    self.assertEquals(30, bc_s.get_count())
+
+
+try:
+  # pylint: disable=g-import-not-at-top
+  from google.cloud.dataflow.coders import stream
+
+  class FastStreamTest(StreamTest):
+    """Runs the test with the compiled stream classes."""
+    InputStream = stream.InputStream
+    OutputStream = stream.OutputStream
+    ByteCountingOutputStream = stream.ByteCountingOutputStream
+
+
+  class SlowFastStreamTest(StreamTest):
+    """Runs the test with compiled and uncompiled stream classes."""
+    InputStream = stream.InputStream
+    OutputStream = slow_stream.OutputStream
+    ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
+
+
+  class FastSlowStreamTest(StreamTest):
+    """Runs the test with uncompiled and compiled stream classes."""
+    InputStream = slow_stream.InputStream
+    OutputStream = stream.OutputStream
+    ByteCountingOutputStream = stream.ByteCountingOutputStream
+
+except ImportError:
+  pass
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()