You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/10/12 22:54:37 UTC

[01/18] beam git commit: Add futurize to pylint script

Repository: beam
Updated Branches:
  refs/heads/master 655227ab0 -> 9c2a6e771


Add futurize to pylint script

Break down futurize pylint

Fix the pylint script checking futurize

Consists of:

Quote the futurize options shell scripting

Jenkins seems to be having weird behaviour with futurize_filtered=, temporary debugging commit

Try quoting the count line.

Do the style fixes from shellcheck on pylint, parallilize futurize command & simplify the grep on output

Fall through on grep exit code of 1 which happens when everything is filtered out & change count check to 0

Remove some debugging from pylint


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd6a7899
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd6a7899
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd6a7899

Branch: refs/heads/master
Commit: dd6a78992fb66e49ff53ae69114f0d682be0888f
Parents: 22f9263
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:53:42 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:07 2017 -0700

----------------------------------------------------------------------
 sdks/python/run_pylint.sh | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dd6a7899/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index df71e44..d53bb14 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -33,7 +33,7 @@ usage(){ echo "Usage: $0 [MODULE|--help]  # The default MODULE is $MODULE"; }
 if test $# -gt 0; then
   case "$@" in
     --help) usage; exit 1;;
-	 *)      MODULE="$@";;
+	 *)      MODULE="$*";;
   esac
 fi
 
@@ -59,15 +59,16 @@ done
 echo "Skipping lint for generated files: $FILES_TO_IGNORE"
 
 echo "Running pylint for module $MODULE:"
-pylint $MODULE --ignore-patterns="$FILES_TO_IGNORE"
+pylint "$MODULE" --ignore-patterns="$FILES_TO_IGNORE"
 echo "Running pycodestyle for module $MODULE:"
-pycodestyle $MODULE --exclude="$FILES_TO_IGNORE"
+pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE"
 echo "Running isort for module $MODULE:"
 # Skip files where isort is behaving weirdly
 ISORT_EXCLUDED=(
   "apiclient.py"
   "avroio_test.py"
   "datastore_wordcount.py"
+  "datastoreio_test.py"
   "iobase_test.py"
   "fast_coders_test.py"
   "slow_coders_test.py"
@@ -79,6 +80,15 @@ done
 for file in "${EXCLUDED_GENERATED_FILES[@]}"; do
   SKIP_PARAM="$SKIP_PARAM --skip $(basename $file)"
 done
-pushd $MODULE
+pushd "$MODULE"
 isort -p apache_beam -w 120 -y -c -ot -cs -sl ${SKIP_PARAM}
 popd
+echo "Checking for files requiring stage 1 refactoring from futurize"
+futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored)
+futurize_filtered=$(echo "$futurize_results" |grep -v 'pb2\|typehints.py\|trivial_inference.py' || echo "")
+count=${#futurize_filtered}
+if [ "$count" != "0" ]; then
+  echo "Some of the changes require futurize stage 1 changes."
+  echo "$futurize_filtered"
+  exit 1
+fi


[09/18] beam git commit: Add future to deps and explicitly set isort req and update pylint in tox ini

Posted by ro...@apache.org.
Add future to deps and explicitly set isort req and update pylint in tox ini


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe0dc2e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe0dc2e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe0dc2e2

Branch: refs/heads/master
Commit: fe0dc2e225a9fca16dbf6ccd77de36fcae4a2484
Parents: c0bc584
Author: Holden Karau <ho...@us.ibm.com>
Authored: Sat Sep 2 00:02:24 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700

----------------------------------------------------------------------
 sdks/python/tox.ini | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fe0dc2e2/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index e82b685..039b0e8 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -90,7 +90,9 @@ passenv = TRAVIS*
 deps=
   nose==1.3.7
   pycodestyle==2.3.1
-  pylint==1.7.1
+  pylint==1.7.2
+  future==0.16.0
+  isort==4.2.15
 whitelist_externals=time
 commands =
   time pip install -e .[test]


[16/18] beam git commit: Break out the exclusion list for futurize

Posted by ro...@apache.org.
Break out the exclusion list for futurize


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf2be415
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf2be415
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf2be415

Branch: refs/heads/master
Commit: cf2be415b9b407b2d3791310a74d76b3e46fc1fa
Parents: 14cf6a1
Author: Holden Karau <ho...@us.ibm.com>
Authored: Thu Sep 7 23:38:52 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700

----------------------------------------------------------------------
 sdks/python/run_pylint.sh | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cf2be415/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 06f2072..4c57e75 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -83,9 +83,15 @@ done
 pushd "$MODULE"
 isort -p apache_beam -w 120 -y -c -ot -cs -sl ${SKIP_PARAM}
 popd
+FUTURIZE_EXCLUDED=(
+  "typehints.py"
+  "pb2"
+  "trivial_infernce.py"
+)
+FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" )
 echo "Checking for files requiring stage 1 refactoring from futurize"
 futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored)
-futurize_filtered=$(echo "$futurize_results" |grep -v 'pb2\|typehints.py\|trivial_inference.py' || echo "")
+futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" || echo "")
 count=${#futurize_filtered}
 if [ "$count" != "0" ]; then
   echo "Some of the changes require futurize stage 1 changes."


[18/18] beam git commit: Closes #3804

Posted by ro...@apache.org.
Closes #3804


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c2a6e77
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c2a6e77
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c2a6e77

Branch: refs/heads/master
Commit: 9c2a6e771d30e3e120b941bd2d450a00d8b7f091
Parents: 655227a 9de10e2
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Oct 12 15:54:09 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:54:09 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    | 18 +++--
 sdks/python/apache_beam/coders/coders.py        |  5 +-
 .../apache_beam/coders/coders_test_common.py    |  6 +-
 .../apache_beam/coders/standard_coders_test.py  |  5 +-
 .../examples/complete/autocomplete.py           |  6 +-
 .../examples/complete/autocomplete_test.py      |  2 +-
 .../examples/complete/game/leader_board.py      |  7 +-
 .../examples/complete/game/user_score.py        |  7 +-
 .../complete/juliaset/juliaset/juliaset.py      |  8 +-
 .../examples/complete/juliaset/setup.py         |  5 +-
 .../apache_beam/examples/complete/tfidf.py      | 15 ++--
 .../apache_beam/examples/complete/tfidf_test.py |  6 +-
 .../examples/cookbook/bigquery_tornadoes.py     |  2 +-
 .../examples/cookbook/datastore_wordcount.py    | 12 ++-
 .../examples/cookbook/group_with_coder.py       |  2 +-
 .../examples/cookbook/mergecontacts.py          | 36 ++++++---
 .../examples/cookbook/multiple_output_pardo.py  | 14 +++-
 .../apache_beam/examples/snippets/snippets.py   | 38 +++++++---
 .../examples/snippets/snippets_test.py          |  8 +-
 .../apache_beam/examples/streaming_wordcount.py |  6 +-
 .../apache_beam/examples/windowed_wordcount.py  |  6 +-
 sdks/python/apache_beam/examples/wordcount.py   | 12 ++-
 .../apache_beam/examples/wordcount_debugging.py | 12 ++-
 .../apache_beam/examples/wordcount_minimal.py   |  6 +-
 sdks/python/apache_beam/internal/pickler.py     |  2 +-
 sdks/python/apache_beam/internal/util.py        |  4 +-
 sdks/python/apache_beam/io/filebasedsource.py   |  2 +-
 .../io/gcp/datastore/v1/datastoreio_test.py     |  5 +-
 .../apache_beam/io/gcp/datastore/v1/helper.py   |  2 +-
 .../io/gcp/datastore/v1/helper_test.py          |  2 +-
 sdks/python/apache_beam/io/gcp/gcsio.py         |  2 +-
 .../apache_beam/metrics/execution_test.py       |  4 +-
 .../apache_beam/options/pipeline_options.py     |  2 +-
 sdks/python/apache_beam/runners/common.py       |  4 +-
 .../runners/dataflow/test_dataflow_runner.py    |  1 +
 .../runners/direct/transform_evaluator.py       |  2 +-
 .../portability/maptask_executor_runner.py      | 10 ++-
 .../portability/maptask_executor_runner_test.py |  6 +-
 .../runners/worker/bundle_processor.py          |  2 +-
 .../apache_beam/runners/worker/operations.py    |  2 +-
 .../apache_beam/runners/worker/sdk_worker.py    |  4 +-
 .../runners/worker/statesampler_test.py         |  3 +-
 sdks/python/apache_beam/testing/util.py         |  2 +-
 sdks/python/apache_beam/transforms/combiners.py |  6 +-
 sdks/python/apache_beam/transforms/core.py      | 10 +--
 .../python/apache_beam/transforms/ptransform.py |  5 +-
 .../apache_beam/transforms/ptransform_test.py   | 38 ++++++----
 .../apache_beam/transforms/sideinputs_test.py   |  2 +-
 .../apache_beam/transforms/trigger_test.py      | 10 ++-
 sdks/python/apache_beam/transforms/util.py      | 12 +--
 .../apache_beam/transforms/window_test.py       |  8 +-
 sdks/python/apache_beam/typehints/decorators.py |  4 +-
 sdks/python/apache_beam/typehints/opcodes.py    | 27 ++++---
 .../apache_beam/typehints/trivial_inference.py  | 80 +++++++++++++-------
 .../typehints/trivial_inference_test.py         |  8 +-
 .../typehints/typed_pipeline_test.py            |  9 ++-
 sdks/python/apache_beam/typehints/typehints.py  | 36 +++++++--
 sdks/python/apache_beam/utils/retry.py          |  2 +-
 sdks/python/run_pylint.sh                       | 27 ++++++-
 sdks/python/tox.ini                             |  4 +-
 60 files changed, 400 insertions(+), 193 deletions(-)
----------------------------------------------------------------------



[08/18] beam git commit: Parallelize pylint

Posted by ro...@apache.org.
Parallelize pylint


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/252c6798
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/252c6798
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/252c6798

Branch: refs/heads/master
Commit: 252c67982a036e5dee4518d96479622cea0d662b
Parents: a795e54
Author: Holden Karau <ho...@us.ibm.com>
Authored: Wed Sep 6 17:10:00 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700

----------------------------------------------------------------------
 sdks/python/run_pylint.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/252c6798/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index ccd2e31..91d5c4a 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -59,7 +59,7 @@ done
 echo "Skipping lint for generated files: $FILES_TO_IGNORE"
 
 echo "Running pylint for module $MODULE:"
-pylint "$MODULE" --ignore-patterns="$FILES_TO_IGNORE"
+pylint -j8 "$MODULE" --ignore-patterns="$FILES_TO_IGNORE"
 echo "Running pycodestyle for module $MODULE:"
 pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE"
 echo "Running isort for module $MODULE:"


[12/18] beam git commit: Fix snippets

Posted by ro...@apache.org.
Fix snippets


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9de10e2d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9de10e2d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9de10e2d

Branch: refs/heads/master
Commit: 9de10e2d80ce15c5881b26f6436d28cccf60e18b
Parents: 3a04dbe
Author: Holden Karau <ho...@us.ibm.com>
Authored: Wed Oct 11 23:46:28 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/snippets/snippets.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9de10e2d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 048b31a..a7751a7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1179,9 +1179,9 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
       return '%s; %s; %s' %\
         (name, sorted(info['emails']), sorted(info['phones']))
 
-    contact_lines = result | beam.Map(join_info)
+    contact_lines = results | beam.Map(join_info)
     # [END model_group_by_key_cogroupbykey_tuple]
-    formatted_results | beam.io.WriteToText(output_path)
+    contact_lines | beam.io.WriteToText(output_path)
 
 
 def model_join_using_side_inputs(


[02/18] beam git commit: Futurize stage 1 run (AUTOMATED futurize)

Posted by ro...@apache.org.
Futurize stage 1 run (AUTOMATED futurize)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22f9263e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22f9263e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22f9263e

Branch: refs/heads/master
Commit: 22f9263e66aaf5ba1c591850f36e31a1481da6e4
Parents: 655227a
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:43:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:07 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    | 17 +++---
 sdks/python/apache_beam/coders/coders.py        |  5 +-
 .../apache_beam/coders/coders_test_common.py    |  5 +-
 .../apache_beam/coders/standard_coders_test.py  |  5 +-
 .../examples/complete/autocomplete.py           |  2 +-
 .../examples/complete/autocomplete_test.py      |  2 +-
 .../examples/complete/game/leader_board.py      |  2 +-
 .../examples/complete/game/user_score.py        |  2 +-
 .../complete/juliaset/juliaset/juliaset.py      |  4 +-
 .../examples/complete/juliaset/setup.py         |  5 +-
 .../apache_beam/examples/complete/tfidf.py      | 11 ++--
 .../apache_beam/examples/complete/tfidf_test.py |  2 +-
 .../examples/cookbook/bigquery_tornadoes.py     |  2 +-
 .../examples/cookbook/datastore_wordcount.py    |  4 +-
 .../examples/cookbook/group_with_coder.py       |  2 +-
 .../examples/cookbook/mergecontacts.py          | 16 ++---
 .../examples/cookbook/multiple_output_pardo.py  |  6 +-
 .../apache_beam/examples/snippets/snippets.py   | 25 ++++----
 .../examples/snippets/snippets_test.py          |  8 ++-
 .../apache_beam/examples/streaming_wordcount.py |  2 +-
 .../apache_beam/examples/windowed_wordcount.py  |  2 +-
 sdks/python/apache_beam/examples/wordcount.py   |  4 +-
 .../apache_beam/examples/wordcount_debugging.py |  4 +-
 .../apache_beam/examples/wordcount_minimal.py   |  2 +-
 sdks/python/apache_beam/internal/pickler.py     |  2 +-
 sdks/python/apache_beam/internal/util.py        |  4 +-
 sdks/python/apache_beam/io/filebasedsource.py   |  2 +-
 .../io/gcp/datastore/v1/datastoreio_test.py     |  3 +-
 .../apache_beam/io/gcp/datastore/v1/helper.py   |  2 +-
 .../io/gcp/datastore/v1/helper_test.py          |  2 +-
 sdks/python/apache_beam/io/gcp/gcsio.py         |  2 +-
 .../apache_beam/metrics/execution_test.py       |  4 +-
 .../apache_beam/options/pipeline_options.py     |  2 +-
 sdks/python/apache_beam/runners/common.py       |  4 +-
 .../runners/dataflow/test_dataflow_runner.py    |  1 +
 .../runners/direct/transform_evaluator.py       |  2 +-
 .../portability/maptask_executor_runner.py      |  6 +-
 .../portability/maptask_executor_runner_test.py |  6 +-
 .../runners/worker/bundle_processor.py          |  2 +-
 .../apache_beam/runners/worker/operations.py    |  2 +-
 .../apache_beam/runners/worker/sdk_worker.py    |  2 +-
 .../runners/worker/statesampler_test.py         |  3 +-
 sdks/python/apache_beam/testing/util.py         |  2 +-
 sdks/python/apache_beam/transforms/combiners.py |  6 +-
 sdks/python/apache_beam/transforms/core.py      | 10 ++--
 .../python/apache_beam/transforms/ptransform.py |  5 +-
 .../apache_beam/transforms/ptransform_test.py   | 22 ++++---
 .../apache_beam/transforms/sideinputs_test.py   |  2 +-
 .../apache_beam/transforms/trigger_test.py      |  4 +-
 sdks/python/apache_beam/transforms/util.py      | 12 ++--
 .../apache_beam/transforms/window_test.py       |  8 +--
 sdks/python/apache_beam/typehints/decorators.py |  4 +-
 sdks/python/apache_beam/typehints/opcodes.py    | 26 +++++----
 .../apache_beam/typehints/trivial_inference.py  | 61 ++++++++++----------
 .../typehints/trivial_inference_test.py         |  3 +-
 .../typehints/typed_pipeline_test.py            |  9 ++-
 sdks/python/apache_beam/typehints/typehints.py  |  6 +-
 sdks/python/apache_beam/utils/retry.py          |  2 +-
 58 files changed, 203 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 5070297..9f7b739 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -26,6 +26,7 @@ coder_impl.pxd file for type hints.
 
 For internal use only; no backwards-compatibility guarantees.
 """
+from __future__ import absolute_import
 from types import NoneType
 
 from apache_beam.coders import observable
@@ -36,18 +37,18 @@ from apache_beam.utils.timestamp import Timestamp
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
-  from stream import InputStream as create_InputStream
-  from stream import OutputStream as create_OutputStream
-  from stream import ByteCountingOutputStream
-  from stream import get_varint_size
+  from .stream import InputStream as create_InputStream
+  from .stream import OutputStream as create_OutputStream
+  from .stream import ByteCountingOutputStream
+  from .stream import get_varint_size
   globals()['create_InputStream'] = create_InputStream
   globals()['create_OutputStream'] = create_OutputStream
   globals()['ByteCountingOutputStream'] = ByteCountingOutputStream
 except ImportError:
-  from slow_stream import InputStream as create_InputStream
-  from slow_stream import OutputStream as create_OutputStream
-  from slow_stream import ByteCountingOutputStream
-  from slow_stream import get_varint_size
+  from .slow_stream import InputStream as create_InputStream
+  from .slow_stream import OutputStream as create_OutputStream
+  from .slow_stream import ByteCountingOutputStream
+  from .slow_stream import get_varint_size
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index cbea98f..67d5adb 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -19,6 +19,7 @@
 
 Only those coders listed in __all__ are part of the public API of this module.
 """
+from __future__ import absolute_import
 
 import base64
 import cPickle as pickle
@@ -32,9 +33,9 @@ from apache_beam.utils import urns
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
-  from stream import get_varint_size
+  from .stream import get_varint_size
 except ImportError:
-  from slow_stream import get_varint_size
+  from .slow_stream import get_varint_size
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 29ff229..d42e637 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -16,6 +16,7 @@
 #
 
 """Tests common to all coder implementations."""
+from __future__ import absolute_import
 
 import logging
 import math
@@ -23,7 +24,7 @@ import unittest
 
 import dill
 
-import observable
+from . import observable
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 from apache_beam.coders import coders
 from apache_beam.runners import pipeline_context
@@ -119,7 +120,7 @@ class CodersTest(unittest.TestCase):
                      (1, dict()), ('a', [dict()]))
 
   def test_dill_coder(self):
-    cell_value = (lambda x: lambda: x)(0).func_closure[0]
+    cell_value = (lambda x: lambda: x)(0).__closure__[0]
     self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
     self.check_coder(
         coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index ca4dffb..ca13b80 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -17,6 +17,7 @@
 
 """Unit tests for coders that must be consistent across all Beam SDKs.
 """
+from __future__ import print_function
 
 import json
 import logging
@@ -125,14 +126,14 @@ class StandardCodersTest(unittest.TestCase):
   @classmethod
   def tearDownClass(cls):
     if cls.fix and cls.to_fix:
-      print "FIXING", len(cls.to_fix), "TESTS"
+      print("FIXING", len(cls.to_fix), "TESTS")
       doc_sep = '\n---\n'
       docs = open(STANDARD_CODERS_YAML).read().split(doc_sep)
 
       def quote(s):
         return json.dumps(s.decode('latin1')).replace(r'\u0000', r'\0')
       for (doc_ix, expected_encoded), actual_encoded in cls.to_fix.items():
-        print quote(expected_encoded), "->", quote(actual_encoded)
+        print(quote(expected_encoded), "->", quote(actual_encoded))
         docs[doc_ix] = docs[doc_ix].replace(
             quote(expected_encoded) + ':', quote(actual_encoded) + ':')
       open(STANDARD_CODERS_YAML, 'w').write(doc_sep.join(docs))

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index ab3397c..81c5351 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -51,7 +51,7 @@ def run(argv=None):
      | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
      | 'TopPerPrefix' >> TopPerPrefix(5)
      | 'format' >> beam.Map(
-         lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+         lambda prefix_candidates: '%s: %s' % (prefix_candidates[0], prefix_candidates[1]))
      | 'write' >> WriteToText(known_args.output))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index e2c84d6..888ce44 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -35,7 +35,7 @@ class AutocompleteTest(unittest.TestCase):
       words = p | beam.Create(self.WORDS)
       result = words | autocomplete.TopPerPrefix(5)
       # values must be hashable for now
-      result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
+      result = result | beam.Map(lambda k_vs: (k_vs[0], tuple(k_vs[1])))
       assert_that(result, equal_to(
           [
               ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 69676f8..6fc7b5d 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -333,7 +333,7 @@ def run(argv=None):
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
      | 'FormatUserScoreSums' >> beam.Map(
-         lambda (user, score): {'user': user, 'total_score': score})
+         lambda user_score: {'user': user_score[0], 'total_score': user_score[1]})
      | 'WriteUserScoreSums' >> WriteToBigQuery(
          args.table_name + '_users', args.dataset, {
              'user': 'STRING',

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index cf9976d..5e093bb 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -142,7 +142,7 @@ def run(argv=None):
      | 'ReadInputText' >> beam.io.ReadFromText(args.input)
      | 'UserScore' >> UserScore()
      | 'FormatUserScoreSums' >> beam.Map(
-         lambda (user, score): 'user: %s, total_score: %s' % (user, score))
+         lambda user_score: 'user: %s, total_score: %s' % (user_score[0], user_score[1]))
      | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
 # [END main]
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 61e3fd1..1013168 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -108,10 +108,10 @@ def run(argv=None):  # pylint: disable=missing-docstring
     # to the output file with an x-coordinate grouping per line.
     # pylint: disable=expression-not-assigned
     (coordinates
-     | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+     | 'x coord key' >> beam.Map(lambda x_y_i: (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2])))
      | 'x coord' >> beam.GroupByKey()
      | 'format' >> beam.Map(
-         lambda (k, coords): ' '.join('(%s, %s, %s)' % c for c in coords))
+         lambda k_coords: ' '.join('(%s, %s, %s)' % c for c in k_coords[1]))
      | WriteToText(known_args.coordinate_output))
 
     # Optionally render the image and save it to a file.

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/juliaset/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/setup.py b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
index 2062e2a..cbf5f3d 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/setup.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
@@ -24,6 +24,7 @@ then installed in the workers when they start running.
 This behavior is triggered by specifying the --setup_file command line option
 when running the workflow for remote execution.
 """
+from __future__ import print_function
 
 import subprocess
 from distutils.command.build import build as _build
@@ -76,14 +77,14 @@ class CustomCommands(setuptools.Command):
     pass
 
   def RunCustomCommand(self, command_list):
-    print 'Running command: %s' % command_list
+    print('Running command: %s' % command_list)
     p = subprocess.Popen(
         command_list,
         stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
     # Can use communicate(input='y\n'.encode()) if the command run requires
     # some confirmation.
     stdout_data, _ = p.communicate()
-    print 'Command output: %s' % stdout_data
+    print('Command output: %s' % stdout_data)
     if p.returncode != 0:
       raise RuntimeError(
           'Command %s failed: exit code: %s' % (command_list, p.returncode))

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 2132fbb..0300505 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -68,7 +68,8 @@ class TfIdf(beam.PTransform):
     # Create a collection of pairs mapping a URI to each of the words
     # in the document associated with that that URI.
 
-    def split_into_words((uri, line)):
+    def split_into_words(xxx_todo_changeme):
+      (uri, line) = xxx_todo_changeme
       return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
 
     uri_to_words = (
@@ -102,7 +103,7 @@ class TfIdf(beam.PTransform):
     uri_to_word_and_count = (
         uri_and_word_to_count
         | 'ShiftKeys' >> beam.Map(
-            lambda ((uri, word), count): (uri, (word, count))))
+            lambda uri_word_count: (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))))
 
     # Perform a CoGroupByKey (a sort of pre-join) on the prepared
     # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
@@ -125,7 +126,8 @@ class TfIdf(beam.PTransform):
     # that word occurs in the document divided by the total number of words in
     # the document.
 
-    def compute_term_frequency((uri, count_and_total)):
+    def compute_term_frequency(xxx_todo_changeme1):
+      (uri, count_and_total) = xxx_todo_changeme1
       word_and_count = count_and_total['word counts']
       # We have an iterable for one element that we want extracted.
       [word_total] = count_and_total['word totals']
@@ -165,7 +167,8 @@ class TfIdf(beam.PTransform):
     # basic version that is the term frequency divided by the log of the
     # document frequency.
 
-    def compute_tf_idf((word, tf_and_df)):
+    def compute_tf_idf(xxx_todo_changeme2):
+      (word, tf_and_df) = xxx_todo_changeme2
       [docf] = tf_and_df['df']
       for uri, tf in tf_and_df['tf']:
         yield word, (uri, tf * math.log(1 / docf))

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 518a47c..957f4c7 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -58,7 +58,7 @@ class TfIdfTest(unittest.TestCase):
       result = (
           uri_to_line
           | tfidf.TfIdf()
-          | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+          | beam.Map(lambda word_uri_tfidf: (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])))
       assert_that(result, equal_to(EXPECTED_RESULTS))
       # Run the pipeline. Note that the assert_that above adds to the pipeline
       # a check that the result PCollection contains expected values.

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 1ca49c5..7b40353 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -58,7 +58,7 @@ def count_tornadoes(input_data):
               lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
           | 'monthly count' >> beam.CombinePerKey(sum)
           | 'format' >> beam.Map(
-              lambda (k, v): {'month': k, 'tornado_count': v}))
+              lambda k_v: {'month': k_v[0], 'tornado_count': k_v[1]}))
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index c42596f..13c5998 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -184,10 +184,10 @@ def read_from_datastore(project, user_options, pipeline_options):
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
 
   # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+  output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 4c86f46..d5dbecf 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -114,7 +114,7 @@ def run(args=None):
      # is registered for the Player class above, a PlayerCoder will be used to
      # encode Player objects as keys for this combine operation.
      | beam.CombinePerKey(sum)
-     | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+     | beam.Map(lambda k_v: '%s,%d' % (k_v[0].name, k_v[1]))
      | WriteToText(known_args.output))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 9acdd90..5a35e51 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -96,19 +96,19 @@ def run(argv=None, assert_results=None):
     # Prepare tab-delimited output; something like this:
     # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
     tsv_lines = grouped | beam.Map(
-        lambda (name, (email, phone, snailmail)): '\t'.join(
-            ['"%s"' % name,
-             '"%s"' % ','.join(email),
-             '"%s"' % ','.join(phone),
-             '"%s"' % next(iter(snailmail), '')]))
+        lambda name_email_phone_snailmail: '\t'.join(
+            ['"%s"' % name_email_phone_snailmail[0],
+             '"%s"' % ','.join(name_email_phone_snailmail[1][0]),
+             '"%s"' % ','.join(name_email_phone_snailmail[1][1]),
+             '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
 
     # Compute some stats about our database of people.
     luddites = grouped | beam.Filter(  # People without email.
-        lambda (name, (email, phone, snailmail)): not next(iter(email), None))
+        lambda name_email_phone_snailmail1: not next(iter(name_email_phone_snailmail1[1][0]), None))
     writers = grouped | beam.Filter(   # People without phones.
-        lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
+        lambda name_email_phone_snailmail2: not next(iter(name_email_phone_snailmail2[1][1]), None))
     nomads = grouped | beam.Filter(    # People without addresses.
-        lambda (name, (e, p, snailmail)): not next(iter(snailmail), None))
+        lambda name_e_p_snailmail: not next(iter(name_e_p_snailmail[1][2]), None))
 
     num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
     num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 2316c66..259f95d 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -122,8 +122,8 @@ class CountWords(beam.PTransform):
     return (pcoll
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
-            | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)))
+            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+            | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])))
 
 
 def run(argv=None):
@@ -163,7 +163,7 @@ def run(argv=None):
     (character_count
      | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
      | beam.GroupByKey()
-     | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
+     | 'count chars' >> beam.Map(lambda __counts: sum(__counts[1]))
      | 'write chars' >> WriteToText(known_args.output + '-chars'))
 
     # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 0ced3f1..873f3c3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -436,7 +436,7 @@ def examples_wordcount_minimal(renames):
       # [END examples_wordcount_minimal_count]
 
       # [START examples_wordcount_minimal_map]
-      | beam.Map(lambda (word, count): '%s: %s' % (word, count))
+      | beam.Map(lambda word_count: '%s: %s' % (word_count[0], word_count[1]))
       # [END examples_wordcount_minimal_map]
 
       # [START examples_wordcount_minimal_write]
@@ -541,8 +541,8 @@ def examples_wordcount_templated(renames):
           lambda x: re.findall(r'[A-Za-z\']+', x))
       | 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
       | 'Group' >> beam.GroupByKey()
-      | 'Sum' >> beam.Map(lambda (word, ones): (word, sum(ones)))
-      | 'Format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+      | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+      | 'Format' >> beam.Map(lambda word_c2: '%s: %s' % (word_c2[0], word_c2[1]))
       | 'Write' >> WriteToText(wordcount_options.output)
   )
 
@@ -612,7 +612,7 @@ def examples_wordcount_debugging(renames):
     # [END example_wordcount_debugging_assert]
 
     output = (filtered_words
-              | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+              | 'format' >> beam.Map(lambda word_c1: '%s: %s' % (word_c1[0], word_c1[1]))
               | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
 
     p.visit(SnippetUtils.RenameFiles(renames))
@@ -1046,7 +1046,7 @@ def model_composite_transform_example(contents, output_path):
       return (pcoll
               | beam.FlatMap(lambda x: re.findall(r'\w+', x))
               | beam.combiners.Count.PerElement()
-              | beam.Map(lambda (word, c): '%s: %s' % (word, c)))
+              | beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])))
   # [END composite_ptransform_apply_method]
   # [END composite_transform_example]
 
@@ -1133,7 +1133,7 @@ def model_group_by_key(contents, output_path):
     grouped_words = words_and_counts | beam.GroupByKey()
     # [END model_group_by_key_transform]
     (grouped_words
-     | 'count words' >> beam.Map(lambda (word, counts): (word, sum(counts)))
+     | 'count words' >> beam.Map(lambda word_counts: (word_counts[0], sum(word_counts[1])))
      | beam.io.WriteToText(output_path))
 
 
@@ -1162,10 +1162,13 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
     results = ({'emails': emails_pcoll, 'phones': phones_pcoll}
                | beam.CoGroupByKey())
 
-    formatted_results = results | beam.Map(
-        lambda (name, info):\
-           '%s; %s; %s' %\
-           (name, sorted(info['emails']), sorted(info['phones'])))
+    def join_info(xxx_todo_changeme):
+      (name, info) = xxx_todo_changeme
+      return '%s; %s; %s' %\
+        (name, sorted(info['emails']), sorted(info['phones']))
+
+
+    contact_lines = result | beam.Map(join_info)
     # [END model_group_by_key_cogroupbykey_tuple]
     formatted_results | beam.io.WriteToText(output_path)
 
@@ -1211,7 +1214,7 @@ def model_join_using_side_inputs(
 class Keys(beam.PTransform):
 
   def expand(self, pcoll):
-    return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
+    return pcoll | 'Keys' >> beam.Map(lambda k_v: k_v[0])
 # [END model_library_transforms_keys]
 # pylint: enable=invalid-name
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 269a241..8a9695d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -358,7 +358,7 @@ class TypeHintsTest(unittest.TestCase):
       # [END type_hints_deterministic_key]
 
       assert_that(
-          totals | beam.Map(lambda (k, v): (k.name, v)),
+          totals | beam.Map(lambda k_v: (k_v[0].name, k_v[1])),
           equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
 
 
@@ -882,14 +882,16 @@ class CombineTest(unittest.TestCase):
       def create_accumulator(self):
         return (0.0, 0)
 
-      def add_input(self, (sum, count), input):
+      def add_input(self, xxx_todo_changeme, input):
+        (sum, count) = xxx_todo_changeme
         return sum + input, count + 1
 
       def merge_accumulators(self, accumulators):
         sums, counts = zip(*accumulators)
         return sum(sums), sum(counts)
 
-      def extract_output(self, (sum, count)):
+      def extract_output(self, xxx_todo_changeme1):
+        (sum, count) = xxx_todo_changeme1
         return sum / count if count else float('NaN')
     # [END combine_custom_average_define]
     # [START combine_custom_average_execute]

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 94d4c70..8a05991 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -65,7 +65,7 @@ def run(argv=None):
                    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                    | beam.WindowInto(window.FixedWindows(15, 0))
                    | 'Group' >> beam.GroupByKey()
-                   | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+                   | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
                    | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
 
     # Write to PubSub.

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 508f18d..680314b 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -75,7 +75,7 @@ def run(argv=None):
                    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                    | beam.WindowInto(window.FixedWindows(2*60, 0))
                    | 'Group' >> beam.GroupByKey()
-                   | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+                   | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
                    | 'Format' >> beam.ParDo(FormatDoFn()))
 
     # Write to BigQuery.

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 34dedb2..e21e91d 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -93,10 +93,10 @@ def run(argv=None):
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
 
   # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+  output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index c0ffd35..bdc4c16 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -98,7 +98,7 @@ class CountWords(beam.PTransform):
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
 
 
 def run(argv=None):
@@ -142,7 +142,7 @@ def run(argv=None):
     # a "Write" transform that has side effects.
     # pylint: disable=unused-variable
     output = (filtered_words
-              | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+              | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
               | 'write' >> WriteToText(known_args.output))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 76b0a22..01c3955 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,7 +106,7 @@ def run(argv=None):
         | 'GroupAndSum' >> beam.CombinePerKey(sum))
 
     # Format the counts into a PCollection of strings.
-    output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
+    output = counts | 'Format' >> beam.Map(lambda w_c: '%s: %s' % (w_c[0], w_c[1]))
 
     # Write the output using a "Write" transform that has side effects.
     # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
index e049a71..102cf23 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -52,7 +52,7 @@ def _find_containing_class(nested_class):
     for k, v in outer.__dict__.items():
       if v is nested_class:
         return outer, k
-      elif isinstance(v, (type, types.ClassType)) and hasattr(v, '__dict__'):
+      elif isinstance(v, type) and hasattr(v, '__dict__'):
         res = _find_containing_class_inner(v)
         if res: return res
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py
index 3e943b0..e4f230b 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -100,10 +100,10 @@ def insert_values_in_args(args, kwargs, values):
   # Use a local iterator so that we don't modify values.
   v_iter = iter(values)
   new_args = [
-      v_iter.next() if isinstance(arg, ArgumentPlaceholder) else arg
+      next(v_iter) if isinstance(arg, ArgumentPlaceholder) else arg
       for arg in args]
   new_kwargs = dict(
-      (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
+      (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v)
       for k, v in sorted(kwargs.iteritems()))
   return (new_args, new_kwargs)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 7b019ed..052c2f3 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -370,7 +370,7 @@ class _Reshard(PTransform):
 
     return (keyed_pc | 'GroupByKey' >> GroupByKey()
             # Using FlatMap below due to the possibility of key collisions.
-            | 'DropKey' >> FlatMap(lambda (k, values): values))
+            | 'DropKey' >> FlatMap(lambda k_values: k_values[1]))
 
 
 class _ReadRange(DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index fa7310f..7c73a06 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -238,7 +239,7 @@ class DatastoreioTest(unittest.TestCase):
       elif req == kind_stat_req:
         return kind_stat_resp
       else:
-        print kind_stat_req
+        print(kind_stat_req)
         raise ValueError("Unknown req: %s" % req)
 
     self._mock_datastore.run_query.side_effect = fake_run_query

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index 01ced6a..b86a2fa 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -277,7 +277,7 @@ class QueryIterator(object):
     self._project = project
     self._namespace = namespace
     self._start_cursor = None
-    self._limit = self._query.limit.value or sys.maxint
+    self._limit = self._query.limit.value or sys.maxsize
     self._req = make_request(project, namespace, query)
 
   @retry.with_exponential_backoff(num_retries=5,

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 36cfb15..90a3668 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -154,7 +154,7 @@ class HelperTest(unittest.TestCase):
       self.assertEqual(entity, entities[i].entity)
       i += 1
 
-    limit = query.limit.value if query.HasField('limit') else sys.maxint
+    limit = query.limit.value if query.HasField('limit') else sys.maxsize
     self.assertEqual(i, min(num_entities, limit))
 
   def test_is_key_valid(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 0db4ba5..448a0c9 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -473,7 +473,7 @@ class GcsBufferedReader(object):
   def __next__(self):
     """Read one line delimited by '\\n' from the file.
     """
-    return self.next()
+    return next(self)
 
   def next(self):
     """Read one line delimited by '\\n' from the file.

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/metrics/execution_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution_test.py b/sdks/python/apache_beam/metrics/execution_test.py
index abf23e3..855f54c 100644
--- a/sdks/python/apache_beam/metrics/execution_test.py
+++ b/sdks/python/apache_beam/metrics/execution_test.py
@@ -29,9 +29,9 @@ from apache_beam.metrics.metricbase import MetricName
 class TestMetricsContainer(unittest.TestCase):
   def test_create_new_counter(self):
     mc = MetricsContainer('astep')
-    self.assertFalse(mc.counters.has_key(MetricName('namespace', 'name')))
+    self.assertFalse(MetricName('namespace', 'name') in mc.counters)
     mc.get_counter(MetricName('namespace', 'name'))
-    self.assertTrue(mc.counters.has_key(MetricName('namespace', 'name')))
+    self.assertTrue(MetricName('namespace', 'name') in mc.counters)
 
   def test_scoped_container(self):
     c1 = MetricsContainer('mystep')

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 3abcbf2..2598551 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -50,7 +50,7 @@ def _static_value_provider_of(value_type):
 
   """
   def _f(value):
-    _f.func_name = value_type.__name__
+    _f.__name__ = value_type.__name__
     return StaticValueProvider(value_type, value)
   return _f
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 66c033f..64abe41 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -248,14 +248,14 @@ class PerWindowInvoker(DoFnInvoker):
       elif d == core.DoFn.SideInputParam:
         # If no more args are present then the value must be passed via kwarg
         try:
-          args_with_placeholders.append(remaining_args_iter.next())
+          args_with_placeholders.append(next(remaining_args_iter))
         except StopIteration:
           if a not in input_kwargs:
             raise ValueError("Value for sideinput %s not provided" % a)
       else:
         # If no more args are present then the value must be passed via kwarg
         try:
-          args_with_placeholders.append(remaining_args_iter.next())
+          args_with_placeholders.append(next(remaining_args_iter))
         except StopIteration:
           pass
     args_with_placeholders.extend(list(remaining_args_iter))

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index e7c8d06..b2330c0 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -16,6 +16,7 @@
 #
 
 """Wrapper of Beam runners that's built for running and verifying e2e tests."""
+from __future__ import print_function
 
 from apache_beam.internal import pickler
 from apache_beam.options.pipeline_options import GoogleCloudOptions

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 26d2019..16a2991 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -137,7 +137,7 @@ class TransformEvaluatorRegistry(object):
                       (core._GroupByKeyOnly,
                        _StreamingGroupByKeyOnly,
                        _StreamingGroupAlsoByWindow,
-                       _NativeWrite,))
+                       _NativeWrite))
 
 
 class RootBundleProvider(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index afb96fa..d4063df 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -435,7 +435,7 @@ class PartialGroupByKeyCombineValues(beam.PTransform):
       def to_accumulator(v):
         return self.combine_fn.add_input(
             self.combine_fn.create_accumulator(), v)
-      return input | beam.Map(lambda (k, v): (k, to_accumulator(v)))
+      return input | beam.Map(lambda k_v: (k_v[0], to_accumulator(k_v[1])))
 
 
 class MergeAccumulators(beam.PTransform):
@@ -449,7 +449,7 @@ class MergeAccumulators(beam.PTransform):
       return beam.pvalue.PCollection(input.pipeline)
     else:
       merge_accumulators = self.combine_fn.merge_accumulators
-      return input | beam.Map(lambda (k, vs): (k, merge_accumulators(vs)))
+      return input | beam.Map(lambda k_vs: (k_vs[0], merge_accumulators(k_vs[1])))
 
 
 class ExtractOutputs(beam.PTransform):
@@ -463,7 +463,7 @@ class ExtractOutputs(beam.PTransform):
       return beam.pvalue.PCollection(input.pipeline)
     else:
       extract_output = self.combine_fn.extract_output
-      return input | beam.Map(lambda (k, v): (k, extract_output(v)))
+      return input | beam.Map(lambda k_v1: (k_v1[0], extract_output(k_v1[1])))
 
 
 class WorkerRunnerResult(PipelineResult):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 519ab6e..4c0d3b3 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -152,7 +152,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
       derived = ((pcoll,) | beam.Flatten()
                  | beam.Map(lambda x: (x, x))
                  | beam.GroupByKey()
-                 | 'Unkey' >> beam.Map(lambda (x, _): x))
+                 | 'Unkey' >> beam.Map(lambda x__: x__[0]))
       assert_that(
           pcoll | beam.FlatMap(cross_product, AsList(derived)),
           equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
@@ -162,7 +162,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
       res = (p
              | beam.Create([('a', 1), ('a', 2), ('b', 3)])
              | beam.GroupByKey()
-             | beam.Map(lambda (k, vs): (k, sorted(vs))))
+             | beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
       assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))
 
   def test_flatten(self):
@@ -199,7 +199,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
              | beam.Map(lambda t: TimestampedValue(('k', t), t))
              | beam.WindowInto(beam.transforms.window.Sessions(10))
              | beam.GroupByKey()
-             | beam.Map(lambda (k, vs): (k, sorted(vs))))
+             | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
       assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
 
   def test_errors(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index b69d002..f44490b 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -112,7 +112,7 @@ class DataInputOperation(RunnerIOOperation):
     # We must do this manually as we don't have a spec or spec.output_coders.
     self.receivers = [
         operations.ConsumerSet(self.counter_factory, self.step_name, 0,
-                               consumers.itervalues().next(),
+                               next(consumers.itervalues()),
                                self.windowed_coder)]
 
   def process(self, windowed_value):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 1136d99..ed9d84d 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -434,7 +434,7 @@ class PGBKCVOperation(Operation):
     fn, args, kwargs = pickler.loads(self.spec.combine_fn)[:3]
     self.combine_fn = curry_combine_fn(fn, args, kwargs)
     if (getattr(fn.add_input, 'im_func', None)
-        is core.CombineFn.add_input.im_func):
+        is core.CombineFn.add_input.__func__):
       # Old versions of the SDK have CombineFns that don't implement add_input.
       self.combine_fn_add_input = (
           lambda a, e: self.combine_fn.add_inputs(a, [e]))

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index ef33c6f..5786105 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -74,7 +74,7 @@ class SdkHarness(object):
           return self.worker.do_instruction(request)
         except Exception as e:  # pylint: disable=broad-except
           traceback_str = traceback.format_exc(e)
-          raise StandardError("Error processing request. Original traceback "
+          raise Exception("Error processing request. Original traceback "
                               "is\n%s\n" % traceback_str)
 
       def handle_response(request, response_future):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/statesampler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index 2a85610..44b2f72 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -16,6 +16,7 @@
 #
 
 """Tests for state sampler."""
+from __future__ import absolute_import
 
 import logging
 import time
@@ -32,7 +33,7 @@ class StateSamplerTest(unittest.TestCase):
     try:
       # pylint: disable=global-variable-not-assigned
       global statesampler
-      import statesampler
+      from . import statesampler
     except ImportError:
       raise SkipTest('State sampler not compiled.')
     super(StateSamplerTest, self).setUp()

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/testing/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index 9bb18cc..34c15f9 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -103,7 +103,7 @@ def assert_that(actual, matcher, label='assert_that'):
           | "ToVoidKey" >> Map(lambda v: (None, v)))
       _ = ((keyed_singleton, keyed_actual)
            | "Group" >> CoGroupByKey()
-           | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
+           | "Unkey" >> Map(lambda k___actual_values: k___actual_values[1][1])
            | "Match" >> Map(matcher))
 
     def default_label(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 60bf2d1..3348790 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -77,14 +77,16 @@ class MeanCombineFn(core.CombineFn):
   def create_accumulator(self):
     return (0, 0)
 
-  def add_input(self, (sum_, count), element):
+  def add_input(self, xxx_todo_changeme, element):
+    (sum_, count) = xxx_todo_changeme
     return sum_ + element, count + 1
 
   def merge_accumulators(self, accumulators):
     sums, counts = zip(*accumulators)
     return sum(sums), sum(counts)
 
-  def extract_output(self, (sum_, count)):
+  def extract_output(self, xxx_todo_changeme1):
+    (sum_, count) = xxx_todo_changeme1
     if count == 0:
       return float('NaN')
     return sum_ / float(count)

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index e5f35c4..ff2428e 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -367,10 +367,10 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     """Checks if an object is a bound method on an instance."""
     if not isinstance(self.process, types.MethodType):
       return False # Not a method
-    if self.process.im_self is None:
+    if self.process.__self__ is None:
       return False # Method is not bound
-    if issubclass(self.process.im_class, type) or \
-        self.process.im_class is types.ClassType:
+    if issubclass(self.process.__self__.__class__, type) or \
+        self.process.__self__.__class__ is type:
       return False # Method is a classmethod
     return True
 
@@ -383,7 +383,7 @@ def _fn_takes_side_inputs(fn):
   except TypeError:
     # We can't tell; maybe it does.
     return True
-  is_bound = isinstance(fn, types.MethodType) and fn.im_self is not None
+  is_bound = isinstance(fn, types.MethodType) and fn.__self__ is not None
   return len(argspec.args) > 1 + is_bound or argspec.varargs or argspec.keywords
 
 
@@ -1111,7 +1111,7 @@ class CombineGlobally(PTransform):
                         KV[None, pcoll.element_type]))
                 | 'CombinePerKey' >> CombinePerKey(
                     self.fn, *self.args, **self.kwargs)
-                | 'UnKey' >> Map(lambda (k, v): v))
+                | 'UnKey' >> Map(lambda k_v: k_v[1]))
 
     if not self.has_defaults and not self.as_view:
       return combined

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 2e6255a..d333ace 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -58,6 +58,7 @@ from apache_beam.typehints.trivial_inference import instance_to_type
 from apache_beam.typehints.typehints import validate_composite_type_param
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
+from functools import reduce
 
 __all__ = [
     'PTransform',
@@ -714,8 +715,8 @@ def label_from_callable(fn):
   elif hasattr(fn, '__name__'):
     if fn.__name__ == '<lambda>':
       return '<lambda at %s:%s>' % (
-          os.path.basename(fn.func_code.co_filename),
-          fn.func_code.co_firstlineno)
+          os.path.basename(fn.__code__.co_filename),
+          fn.__code__.co_firstlineno)
     return fn.__name__
   return str(fn)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8237c52..112c092 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -18,6 +18,7 @@
 """Unit tests for the PTransform and descendants."""
 
 from __future__ import absolute_import
+from __future__ import print_function
 
 import collections
 import operator
@@ -47,6 +48,7 @@ from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
 from apache_beam.typehints.typehints_test import TypeHintTestCase
 from apache_beam.utils.windowed_value import WindowedValue
+from functools import reduce
 
 # Disable frequent lint warning due to pipe operator for chaining transforms.
 # pylint: disable=expression-not-assigned
@@ -353,14 +355,16 @@ class PTransformTest(unittest.TestCase):
     def create_accumulator(self):
       return (0, 0)
 
-    def add_input(self, (sum_, count), element):
+    def add_input(self, xxx_todo_changeme, element):
+      (sum_, count) = xxx_todo_changeme
       return sum_ + element, count + 1
 
     def merge_accumulators(self, accumulators):
       sums, counts = zip(*accumulators)
       return sum(sums), sum(counts)
 
-    def extract_output(self, (sum_, count)):
+    def extract_output(self, xxx_todo_changeme3):
+      (sum_, count) = xxx_todo_changeme3
       if not count:
         return float('nan')
       return sum_ / float(count)
@@ -619,7 +623,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = TestPipeline()
     t = (beam.Map(lambda x: (x, 1))
          | beam.GroupByKey()
-         | beam.Map(lambda (x, ones): (x, sum(ones))))
+         | beam.Map(lambda x_ones: (x_ones[0], sum(x_ones[1]))))
     result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
     assert_that(result, equal_to([('a', 2), ('b', 1)]))
     pipeline.run()
@@ -643,7 +647,7 @@ class PTransformTest(unittest.TestCase):
                 | beam.Flatten()
                 | beam.Map(lambda x: (x, None))
                 | beam.GroupByKey()
-                | beam.Map(lambda (x, _): x))
+                | beam.Map(lambda x__: x__[0]))
     self.assertEqual([1, 2, 3], sorted(([1, 2], [2, 3]) | DisjointUnion()))
 
   def test_apply_to_crazy_pvaluish(self):
@@ -720,7 +724,7 @@ class PTransformLabelsTest(unittest.TestCase):
     pipeline = TestPipeline()
     map1 = 'Map1' >> beam.Map(lambda x: (x, 1))
     gbk = 'Gbk' >> beam.GroupByKey()
-    map2 = 'Map2' >> beam.Map(lambda (x, ones): (x, sum(ones)))
+    map2 = 'Map2' >> beam.Map(lambda x_ones2: (x_ones2[0], sum(x_ones2[1])))
     t = (map1 | gbk | map2)
     result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
     self.assertTrue('Map1|Gbk|Map2/Map1' in pipeline.applied_labels)
@@ -1320,7 +1324,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | ('Add' >> beam.FlatMap(lambda (x, y): [x + y])
+       | ('Add' >> beam.FlatMap(lambda x_y: [x_y[0] + x_y[1]])
           .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
       )
       self.p.run()
@@ -1339,9 +1343,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # The type-hinted applied via the 'returns()' method indicates the ParDo
     # should output an instance of type 'int', however a 'float' will be
     # generated instead.
-    print "HINTS", ('ToInt' >> beam.FlatMap(
+    print("HINTS", ('ToInt' >> beam.FlatMap(
         lambda x: [float(x)]).with_input_types(int).with_output_types(
-            int)).get_type_hints()
+            int)).get_type_hints())
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([1, 2, 3])
@@ -1368,7 +1372,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
        | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
-       | ('Swap' >> beam.FlatMap(lambda (x, y): [x + y])
+       | ('Swap' >> beam.FlatMap(lambda x_y1: [x_y1[0] + x_y1[1]])
           .with_input_types(typehints.Tuple[int, float])
           .with_output_types(typehints.Tuple[float, int]))
       )

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 0a69c3b..1d58834 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -55,7 +55,7 @@ class SideInputsTest(unittest.TestCase):
         side |= beam.Map(lambda x: ('k%s' % x, 'v%s' % x))
       res = main | beam.Map(lambda x, s: (x, s), side_input_type(side, **kw))
       if side_input_type in (beam.pvalue.AsIter, beam.pvalue.AsList):
-        res |= beam.Map(lambda (x, s): (x, sorted(s)))
+        res |= beam.Map(lambda x_s: (x_s[0], sorted(x_s[1])))
       assert_that(res, equal_to(expected))
 
   def test_global_global_windows(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index b2fd761..5a56c7a 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -407,11 +407,11 @@ class TriggerPipelineTest(unittest.TestCase):
       result = (p
                 | beam.Create([1, 2, 3, 4, 5, 10, 11])
                 | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
-                | beam.Map(lambda (k, t): TimestampedValue((k, t), t))
+                | beam.Map(lambda k_t: TimestampedValue((k_t[0], k_t[1]), k_t[1]))
                 | beam.WindowInto(FixedWindows(10), trigger=AfterCount(3),
                                   accumulation_mode=AccumulationMode.DISCARDING)
                 | beam.GroupByKey()
-                | beam.Map(lambda (k, v): ('%s-%s' % (k, len(v)), set(v))))
+                | beam.Map(lambda k_v: ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))))
       assert_that(result, equal_to(
           {
               'A-5': {1, 2, 3, 4, 5},

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 81b8c22..4509ed4 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,14 +98,16 @@ class CoGroupByKey(PTransform):
   def expand(self, pcolls):
     """Performs CoGroupByKey on argument pcolls; see class docstring."""
     # For associating values in K-V pairs with the PCollections they came from.
-    def _pair_tag_with_value((key, value), tag):
+    def _pair_tag_with_value(xxx_todo_changeme, tag):
+      (key, value) = xxx_todo_changeme
       return (key, (tag, value))
 
     # Creates the key, value pairs for the output PCollection. Values are either
     # lists or dicts (per the class docstring), initialized by the result of
     # result_ctor(result_ctor_arg).
-    def _merge_tagged_vals_under_key((key, grouped), result_ctor,
+    def _merge_tagged_vals_under_key(xxx_todo_changeme3, result_ctor,
                                      result_ctor_arg):
+      (key, grouped) = xxx_todo_changeme3
       result_value = result_ctor(result_ctor_arg)
       for tag, value in grouped:
         result_value[tag].append(value)
@@ -141,17 +143,17 @@ class CoGroupByKey(PTransform):
 
 def Keys(label='Keys'):  # pylint: disable=invalid-name
   """Produces a PCollection of first elements of 2-tuples in a PCollection."""
-  return label >> Map(lambda (k, v): k)
+  return label >> Map(lambda k_v: k_v[0])
 
 
 def Values(label='Values'):  # pylint: disable=invalid-name
   """Produces a PCollection of second elements of 2-tuples in a PCollection."""
-  return label >> Map(lambda (k, v): v)
+  return label >> Map(lambda k_v1: k_v1[1])
 
 
 def KvSwap(label='KvSwap'):  # pylint: disable=invalid-name
   """Produces a PCollection reversing 2-tuples in a PCollection."""
-  return label >> Map(lambda (k, v): (v, k))
+  return label >> Map(lambda k_v2: (k_v2[1], k_v2[0]))
 
 
 @ptransform_fn

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 71c0622..7c1d4e9 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -51,7 +51,7 @@ def context(element, timestamp):
   return WindowFn.AssignContext(timestamp, element)
 
 
-sort_values = Map(lambda (k, vs): (k, sorted(vs)))
+sort_values = Map(lambda k_vs: (k_vs[0], sorted(k_vs[1])))
 
 
 class ReifyWindowsFn(core.DoFn):
@@ -195,7 +195,7 @@ class WindowTest(unittest.TestCase):
     with TestPipeline() as p:
       result = (p
                 | 'start' >> Create([(k, k) for k in range(10)])
-                | Map(lambda (x, t): TimestampedValue(x, t))
+                | Map(lambda x_t: TimestampedValue(x_t[0], x_t[1]))
                 | 'w' >> WindowInto(FixedWindows(5))
                 | Map(lambda v: ('key', v))
                 | GroupByKey())
@@ -206,7 +206,7 @@ class WindowTest(unittest.TestCase):
     with TestPipeline() as p:
       result = (p
                 | Create([(k, k) for k in range(10)])
-                | Map(lambda (x, t): TimestampedValue(x, t))
+                | Map(lambda x_t1: TimestampedValue(x_t1[0], x_t1[1]))
                 | 'window' >> WindowInto(SlidingWindows(period=2, size=6))
                 # Per the model, each element is now duplicated across
                 # three windows. Rewindowing must preserve this duplication.
@@ -232,7 +232,7 @@ class WindowTest(unittest.TestCase):
                 # Now there are values 5 ms apart and since Map propagates the
                 # windowing function from input to output the output PCollection
                 # will have elements falling into different 5ms windows.
-                | Map(lambda (x, t): TimestampedValue(x, t))
+                | Map(lambda x_t2: TimestampedValue(x_t2[0], x_t2[1]))
                 # We add a 'key' to each value representing the index of the
                 # window. This is important since there is no guarantee of
                 # order for the elements of a PCollection.

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index 694433a..89dc6af 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -117,7 +117,7 @@ def getargspec(func):
   try:
     return _original_getargspec(func)
   except TypeError:
-    if isinstance(func, (type, types.ClassType)):
+    if isinstance(func, type):
       argspec = getargspec(func.__init__)
       del argspec.args[0]
       return argspec
@@ -261,7 +261,7 @@ def getcallargs_forhints(func, *typeargs, **typekwargs):
   packed_typeargs += list(typeargs[len(packed_typeargs):])
   try:
     callargs = inspect.getcallargs(func, *packed_typeargs, **typekwargs)
-  except TypeError, e:
+  except TypeError as e:
     raise TypeCheckError(e)
   if argspec.defaults:
     # Declare any default arguments to be Any.

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py
index 923b848..c3ba92a 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -26,19 +26,21 @@ are handled inline rather than here.
 
 For internal use only; no backwards-compatibility guarantees.
 """
+from __future__ import absolute_import
 import types
 
-import typehints
-from trivial_inference import BoundMethod
-from trivial_inference import Const
-from trivial_inference import element_type
-from trivial_inference import union
-from typehints import Any
-from typehints import Dict
-from typehints import Iterable
-from typehints import List
-from typehints import Tuple
-from typehints import Union
+from . import typehints
+from .trivial_inference import BoundMethod
+from .trivial_inference import Const
+from .trivial_inference import element_type
+from .trivial_inference import union
+from .typehints import Any
+from .typehints import Dict
+from .typehints import Iterable
+from .typehints import List
+from .typehints import Tuple
+from .typehints import Union
+from functools import reduce
 
 
 def pop_one(state, unused_arg):
@@ -262,7 +264,7 @@ def load_attr(state, arg):
   name = state.get_name(arg)
   if isinstance(o, Const) and hasattr(o.value, name):
     state.stack.append(Const(getattr(o.value, name)))
-  elif (isinstance(o, (type, types.ClassType))
+  elif (isinstance(o, type)
         and isinstance(getattr(o, name, None), types.MethodType)):
     state.stack.append(Const(BoundMethod(getattr(o, name))))
   else:

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index c740596..df96900 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -19,6 +19,8 @@
 
 For internal use only; no backwards-compatibility guarantees.
 """
+from __future__ import print_function
+from __future__ import absolute_import
 import __builtin__
 import collections
 import dis
@@ -28,6 +30,7 @@ import types
 
 from apache_beam.typehints import Any
 from apache_beam.typehints import typehints
+from functools import reduce
 
 
 class TypeInferenceError(ValueError):
@@ -103,7 +106,7 @@ class FrameState(object):
 
   def __init__(self, f, local_vars=None, stack=()):
     self.f = f
-    self.co = f.func_code
+    self.co = f.__code__
     self.vars = list(local_vars)
     self.stack = list(stack)
 
@@ -120,12 +123,12 @@ class FrameState(object):
     ncellvars = len(self.co.co_cellvars)
     if i < ncellvars:
       return Any
-    return Const(self.f.func_closure[i - ncellvars].cell_contents)
+    return Const(self.f.__closure__[i - ncellvars].cell_contents)
 
   def get_global(self, i):
     name = self.get_name(i)
-    if name in self.f.func_globals:
-      return Const(self.f.func_globals[name])
+    if name in self.f.__globals__:
+      return Const(self.f.__globals__[name])
     if name in __builtin__.__dict__:
       return Const(__builtin__.__dict__[name])
     return Any
@@ -227,14 +230,14 @@ def infer_return_type(c, input_types, debug=False, depth=5):
     elif isinstance(c, types.FunctionType):
       return infer_return_type_func(c, input_types, debug, depth)
     elif isinstance(c, types.MethodType):
-      if c.im_self is not None:
-        input_types = [Const(c.im_self)] + input_types
-      return infer_return_type_func(c.im_func, input_types, debug, depth)
+      if c.__self__ is not None:
+        input_types = [Const(c.__self__)] + input_types
+      return infer_return_type_func(c.__func__, input_types, debug, depth)
     elif isinstance(c, BoundMethod):
-      input_types = [c.unbound.im_class] + input_types
+      input_types = [c.unbound.__self__.__class__] + input_types
       return infer_return_type_func(
-          c.unbound.im_func, input_types, debug, depth)
-    elif isinstance(c, (type, types.ClassType)):
+          c.unbound.__func__, input_types, debug, depth)
+    elif isinstance(c, type):
       if c in typehints.DISALLOWED_PRIMITIVE_TYPES:
         return {
             list: typehints.List[Any],
@@ -272,12 +275,12 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
     TypeInferenceError: if no type can be inferred.
   """
   if debug:
-    print
-    print f, id(f), input_types
-  import opcodes
+    print()
+    print(f, id(f), input_types)
+  from . import opcodes
   simple_ops = dict((k.upper(), v) for k, v in opcodes.__dict__.items())
 
-  co = f.func_code
+  co = f.__code__
   code = co.co_code
   end = len(code)
   pc = 0
@@ -299,38 +302,38 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
     op = ord(code[pc])
 
     if debug:
-      print '-->' if pc == last_pc else '    ',
-      print repr(pc).rjust(4),
-      print dis.opname[op].ljust(20),
+      print('-->' if pc == last_pc else '    ', end=' ')
+      print(repr(pc).rjust(4), end=' ')
+      print(dis.opname[op].ljust(20), end=' ')
     pc += 1
     if op >= dis.HAVE_ARGUMENT:
       arg = ord(code[pc]) + ord(code[pc + 1]) * 256 + extended_arg
       extended_arg = 0
       pc += 2
       if op == dis.EXTENDED_ARG:
-        extended_arg = arg * 65536L
+        extended_arg = arg * 65536
       if debug:
-        print str(arg).rjust(5),
+        print(str(arg).rjust(5), end=' ')
         if op in dis.hasconst:
-          print '(' + repr(co.co_consts[arg]) + ')',
+          print('(' + repr(co.co_consts[arg]) + ')', end=' ')
         elif op in dis.hasname:
-          print '(' + co.co_names[arg] + ')',
+          print('(' + co.co_names[arg] + ')', end=' ')
         elif op in dis.hasjrel:
-          print '(to ' + repr(pc + arg) + ')',
+          print('(to ' + repr(pc + arg) + ')', end=' ')
         elif op in dis.haslocal:
-          print '(' + co.co_varnames[arg] + ')',
+          print('(' + co.co_varnames[arg] + ')', end=' ')
         elif op in dis.hascompare:
-          print '(' + dis.cmp_op[arg] + ')',
+          print('(' + dis.cmp_op[arg] + ')', end=' ')
         elif op in dis.hasfree:
           if free is None:
             free = co.co_cellvars + co.co_freevars
-          print '(' + free[arg] + ')',
+          print('(' + free[arg] + ')', end=' ')
 
     # Acutally emulate the op.
     if state is None and states[start] is None:
       # No control reaches here (yet).
       if debug:
-        print
+        print()
       continue
     state |= states[start]
 
@@ -398,8 +401,8 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
       states[jmp] = new_state
 
     if debug:
-      print
-      print state
+      print()
+      print(state)
       pprint.pprint(dict(item for item in states.items() if item[1]))
 
   if yields:
@@ -408,5 +411,5 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
     result = reduce(union, Const.unwrap_all(returns))
 
   if debug:
-    print f, id(f), input_types, '->', result
+    print(f, id(f), input_types, '->', result)
   return result

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 38259c8..8af9dd6 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -39,7 +39,8 @@ class TrivialInferenceTest(unittest.TestCase):
         typehints.Tuple[str, int, float], lambda x: (x, 0, 1.0), [str])
 
   def testUnpack(self):
-    def reverse((a, b)):
+    def reverse(xxx_todo_changeme):
+      (a, b) = xxx_todo_changeme
       return b, a
     any_tuple = typehints.Tuple[typehints.Any, typehints.Any]
     self.assertReturnType(

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 1df1104..501715b 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -103,14 +103,16 @@ class NativeTypesTest(unittest.TestCase):
 
   def test_good_main_input(self):
     @typehints.with_input_types(typing.Tuple[str, int])
-    def munge((s, i)):
+    def munge(xxx_todo_changeme):
+      (s, i) = xxx_todo_changeme
       return (s + 's', i * 2)
     result = [('apple', 5), ('pear', 3)] | beam.Map(munge)
     self.assertEqual([('apples', 10), ('pears', 6)], sorted(result))
 
   def test_bad_main_input(self):
     @typehints.with_input_types(typing.Tuple[str, str])
-    def munge((s, i)):
+    def munge(xxx_todo_changeme1):
+      (s, i) = xxx_todo_changeme1
       return (s + 's', i * 2)
     with self.assertRaises(typehints.TypeCheckError):
       [('apple', 5), ('pear', 3)] | beam.Map(munge)
@@ -118,7 +120,8 @@ class NativeTypesTest(unittest.TestCase):
   def test_bad_main_output(self):
     @typehints.with_input_types(typing.Tuple[int, int])
     @typehints.with_output_types(typing.Tuple[str, str])
-    def munge((a, b)):
+    def munge(xxx_todo_changeme2):
+      (a, b) = xxx_todo_changeme2
       return (str(a), str(b))
     with self.assertRaises(typehints.TypeCheckError):
       [(5, 4), (3, 2)] | beam.Map(munge) | 'Again' >> beam.Map(munge)

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index c19916f..a27dd7e 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -315,7 +315,7 @@ def validate_composite_type_param(type_param, error_msg_prefix):
   """
   # Must either be a TypeConstraint instance or a basic Python type.
   is_not_type_constraint = (
-      not isinstance(type_param, (type, types.ClassType, TypeConstraint))
+      not isinstance(type_param, (type, TypeConstraint))
       and type_param is not None)
   is_forbidden_type = (isinstance(type_param, type) and
                        type_param in DISALLOWED_PRIMITIVE_TYPES)
@@ -340,7 +340,7 @@ def _unified_repr(o):
     A qualified name for the passed Python object fit for string formatting.
   """
   return repr(o) if isinstance(
-      o, (TypeConstraint, types.NoneType)) else o.__name__
+      o, (TypeConstraint, type(None))) else o.__name__
 
 
 def check_constraint(type_constraint, object_instance):
@@ -491,7 +491,7 @@ class UnionHint(CompositeTypeHint):
     if Any in params:
       return Any
     elif len(params) == 1:
-      return iter(params).next()
+      return next(iter(params))
     return self.UnionConstraint(params)
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 08223b3..927da14 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -182,7 +182,7 @@ def with_exponential_backoff(
           exn_traceback = sys.exc_info()[2]
           try:
             try:
-              sleep_interval = retry_intervals.next()
+              sleep_interval = next(retry_intervals)
             except StopIteration:
               # Re-raise the original exception since we finished the retries.
               raise exn, None, exn_traceback  # pylint: disable=raising-bad-type


[14/18] beam git commit: pr/cl-feedback from c-y-koo

Posted by ro...@apache.org.
pr/cl-feedback from c-y-koo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3a04dbe4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3a04dbe4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3a04dbe4

Branch: refs/heads/master
Commit: 3a04dbe46debc2c4aa188b210870a24dc1408bf1
Parents: 3b0ad58
Author: Holden Karau <ho...@us.ibm.com>
Authored: Tue Oct 10 12:31:16 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/complete/game/leader_board.py    |  2 +-
 .../python/apache_beam/examples/cookbook/mergecontacts.py |  9 +++------
 sdks/python/apache_beam/examples/snippets/snippets.py     |  1 -
 .../runners/portability/maptask_executor_runner_test.py   |  2 +-
 sdks/python/apache_beam/transforms/ptransform_test.py     | 10 +++++-----
 5 files changed, 10 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index a5bde05..e207f26 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -329,11 +329,11 @@ def run(argv=None):
              'processing_time': 'STRING',
          }))
 
-    # Get user scores and write the results to BigQuery
     def format_user_score_sums(user_score):
       (user, score) = user_score
       return {'user': user, 'total_score': score}
 
+    # Get user scores and write the results to BigQuery
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
      | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)

http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index b07b98d..237d4ca 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -118,12 +118,9 @@ def run(argv=None, assert_results=None):
       (_, (_, _, snailmail)) = name_email_phone_snailmail
       return not next(iter(snailmail), None)
 
-    luddites = grouped | beam.Filter(  # People without email.
-        without_email)
-    writers = grouped | beam.Filter(   # People without phones.
-        without_phones)
-    nomads = grouped | beam.Filter(    # People without addresses.
-        without_address)
+    luddites = grouped | beam.Filter(without_email) # People without email.
+    writers = grouped | beam.Filter(without_phones) # People without phones.
+    nomads = grouped | beam.Filter(without_address) # People without addresses.
 
     num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
     num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()

http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 54abd8c..048b31a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1179,7 +1179,6 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
       return '%s; %s; %s' %\
         (name, sorted(info['emails']), sorted(info['phones']))
 
-
     contact_lines = result | beam.Map(join_info)
     # [END model_group_by_key_cogroupbykey_tuple]
     formatted_results | beam.io.WriteToText(output_path)

http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 4c0d3b3..0f8637f 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -152,7 +152,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
       derived = ((pcoll,) | beam.Flatten()
                  | beam.Map(lambda x: (x, x))
                  | beam.GroupByKey()
-                 | 'Unkey' >> beam.Map(lambda x__: x__[0]))
+                 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
       assert_that(
           pcoll | beam.FlatMap(cross_product, AsList(derived)),
           equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))

http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 0d2bb7a..dac2c4f 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -647,7 +647,7 @@ class PTransformTest(unittest.TestCase):
                 | beam.Flatten()
                 | beam.Map(lambda x: (x, None))
                 | beam.GroupByKey()
-                | beam.Map(lambda x__: x__[0]))
+                | beam.Map(lambda kv: kv[0]))
     self.assertEqual([1, 2, 3], sorted(([1, 2], [2, 3]) | DisjointUnion()))
 
   def test_apply_to_crazy_pvaluish(self):
@@ -1593,8 +1593,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'C' >> beam.Create(range(5)).with_output_types(int)
          | 'Mean' >> combine.Mean.Globally())
 
-    assert_that(d, equal_to([2.0]))
     self.assertEqual(float, d.element_type)
+    assert_that(d, equal_to([2.0]))
     self.p.run()
 
   def test_mean_globally_pipeline_checking_violated(self):
@@ -1616,8 +1616,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'C' >> beam.Create(range(5)).with_output_types(int)
          | 'Mean' >> combine.Mean.Globally())
 
-    assert_that(d, equal_to([2.0]))
     self.assertEqual(float, d.element_type)
+    assert_that(d, equal_to([2.0]))
     self.p.run()
 
   def test_mean_globally_runtime_checking_violated(self):
@@ -1709,8 +1709,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'P' >> beam.Create(range(5)).with_output_types(int)
          | 'CountInt' >> combine.Count.Globally())
 
-    assert_that(d, equal_to([5]))
     self.assertEqual(int, d.element_type)
+    assert_that(d, equal_to([5]))
     self.p.run()
 
   def test_count_globally_runtime_type_checking_satisfied(self):
@@ -1720,8 +1720,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'P' >> beam.Create(range(5)).with_output_types(int)
          | 'CountInt' >> combine.Count.Globally())
 
-    assert_that(d, equal_to([5]))
     self.assertEqual(int, d.element_type)
+    assert_that(d, equal_to([5]))
     self.p.run()
 
   def test_count_perkey_pipeline_type_checking_satisfied(self):


[04/18] beam git commit: Change the ptransform_test to use assertEqual rather than assertTrue for improved debugging, fix the error message we're looking for since we now throw a tuple exception instead

Posted by ro...@apache.org.
Change the ptransform_test to use assertEqual rather than assertTrue for improved debugging, fix the error message we're looking for since we now throw a tuple exception instead


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5cc6b5f1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5cc6b5f1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5cc6b5f1

Branch: refs/heads/master
Commit: 5cc6b5f17e0e1e117a8cd237d4c7be8d7fce8a17
Parents: 7d0040d
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 20:49:40 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/transforms/ptransform_test.py       | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5cc6b5f1/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index c137b14..2f2734d 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1332,9 +1332,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.assertStartswith(
         e.exception.message,
         "Runtime type violation detected within ParDo(Add): "
-        "Type-hint for argument: 'y' violated. "
-        "Expected an instance of <type 'int'>, "
-        "instead found 3.0, an instance of <type 'float'>.")
+        "Type-hint for argument: 'x_y' violated: "
+        "Tuple[int, int] hint type-constraint violated. "
+        "The type of element #1 in the passed tuple is incorrect. "
+        "Expected an instance of type int, instead received an instance "
+        "of type float.")
 
   def test_pipeline_runtime_checking_violation_simple_type_output(self):
     self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1591,8 +1593,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'C' >> beam.Create(range(5)).with_output_types(int)
          | 'Mean' >> combine.Mean.Globally())
 
-    self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
+    self.assertEqual(float, d.element_type)
     self.p.run()
 
   def test_mean_globally_pipeline_checking_violated(self):
@@ -1614,8 +1616,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'C' >> beam.Create(range(5)).with_output_types(int)
          | 'Mean' >> combine.Mean.Globally())
 
-    self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
+    self.assertEqual(float, d.element_type)
     self.p.run()
 
   def test_mean_globally_runtime_checking_violated(self):
@@ -1707,8 +1709,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'P' >> beam.Create(range(5)).with_output_types(int)
          | 'CountInt' >> combine.Count.Globally())
 
-    self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
+    self.assertEqual(int, d.element_type)
     self.p.run()
 
   def test_count_globally_runtime_type_checking_satisfied(self):
@@ -1718,8 +1720,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | 'P' >> beam.Create(range(5)).with_output_types(int)
          | 'CountInt' >> combine.Count.Globally())
 
-    self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
+    self.assertEqual(int, d.element_type)
     self.p.run()
 
   def test_count_perkey_pipeline_type_checking_satisfied(self):


[13/18] beam git commit: Re-order comment (reviewer feedback)

Posted by ro...@apache.org.
Re-order comment (reviewer feedback)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14cf6a13
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14cf6a13
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14cf6a13

Branch: refs/heads/master
Commit: 14cf6a13a3e3152ca8f168535500a95f16db07b2
Parents: bf910c1
Author: Holden Karau <ho...@us.ibm.com>
Authored: Thu Sep 7 23:38:37 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/juliaset/juliaset/juliaset.py            | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/14cf6a13/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index bb5b185..165237d 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -104,12 +104,12 @@ def run(argv=None):  # pylint: disable=missing-docstring
 
     coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
 
-    # Group each coordinate triplet by its x value, then write the coordinates
-    # to the output file with an x-coordinate grouping per line.
-    # pylint: disable=expression-not-assigned
     def x_coord_key(x_y_i):
       return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
 
+    # Group each coordinate triplet by its x value, then write the coordinates
+    # to the output file with an x-coordinate grouping per line.
+    # pylint: disable=expression-not-assigned
     (coordinates
      | 'x coord key' >> beam.Map(x_coord_key)
      | 'x coord' >> beam.GroupByKey()


[11/18] beam git commit: Change the xxx_todo_changeme[x]s to reasonable values

Posted by ro...@apache.org.
Change the xxx_todo_changeme[x]s to reasonable values


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a795e545
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a795e545
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a795e545

Branch: refs/heads/master
Commit: a795e545157d20099c82970e4e2c302aa151413b
Parents: fe0dc2e
Author: Holden Karau <ho...@us.ibm.com>
Authored: Wed Sep 6 16:15:10 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/complete/tfidf.py      | 12 ++++++------
 sdks/python/apache_beam/examples/snippets/snippets.py   |  4 ++--
 .../apache_beam/examples/snippets/snippets_test.py      |  8 ++++----
 sdks/python/apache_beam/transforms/combiners.py         |  8 ++++----
 sdks/python/apache_beam/transforms/ptransform_test.py   |  8 ++++----
 sdks/python/apache_beam/transforms/util.py              |  8 ++++----
 .../apache_beam/typehints/trivial_inference_test.py     |  4 ++--
 .../python/apache_beam/typehints/typed_pipeline_test.py | 12 ++++++------
 8 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 55404df..065e4b3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -68,8 +68,8 @@ class TfIdf(beam.PTransform):
     # Create a collection of pairs mapping a URI to each of the words
     # in the document associated with that that URI.
 
-    def split_into_words(xxx_todo_changeme):
-      (uri, line) = xxx_todo_changeme
+    def split_into_words(uri_line):
+      (uri, line) = uri_line
       return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
 
     uri_to_words = (
@@ -128,8 +128,8 @@ class TfIdf(beam.PTransform):
     # that word occurs in the document divided by the total number of words in
     # the document.
 
-    def compute_term_frequency(xxx_todo_changeme1):
-      (uri, count_and_total) = xxx_todo_changeme1
+    def compute_term_frequency(uri_count_and_total):
+      (uri, count_and_total) = uri_count_and_total
       word_and_count = count_and_total['word counts']
       # We have an iterable for one element that we want extracted.
       [word_total] = count_and_total['word totals']
@@ -169,8 +169,8 @@ class TfIdf(beam.PTransform):
     # basic version that is the term frequency divided by the log of the
     # document frequency.
 
-    def compute_tf_idf(xxx_todo_changeme2):
-      (word, tf_and_df) = xxx_todo_changeme2
+    def compute_tf_idf(word_tf_and_df):
+      (word, tf_and_df) = word_tf_and_df
       [docf] = tf_and_df['df']
       for uri, tf in tf_and_df['tf']:
         yield word, (uri, tf * math.log(1 / docf))

http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 10080c9..01118b3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1171,8 +1171,8 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
     results = ({'emails': emails_pcoll, 'phones': phones_pcoll}
                | beam.CoGroupByKey())
 
-    def join_info(xxx_todo_changeme):
-      (name, info) = xxx_todo_changeme
+    def join_info(name_info):
+      (name, info) = name_info
       return '%s; %s; %s' %\
         (name, sorted(info['emails']), sorted(info['phones']))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 8a9695d..8f88ab9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -882,16 +882,16 @@ class CombineTest(unittest.TestCase):
       def create_accumulator(self):
         return (0.0, 0)
 
-      def add_input(self, xxx_todo_changeme, input):
-        (sum, count) = xxx_todo_changeme
+      def add_input(self, sum_count, input):
+        (sum, count) = sum_count
         return sum + input, count + 1
 
       def merge_accumulators(self, accumulators):
         sums, counts = zip(*accumulators)
         return sum(sums), sum(counts)
 
-      def extract_output(self, xxx_todo_changeme1):
-        (sum, count) = xxx_todo_changeme1
+      def extract_output(self, sum_count):
+        (sum, count) = sum_count
         return sum / count if count else float('NaN')
     # [END combine_custom_average_define]
     # [START combine_custom_average_execute]

http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 3348790..ce5e942 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -77,16 +77,16 @@ class MeanCombineFn(core.CombineFn):
   def create_accumulator(self):
     return (0, 0)
 
-  def add_input(self, xxx_todo_changeme, element):
-    (sum_, count) = xxx_todo_changeme
+  def add_input(self, sum_count, element):
+    (sum_, count) = sum_count
     return sum_ + element, count + 1
 
   def merge_accumulators(self, accumulators):
     sums, counts = zip(*accumulators)
     return sum(sums), sum(counts)
 
-  def extract_output(self, xxx_todo_changeme1):
-    (sum_, count) = xxx_todo_changeme1
+  def extract_output(self, sum_count):
+    (sum_, count) = sum_count
     if count == 0:
       return float('NaN')
     return sum_ / float(count)

http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 2f2734d..0d2bb7a 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -355,16 +355,16 @@ class PTransformTest(unittest.TestCase):
     def create_accumulator(self):
       return (0, 0)
 
-    def add_input(self, xxx_todo_changeme, element):
-      (sum_, count) = xxx_todo_changeme
+    def add_input(self, sum_count, element):
+      (sum_, count) = sum_count
       return sum_ + element, count + 1
 
     def merge_accumulators(self, accumulators):
       sums, counts = zip(*accumulators)
       return sum(sums), sum(counts)
 
-    def extract_output(self, xxx_todo_changeme3):
-      (sum_, count) = xxx_todo_changeme3
+    def extract_output(self, sum_count):
+      (sum_, count) = sum_count
       if not count:
         return float('nan')
       return sum_ / float(count)

http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 4509ed4..6a7e269 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,16 +98,16 @@ class CoGroupByKey(PTransform):
   def expand(self, pcolls):
     """Performs CoGroupByKey on argument pcolls; see class docstring."""
     # For associating values in K-V pairs with the PCollections they came from.
-    def _pair_tag_with_value(xxx_todo_changeme, tag):
-      (key, value) = xxx_todo_changeme
+    def _pair_tag_with_value(k_v, tag):
+      (key, value) = k_v
       return (key, (tag, value))
 
     # Creates the key, value pairs for the output PCollection. Values are either
     # lists or dicts (per the class docstring), initialized by the result of
     # result_ctor(result_ctor_arg).
-    def _merge_tagged_vals_under_key(xxx_todo_changeme3, result_ctor,
+    def _merge_tagged_vals_under_key(k_grouped, result_ctor,
                                      result_ctor_arg):
-      (key, grouped) = xxx_todo_changeme3
+      (key, grouped) = k_grouped
       result_value = result_ctor(result_ctor_arg)
       for tag, value in grouped:
         result_value[tag].append(value)

http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 7b7b6a8..37b2258 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -44,8 +44,8 @@ class TrivialInferenceTest(unittest.TestCase):
         typehints.Tuple[str, int, float], lambda x: (x, 0, 1.0), [str])
 
   def testUnpack(self):
-    def reverse(xxx_todo_changeme):
-      (a, b) = xxx_todo_changeme
+    def reverse(a_b):
+      (a, b) = a_b
       return b, a
     any_tuple = typehints.Tuple[typehints.Any, typehints.Any]
     self.assertReturnType(

http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 501715b..2581457 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -103,16 +103,16 @@ class NativeTypesTest(unittest.TestCase):
 
   def test_good_main_input(self):
     @typehints.with_input_types(typing.Tuple[str, int])
-    def munge(xxx_todo_changeme):
-      (s, i) = xxx_todo_changeme
+    def munge(s_i):
+      (s, i) = s_i
       return (s + 's', i * 2)
     result = [('apple', 5), ('pear', 3)] | beam.Map(munge)
     self.assertEqual([('apples', 10), ('pears', 6)], sorted(result))
 
   def test_bad_main_input(self):
     @typehints.with_input_types(typing.Tuple[str, str])
-    def munge(xxx_todo_changeme1):
-      (s, i) = xxx_todo_changeme1
+    def munge(s_i):
+      (s, i) = s_i
       return (s + 's', i * 2)
     with self.assertRaises(typehints.TypeCheckError):
       [('apple', 5), ('pear', 3)] | beam.Map(munge)
@@ -120,8 +120,8 @@ class NativeTypesTest(unittest.TestCase):
   def test_bad_main_output(self):
     @typehints.with_input_types(typing.Tuple[int, int])
     @typehints.with_output_types(typing.Tuple[str, str])
-    def munge(xxx_todo_changeme2):
-      (a, b) = xxx_todo_changeme2
+    def munge(a_b):
+      (a, b) = a_b
       return (str(a), str(b))
     with self.assertRaises(typehints.TypeCheckError):
       [(5, 4), (3, 2)] | beam.Map(munge) | 'Again' >> beam.Map(munge)


[15/18] beam git commit: Clarify docstring and update run_pylint to be more clear about what steps the developer can take

Posted by ro...@apache.org.
Clarify docstring and update run_pylint to be more clear about what steps the developer can take


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf910c11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf910c11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf910c11

Branch: refs/heads/master
Commit: bf910c118b6940fef951d4441762b4c55b199ec1
Parents: 252c679
Author: Holden Karau <ho...@us.ibm.com>
Authored: Thu Sep 7 21:26:51 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/typehints/typehints.py | 6 +++++-
 sdks/python/run_pylint.sh                      | 2 ++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bf910c11/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index b78ead2..6e1d8b7 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -191,7 +191,11 @@ class IndexableTypeConstraint(TypeConstraint):
   """
 
   def _constraint_for_index(self, idx):
-    """Returns the type at the given index."""
+    """Returns the type at the given index. This is used to allow type inference
+    to determine the correct type for a specific index. On lists this will also
+    be the same, however for tuples the value will depend on the position. This
+    was added as part of the futurize changes since more of the expressions now
+    index into tuples."""
     raise NotImplementedError
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf910c11/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 91d5c4a..06f2072 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -89,7 +89,9 @@ futurize_filtered=$(echo "$futurize_results" |grep -v 'pb2\|typehints.py\|trivia
 count=${#futurize_filtered}
 if [ "$count" != "0" ]; then
   echo "Some of the changes require futurize stage 1 changes."
+  echo "The files with required changes:"
   echo "$futurize_filtered"
+  echo "You can run futurize apache_beam to see the proposed changes."
   exit 1
 fi
 echo "No future changes needed"


[17/18] beam git commit: Change the examples with long-lambdas which were pulled out into defs to use explicit tuple unpacking instead of indexing. Note: this still keeps the lambdas which are short enough and those use indexing (as required).

Posted by ro...@apache.org.
Change the examples with long-lambdas which were pulled out into defs to use explicit tuple unpacking instead of indexing. Note: this still keeps the lambdas which are short enough and those use indexing (as required).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3b0ad58c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3b0ad58c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3b0ad58c

Branch: refs/heads/master
Commit: 3b0ad58cd68ee0790ac1f10ddffa96a866d85a0c
Parents: cf2be41
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 8 12:56:27 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py           |  3 +-
 .../examples/complete/game/leader_board.py      |  3 +-
 .../examples/complete/game/user_score.py        |  3 +-
 .../complete/juliaset/juliaset/juliaset.py      |  3 +-
 .../apache_beam/examples/complete/tfidf_test.py |  3 +-
 .../examples/cookbook/datastore_wordcount.py    |  8 ++++--
 .../examples/cookbook/mergecontacts.py          | 30 ++++++++++++--------
 .../examples/cookbook/multiple_output_pardo.py  | 10 ++++---
 .../apache_beam/examples/snippets/snippets.py   | 13 +++++----
 .../apache_beam/examples/streaming_wordcount.py |  3 +-
 .../apache_beam/examples/windowed_wordcount.py  |  3 +-
 sdks/python/apache_beam/examples/wordcount.py   |  8 ++++--
 .../apache_beam/examples/wordcount_debugging.py |  8 ++++--
 .../apache_beam/examples/wordcount_minimal.py   |  5 ++--
 sdks/python/apache_beam/transforms/util.py      |  8 +++---
 15 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index c09b78e..b556e65 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -46,7 +46,8 @@ def run(argv=None):
   pipeline_options.view_as(SetupOptions).save_main_session = True
   with beam.Pipeline(options=pipeline_options) as p:
     def format_result(prefix_candidates):
-      return '%s: %s' % (prefix_candidates[0], prefix_candidates[1])
+      (prefix, candidates) = prefix_candidates
+      return '%s: %s' % (prefix, candidates)
 
     (p  # pylint: disable=expression-not-assigned
      | 'read' >> ReadFromText(known_args.input)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index c1b15e9..a5bde05 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -331,7 +331,8 @@ def run(argv=None):
 
     # Get user scores and write the results to BigQuery
     def format_user_score_sums(user_score):
-      return {'user': user_score[0], 'total_score': user_score[1]}
+      (user, score) = user_score
+      return {'user': user, 'total_score': score}
 
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 0405bab..3e3e547 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -139,7 +139,8 @@ def run(argv=None):
 
   with beam.Pipeline(argv=pipeline_args) as p:
     def format_user_score_sums(user_score):
-      return 'user: %s, total_score: %s' % (user_score[0], user_score[1])
+      (user, score) = user_score
+      return 'user: %s, total_score: %s' % (user, score)
 
     (p  # pylint: disable=expression-not-assigned
      | 'ReadInputText' >> beam.io.ReadFromText(args.input)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 165237d..3f3ef03 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,7 +105,8 @@ def run(argv=None):  # pylint: disable=missing-docstring
     coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
 
     def x_coord_key(x_y_i):
-      return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+      (x, y, i) = x_y_i
+      return (x, (x, y, i))
 
     # Group each coordinate triplet by its x value, then write the coordinates
     # to the output file with an x-coordinate grouping per line.

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 71e71e3..637d10a 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -52,7 +52,8 @@ class TfIdfTest(unittest.TestCase):
   def test_tfidf_transform(self):
     with TestPipeline() as p:
       def re_key(word_uri_tfidf):
-        return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])
+        (word, (uri, tfidf)) = word_uri_tfidf
+        return (word, uri, tfidf)
 
       uri_to_line = p | 'create sample' >> beam.Create(
           [('1.txt', 'abc def ghi'),

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 03fcd8a..099fb08 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -180,7 +180,8 @@ def read_from_datastore(project, user_options, pipeline_options):
 
   # Count the occurrences of each word.
   def count_ones(word_ones):
-    return (word_ones[0], sum(word_ones[1]))
+    (word, ones) = word_ones
+    return (word, sum(ones))
 
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -190,8 +191,9 @@ def read_from_datastore(project, user_options, pipeline_options):
             | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  def format_result(w_c):
-    return '%s: %s' % (w_c[0], w_c[1])
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %s' % (word, count)
 
   output = counts | 'format' >> beam.Map(format_result)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index fdfa286..b07b98d 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -95,22 +95,28 @@ def run(argv=None, assert_results=None):
 
     # Prepare tab-delimited output; something like this:
     # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
-    tsv_lines = grouped | beam.Map(
-        lambda name_email_phone_snailmail: '\t'.join(
-            ['"%s"' % name_email_phone_snailmail[0],
-             '"%s"' % ','.join(name_email_phone_snailmail[1][0]),
-             '"%s"' % ','.join(name_email_phone_snailmail[1][1]),
-             '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
+    def format_as_tsv(name_email_phone_snailmail):
+      (name, (email, phone, snailmail)) = name_email_phone_snailmail
+      return '\t'.join(
+          ['"%s"' % name,
+           '"%s"' % ','.join(email),
+           '"%s"' % ','.join(phone),
+           '"%s"' % next(iter(snailmail), '')])
+
+    tsv_lines = grouped | beam.Map(format_as_tsv)
 
     # Compute some stats about our database of people.
-    def without_email(name_email_phone_snailmail1):
-      return not next(iter(name_email_phone_snailmail1[1][0]), None)
+    def without_email(name_email_phone_snailmail):
+      (_, (email, _, _)) = name_email_phone_snailmail
+      return not next(iter(email), None)
 
-    def without_phones(name_email_phone_snailmail2):
-      return not next(iter(name_email_phone_snailmail2[1][1]), None)
+    def without_phones(name_email_phone_snailmail):
+      (_, (_, phone, _)) = name_email_phone_snailmail
+      return not next(iter(phone), None)
 
-    def without_address(name_e_p_snailmail):
-      return not next(iter(name_e_p_snailmail[1][2]), None)
+    def without_address(name_email_phone_snailmail):
+      (_, (_, _, snailmail)) = name_email_phone_snailmail
+      return not next(iter(snailmail), None)
 
     luddites = grouped | beam.Filter(  # People without email.
         without_email)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index fe7929e..e3df3a8 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -120,10 +120,12 @@ class CountWords(beam.PTransform):
 
   def expand(self, pcoll):
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     return (pcoll
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
@@ -169,7 +171,7 @@ def run(argv=None):
     (character_count
      | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
      | beam.GroupByKey()
-     | 'count chars' >> beam.Map(lambda __counts: sum(__counts[1]))
+     | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
      | 'write chars' >> WriteToText(known_args.output + '-chars'))
 
     # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 01118b3..54abd8c 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -535,8 +535,9 @@ def examples_wordcount_templated(renames):
   lines = p | 'Read' >> ReadFromText(wordcount_options.input)
   # [END example_wordcount_templated]
 
-  def format_result(w_c):
-    return '%s: %s' % (w_c[0], w_c[1])
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %s' % (word, count)
 
   (
       lines
@@ -614,8 +615,9 @@ def examples_wordcount_debugging(renames):
             [('Flourish', 3), ('stomach', 1)]))
     # [END example_wordcount_debugging_assert]
 
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     output = (filtered_words
               | 'format' >> beam.Map(format_result)
@@ -1126,7 +1128,8 @@ def model_group_by_key(contents, output_path):
   import apache_beam as beam
   with TestPipeline() as p:  # Use TestPipeline for testing.
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     words_and_counts = (
         p

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index d1af4ce..df8a99b 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -59,7 +59,8 @@ def run(argv=None):
 
     # Capitalize the characters in each line.
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     transformed = (lines
                    # Use a pre-defined function that imports the re package.

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index f88b942..4c7eee1 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -70,7 +70,8 @@ def run(argv=None):
 
     # Capitalize the characters in each line.
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     transformed = (lines
                    | 'Split' >> (beam.FlatMap(find_words)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 60f1ffe..b1c4a5e 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -89,7 +89,8 @@ def run(argv=None):
 
   # Count the occurrences of each word.
   def count_ones(word_ones):
-    return (word_ones[0], sum(word_ones[1]))
+    (word, ones) = word_ones
+    return (word, sum(ones))
 
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -99,8 +100,9 @@ def run(argv=None):
             | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  def format_result(w_c):
-    return '%s: %s' % (w_c[0], w_c[1])
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %s' % (word, count)
 
   output = counts | 'format' >> beam.Map(format_result)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 7d18c0d..6ff8f26 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -94,7 +94,8 @@ class CountWords(beam.PTransform):
   """
   def expand(self, pcoll):
     def count_ones(word_ones):
-      return (word_ones[0], sum(word_ones[1]))
+      (word, ones) = word_ones
+      return (word, sum(ones))
 
     return (pcoll
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
@@ -144,8 +145,9 @@ def run(argv=None):
     # Format the counts into a PCollection of strings and write the output using
     # a "Write" transform that has side effects.
     # pylint: disable=unused-variable
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     output = (filtered_words
               | 'format' >> beam.Map(format_result)

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 54fd1f1..390c8c0 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,8 +106,9 @@ def run(argv=None):
         | 'GroupAndSum' >> beam.CombinePerKey(sum))
 
     # Format the counts into a PCollection of strings.
-    def format_result(w_c):
-      return '%s: %s' % (w_c[0], w_c[1])
+    def format_result(word_count):
+      (word, count) = word_count
+      return '%s: %s' % (word, count)
 
     output = counts | 'Format' >> beam.Map(format_result)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 6a7e269..647781f 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,16 +98,16 @@ class CoGroupByKey(PTransform):
   def expand(self, pcolls):
     """Performs CoGroupByKey on argument pcolls; see class docstring."""
     # For associating values in K-V pairs with the PCollections they came from.
-    def _pair_tag_with_value(k_v, tag):
-      (key, value) = k_v
+    def _pair_tag_with_value(key_value, tag):
+      (key, value) = key_value
       return (key, (tag, value))
 
     # Creates the key, value pairs for the output PCollection. Values are either
     # lists or dicts (per the class docstring), initialized by the result of
     # result_ctor(result_ctor_arg).
-    def _merge_tagged_vals_under_key(k_grouped, result_ctor,
+    def _merge_tagged_vals_under_key(key_grouped, result_ctor,
                                      result_ctor_arg):
-      (key, grouped) = k_grouped
+      (key, grouped) = key_grouped
       result_value = result_ctor(result_ctor_arg)
       for tag, value in grouped:
         result_value[tag].append(value)


[03/18] beam git commit: Add special work to handle indexable return types. Introduce a base IndexableTypeConstraint.

Posted by ro...@apache.org.
Add special work to handle indexable return types. Introduce a base IndexableTypeConstraint.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f364248b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f364248b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f364248b

Branch: refs/heads/master
Commit: f364248b984e8ff884a34b37d49a17fa1a50cc26
Parents: 5cc6b5f
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 20:51:18 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/typehints/trivial_inference.py  | 20 +++++++++++++-
 .../typehints/trivial_inference_test.py         |  5 ++++
 sdks/python/apache_beam/typehints/typehints.py  | 28 +++++++++++++++++---
 3 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f364248b/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index 51d3db2..a68bd18 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -107,7 +107,10 @@ class FrameState(object):
 
   def __init__(self, f, local_vars=None, stack=()):
     self.f = f
-    self.co = f.__code__
+    if sys.version_info[0] >= 3:
+      self.co = f.__code__
+    else:
+      self.co = f.func_code
     self.vars = list(local_vars)
     self.stack = list(stack)
 
@@ -362,7 +365,22 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
       else:
         return_type = Any
       state.stack[-pop_count:] = [return_type]
+    elif (opname == 'BINARY_SUBSCR'
+          and isinstance(state.stack[1], Const)
+          and isinstance(state.stack[0], typehints.IndexableTypeConstraint)):
+      if debug:
+        print("Executing special case binary subscript")
+      idx = state.stack.pop()
+      src = state.stack.pop()
+      try:
+        state.stack.append(src._constraint_for_index(idx.value))
+      except Exception as e:
+        if debug:
+          print("Exception {0} during special case indexing".format(e))
+        state.stack.append(Any)
     elif opname in simple_ops:
+      if debug:
+        print("Executing simple op " + opname)
       simple_ops[opname](state, arg)
     elif opname == 'RETURN_VALUE':
       returns.add(state.stack[-1])

http://git-wip-us.apache.org/repos/asf/beam/blob/f364248b/sdks/python/apache_beam/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 8af9dd6..7b7b6a8 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -32,6 +32,11 @@ class TrivialInferenceTest(unittest.TestCase):
   def testIdentity(self):
     self.assertReturnType(int, lambda x: x, [int])
 
+  def testIndexing(self):
+    self.assertReturnType(int, lambda x: x[0], [typehints.Tuple[int, str]])
+    self.assertReturnType(str, lambda x: x[1], [typehints.Tuple[int, str]])
+    self.assertReturnType(str, lambda x: x[1], [typehints.List[str]])
+
   def testTuples(self):
     self.assertReturnType(
         typehints.Tuple[typehints.Tuple[()], int], lambda x: ((), x), [int])

http://git-wip-us.apache.org/repos/asf/beam/blob/f364248b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index a27dd7e..b78ead2 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -65,6 +65,7 @@ In addition, type-hints can be used to implement run-time type-checking via the
 
 import collections
 import copy
+import sys
 import types
 
 __all__ = [
@@ -184,7 +185,17 @@ def bind_type_variables(type_constraint, bindings):
   return type_constraint
 
 
-class SequenceTypeConstraint(TypeConstraint):
+class IndexableTypeConstraint(TypeConstraint):
+  """An internal common base-class for all type constraints with indexing.
+  E.G. SequenceTypeConstraint + Tuple's of fixed size.
+  """
+
+  def _constraint_for_index(self, idx):
+    """Returns the type at the given index."""
+    raise NotImplementedError
+
+
+class SequenceTypeConstraint(IndexableTypeConstraint):
   """A common base-class for all sequence related type-constraint classes.
 
   A sequence is defined as an arbitrary length homogeneous container type. Type
@@ -214,6 +225,10 @@ class SequenceTypeConstraint(TypeConstraint):
   def _inner_types(self):
     yield self.inner_type
 
+  def _constraint_for_index(self, idx):
+    """Returns the type at the given index."""
+    return self.inner_type
+
   def _consistent_with_check_(self, sub):
     return (isinstance(sub, self.__class__)
             and is_consistent_with(sub.inner_type, self.inner_type))
@@ -314,8 +329,11 @@ def validate_composite_type_param(type_param, error_msg_prefix):
       parameter for a :class:`CompositeTypeHint`.
   """
   # Must either be a TypeConstraint instance or a basic Python type.
+  possible_classes = [type, TypeConstraint]
+  if sys.version_info[0] == 2:
+    possible_classes.append(types.ClassType)
   is_not_type_constraint = (
-      not isinstance(type_param, (type, TypeConstraint))
+      not isinstance(type_param, tuple(possible_classes))
       and type_param is not None)
   is_forbidden_type = (isinstance(type_param, type) and
                        type_param in DISALLOWED_PRIMITIVE_TYPES)
@@ -546,7 +564,7 @@ class TupleHint(CompositeTypeHint):
                    for elem in sub.tuple_types)
       return super(TupleSequenceConstraint, self)._consistent_with_check_(sub)
 
-  class TupleConstraint(TypeConstraint):
+  class TupleConstraint(IndexableTypeConstraint):
 
     def __init__(self, type_params):
       self.tuple_types = tuple(type_params)
@@ -566,6 +584,10 @@ class TupleHint(CompositeTypeHint):
       for t in self.tuple_types:
         yield t
 
+    def _constraint_for_index(self, idx):
+      """Returns the type at the given index."""
+      return self.tuple_types[idx]
+
     def _consistent_with_check_(self, sub):
       return (isinstance(sub, self.__class__)
               and len(sub.tuple_types) == len(self.tuple_types)


[07/18] beam git commit: re-order imports (AUTOMATED isort)

Posted by ro...@apache.org.
re-order imports (AUTOMATED isort)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94739377
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94739377
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94739377

Branch: refs/heads/master
Commit: 94739377e02e992d21f24257189acd1c4ece9d42
Parents: dd6a789
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:56:11 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    |  1 +
 .../apache_beam/coders/coders_test_common.py    |  3 ++-
 .../io/gcp/datastore/v1/datastoreio_test.py     | 24 ++++++++++++--------
 .../python/apache_beam/transforms/ptransform.py |  2 +-
 .../apache_beam/transforms/ptransform_test.py   |  2 +-
 sdks/python/apache_beam/typehints/opcodes.py    |  3 ++-
 .../apache_beam/typehints/trivial_inference.py  |  5 ++--
 7 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 9f7b739..172ee74 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -27,6 +27,7 @@ coder_impl.pxd file for type hints.
 For internal use only; no backwards-compatibility guarantees.
 """
 from __future__ import absolute_import
+
 from types import NoneType
 
 from apache_beam.coders import observable

http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index d42e637..fc7279d 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -24,7 +24,6 @@ import unittest
 
 import dill
 
-from . import observable
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 from apache_beam.coders import coders
 from apache_beam.runners import pipeline_context
@@ -34,6 +33,8 @@ from apache_beam.utils import timestamp
 from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
+from . import observable
+
 
 # Defined out of line for picklability.
 class CustomCoder(coders.Coder):

http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index 7c73a06..96866ce 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -1,4 +1,18 @@
 from __future__ import print_function
+
+import unittest
+
+from mock import MagicMock
+from mock import call
+from mock import patch
+
+from apache_beam.io.gcp.datastore.v1 import fake_datastore
+from apache_beam.io.gcp.datastore.v1 import helper
+from apache_beam.io.gcp.datastore.v1 import query_splitter
+from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
+
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -16,18 +30,8 @@ from __future__ import print_function
 # limitations under the License.
 #
 
-import unittest
 
-from mock import MagicMock
-from mock import call
-from mock import patch
 
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.io.gcp.datastore.v1 import query_splitter
-from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
 
 # Protect against environments where datastore library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports

http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index d333ace..0b6d608 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -41,6 +41,7 @@ import inspect
 import operator
 import os
 import sys
+from functools import reduce
 
 from google.protobuf import wrappers_pb2
 
@@ -58,7 +59,6 @@ from apache_beam.typehints.trivial_inference import instance_to_type
 from apache_beam.typehints.typehints import validate_composite_type_param
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
-from functools import reduce
 
 __all__ = [
     'PTransform',

http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 112c092..c137b14 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -24,6 +24,7 @@ import collections
 import operator
 import re
 import unittest
+from functools import reduce
 
 import hamcrest as hc
 from nose.plugins.attrib import attr
@@ -48,7 +49,6 @@ from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
 from apache_beam.typehints.typehints_test import TypeHintTestCase
 from apache_beam.utils.windowed_value import WindowedValue
-from functools import reduce
 
 # Disable frequent lint warning due to pipe operator for chaining transforms.
 # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py
index c3ba92a..dcca6d0 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -27,7 +27,9 @@ are handled inline rather than here.
 For internal use only; no backwards-compatibility guarantees.
 """
 from __future__ import absolute_import
+
 import types
+from functools import reduce
 
 from . import typehints
 from .trivial_inference import BoundMethod
@@ -40,7 +42,6 @@ from .typehints import Iterable
 from .typehints import List
 from .typehints import Tuple
 from .typehints import Union
-from functools import reduce
 
 
 def pop_one(state, unused_arg):

http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index df96900..51d3db2 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -19,18 +19,19 @@
 
 For internal use only; no backwards-compatibility guarantees.
 """
-from __future__ import print_function
 from __future__ import absolute_import
+from __future__ import print_function
+
 import __builtin__
 import collections
 import dis
 import pprint
 import sys
 import types
+from functools import reduce
 
 from apache_beam.typehints import Any
 from apache_beam.typehints import typehints
-from functools import reduce
 
 
 class TypeInferenceError(ValueError):


[05/18] beam git commit: Cleanup whitespace (AUTOMATED autopep8)

Posted by ro...@apache.org.
Cleanup whitespace (AUTOMATED autopep8)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d0040db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d0040db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d0040db

Branch: refs/heads/master
Commit: 7d0040dbc680e4c144311303014c3feffdc90b60
Parents: 9473937
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:57:19 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py | 2 --
 sdks/python/apache_beam/runners/worker/sdk_worker.py            | 2 +-
 2 files changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d0040db/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index 96866ce..b1d18fe 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -31,8 +31,6 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
 #
 
 
-
-
 # Protect against environments where datastore library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:

http://git-wip-us.apache.org/repos/asf/beam/blob/7d0040db/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 5786105..1ad65fe 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -75,7 +75,7 @@ class SdkHarness(object):
         except Exception as e:  # pylint: disable=broad-except
           traceback_str = traceback.format_exc(e)
           raise Exception("Error processing request. Original traceback "
-                              "is\n%s\n" % traceback_str)
+                          "is\n%s\n" % traceback_str)
 
       def handle_response(request, response_future):
         try:


[06/18] beam git commit: Fix remaining style issues from auto conversion, switch ref in pylint to stage 1 (stages in futurize are apparently 1 indexed)

Posted by ro...@apache.org.
Fix remaining style issues from auto conversion, switch ref in pylint to stage 1 (stages in futurize are apparently 1 indexed)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd702b3e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd702b3e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd702b3e

Branch: refs/heads/master
Commit: cd702b3e7ea81fe41952066cfe9a1892d5433bd9
Parents: f364248
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 21:38:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/complete/autocomplete.py    |  5 +++--
 .../examples/complete/game/leader_board.py           |  6 ++++--
 .../apache_beam/examples/complete/game/user_score.py |  6 ++++--
 .../examples/complete/juliaset/juliaset/juliaset.py  |  5 ++++-
 sdks/python/apache_beam/examples/complete/tfidf.py   |  6 ++++--
 .../apache_beam/examples/complete/tfidf_test.py      |  5 ++++-
 .../examples/cookbook/datastore_wordcount.py         | 10 ++++++++--
 .../apache_beam/examples/cookbook/mergecontacts.py   | 15 ++++++++++++---
 .../examples/cookbook/multiple_output_pardo.py       | 10 ++++++++--
 .../python/apache_beam/examples/snippets/snippets.py | 15 ++++++++++++---
 .../apache_beam/examples/streaming_wordcount.py      |  5 ++++-
 .../apache_beam/examples/windowed_wordcount.py       |  5 ++++-
 sdks/python/apache_beam/examples/wordcount.py        | 10 ++++++++--
 .../apache_beam/examples/wordcount_debugging.py      | 10 ++++++++--
 .../python/apache_beam/examples/wordcount_minimal.py |  5 ++++-
 .../runners/portability/maptask_executor_runner.py   |  6 +++++-
 sdks/python/apache_beam/transforms/trigger_test.py   | 10 ++++++++--
 17 files changed, 104 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 81c5351..c09b78e 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -45,13 +45,14 @@ def run(argv=None):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
   with beam.Pipeline(options=pipeline_options) as p:
+    def format_result(prefix_candidates):
+      return '%s: %s' % (prefix_candidates[0], prefix_candidates[1])
 
     (p  # pylint: disable=expression-not-assigned
      | 'read' >> ReadFromText(known_args.input)
      | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
      | 'TopPerPrefix' >> TopPerPrefix(5)
-     | 'format' >> beam.Map(
-         lambda prefix_candidates: '%s: %s' % (prefix_candidates[0], prefix_candidates[1]))
+     | 'format' >> beam.Map(format_result)
      | 'write' >> WriteToText(known_args.output))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 6fc7b5d..c1b15e9 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -330,10 +330,12 @@ def run(argv=None):
          }))
 
     # Get user scores and write the results to BigQuery
+    def format_user_score_sums(user_score):
+      return {'user': user_score[0], 'total_score': user_score[1]}
+
     (events  # pylint: disable=expression-not-assigned
      | 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
-     | 'FormatUserScoreSums' >> beam.Map(
-         lambda user_score: {'user': user_score[0], 'total_score': user_score[1]})
+     | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
      | 'WriteUserScoreSums' >> WriteToBigQuery(
          args.table_name + '_users', args.dataset, {
              'user': 'STRING',

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 5e093bb..0405bab 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -138,11 +138,13 @@ def run(argv=None):
   args, pipeline_args = parser.parse_known_args(argv)
 
   with beam.Pipeline(argv=pipeline_args) as p:
+    def format_user_score_sums(user_score):
+      return 'user: %s, total_score: %s' % (user_score[0], user_score[1])
+
     (p  # pylint: disable=expression-not-assigned
      | 'ReadInputText' >> beam.io.ReadFromText(args.input)
      | 'UserScore' >> UserScore()
-     | 'FormatUserScoreSums' >> beam.Map(
-         lambda user_score: 'user: %s, total_score: %s' % (user_score[0], user_score[1]))
+     | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
      | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
 # [END main]
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 1013168..bb5b185 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -107,8 +107,11 @@ def run(argv=None):  # pylint: disable=missing-docstring
     # Group each coordinate triplet by its x value, then write the coordinates
     # to the output file with an x-coordinate grouping per line.
     # pylint: disable=expression-not-assigned
+    def x_coord_key(x_y_i):
+      return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+
     (coordinates
-     | 'x coord key' >> beam.Map(lambda x_y_i: (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2])))
+     | 'x coord key' >> beam.Map(x_coord_key)
      | 'x coord' >> beam.GroupByKey()
      | 'format' >> beam.Map(
          lambda k_coords: ' '.join('(%s, %s, %s)' % c for c in k_coords[1]))

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 0300505..55404df 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -100,10 +100,12 @@ class TfIdf(beam.PTransform):
     # Adjust the above collection to a mapping from (URI, word) pairs to counts
     # into an isomorphic mapping from URI to (word, count) pairs, to prepare
     # for a join by the URI key.
+    def shift_keys(uri_word_count):
+      return (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))
+
     uri_to_word_and_count = (
         uri_and_word_to_count
-        | 'ShiftKeys' >> beam.Map(
-            lambda uri_word_count: (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))))
+        | 'ShiftKeys' >> beam.Map(shift_keys))
 
     # Perform a CoGroupByKey (a sort of pre-join) on the prepared
     # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 957f4c7..71e71e3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -51,6 +51,9 @@ class TfIdfTest(unittest.TestCase):
 
   def test_tfidf_transform(self):
     with TestPipeline() as p:
+      def re_key(word_uri_tfidf):
+        return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])
+
       uri_to_line = p | 'create sample' >> beam.Create(
           [('1.txt', 'abc def ghi'),
            ('2.txt', 'abc def'),
@@ -58,7 +61,7 @@ class TfIdfTest(unittest.TestCase):
       result = (
           uri_to_line
           | tfidf.TfIdf()
-          | beam.Map(lambda word_uri_tfidf: (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])))
+          | beam.Map(re_key))
       assert_that(result, equal_to(EXPECTED_RESULTS))
       # Run the pipeline. Note that the assert_that above adds to the pipeline
       # a check that the result PCollection contains expected values.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 13c5998..03fcd8a 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -179,15 +179,21 @@ def read_from_datastore(project, user_options, pipeline_options):
       project, query, user_options.namespace)
 
   # Count the occurrences of each word.
+  def count_ones(word_ones):
+    return (word_ones[0], sum(word_ones[1]))
+
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
+            | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
+  def format_result(w_c):
+    return '%s: %s' % (w_c[0], w_c[1])
+
+  output = counts | 'format' >> beam.Map(format_result)
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 5a35e51..fdfa286 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -103,12 +103,21 @@ def run(argv=None, assert_results=None):
              '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
 
     # Compute some stats about our database of people.
+    def without_email(name_email_phone_snailmail1):
+      return not next(iter(name_email_phone_snailmail1[1][0]), None)
+
+    def without_phones(name_email_phone_snailmail2):
+      return not next(iter(name_email_phone_snailmail2[1][1]), None)
+
+    def without_address(name_e_p_snailmail):
+      return not next(iter(name_e_p_snailmail[1][2]), None)
+
     luddites = grouped | beam.Filter(  # People without email.
-        lambda name_email_phone_snailmail1: not next(iter(name_email_phone_snailmail1[1][0]), None))
+        without_email)
     writers = grouped | beam.Filter(   # People without phones.
-        lambda name_email_phone_snailmail2: not next(iter(name_email_phone_snailmail2[1][1]), None))
+        without_phones)
     nomads = grouped | beam.Filter(    # People without addresses.
-        lambda name_e_p_snailmail: not next(iter(name_e_p_snailmail[1][2]), None))
+        without_address)
 
     num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
     num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 259f95d..fe7929e 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -119,11 +119,17 @@ class CountWords(beam.PTransform):
   """
 
   def expand(self, pcoll):
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
     return (pcoll
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
-            | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])))
+            | 'count' >> beam.Map(count_ones)
+            | 'format' >> beam.Map(format_result))
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 873f3c3..10080c9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -535,6 +535,9 @@ def examples_wordcount_templated(renames):
   lines = p | 'Read' >> ReadFromText(wordcount_options.input)
   # [END example_wordcount_templated]
 
+  def format_result(w_c):
+    return '%s: %s' % (w_c[0], w_c[1])
+
   (
       lines
       | 'ExtractWords' >> beam.FlatMap(
@@ -542,7 +545,7 @@ def examples_wordcount_templated(renames):
       | 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
       | 'Group' >> beam.GroupByKey()
       | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
-      | 'Format' >> beam.Map(lambda word_c2: '%s: %s' % (word_c2[0], word_c2[1]))
+      | 'Format' >> beam.Map(format_result)
       | 'Write' >> WriteToText(wordcount_options.output)
   )
 
@@ -611,8 +614,11 @@ def examples_wordcount_debugging(renames):
             [('Flourish', 3), ('stomach', 1)]))
     # [END example_wordcount_debugging_assert]
 
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
     output = (filtered_words
-              | 'format' >> beam.Map(lambda word_c1: '%s: %s' % (word_c1[0], word_c1[1]))
+              | 'format' >> beam.Map(format_result)
               | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
 
     p.visit(SnippetUtils.RenameFiles(renames))
@@ -1119,6 +1125,9 @@ def model_group_by_key(contents, output_path):
 
   import apache_beam as beam
   with TestPipeline() as p:  # Use TestPipeline for testing.
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     words_and_counts = (
         p
         | beam.Create(contents)
@@ -1133,7 +1142,7 @@ def model_group_by_key(contents, output_path):
     grouped_words = words_and_counts | beam.GroupByKey()
     # [END model_group_by_key_transform]
     (grouped_words
-     | 'count words' >> beam.Map(lambda word_counts: (word_counts[0], sum(word_counts[1])))
+     | 'count words' >> beam.Map(count_ones)
      | beam.io.WriteToText(output_path))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 8a05991..d1af4ce 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -58,6 +58,9 @@ def run(argv=None):
     lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
     # Capitalize the characters in each line.
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     transformed = (lines
                    # Use a pre-defined function that imports the re package.
                    | 'Split' >> (
@@ -65,7 +68,7 @@ def run(argv=None):
                    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                    | beam.WindowInto(window.FixedWindows(15, 0))
                    | 'Group' >> beam.GroupByKey()
-                   | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+                   | 'Count' >> beam.Map(count_ones)
                    | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
 
     # Write to PubSub.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 680314b..f88b942 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -69,13 +69,16 @@ def run(argv=None):
     lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
     # Capitalize the characters in each line.
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     transformed = (lines
                    | 'Split' >> (beam.FlatMap(find_words)
                                  .with_output_types(unicode))
                    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                    | beam.WindowInto(window.FixedWindows(2*60, 0))
                    | 'Group' >> beam.GroupByKey()
-                   | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+                   | 'Count' >> beam.Map(count_ones)
                    | 'Format' >> beam.ParDo(FormatDoFn()))
 
     # Write to BigQuery.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index e21e91d..60f1ffe 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -88,15 +88,21 @@ def run(argv=None):
   lines = p | 'read' >> ReadFromText(known_args.input)
 
   # Count the occurrences of each word.
+  def count_ones(word_ones):
+    return (word_ones[0], sum(word_ones[1]))
+
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
+            | 'count' >> beam.Map(count_ones))
 
   # Format the counts into a PCollection of strings.
-  output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
+  def format_result(w_c):
+    return '%s: %s' % (w_c[0], w_c[1])
+
+  output = counts | 'format' >> beam.Map(format_result)
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index bdc4c16..7d18c0d 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -93,12 +93,15 @@ class CountWords(beam.PTransform):
   PCollection of (word, count) tuples.
   """
   def expand(self, pcoll):
+    def count_ones(word_ones):
+      return (word_ones[0], sum(word_ones[1]))
+
     return (pcoll
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                           .with_output_types(unicode))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()
-            | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
+            | 'count' >> beam.Map(count_ones))
 
 
 def run(argv=None):
@@ -141,8 +144,11 @@ def run(argv=None):
     # Format the counts into a PCollection of strings and write the output using
     # a "Write" transform that has side effects.
     # pylint: disable=unused-variable
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
     output = (filtered_words
-              | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
+              | 'format' >> beam.Map(format_result)
               | 'write' >> WriteToText(known_args.output))
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 01c3955..54fd1f1 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,7 +106,10 @@ def run(argv=None):
         | 'GroupAndSum' >> beam.CombinePerKey(sum))
 
     # Format the counts into a PCollection of strings.
-    output = counts | 'Format' >> beam.Map(lambda w_c: '%s: %s' % (w_c[0], w_c[1]))
+    def format_result(w_c):
+      return '%s: %s' % (w_c[0], w_c[1])
+
+    output = counts | 'Format' >> beam.Map(format_result)
 
     # Write the output using a "Write" transform that has side effects.
     # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index d4063df..5b580a6 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -449,7 +449,11 @@ class MergeAccumulators(beam.PTransform):
       return beam.pvalue.PCollection(input.pipeline)
     else:
       merge_accumulators = self.combine_fn.merge_accumulators
-      return input | beam.Map(lambda k_vs: (k_vs[0], merge_accumulators(k_vs[1])))
+
+      def merge_with_existing_key(k_vs):
+        return (k_vs[0], merge_accumulators(k_vs[1]))
+
+      return input | beam.Map(merge_with_existing_key)
 
 
 class ExtractOutputs(beam.PTransform):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 5a56c7a..3afabaf 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -404,14 +404,20 @@ class TriggerPipelineTest(unittest.TestCase):
 
   def test_after_count(self):
     with TestPipeline() as p:
+      def construct_timestamped(k_t):
+        return TimestampedValue((k_t[0], k_t[1]), k_t[1])
+
+      def format_result(k_v):
+        return ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))
+
       result = (p
                 | beam.Create([1, 2, 3, 4, 5, 10, 11])
                 | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
-                | beam.Map(lambda k_t: TimestampedValue((k_t[0], k_t[1]), k_t[1]))
+                | beam.Map(construct_timestamped)
                 | beam.WindowInto(FixedWindows(10), trigger=AfterCount(3),
                                   accumulation_mode=AccumulationMode.DISCARDING)
                 | beam.GroupByKey()
-                | beam.Map(lambda k_v: ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))))
+                | beam.Map(format_result))
       assert_that(result, equal_to(
           {
               'A-5': {1, 2, 3, 4, 5},


[10/18] beam git commit: Restore license position that got screwed up

Posted by ro...@apache.org.
Restore license position that got screwed up


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0bc5840
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0bc5840
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0bc5840

Branch: refs/heads/master
Commit: c0bc5840084e46e68d9fcff38aacf125f01a8613
Parents: cd702b3
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 21:43:35 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/datastore/v1/datastoreio_test.py     | 30 ++++++++++----------
 sdks/python/run_pylint.sh                       |  1 +
 2 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c0bc5840/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index b1d18fe..e131f93 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -1,18 +1,3 @@
-from __future__ import print_function
-
-import unittest
-
-from mock import MagicMock
-from mock import call
-from mock import patch
-
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.io.gcp.datastore.v1 import query_splitter
-from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
-
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -30,6 +15,21 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
 # limitations under the License.
 #
 
+from __future__ import print_function
+
+import unittest
+
+from mock import MagicMock
+from mock import call
+from mock import patch
+
+from apache_beam.io.gcp.datastore.v1 import fake_datastore
+from apache_beam.io.gcp.datastore.v1 import helper
+from apache_beam.io.gcp.datastore.v1 import query_splitter
+from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
+
 
 # Protect against environments where datastore library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports

http://git-wip-us.apache.org/repos/asf/beam/blob/c0bc5840/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index d53bb14..ccd2e31 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -92,3 +92,4 @@ if [ "$count" != "0" ]; then
   echo "$futurize_filtered"
   exit 1
 fi
+echo "No future changes needed"