You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/10/12 22:54:37 UTC
[01/18] beam git commit: Add futurize to pylint script
Repository: beam
Updated Branches:
refs/heads/master 655227ab0 -> 9c2a6e771
Add futurize to pylint script
Break down futurize pylint
Fix the pylint script checking futurize
Consists of:
Quote the futurize options shell scripting
Jenkins seems to be having weird behaviour with futurize_filtered=, temporary debugging commit
Try quoting the count line.
Do the style fixes from shellcheck on pylint, parallilize futurize command & simplify the grep on output
Fall through on grep exit code of 1 which happens when everything is filtered out & change count check to 0
Remove some debugging from pylint
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd6a7899
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd6a7899
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd6a7899
Branch: refs/heads/master
Commit: dd6a78992fb66e49ff53ae69114f0d682be0888f
Parents: 22f9263
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:53:42 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:07 2017 -0700
----------------------------------------------------------------------
sdks/python/run_pylint.sh | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dd6a7899/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index df71e44..d53bb14 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -33,7 +33,7 @@ usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; }
if test $# -gt 0; then
case "$@" in
--help) usage; exit 1;;
- *) MODULE="$@";;
+ *) MODULE="$*";;
esac
fi
@@ -59,15 +59,16 @@ done
echo "Skipping lint for generated files: $FILES_TO_IGNORE"
echo "Running pylint for module $MODULE:"
-pylint $MODULE --ignore-patterns="$FILES_TO_IGNORE"
+pylint "$MODULE" --ignore-patterns="$FILES_TO_IGNORE"
echo "Running pycodestyle for module $MODULE:"
-pycodestyle $MODULE --exclude="$FILES_TO_IGNORE"
+pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE"
echo "Running isort for module $MODULE:"
# Skip files where isort is behaving weirdly
ISORT_EXCLUDED=(
"apiclient.py"
"avroio_test.py"
"datastore_wordcount.py"
+ "datastoreio_test.py"
"iobase_test.py"
"fast_coders_test.py"
"slow_coders_test.py"
@@ -79,6 +80,15 @@ done
for file in "${EXCLUDED_GENERATED_FILES[@]}"; do
SKIP_PARAM="$SKIP_PARAM --skip $(basename $file)"
done
-pushd $MODULE
+pushd "$MODULE"
isort -p apache_beam -w 120 -y -c -ot -cs -sl ${SKIP_PARAM}
popd
+echo "Checking for files requiring stage 1 refactoring from futurize"
+futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored)
+futurize_filtered=$(echo "$futurize_results" |grep -v 'pb2\|typehints.py\|trivial_inference.py' || echo "")
+count=${#futurize_filtered}
+if [ "$count" != "0" ]; then
+ echo "Some of the changes require futurize stage 1 changes."
+ echo "$futurize_filtered"
+ exit 1
+fi
[09/18] beam git commit: Add future to deps and explicitly set isort
req and update pylint in tox ini
Posted by ro...@apache.org.
Add future to deps and explicitly set isort req and update pylint in tox ini
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe0dc2e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe0dc2e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe0dc2e2
Branch: refs/heads/master
Commit: fe0dc2e225a9fca16dbf6ccd77de36fcae4a2484
Parents: c0bc584
Author: Holden Karau <ho...@us.ibm.com>
Authored: Sat Sep 2 00:02:24 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700
----------------------------------------------------------------------
sdks/python/tox.ini | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fe0dc2e2/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index e82b685..039b0e8 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -90,7 +90,9 @@ passenv = TRAVIS*
deps=
nose==1.3.7
pycodestyle==2.3.1
- pylint==1.7.1
+ pylint==1.7.2
+ future==0.16.0
+ isort==4.2.15
whitelist_externals=time
commands =
time pip install -e .[test]
[16/18] beam git commit: Break out the exclusion list for futurize
Posted by ro...@apache.org.
Break out the exclusion list for futurize
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf2be415
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf2be415
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf2be415
Branch: refs/heads/master
Commit: cf2be415b9b407b2d3791310a74d76b3e46fc1fa
Parents: 14cf6a1
Author: Holden Karau <ho...@us.ibm.com>
Authored: Thu Sep 7 23:38:52 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700
----------------------------------------------------------------------
sdks/python/run_pylint.sh | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cf2be415/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 06f2072..4c57e75 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -83,9 +83,15 @@ done
pushd "$MODULE"
isort -p apache_beam -w 120 -y -c -ot -cs -sl ${SKIP_PARAM}
popd
+FUTURIZE_EXCLUDED=(
+ "typehints.py"
+ "pb2"
+ "trivial_infernce.py"
+)
+FUTURIZE_GREP_PARAM=$( IFS='|'; echo "${ids[*]}" )
echo "Checking for files requiring stage 1 refactoring from futurize"
futurize_results=$(futurize -j 8 --stage1 apache_beam 2>&1 |grep Refactored)
-futurize_filtered=$(echo "$futurize_results" |grep -v 'pb2\|typehints.py\|trivial_inference.py' || echo "")
+futurize_filtered=$(echo "$futurize_results" |grep -v "$FUTURIZE_GREP_PARAM" || echo "")
count=${#futurize_filtered}
if [ "$count" != "0" ]; then
echo "Some of the changes require futurize stage 1 changes."
[18/18] beam git commit: Closes #3804
Posted by ro...@apache.org.
Closes #3804
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c2a6e77
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c2a6e77
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c2a6e77
Branch: refs/heads/master
Commit: 9c2a6e771d30e3e120b941bd2d450a00d8b7f091
Parents: 655227a 9de10e2
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Oct 12 15:54:09 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:54:09 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 18 +++--
sdks/python/apache_beam/coders/coders.py | 5 +-
.../apache_beam/coders/coders_test_common.py | 6 +-
.../apache_beam/coders/standard_coders_test.py | 5 +-
.../examples/complete/autocomplete.py | 6 +-
.../examples/complete/autocomplete_test.py | 2 +-
.../examples/complete/game/leader_board.py | 7 +-
.../examples/complete/game/user_score.py | 7 +-
.../complete/juliaset/juliaset/juliaset.py | 8 +-
.../examples/complete/juliaset/setup.py | 5 +-
.../apache_beam/examples/complete/tfidf.py | 15 ++--
.../apache_beam/examples/complete/tfidf_test.py | 6 +-
.../examples/cookbook/bigquery_tornadoes.py | 2 +-
.../examples/cookbook/datastore_wordcount.py | 12 ++-
.../examples/cookbook/group_with_coder.py | 2 +-
.../examples/cookbook/mergecontacts.py | 36 ++++++---
.../examples/cookbook/multiple_output_pardo.py | 14 +++-
.../apache_beam/examples/snippets/snippets.py | 38 +++++++---
.../examples/snippets/snippets_test.py | 8 +-
.../apache_beam/examples/streaming_wordcount.py | 6 +-
.../apache_beam/examples/windowed_wordcount.py | 6 +-
sdks/python/apache_beam/examples/wordcount.py | 12 ++-
.../apache_beam/examples/wordcount_debugging.py | 12 ++-
.../apache_beam/examples/wordcount_minimal.py | 6 +-
sdks/python/apache_beam/internal/pickler.py | 2 +-
sdks/python/apache_beam/internal/util.py | 4 +-
sdks/python/apache_beam/io/filebasedsource.py | 2 +-
.../io/gcp/datastore/v1/datastoreio_test.py | 5 +-
.../apache_beam/io/gcp/datastore/v1/helper.py | 2 +-
.../io/gcp/datastore/v1/helper_test.py | 2 +-
sdks/python/apache_beam/io/gcp/gcsio.py | 2 +-
.../apache_beam/metrics/execution_test.py | 4 +-
.../apache_beam/options/pipeline_options.py | 2 +-
sdks/python/apache_beam/runners/common.py | 4 +-
.../runners/dataflow/test_dataflow_runner.py | 1 +
.../runners/direct/transform_evaluator.py | 2 +-
.../portability/maptask_executor_runner.py | 10 ++-
.../portability/maptask_executor_runner_test.py | 6 +-
.../runners/worker/bundle_processor.py | 2 +-
.../apache_beam/runners/worker/operations.py | 2 +-
.../apache_beam/runners/worker/sdk_worker.py | 4 +-
.../runners/worker/statesampler_test.py | 3 +-
sdks/python/apache_beam/testing/util.py | 2 +-
sdks/python/apache_beam/transforms/combiners.py | 6 +-
sdks/python/apache_beam/transforms/core.py | 10 +--
.../python/apache_beam/transforms/ptransform.py | 5 +-
.../apache_beam/transforms/ptransform_test.py | 38 ++++++----
.../apache_beam/transforms/sideinputs_test.py | 2 +-
.../apache_beam/transforms/trigger_test.py | 10 ++-
sdks/python/apache_beam/transforms/util.py | 12 +--
.../apache_beam/transforms/window_test.py | 8 +-
sdks/python/apache_beam/typehints/decorators.py | 4 +-
sdks/python/apache_beam/typehints/opcodes.py | 27 ++++---
.../apache_beam/typehints/trivial_inference.py | 80 +++++++++++++-------
.../typehints/trivial_inference_test.py | 8 +-
.../typehints/typed_pipeline_test.py | 9 ++-
sdks/python/apache_beam/typehints/typehints.py | 36 +++++++--
sdks/python/apache_beam/utils/retry.py | 2 +-
sdks/python/run_pylint.sh | 27 ++++++-
sdks/python/tox.ini | 4 +-
60 files changed, 400 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
[08/18] beam git commit: Parallelize pylint
Posted by ro...@apache.org.
Parallelize pylint
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/252c6798
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/252c6798
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/252c6798
Branch: refs/heads/master
Commit: 252c67982a036e5dee4518d96479622cea0d662b
Parents: a795e54
Author: Holden Karau <ho...@us.ibm.com>
Authored: Wed Sep 6 17:10:00 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700
----------------------------------------------------------------------
sdks/python/run_pylint.sh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/252c6798/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index ccd2e31..91d5c4a 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -59,7 +59,7 @@ done
echo "Skipping lint for generated files: $FILES_TO_IGNORE"
echo "Running pylint for module $MODULE:"
-pylint "$MODULE" --ignore-patterns="$FILES_TO_IGNORE"
+pylint -j8 "$MODULE" --ignore-patterns="$FILES_TO_IGNORE"
echo "Running pycodestyle for module $MODULE:"
pycodestyle "$MODULE" --exclude="$FILES_TO_IGNORE"
echo "Running isort for module $MODULE:"
[12/18] beam git commit: Fix snippets
Posted by ro...@apache.org.
Fix snippets
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9de10e2d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9de10e2d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9de10e2d
Branch: refs/heads/master
Commit: 9de10e2d80ce15c5881b26f6436d28cccf60e18b
Parents: 3a04dbe
Author: Holden Karau <ho...@us.ibm.com>
Authored: Wed Oct 11 23:46:28 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/examples/snippets/snippets.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9de10e2d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 048b31a..a7751a7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1179,9 +1179,9 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
return '%s; %s; %s' %\
(name, sorted(info['emails']), sorted(info['phones']))
- contact_lines = result | beam.Map(join_info)
+ contact_lines = results | beam.Map(join_info)
# [END model_group_by_key_cogroupbykey_tuple]
- formatted_results | beam.io.WriteToText(output_path)
+ contact_lines | beam.io.WriteToText(output_path)
def model_join_using_side_inputs(
[02/18] beam git commit: Futurize stage 1 run (AUTOMATED futurize)
Posted by ro...@apache.org.
Futurize stage 1 run (AUTOMATED futurize)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22f9263e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22f9263e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22f9263e
Branch: refs/heads/master
Commit: 22f9263e66aaf5ba1c591850f36e31a1481da6e4
Parents: 655227a
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:43:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:07 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 17 +++---
sdks/python/apache_beam/coders/coders.py | 5 +-
.../apache_beam/coders/coders_test_common.py | 5 +-
.../apache_beam/coders/standard_coders_test.py | 5 +-
.../examples/complete/autocomplete.py | 2 +-
.../examples/complete/autocomplete_test.py | 2 +-
.../examples/complete/game/leader_board.py | 2 +-
.../examples/complete/game/user_score.py | 2 +-
.../complete/juliaset/juliaset/juliaset.py | 4 +-
.../examples/complete/juliaset/setup.py | 5 +-
.../apache_beam/examples/complete/tfidf.py | 11 ++--
.../apache_beam/examples/complete/tfidf_test.py | 2 +-
.../examples/cookbook/bigquery_tornadoes.py | 2 +-
.../examples/cookbook/datastore_wordcount.py | 4 +-
.../examples/cookbook/group_with_coder.py | 2 +-
.../examples/cookbook/mergecontacts.py | 16 ++---
.../examples/cookbook/multiple_output_pardo.py | 6 +-
.../apache_beam/examples/snippets/snippets.py | 25 ++++----
.../examples/snippets/snippets_test.py | 8 ++-
.../apache_beam/examples/streaming_wordcount.py | 2 +-
.../apache_beam/examples/windowed_wordcount.py | 2 +-
sdks/python/apache_beam/examples/wordcount.py | 4 +-
.../apache_beam/examples/wordcount_debugging.py | 4 +-
.../apache_beam/examples/wordcount_minimal.py | 2 +-
sdks/python/apache_beam/internal/pickler.py | 2 +-
sdks/python/apache_beam/internal/util.py | 4 +-
sdks/python/apache_beam/io/filebasedsource.py | 2 +-
.../io/gcp/datastore/v1/datastoreio_test.py | 3 +-
.../apache_beam/io/gcp/datastore/v1/helper.py | 2 +-
.../io/gcp/datastore/v1/helper_test.py | 2 +-
sdks/python/apache_beam/io/gcp/gcsio.py | 2 +-
.../apache_beam/metrics/execution_test.py | 4 +-
.../apache_beam/options/pipeline_options.py | 2 +-
sdks/python/apache_beam/runners/common.py | 4 +-
.../runners/dataflow/test_dataflow_runner.py | 1 +
.../runners/direct/transform_evaluator.py | 2 +-
.../portability/maptask_executor_runner.py | 6 +-
.../portability/maptask_executor_runner_test.py | 6 +-
.../runners/worker/bundle_processor.py | 2 +-
.../apache_beam/runners/worker/operations.py | 2 +-
.../apache_beam/runners/worker/sdk_worker.py | 2 +-
.../runners/worker/statesampler_test.py | 3 +-
sdks/python/apache_beam/testing/util.py | 2 +-
sdks/python/apache_beam/transforms/combiners.py | 6 +-
sdks/python/apache_beam/transforms/core.py | 10 ++--
.../python/apache_beam/transforms/ptransform.py | 5 +-
.../apache_beam/transforms/ptransform_test.py | 22 ++++---
.../apache_beam/transforms/sideinputs_test.py | 2 +-
.../apache_beam/transforms/trigger_test.py | 4 +-
sdks/python/apache_beam/transforms/util.py | 12 ++--
.../apache_beam/transforms/window_test.py | 8 +--
sdks/python/apache_beam/typehints/decorators.py | 4 +-
sdks/python/apache_beam/typehints/opcodes.py | 26 +++++----
.../apache_beam/typehints/trivial_inference.py | 61 ++++++++++----------
.../typehints/trivial_inference_test.py | 3 +-
.../typehints/typed_pipeline_test.py | 9 ++-
sdks/python/apache_beam/typehints/typehints.py | 6 +-
sdks/python/apache_beam/utils/retry.py | 2 +-
58 files changed, 203 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 5070297..9f7b739 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -26,6 +26,7 @@ coder_impl.pxd file for type hints.
For internal use only; no backwards-compatibility guarantees.
"""
+from __future__ import absolute_import
from types import NoneType
from apache_beam.coders import observable
@@ -36,18 +37,18 @@ from apache_beam.utils.timestamp import Timestamp
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
- from stream import InputStream as create_InputStream
- from stream import OutputStream as create_OutputStream
- from stream import ByteCountingOutputStream
- from stream import get_varint_size
+ from .stream import InputStream as create_InputStream
+ from .stream import OutputStream as create_OutputStream
+ from .stream import ByteCountingOutputStream
+ from .stream import get_varint_size
globals()['create_InputStream'] = create_InputStream
globals()['create_OutputStream'] = create_OutputStream
globals()['ByteCountingOutputStream'] = ByteCountingOutputStream
except ImportError:
- from slow_stream import InputStream as create_InputStream
- from slow_stream import OutputStream as create_OutputStream
- from slow_stream import ByteCountingOutputStream
- from slow_stream import get_varint_size
+ from .slow_stream import InputStream as create_InputStream
+ from .slow_stream import OutputStream as create_OutputStream
+ from .slow_stream import ByteCountingOutputStream
+ from .slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index cbea98f..67d5adb 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -19,6 +19,7 @@
Only those coders listed in __all__ are part of the public API of this module.
"""
+from __future__ import absolute_import
import base64
import cPickle as pickle
@@ -32,9 +33,9 @@ from apache_beam.utils import urns
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
- from stream import get_varint_size
+ from .stream import get_varint_size
except ImportError:
- from slow_stream import get_varint_size
+ from .slow_stream import get_varint_size
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 29ff229..d42e637 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -16,6 +16,7 @@
#
"""Tests common to all coder implementations."""
+from __future__ import absolute_import
import logging
import math
@@ -23,7 +24,7 @@ import unittest
import dill
-import observable
+from . import observable
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
from apache_beam.runners import pipeline_context
@@ -119,7 +120,7 @@ class CodersTest(unittest.TestCase):
(1, dict()), ('a', [dict()]))
def test_dill_coder(self):
- cell_value = (lambda x: lambda: x)(0).func_closure[0]
+ cell_value = (lambda x: lambda: x)(0).__closure__[0]
self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
self.check_coder(
coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/coders/standard_coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index ca4dffb..ca13b80 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -17,6 +17,7 @@
"""Unit tests for coders that must be consistent across all Beam SDKs.
"""
+from __future__ import print_function
import json
import logging
@@ -125,14 +126,14 @@ class StandardCodersTest(unittest.TestCase):
@classmethod
def tearDownClass(cls):
if cls.fix and cls.to_fix:
- print "FIXING", len(cls.to_fix), "TESTS"
+ print("FIXING", len(cls.to_fix), "TESTS")
doc_sep = '\n---\n'
docs = open(STANDARD_CODERS_YAML).read().split(doc_sep)
def quote(s):
return json.dumps(s.decode('latin1')).replace(r'\u0000', r'\0')
for (doc_ix, expected_encoded), actual_encoded in cls.to_fix.items():
- print quote(expected_encoded), "->", quote(actual_encoded)
+ print(quote(expected_encoded), "->", quote(actual_encoded))
docs[doc_ix] = docs[doc_ix].replace(
quote(expected_encoded) + ':', quote(actual_encoded) + ':')
open(STANDARD_CODERS_YAML, 'w').write(doc_sep.join(docs))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index ab3397c..81c5351 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -51,7 +51,7 @@ def run(argv=None):
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
| 'format' >> beam.Map(
- lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+ lambda prefix_candidates: '%s: %s' % (prefix_candidates[0], prefix_candidates[1]))
| 'write' >> WriteToText(known_args.output))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index e2c84d6..888ce44 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -35,7 +35,7 @@ class AutocompleteTest(unittest.TestCase):
words = p | beam.Create(self.WORDS)
result = words | autocomplete.TopPerPrefix(5)
# values must be hashable for now
- result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
+ result = result | beam.Map(lambda k_vs: (k_vs[0], tuple(k_vs[1])))
assert_that(result, equal_to(
[
('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 69676f8..6fc7b5d 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -333,7 +333,7 @@ def run(argv=None):
(events # pylint: disable=expression-not-assigned
| 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
| 'FormatUserScoreSums' >> beam.Map(
- lambda (user, score): {'user': user, 'total_score': score})
+ lambda user_score: {'user': user_score[0], 'total_score': user_score[1]})
| 'WriteUserScoreSums' >> WriteToBigQuery(
args.table_name + '_users', args.dataset, {
'user': 'STRING',
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index cf9976d..5e093bb 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -142,7 +142,7 @@ def run(argv=None):
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
| 'UserScore' >> UserScore()
| 'FormatUserScoreSums' >> beam.Map(
- lambda (user, score): 'user: %s, total_score: %s' % (user, score))
+ lambda user_score: 'user: %s, total_score: %s' % (user_score[0], user_score[1]))
| 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
# [END main]
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 61e3fd1..1013168 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -108,10 +108,10 @@ def run(argv=None): # pylint: disable=missing-docstring
# to the output file with an x-coordinate grouping per line.
# pylint: disable=expression-not-assigned
(coordinates
- | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+ | 'x coord key' >> beam.Map(lambda x_y_i: (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2])))
| 'x coord' >> beam.GroupByKey()
| 'format' >> beam.Map(
- lambda (k, coords): ' '.join('(%s, %s, %s)' % c for c in coords))
+ lambda k_coords: ' '.join('(%s, %s, %s)' % c for c in k_coords[1]))
| WriteToText(known_args.coordinate_output))
# Optionally render the image and save it to a file.
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/juliaset/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/setup.py b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
index 2062e2a..cbf5f3d 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/setup.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
@@ -24,6 +24,7 @@ then installed in the workers when they start running.
This behavior is triggered by specifying the --setup_file command line option
when running the workflow for remote execution.
"""
+from __future__ import print_function
import subprocess
from distutils.command.build import build as _build
@@ -76,14 +77,14 @@ class CustomCommands(setuptools.Command):
pass
def RunCustomCommand(self, command_list):
- print 'Running command: %s' % command_list
+ print('Running command: %s' % command_list)
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode()) if the command run requires
# some confirmation.
stdout_data, _ = p.communicate()
- print 'Command output: %s' % stdout_data
+ print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError(
'Command %s failed: exit code: %s' % (command_list, p.returncode))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 2132fbb..0300505 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -68,7 +68,8 @@ class TfIdf(beam.PTransform):
# Create a collection of pairs mapping a URI to each of the words
# in the document associated with that that URI.
- def split_into_words((uri, line)):
+ def split_into_words(xxx_todo_changeme):
+ (uri, line) = xxx_todo_changeme
return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
uri_to_words = (
@@ -102,7 +103,7 @@ class TfIdf(beam.PTransform):
uri_to_word_and_count = (
uri_and_word_to_count
| 'ShiftKeys' >> beam.Map(
- lambda ((uri, word), count): (uri, (word, count))))
+ lambda uri_word_count: (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))))
# Perform a CoGroupByKey (a sort of pre-join) on the prepared
# uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
@@ -125,7 +126,8 @@ class TfIdf(beam.PTransform):
# that word occurs in the document divided by the total number of words in
# the document.
- def compute_term_frequency((uri, count_and_total)):
+ def compute_term_frequency(xxx_todo_changeme1):
+ (uri, count_and_total) = xxx_todo_changeme1
word_and_count = count_and_total['word counts']
# We have an iterable for one element that we want extracted.
[word_total] = count_and_total['word totals']
@@ -165,7 +167,8 @@ class TfIdf(beam.PTransform):
# basic version that is the term frequency divided by the log of the
# document frequency.
- def compute_tf_idf((word, tf_and_df)):
+ def compute_tf_idf(xxx_todo_changeme2):
+ (word, tf_and_df) = xxx_todo_changeme2
[docf] = tf_and_df['df']
for uri, tf in tf_and_df['tf']:
yield word, (uri, tf * math.log(1 / docf))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 518a47c..957f4c7 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -58,7 +58,7 @@ class TfIdfTest(unittest.TestCase):
result = (
uri_to_line
| tfidf.TfIdf()
- | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+ | beam.Map(lambda word_uri_tfidf: (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])))
assert_that(result, equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values.
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 1ca49c5..7b40353 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -58,7 +58,7 @@ def count_tornadoes(input_data):
lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
| 'monthly count' >> beam.CombinePerKey(sum)
| 'format' >> beam.Map(
- lambda (k, v): {'month': k, 'tornado_count': v}))
+ lambda k_v: {'month': k_v[0], 'tornado_count': k_v[1]}))
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index c42596f..13c5998 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -184,10 +184,10 @@ def read_from_datastore(project, user_options, pipeline_options):
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+ | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
# Format the counts into a PCollection of strings.
- output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 4c86f46..d5dbecf 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -114,7 +114,7 @@ def run(args=None):
# is registered for the Player class above, a PlayerCoder will be used to
# encode Player objects as keys for this combine operation.
| beam.CombinePerKey(sum)
- | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
+ | beam.Map(lambda k_v: '%s,%d' % (k_v[0].name, k_v[1]))
| WriteToText(known_args.output))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 9acdd90..5a35e51 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -96,19 +96,19 @@ def run(argv=None, assert_results=None):
# Prepare tab-delimited output; something like this:
# "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
tsv_lines = grouped | beam.Map(
- lambda (name, (email, phone, snailmail)): '\t'.join(
- ['"%s"' % name,
- '"%s"' % ','.join(email),
- '"%s"' % ','.join(phone),
- '"%s"' % next(iter(snailmail), '')]))
+ lambda name_email_phone_snailmail: '\t'.join(
+ ['"%s"' % name_email_phone_snailmail[0],
+ '"%s"' % ','.join(name_email_phone_snailmail[1][0]),
+ '"%s"' % ','.join(name_email_phone_snailmail[1][1]),
+ '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
# Compute some stats about our database of people.
luddites = grouped | beam.Filter( # People without email.
- lambda (name, (email, phone, snailmail)): not next(iter(email), None))
+ lambda name_email_phone_snailmail1: not next(iter(name_email_phone_snailmail1[1][0]), None))
writers = grouped | beam.Filter( # People without phones.
- lambda (name, (email, phone, snailmail)): not next(iter(phone), None))
+ lambda name_email_phone_snailmail2: not next(iter(name_email_phone_snailmail2[1][1]), None))
nomads = grouped | beam.Filter( # People without addresses.
- lambda (name, (e, p, snailmail)): not next(iter(snailmail), None))
+ lambda name_e_p_snailmail: not next(iter(name_e_p_snailmail[1][2]), None))
num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 2316c66..259f95d 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -122,8 +122,8 @@ class CountWords(beam.PTransform):
return (pcoll
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
- | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)))
+ | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+ | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])))
def run(argv=None):
@@ -163,7 +163,7 @@ def run(argv=None):
(character_count
| 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
+ | 'count chars' >> beam.Map(lambda __counts: sum(__counts[1]))
| 'write chars' >> WriteToText(known_args.output + '-chars'))
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 0ced3f1..873f3c3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -436,7 +436,7 @@ def examples_wordcount_minimal(renames):
# [END examples_wordcount_minimal_count]
# [START examples_wordcount_minimal_map]
- | beam.Map(lambda (word, count): '%s: %s' % (word, count))
+ | beam.Map(lambda word_count: '%s: %s' % (word_count[0], word_count[1]))
# [END examples_wordcount_minimal_map]
# [START examples_wordcount_minimal_write]
@@ -541,8 +541,8 @@ def examples_wordcount_templated(renames):
lambda x: re.findall(r'[A-Za-z\']+', x))
| 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
| 'Group' >> beam.GroupByKey()
- | 'Sum' >> beam.Map(lambda (word, ones): (word, sum(ones)))
- | 'Format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+ | 'Format' >> beam.Map(lambda word_c2: '%s: %s' % (word_c2[0], word_c2[1]))
| 'Write' >> WriteToText(wordcount_options.output)
)
@@ -612,7 +612,7 @@ def examples_wordcount_debugging(renames):
# [END example_wordcount_debugging_assert]
output = (filtered_words
- | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ | 'format' >> beam.Map(lambda word_c1: '%s: %s' % (word_c1[0], word_c1[1]))
| 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
p.visit(SnippetUtils.RenameFiles(renames))
@@ -1046,7 +1046,7 @@ def model_composite_transform_example(contents, output_path):
return (pcoll
| beam.FlatMap(lambda x: re.findall(r'\w+', x))
| beam.combiners.Count.PerElement()
- | beam.Map(lambda (word, c): '%s: %s' % (word, c)))
+ | beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])))
# [END composite_ptransform_apply_method]
# [END composite_transform_example]
@@ -1133,7 +1133,7 @@ def model_group_by_key(contents, output_path):
grouped_words = words_and_counts | beam.GroupByKey()
# [END model_group_by_key_transform]
(grouped_words
- | 'count words' >> beam.Map(lambda (word, counts): (word, sum(counts)))
+ | 'count words' >> beam.Map(lambda word_counts: (word_counts[0], sum(word_counts[1])))
| beam.io.WriteToText(output_path))
@@ -1162,10 +1162,13 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
results = ({'emails': emails_pcoll, 'phones': phones_pcoll}
| beam.CoGroupByKey())
- formatted_results = results | beam.Map(
- lambda (name, info):\
- '%s; %s; %s' %\
- (name, sorted(info['emails']), sorted(info['phones'])))
+ def join_info(xxx_todo_changeme):
+ (name, info) = xxx_todo_changeme
+ return '%s; %s; %s' %\
+ (name, sorted(info['emails']), sorted(info['phones']))
+
+
+ contact_lines = result | beam.Map(join_info)
# [END model_group_by_key_cogroupbykey_tuple]
formatted_results | beam.io.WriteToText(output_path)
@@ -1211,7 +1214,7 @@ def model_join_using_side_inputs(
class Keys(beam.PTransform):
def expand(self, pcoll):
- return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
+ return pcoll | 'Keys' >> beam.Map(lambda k_v: k_v[0])
# [END model_library_transforms_keys]
# pylint: enable=invalid-name
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 269a241..8a9695d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -358,7 +358,7 @@ class TypeHintsTest(unittest.TestCase):
# [END type_hints_deterministic_key]
assert_that(
- totals | beam.Map(lambda (k, v): (k.name, v)),
+ totals | beam.Map(lambda k_v: (k_v[0].name, k_v[1])),
equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
@@ -882,14 +882,16 @@ class CombineTest(unittest.TestCase):
def create_accumulator(self):
return (0.0, 0)
- def add_input(self, (sum, count), input):
+ def add_input(self, xxx_todo_changeme, input):
+ (sum, count) = xxx_todo_changeme
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
- def extract_output(self, (sum, count)):
+ def extract_output(self, xxx_todo_changeme1):
+ (sum, count) = xxx_todo_changeme1
return sum / count if count else float('NaN')
# [END combine_custom_average_define]
# [START combine_custom_average_execute]
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 94d4c70..8a05991 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -65,7 +65,7 @@ def run(argv=None):
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'Group' >> beam.GroupByKey()
- | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
| 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
# Write to PubSub.
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 508f18d..680314b 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -75,7 +75,7 @@ def run(argv=None):
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(2*60, 0))
| 'Group' >> beam.GroupByKey()
- | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+ | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
| 'Format' >> beam.ParDo(FormatDoFn()))
# Write to BigQuery.
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 34dedb2..e21e91d 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -93,10 +93,10 @@ def run(argv=None):
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+ | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
# Format the counts into a PCollection of strings.
- output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index c0ffd35..bdc4c16 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -98,7 +98,7 @@ class CountWords(beam.PTransform):
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))))
+ | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
def run(argv=None):
@@ -142,7 +142,7 @@ def run(argv=None):
# a "Write" transform that has side effects.
# pylint: disable=unused-variable
output = (filtered_words
- | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+ | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
| 'write' >> WriteToText(known_args.output))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 76b0a22..01c3955 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,7 +106,7 @@ def run(argv=None):
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
- output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
+ output = counts | 'Format' >> beam.Map(lambda w_c: '%s: %s' % (w_c[0], w_c[1]))
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
index e049a71..102cf23 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -52,7 +52,7 @@ def _find_containing_class(nested_class):
for k, v in outer.__dict__.items():
if v is nested_class:
return outer, k
- elif isinstance(v, (type, types.ClassType)) and hasattr(v, '__dict__'):
+ elif isinstance(v, type) and hasattr(v, '__dict__'):
res = _find_containing_class_inner(v)
if res: return res
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py
index 3e943b0..e4f230b 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -100,10 +100,10 @@ def insert_values_in_args(args, kwargs, values):
# Use a local iterator so that we don't modify values.
v_iter = iter(values)
new_args = [
- v_iter.next() if isinstance(arg, ArgumentPlaceholder) else arg
+ next(v_iter) if isinstance(arg, ArgumentPlaceholder) else arg
for arg in args]
new_kwargs = dict(
- (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
+ (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v)
for k, v in sorted(kwargs.iteritems()))
return (new_args, new_kwargs)
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 7b019ed..052c2f3 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -370,7 +370,7 @@ class _Reshard(PTransform):
return (keyed_pc | 'GroupByKey' >> GroupByKey()
# Using FlatMap below due to the possibility of key collisions.
- | 'DropKey' >> FlatMap(lambda (k, values): values))
+ | 'DropKey' >> FlatMap(lambda k_values: k_values[1]))
class _ReadRange(DoFn):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index fa7310f..7c73a06 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -238,7 +239,7 @@ class DatastoreioTest(unittest.TestCase):
elif req == kind_stat_req:
return kind_stat_resp
else:
- print kind_stat_req
+ print(kind_stat_req)
raise ValueError("Unknown req: %s" % req)
self._mock_datastore.run_query.side_effect = fake_run_query
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index 01ced6a..b86a2fa 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -277,7 +277,7 @@ class QueryIterator(object):
self._project = project
self._namespace = namespace
self._start_cursor = None
- self._limit = self._query.limit.value or sys.maxint
+ self._limit = self._query.limit.value or sys.maxsize
self._req = make_request(project, namespace, query)
@retry.with_exponential_backoff(num_retries=5,
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 36cfb15..90a3668 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -154,7 +154,7 @@ class HelperTest(unittest.TestCase):
self.assertEqual(entity, entities[i].entity)
i += 1
- limit = query.limit.value if query.HasField('limit') else sys.maxint
+ limit = query.limit.value if query.HasField('limit') else sys.maxsize
self.assertEqual(i, min(num_entities, limit))
def test_is_key_valid(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 0db4ba5..448a0c9 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -473,7 +473,7 @@ class GcsBufferedReader(object):
def __next__(self):
"""Read one line delimited by '\\n' from the file.
"""
- return self.next()
+ return next(self)
def next(self):
"""Read one line delimited by '\\n' from the file.
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/metrics/execution_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution_test.py b/sdks/python/apache_beam/metrics/execution_test.py
index abf23e3..855f54c 100644
--- a/sdks/python/apache_beam/metrics/execution_test.py
+++ b/sdks/python/apache_beam/metrics/execution_test.py
@@ -29,9 +29,9 @@ from apache_beam.metrics.metricbase import MetricName
class TestMetricsContainer(unittest.TestCase):
def test_create_new_counter(self):
mc = MetricsContainer('astep')
- self.assertFalse(mc.counters.has_key(MetricName('namespace', 'name')))
+ self.assertFalse(MetricName('namespace', 'name') in mc.counters)
mc.get_counter(MetricName('namespace', 'name'))
- self.assertTrue(mc.counters.has_key(MetricName('namespace', 'name')))
+ self.assertTrue(MetricName('namespace', 'name') in mc.counters)
def test_scoped_container(self):
c1 = MetricsContainer('mystep')
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 3abcbf2..2598551 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -50,7 +50,7 @@ def _static_value_provider_of(value_type):
"""
def _f(value):
- _f.func_name = value_type.__name__
+ _f.__name__ = value_type.__name__
return StaticValueProvider(value_type, value)
return _f
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 66c033f..64abe41 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -248,14 +248,14 @@ class PerWindowInvoker(DoFnInvoker):
elif d == core.DoFn.SideInputParam:
# If no more args are present then the value must be passed via kwarg
try:
- args_with_placeholders.append(remaining_args_iter.next())
+ args_with_placeholders.append(next(remaining_args_iter))
except StopIteration:
if a not in input_kwargs:
raise ValueError("Value for sideinput %s not provided" % a)
else:
# If no more args are present then the value must be passed via kwarg
try:
- args_with_placeholders.append(remaining_args_iter.next())
+ args_with_placeholders.append(next(remaining_args_iter))
except StopIteration:
pass
args_with_placeholders.extend(list(remaining_args_iter))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index e7c8d06..b2330c0 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -16,6 +16,7 @@
#
"""Wrapper of Beam runners that's built for running and verifying e2e tests."""
+from __future__ import print_function
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import GoogleCloudOptions
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 26d2019..16a2991 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -137,7 +137,7 @@ class TransformEvaluatorRegistry(object):
(core._GroupByKeyOnly,
_StreamingGroupByKeyOnly,
_StreamingGroupAlsoByWindow,
- _NativeWrite,))
+ _NativeWrite))
class RootBundleProvider(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index afb96fa..d4063df 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -435,7 +435,7 @@ class PartialGroupByKeyCombineValues(beam.PTransform):
def to_accumulator(v):
return self.combine_fn.add_input(
self.combine_fn.create_accumulator(), v)
- return input | beam.Map(lambda (k, v): (k, to_accumulator(v)))
+ return input | beam.Map(lambda k_v: (k_v[0], to_accumulator(k_v[1])))
class MergeAccumulators(beam.PTransform):
@@ -449,7 +449,7 @@ class MergeAccumulators(beam.PTransform):
return beam.pvalue.PCollection(input.pipeline)
else:
merge_accumulators = self.combine_fn.merge_accumulators
- return input | beam.Map(lambda (k, vs): (k, merge_accumulators(vs)))
+ return input | beam.Map(lambda k_vs: (k_vs[0], merge_accumulators(k_vs[1])))
class ExtractOutputs(beam.PTransform):
@@ -463,7 +463,7 @@ class ExtractOutputs(beam.PTransform):
return beam.pvalue.PCollection(input.pipeline)
else:
extract_output = self.combine_fn.extract_output
- return input | beam.Map(lambda (k, v): (k, extract_output(v)))
+ return input | beam.Map(lambda k_v1: (k_v1[0], extract_output(k_v1[1])))
class WorkerRunnerResult(PipelineResult):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 519ab6e..4c0d3b3 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -152,7 +152,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
derived = ((pcoll,) | beam.Flatten()
| beam.Map(lambda x: (x, x))
| beam.GroupByKey()
- | 'Unkey' >> beam.Map(lambda (x, _): x))
+ | 'Unkey' >> beam.Map(lambda x__: x__[0]))
assert_that(
pcoll | beam.FlatMap(cross_product, AsList(derived)),
equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
@@ -162,7 +162,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
res = (p
| beam.Create([('a', 1), ('a', 2), ('b', 3)])
| beam.GroupByKey()
- | beam.Map(lambda (k, vs): (k, sorted(vs))))
+ | beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))
def test_flatten(self):
@@ -199,7 +199,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
| beam.Map(lambda t: TimestampedValue(('k', t), t))
| beam.WindowInto(beam.transforms.window.Sessions(10))
| beam.GroupByKey()
- | beam.Map(lambda (k, vs): (k, sorted(vs))))
+ | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
def test_errors(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index b69d002..f44490b 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -112,7 +112,7 @@ class DataInputOperation(RunnerIOOperation):
# We must do this manually as we don't have a spec or spec.output_coders.
self.receivers = [
operations.ConsumerSet(self.counter_factory, self.step_name, 0,
- consumers.itervalues().next(),
+ next(consumers.itervalues()),
self.windowed_coder)]
def process(self, windowed_value):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 1136d99..ed9d84d 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -434,7 +434,7 @@ class PGBKCVOperation(Operation):
fn, args, kwargs = pickler.loads(self.spec.combine_fn)[:3]
self.combine_fn = curry_combine_fn(fn, args, kwargs)
if (getattr(fn.add_input, 'im_func', None)
- is core.CombineFn.add_input.im_func):
+ is core.CombineFn.add_input.__func__):
# Old versions of the SDK have CombineFns that don't implement add_input.
self.combine_fn_add_input = (
lambda a, e: self.combine_fn.add_inputs(a, [e]))
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index ef33c6f..5786105 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -74,7 +74,7 @@ class SdkHarness(object):
return self.worker.do_instruction(request)
except Exception as e: # pylint: disable=broad-except
traceback_str = traceback.format_exc(e)
- raise StandardError("Error processing request. Original traceback "
+ raise Exception("Error processing request. Original traceback "
"is\n%s\n" % traceback_str)
def handle_response(request, response_future):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/runners/worker/statesampler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py
index 2a85610..44b2f72 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_test.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py
@@ -16,6 +16,7 @@
#
"""Tests for state sampler."""
+from __future__ import absolute_import
import logging
import time
@@ -32,7 +33,7 @@ class StateSamplerTest(unittest.TestCase):
try:
# pylint: disable=global-variable-not-assigned
global statesampler
- import statesampler
+ from . import statesampler
except ImportError:
raise SkipTest('State sampler not compiled.')
super(StateSamplerTest, self).setUp()
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/testing/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index 9bb18cc..34c15f9 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -103,7 +103,7 @@ def assert_that(actual, matcher, label='assert_that'):
| "ToVoidKey" >> Map(lambda v: (None, v)))
_ = ((keyed_singleton, keyed_actual)
| "Group" >> CoGroupByKey()
- | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values)
+ | "Unkey" >> Map(lambda k___actual_values: k___actual_values[1][1])
| "Match" >> Map(matcher))
def default_label(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 60bf2d1..3348790 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -77,14 +77,16 @@ class MeanCombineFn(core.CombineFn):
def create_accumulator(self):
return (0, 0)
- def add_input(self, (sum_, count), element):
+ def add_input(self, xxx_todo_changeme, element):
+ (sum_, count) = xxx_todo_changeme
return sum_ + element, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
- def extract_output(self, (sum_, count)):
+ def extract_output(self, xxx_todo_changeme1):
+ (sum_, count) = xxx_todo_changeme1
if count == 0:
return float('NaN')
return sum_ / float(count)
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index e5f35c4..ff2428e 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -367,10 +367,10 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""Checks if an object is a bound method on an instance."""
if not isinstance(self.process, types.MethodType):
return False # Not a method
- if self.process.im_self is None:
+ if self.process.__self__ is None:
return False # Method is not bound
- if issubclass(self.process.im_class, type) or \
- self.process.im_class is types.ClassType:
+ if issubclass(self.process.__self__.__class__, type) or \
+ self.process.__self__.__class__ is type:
return False # Method is a classmethod
return True
@@ -383,7 +383,7 @@ def _fn_takes_side_inputs(fn):
except TypeError:
# We can't tell; maybe it does.
return True
- is_bound = isinstance(fn, types.MethodType) and fn.im_self is not None
+ is_bound = isinstance(fn, types.MethodType) and fn.__self__ is not None
return len(argspec.args) > 1 + is_bound or argspec.varargs or argspec.keywords
@@ -1111,7 +1111,7 @@ class CombineGlobally(PTransform):
KV[None, pcoll.element_type]))
| 'CombinePerKey' >> CombinePerKey(
self.fn, *self.args, **self.kwargs)
- | 'UnKey' >> Map(lambda (k, v): v))
+ | 'UnKey' >> Map(lambda k_v: k_v[1]))
if not self.has_defaults and not self.as_view:
return combined
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 2e6255a..d333ace 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -58,6 +58,7 @@ from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.typehints import validate_composite_type_param
from apache_beam.utils import proto_utils
from apache_beam.utils import urns
+from functools import reduce
__all__ = [
'PTransform',
@@ -714,8 +715,8 @@ def label_from_callable(fn):
elif hasattr(fn, '__name__'):
if fn.__name__ == '<lambda>':
return '<lambda at %s:%s>' % (
- os.path.basename(fn.func_code.co_filename),
- fn.func_code.co_firstlineno)
+ os.path.basename(fn.__code__.co_filename),
+ fn.__code__.co_firstlineno)
return fn.__name__
return str(fn)
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8237c52..112c092 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -18,6 +18,7 @@
"""Unit tests for the PTransform and descendants."""
from __future__ import absolute_import
+from __future__ import print_function
import collections
import operator
@@ -47,6 +48,7 @@ from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
from apache_beam.typehints.typehints_test import TypeHintTestCase
from apache_beam.utils.windowed_value import WindowedValue
+from functools import reduce
# Disable frequent lint warning due to pipe operator for chaining transforms.
# pylint: disable=expression-not-assigned
@@ -353,14 +355,16 @@ class PTransformTest(unittest.TestCase):
def create_accumulator(self):
return (0, 0)
- def add_input(self, (sum_, count), element):
+ def add_input(self, xxx_todo_changeme, element):
+ (sum_, count) = xxx_todo_changeme
return sum_ + element, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
- def extract_output(self, (sum_, count)):
+ def extract_output(self, xxx_todo_changeme3):
+ (sum_, count) = xxx_todo_changeme3
if not count:
return float('nan')
return sum_ / float(count)
@@ -619,7 +623,7 @@ class PTransformTest(unittest.TestCase):
pipeline = TestPipeline()
t = (beam.Map(lambda x: (x, 1))
| beam.GroupByKey()
- | beam.Map(lambda (x, ones): (x, sum(ones))))
+ | beam.Map(lambda x_ones: (x_ones[0], sum(x_ones[1]))))
result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
assert_that(result, equal_to([('a', 2), ('b', 1)]))
pipeline.run()
@@ -643,7 +647,7 @@ class PTransformTest(unittest.TestCase):
| beam.Flatten()
| beam.Map(lambda x: (x, None))
| beam.GroupByKey()
- | beam.Map(lambda (x, _): x))
+ | beam.Map(lambda x__: x__[0]))
self.assertEqual([1, 2, 3], sorted(([1, 2], [2, 3]) | DisjointUnion()))
def test_apply_to_crazy_pvaluish(self):
@@ -720,7 +724,7 @@ class PTransformLabelsTest(unittest.TestCase):
pipeline = TestPipeline()
map1 = 'Map1' >> beam.Map(lambda x: (x, 1))
gbk = 'Gbk' >> beam.GroupByKey()
- map2 = 'Map2' >> beam.Map(lambda (x, ones): (x, sum(ones)))
+ map2 = 'Map2' >> beam.Map(lambda x_ones2: (x_ones2[0], sum(x_ones2[1])))
t = (map1 | gbk | map2)
result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t
self.assertTrue('Map1|Gbk|Map2/Map1' in pipeline.applied_labels)
@@ -1320,7 +1324,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | ('Add' >> beam.FlatMap(lambda (x, y): [x + y])
+ | ('Add' >> beam.FlatMap(lambda x_y: [x_y[0] + x_y[1]])
.with_input_types(typehints.Tuple[int, int]).with_output_types(int))
)
self.p.run()
@@ -1339,9 +1343,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
# The type-hinted applied via the 'returns()' method indicates the ParDo
# should output an instance of type 'int', however a 'float' will be
# generated instead.
- print "HINTS", ('ToInt' >> beam.FlatMap(
+ print("HINTS", ('ToInt' >> beam.FlatMap(
lambda x: [float(x)]).with_input_types(int).with_output_types(
- int)).get_type_hints()
+ int)).get_type_hints())
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([1, 2, 3])
@@ -1368,7 +1372,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
| beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
- | ('Swap' >> beam.FlatMap(lambda (x, y): [x + y])
+ | ('Swap' >> beam.FlatMap(lambda x_y1: [x_y1[0] + x_y1[1]])
.with_input_types(typehints.Tuple[int, float])
.with_output_types(typehints.Tuple[float, int]))
)
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 0a69c3b..1d58834 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -55,7 +55,7 @@ class SideInputsTest(unittest.TestCase):
side |= beam.Map(lambda x: ('k%s' % x, 'v%s' % x))
res = main | beam.Map(lambda x, s: (x, s), side_input_type(side, **kw))
if side_input_type in (beam.pvalue.AsIter, beam.pvalue.AsList):
- res |= beam.Map(lambda (x, s): (x, sorted(s)))
+ res |= beam.Map(lambda x_s: (x_s[0], sorted(x_s[1])))
assert_that(res, equal_to(expected))
def test_global_global_windows(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index b2fd761..5a56c7a 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -407,11 +407,11 @@ class TriggerPipelineTest(unittest.TestCase):
result = (p
| beam.Create([1, 2, 3, 4, 5, 10, 11])
| beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
- | beam.Map(lambda (k, t): TimestampedValue((k, t), t))
+ | beam.Map(lambda k_t: TimestampedValue((k_t[0], k_t[1]), k_t[1]))
| beam.WindowInto(FixedWindows(10), trigger=AfterCount(3),
accumulation_mode=AccumulationMode.DISCARDING)
| beam.GroupByKey()
- | beam.Map(lambda (k, v): ('%s-%s' % (k, len(v)), set(v))))
+ | beam.Map(lambda k_v: ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))))
assert_that(result, equal_to(
{
'A-5': {1, 2, 3, 4, 5},
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 81b8c22..4509ed4 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,14 +98,16 @@ class CoGroupByKey(PTransform):
def expand(self, pcolls):
"""Performs CoGroupByKey on argument pcolls; see class docstring."""
# For associating values in K-V pairs with the PCollections they came from.
- def _pair_tag_with_value((key, value), tag):
+ def _pair_tag_with_value(xxx_todo_changeme, tag):
+ (key, value) = xxx_todo_changeme
return (key, (tag, value))
# Creates the key, value pairs for the output PCollection. Values are either
# lists or dicts (per the class docstring), initialized by the result of
# result_ctor(result_ctor_arg).
- def _merge_tagged_vals_under_key((key, grouped), result_ctor,
+ def _merge_tagged_vals_under_key(xxx_todo_changeme3, result_ctor,
result_ctor_arg):
+ (key, grouped) = xxx_todo_changeme3
result_value = result_ctor(result_ctor_arg)
for tag, value in grouped:
result_value[tag].append(value)
@@ -141,17 +143,17 @@ class CoGroupByKey(PTransform):
def Keys(label='Keys'): # pylint: disable=invalid-name
"""Produces a PCollection of first elements of 2-tuples in a PCollection."""
- return label >> Map(lambda (k, v): k)
+ return label >> Map(lambda k_v: k_v[0])
def Values(label='Values'): # pylint: disable=invalid-name
"""Produces a PCollection of second elements of 2-tuples in a PCollection."""
- return label >> Map(lambda (k, v): v)
+ return label >> Map(lambda k_v1: k_v1[1])
def KvSwap(label='KvSwap'): # pylint: disable=invalid-name
"""Produces a PCollection reversing 2-tuples in a PCollection."""
- return label >> Map(lambda (k, v): (v, k))
+ return label >> Map(lambda k_v2: (k_v2[1], k_v2[0]))
@ptransform_fn
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index 71c0622..7c1d4e9 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -51,7 +51,7 @@ def context(element, timestamp):
return WindowFn.AssignContext(timestamp, element)
-sort_values = Map(lambda (k, vs): (k, sorted(vs)))
+sort_values = Map(lambda k_vs: (k_vs[0], sorted(k_vs[1])))
class ReifyWindowsFn(core.DoFn):
@@ -195,7 +195,7 @@ class WindowTest(unittest.TestCase):
with TestPipeline() as p:
result = (p
| 'start' >> Create([(k, k) for k in range(10)])
- | Map(lambda (x, t): TimestampedValue(x, t))
+ | Map(lambda x_t: TimestampedValue(x_t[0], x_t[1]))
| 'w' >> WindowInto(FixedWindows(5))
| Map(lambda v: ('key', v))
| GroupByKey())
@@ -206,7 +206,7 @@ class WindowTest(unittest.TestCase):
with TestPipeline() as p:
result = (p
| Create([(k, k) for k in range(10)])
- | Map(lambda (x, t): TimestampedValue(x, t))
+ | Map(lambda x_t1: TimestampedValue(x_t1[0], x_t1[1]))
| 'window' >> WindowInto(SlidingWindows(period=2, size=6))
# Per the model, each element is now duplicated across
# three windows. Rewindowing must preserve this duplication.
@@ -232,7 +232,7 @@ class WindowTest(unittest.TestCase):
# Now there are values 5 ms apart and since Map propagates the
# windowing function from input to output the output PCollection
# will have elements falling into different 5ms windows.
- | Map(lambda (x, t): TimestampedValue(x, t))
+ | Map(lambda x_t2: TimestampedValue(x_t2[0], x_t2[1]))
# We add a 'key' to each value representing the index of the
# window. This is important since there is no guarantee of
# order for the elements of a PCollection.
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index 694433a..89dc6af 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -117,7 +117,7 @@ def getargspec(func):
try:
return _original_getargspec(func)
except TypeError:
- if isinstance(func, (type, types.ClassType)):
+ if isinstance(func, type):
argspec = getargspec(func.__init__)
del argspec.args[0]
return argspec
@@ -261,7 +261,7 @@ def getcallargs_forhints(func, *typeargs, **typekwargs):
packed_typeargs += list(typeargs[len(packed_typeargs):])
try:
callargs = inspect.getcallargs(func, *packed_typeargs, **typekwargs)
- except TypeError, e:
+ except TypeError as e:
raise TypeCheckError(e)
if argspec.defaults:
# Declare any default arguments to be Any.
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py
index 923b848..c3ba92a 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -26,19 +26,21 @@ are handled inline rather than here.
For internal use only; no backwards-compatibility guarantees.
"""
+from __future__ import absolute_import
import types
-import typehints
-from trivial_inference import BoundMethod
-from trivial_inference import Const
-from trivial_inference import element_type
-from trivial_inference import union
-from typehints import Any
-from typehints import Dict
-from typehints import Iterable
-from typehints import List
-from typehints import Tuple
-from typehints import Union
+from . import typehints
+from .trivial_inference import BoundMethod
+from .trivial_inference import Const
+from .trivial_inference import element_type
+from .trivial_inference import union
+from .typehints import Any
+from .typehints import Dict
+from .typehints import Iterable
+from .typehints import List
+from .typehints import Tuple
+from .typehints import Union
+from functools import reduce
def pop_one(state, unused_arg):
@@ -262,7 +264,7 @@ def load_attr(state, arg):
name = state.get_name(arg)
if isinstance(o, Const) and hasattr(o.value, name):
state.stack.append(Const(getattr(o.value, name)))
- elif (isinstance(o, (type, types.ClassType))
+ elif (isinstance(o, type)
and isinstance(getattr(o, name, None), types.MethodType)):
state.stack.append(Const(BoundMethod(getattr(o, name))))
else:
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index c740596..df96900 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -19,6 +19,8 @@
For internal use only; no backwards-compatibility guarantees.
"""
+from __future__ import print_function
+from __future__ import absolute_import
import __builtin__
import collections
import dis
@@ -28,6 +30,7 @@ import types
from apache_beam.typehints import Any
from apache_beam.typehints import typehints
+from functools import reduce
class TypeInferenceError(ValueError):
@@ -103,7 +106,7 @@ class FrameState(object):
def __init__(self, f, local_vars=None, stack=()):
self.f = f
- self.co = f.func_code
+ self.co = f.__code__
self.vars = list(local_vars)
self.stack = list(stack)
@@ -120,12 +123,12 @@ class FrameState(object):
ncellvars = len(self.co.co_cellvars)
if i < ncellvars:
return Any
- return Const(self.f.func_closure[i - ncellvars].cell_contents)
+ return Const(self.f.__closure__[i - ncellvars].cell_contents)
def get_global(self, i):
name = self.get_name(i)
- if name in self.f.func_globals:
- return Const(self.f.func_globals[name])
+ if name in self.f.__globals__:
+ return Const(self.f.__globals__[name])
if name in __builtin__.__dict__:
return Const(__builtin__.__dict__[name])
return Any
@@ -227,14 +230,14 @@ def infer_return_type(c, input_types, debug=False, depth=5):
elif isinstance(c, types.FunctionType):
return infer_return_type_func(c, input_types, debug, depth)
elif isinstance(c, types.MethodType):
- if c.im_self is not None:
- input_types = [Const(c.im_self)] + input_types
- return infer_return_type_func(c.im_func, input_types, debug, depth)
+ if c.__self__ is not None:
+ input_types = [Const(c.__self__)] + input_types
+ return infer_return_type_func(c.__func__, input_types, debug, depth)
elif isinstance(c, BoundMethod):
- input_types = [c.unbound.im_class] + input_types
+ input_types = [c.unbound.__self__.__class__] + input_types
return infer_return_type_func(
- c.unbound.im_func, input_types, debug, depth)
- elif isinstance(c, (type, types.ClassType)):
+ c.unbound.__func__, input_types, debug, depth)
+ elif isinstance(c, type):
if c in typehints.DISALLOWED_PRIMITIVE_TYPES:
return {
list: typehints.List[Any],
@@ -272,12 +275,12 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
TypeInferenceError: if no type can be inferred.
"""
if debug:
- print
- print f, id(f), input_types
- import opcodes
+ print()
+ print(f, id(f), input_types)
+ from . import opcodes
simple_ops = dict((k.upper(), v) for k, v in opcodes.__dict__.items())
- co = f.func_code
+ co = f.__code__
code = co.co_code
end = len(code)
pc = 0
@@ -299,38 +302,38 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
op = ord(code[pc])
if debug:
- print '-->' if pc == last_pc else ' ',
- print repr(pc).rjust(4),
- print dis.opname[op].ljust(20),
+ print('-->' if pc == last_pc else ' ', end=' ')
+ print(repr(pc).rjust(4), end=' ')
+ print(dis.opname[op].ljust(20), end=' ')
pc += 1
if op >= dis.HAVE_ARGUMENT:
arg = ord(code[pc]) + ord(code[pc + 1]) * 256 + extended_arg
extended_arg = 0
pc += 2
if op == dis.EXTENDED_ARG:
- extended_arg = arg * 65536L
+ extended_arg = arg * 65536
if debug:
- print str(arg).rjust(5),
+ print(str(arg).rjust(5), end=' ')
if op in dis.hasconst:
- print '(' + repr(co.co_consts[arg]) + ')',
+ print('(' + repr(co.co_consts[arg]) + ')', end=' ')
elif op in dis.hasname:
- print '(' + co.co_names[arg] + ')',
+ print('(' + co.co_names[arg] + ')', end=' ')
elif op in dis.hasjrel:
- print '(to ' + repr(pc + arg) + ')',
+ print('(to ' + repr(pc + arg) + ')', end=' ')
elif op in dis.haslocal:
- print '(' + co.co_varnames[arg] + ')',
+ print('(' + co.co_varnames[arg] + ')', end=' ')
elif op in dis.hascompare:
- print '(' + dis.cmp_op[arg] + ')',
+ print('(' + dis.cmp_op[arg] + ')', end=' ')
elif op in dis.hasfree:
if free is None:
free = co.co_cellvars + co.co_freevars
- print '(' + free[arg] + ')',
+ print('(' + free[arg] + ')', end=' ')
# Acutally emulate the op.
if state is None and states[start] is None:
# No control reaches here (yet).
if debug:
- print
+ print()
continue
state |= states[start]
@@ -398,8 +401,8 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
states[jmp] = new_state
if debug:
- print
- print state
+ print()
+ print(state)
pprint.pprint(dict(item for item in states.items() if item[1]))
if yields:
@@ -408,5 +411,5 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
result = reduce(union, Const.unwrap_all(returns))
if debug:
- print f, id(f), input_types, '->', result
+ print(f, id(f), input_types, '->', result)
return result
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 38259c8..8af9dd6 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -39,7 +39,8 @@ class TrivialInferenceTest(unittest.TestCase):
typehints.Tuple[str, int, float], lambda x: (x, 0, 1.0), [str])
def testUnpack(self):
- def reverse((a, b)):
+ def reverse(xxx_todo_changeme):
+ (a, b) = xxx_todo_changeme
return b, a
any_tuple = typehints.Tuple[typehints.Any, typehints.Any]
self.assertReturnType(
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 1df1104..501715b 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -103,14 +103,16 @@ class NativeTypesTest(unittest.TestCase):
def test_good_main_input(self):
@typehints.with_input_types(typing.Tuple[str, int])
- def munge((s, i)):
+ def munge(xxx_todo_changeme):
+ (s, i) = xxx_todo_changeme
return (s + 's', i * 2)
result = [('apple', 5), ('pear', 3)] | beam.Map(munge)
self.assertEqual([('apples', 10), ('pears', 6)], sorted(result))
def test_bad_main_input(self):
@typehints.with_input_types(typing.Tuple[str, str])
- def munge((s, i)):
+ def munge(xxx_todo_changeme1):
+ (s, i) = xxx_todo_changeme1
return (s + 's', i * 2)
with self.assertRaises(typehints.TypeCheckError):
[('apple', 5), ('pear', 3)] | beam.Map(munge)
@@ -118,7 +120,8 @@ class NativeTypesTest(unittest.TestCase):
def test_bad_main_output(self):
@typehints.with_input_types(typing.Tuple[int, int])
@typehints.with_output_types(typing.Tuple[str, str])
- def munge((a, b)):
+ def munge(xxx_todo_changeme2):
+ (a, b) = xxx_todo_changeme2
return (str(a), str(b))
with self.assertRaises(typehints.TypeCheckError):
[(5, 4), (3, 2)] | beam.Map(munge) | 'Again' >> beam.Map(munge)
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index c19916f..a27dd7e 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -315,7 +315,7 @@ def validate_composite_type_param(type_param, error_msg_prefix):
"""
# Must either be a TypeConstraint instance or a basic Python type.
is_not_type_constraint = (
- not isinstance(type_param, (type, types.ClassType, TypeConstraint))
+ not isinstance(type_param, (type, TypeConstraint))
and type_param is not None)
is_forbidden_type = (isinstance(type_param, type) and
type_param in DISALLOWED_PRIMITIVE_TYPES)
@@ -340,7 +340,7 @@ def _unified_repr(o):
A qualified name for the passed Python object fit for string formatting.
"""
return repr(o) if isinstance(
- o, (TypeConstraint, types.NoneType)) else o.__name__
+ o, (TypeConstraint, type(None))) else o.__name__
def check_constraint(type_constraint, object_instance):
@@ -491,7 +491,7 @@ class UnionHint(CompositeTypeHint):
if Any in params:
return Any
elif len(params) == 1:
- return iter(params).next()
+ return next(iter(params))
return self.UnionConstraint(params)
http://git-wip-us.apache.org/repos/asf/beam/blob/22f9263e/sdks/python/apache_beam/utils/retry.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 08223b3..927da14 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -182,7 +182,7 @@ def with_exponential_backoff(
exn_traceback = sys.exc_info()[2]
try:
try:
- sleep_interval = retry_intervals.next()
+ sleep_interval = next(retry_intervals)
except StopIteration:
# Re-raise the original exception since we finished the retries.
raise exn, None, exn_traceback # pylint: disable=raising-bad-type
[14/18] beam git commit: pr/cl-feedback from c-y-koo
Posted by ro...@apache.org.
pr/cl-feedback from c-y-koo
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3a04dbe4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3a04dbe4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3a04dbe4
Branch: refs/heads/master
Commit: 3a04dbe46debc2c4aa188b210870a24dc1408bf1
Parents: 3b0ad58
Author: Holden Karau <ho...@us.ibm.com>
Authored: Tue Oct 10 12:31:16 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/complete/game/leader_board.py | 2 +-
.../python/apache_beam/examples/cookbook/mergecontacts.py | 9 +++------
sdks/python/apache_beam/examples/snippets/snippets.py | 1 -
.../runners/portability/maptask_executor_runner_test.py | 2 +-
sdks/python/apache_beam/transforms/ptransform_test.py | 10 +++++-----
5 files changed, 10 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index a5bde05..e207f26 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -329,11 +329,11 @@ def run(argv=None):
'processing_time': 'STRING',
}))
- # Get user scores and write the results to BigQuery
def format_user_score_sums(user_score):
(user, score) = user_score
return {'user': user, 'total_score': score}
+ # Get user scores and write the results to BigQuery
(events # pylint: disable=expression-not-assigned
| 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
| 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index b07b98d..237d4ca 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -118,12 +118,9 @@ def run(argv=None, assert_results=None):
(_, (_, _, snailmail)) = name_email_phone_snailmail
return not next(iter(snailmail), None)
- luddites = grouped | beam.Filter( # People without email.
- without_email)
- writers = grouped | beam.Filter( # People without phones.
- without_phones)
- nomads = grouped | beam.Filter( # People without addresses.
- without_address)
+ luddites = grouped | beam.Filter(without_email) # People without email.
+ writers = grouped | beam.Filter(without_phones) # People without phones.
+ nomads = grouped | beam.Filter(without_address) # People without addresses.
num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 54abd8c..048b31a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1179,7 +1179,6 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
return '%s; %s; %s' %\
(name, sorted(info['emails']), sorted(info['phones']))
-
contact_lines = result | beam.Map(join_info)
# [END model_group_by_key_cogroupbykey_tuple]
formatted_results | beam.io.WriteToText(output_path)
http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
index 4c0d3b3..0f8637f 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py
@@ -152,7 +152,7 @@ class MapTaskExecutorRunnerTest(unittest.TestCase):
derived = ((pcoll,) | beam.Flatten()
| beam.Map(lambda x: (x, x))
| beam.GroupByKey()
- | 'Unkey' >> beam.Map(lambda x__: x__[0]))
+ | 'Unkey' >> beam.Map(lambda kv: kv[0]))
assert_that(
pcoll | beam.FlatMap(cross_product, AsList(derived)),
equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))
http://git-wip-us.apache.org/repos/asf/beam/blob/3a04dbe4/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 0d2bb7a..dac2c4f 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -647,7 +647,7 @@ class PTransformTest(unittest.TestCase):
| beam.Flatten()
| beam.Map(lambda x: (x, None))
| beam.GroupByKey()
- | beam.Map(lambda x__: x__[0]))
+ | beam.Map(lambda kv: kv[0]))
self.assertEqual([1, 2, 3], sorted(([1, 2], [2, 3]) | DisjointUnion()))
def test_apply_to_crazy_pvaluish(self):
@@ -1593,8 +1593,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'C' >> beam.Create(range(5)).with_output_types(int)
| 'Mean' >> combine.Mean.Globally())
- assert_that(d, equal_to([2.0]))
self.assertEqual(float, d.element_type)
+ assert_that(d, equal_to([2.0]))
self.p.run()
def test_mean_globally_pipeline_checking_violated(self):
@@ -1616,8 +1616,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'C' >> beam.Create(range(5)).with_output_types(int)
| 'Mean' >> combine.Mean.Globally())
- assert_that(d, equal_to([2.0]))
self.assertEqual(float, d.element_type)
+ assert_that(d, equal_to([2.0]))
self.p.run()
def test_mean_globally_runtime_checking_violated(self):
@@ -1709,8 +1709,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'P' >> beam.Create(range(5)).with_output_types(int)
| 'CountInt' >> combine.Count.Globally())
- assert_that(d, equal_to([5]))
self.assertEqual(int, d.element_type)
+ assert_that(d, equal_to([5]))
self.p.run()
def test_count_globally_runtime_type_checking_satisfied(self):
@@ -1720,8 +1720,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'P' >> beam.Create(range(5)).with_output_types(int)
| 'CountInt' >> combine.Count.Globally())
- assert_that(d, equal_to([5]))
self.assertEqual(int, d.element_type)
+ assert_that(d, equal_to([5]))
self.p.run()
def test_count_perkey_pipeline_type_checking_satisfied(self):
[04/18] beam git commit: Change the ptransform_test to use
assertEqual rather than assertTrue for improved debugging,
fix the error message we're looking for since we now throw a tuple exception
instead
Posted by ro...@apache.org.
Change the ptransform_test to use assertEqual rather than assertTrue for improved debugging, fix the error message we're looking for since we now throw a tuple exception instead
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5cc6b5f1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5cc6b5f1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5cc6b5f1
Branch: refs/heads/master
Commit: 5cc6b5f17e0e1e117a8cd237d4c7be8d7fce8a17
Parents: 7d0040d
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 20:49:40 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700
----------------------------------------------------------------------
.../apache_beam/transforms/ptransform_test.py | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5cc6b5f1/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index c137b14..2f2734d 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1332,9 +1332,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
self.assertStartswith(
e.exception.message,
"Runtime type violation detected within ParDo(Add): "
- "Type-hint for argument: 'y' violated. "
- "Expected an instance of <type 'int'>, "
- "instead found 3.0, an instance of <type 'float'>.")
+ "Type-hint for argument: 'x_y' violated: "
+ "Tuple[int, int] hint type-constraint violated. "
+ "The type of element #1 in the passed tuple is incorrect. "
+ "Expected an instance of type int, instead received an instance "
+ "of type float.")
def test_pipeline_runtime_checking_violation_simple_type_output(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1591,8 +1593,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'C' >> beam.Create(range(5)).with_output_types(int)
| 'Mean' >> combine.Mean.Globally())
- self.assertTrue(d.element_type is float)
assert_that(d, equal_to([2.0]))
+ self.assertEqual(float, d.element_type)
self.p.run()
def test_mean_globally_pipeline_checking_violated(self):
@@ -1614,8 +1616,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'C' >> beam.Create(range(5)).with_output_types(int)
| 'Mean' >> combine.Mean.Globally())
- self.assertTrue(d.element_type is float)
assert_that(d, equal_to([2.0]))
+ self.assertEqual(float, d.element_type)
self.p.run()
def test_mean_globally_runtime_checking_violated(self):
@@ -1707,8 +1709,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'P' >> beam.Create(range(5)).with_output_types(int)
| 'CountInt' >> combine.Count.Globally())
- self.assertTrue(d.element_type is int)
assert_that(d, equal_to([5]))
+ self.assertEqual(int, d.element_type)
self.p.run()
def test_count_globally_runtime_type_checking_satisfied(self):
@@ -1718,8 +1720,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
| 'P' >> beam.Create(range(5)).with_output_types(int)
| 'CountInt' >> combine.Count.Globally())
- self.assertTrue(d.element_type is int)
assert_that(d, equal_to([5]))
+ self.assertEqual(int, d.element_type)
self.p.run()
def test_count_perkey_pipeline_type_checking_satisfied(self):
[13/18] beam git commit: Re-order comment (reviewer feedback)
Posted by ro...@apache.org.
Re-order comment (reviewer feedback)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/14cf6a13
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/14cf6a13
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/14cf6a13
Branch: refs/heads/master
Commit: 14cf6a13a3e3152ca8f168535500a95f16db07b2
Parents: bf910c1
Author: Holden Karau <ho...@us.ibm.com>
Authored: Thu Sep 7 23:38:37 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700
----------------------------------------------------------------------
.../examples/complete/juliaset/juliaset/juliaset.py | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/14cf6a13/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index bb5b185..165237d 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -104,12 +104,12 @@ def run(argv=None): # pylint: disable=missing-docstring
coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
- # Group each coordinate triplet by its x value, then write the coordinates
- # to the output file with an x-coordinate grouping per line.
- # pylint: disable=expression-not-assigned
def x_coord_key(x_y_i):
return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+ # Group each coordinate triplet by its x value, then write the coordinates
+ # to the output file with an x-coordinate grouping per line.
+ # pylint: disable=expression-not-assigned
(coordinates
| 'x coord key' >> beam.Map(x_coord_key)
| 'x coord' >> beam.GroupByKey()
[11/18] beam git commit: Change the xxx_todo_changeme[x]s to
reasonable values
Posted by ro...@apache.org.
Change the xxx_todo_changeme[x]s to reasonable values
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a795e545
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a795e545
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a795e545
Branch: refs/heads/master
Commit: a795e545157d20099c82970e4e2c302aa151413b
Parents: fe0dc2e
Author: Holden Karau <ho...@us.ibm.com>
Authored: Wed Sep 6 16:15:10 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/examples/complete/tfidf.py | 12 ++++++------
sdks/python/apache_beam/examples/snippets/snippets.py | 4 ++--
.../apache_beam/examples/snippets/snippets_test.py | 8 ++++----
sdks/python/apache_beam/transforms/combiners.py | 8 ++++----
sdks/python/apache_beam/transforms/ptransform_test.py | 8 ++++----
sdks/python/apache_beam/transforms/util.py | 8 ++++----
.../apache_beam/typehints/trivial_inference_test.py | 4 ++--
.../python/apache_beam/typehints/typed_pipeline_test.py | 12 ++++++------
8 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 55404df..065e4b3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -68,8 +68,8 @@ class TfIdf(beam.PTransform):
# Create a collection of pairs mapping a URI to each of the words
# in the document associated with that that URI.
- def split_into_words(xxx_todo_changeme):
- (uri, line) = xxx_todo_changeme
+ def split_into_words(uri_line):
+ (uri, line) = uri_line
return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
uri_to_words = (
@@ -128,8 +128,8 @@ class TfIdf(beam.PTransform):
# that word occurs in the document divided by the total number of words in
# the document.
- def compute_term_frequency(xxx_todo_changeme1):
- (uri, count_and_total) = xxx_todo_changeme1
+ def compute_term_frequency(uri_count_and_total):
+ (uri, count_and_total) = uri_count_and_total
word_and_count = count_and_total['word counts']
# We have an iterable for one element that we want extracted.
[word_total] = count_and_total['word totals']
@@ -169,8 +169,8 @@ class TfIdf(beam.PTransform):
# basic version that is the term frequency divided by the log of the
# document frequency.
- def compute_tf_idf(xxx_todo_changeme2):
- (word, tf_and_df) = xxx_todo_changeme2
+ def compute_tf_idf(word_tf_and_df):
+ (word, tf_and_df) = word_tf_and_df
[docf] = tf_and_df['df']
for uri, tf in tf_and_df['tf']:
yield word, (uri, tf * math.log(1 / docf))
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 10080c9..01118b3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1171,8 +1171,8 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
results = ({'emails': emails_pcoll, 'phones': phones_pcoll}
| beam.CoGroupByKey())
- def join_info(xxx_todo_changeme):
- (name, info) = xxx_todo_changeme
+ def join_info(name_info):
+ (name, info) = name_info
return '%s; %s; %s' %\
(name, sorted(info['emails']), sorted(info['phones']))
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 8a9695d..8f88ab9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -882,16 +882,16 @@ class CombineTest(unittest.TestCase):
def create_accumulator(self):
return (0.0, 0)
- def add_input(self, xxx_todo_changeme, input):
- (sum, count) = xxx_todo_changeme
+ def add_input(self, sum_count, input):
+ (sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
- def extract_output(self, xxx_todo_changeme1):
- (sum, count) = xxx_todo_changeme1
+ def extract_output(self, sum_count):
+ (sum, count) = sum_count
return sum / count if count else float('NaN')
# [END combine_custom_average_define]
# [START combine_custom_average_execute]
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 3348790..ce5e942 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -77,16 +77,16 @@ class MeanCombineFn(core.CombineFn):
def create_accumulator(self):
return (0, 0)
- def add_input(self, xxx_todo_changeme, element):
- (sum_, count) = xxx_todo_changeme
+ def add_input(self, sum_count, element):
+ (sum_, count) = sum_count
return sum_ + element, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
- def extract_output(self, xxx_todo_changeme1):
- (sum_, count) = xxx_todo_changeme1
+ def extract_output(self, sum_count):
+ (sum_, count) = sum_count
if count == 0:
return float('NaN')
return sum_ / float(count)
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 2f2734d..0d2bb7a 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -355,16 +355,16 @@ class PTransformTest(unittest.TestCase):
def create_accumulator(self):
return (0, 0)
- def add_input(self, xxx_todo_changeme, element):
- (sum_, count) = xxx_todo_changeme
+ def add_input(self, sum_count, element):
+ (sum_, count) = sum_count
return sum_ + element, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
- def extract_output(self, xxx_todo_changeme3):
- (sum_, count) = xxx_todo_changeme3
+ def extract_output(self, sum_count):
+ (sum_, count) = sum_count
if not count:
return float('nan')
return sum_ / float(count)
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 4509ed4..6a7e269 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,16 +98,16 @@ class CoGroupByKey(PTransform):
def expand(self, pcolls):
"""Performs CoGroupByKey on argument pcolls; see class docstring."""
# For associating values in K-V pairs with the PCollections they came from.
- def _pair_tag_with_value(xxx_todo_changeme, tag):
- (key, value) = xxx_todo_changeme
+ def _pair_tag_with_value(k_v, tag):
+ (key, value) = k_v
return (key, (tag, value))
# Creates the key, value pairs for the output PCollection. Values are either
# lists or dicts (per the class docstring), initialized by the result of
# result_ctor(result_ctor_arg).
- def _merge_tagged_vals_under_key(xxx_todo_changeme3, result_ctor,
+ def _merge_tagged_vals_under_key(k_grouped, result_ctor,
result_ctor_arg):
- (key, grouped) = xxx_todo_changeme3
+ (key, grouped) = k_grouped
result_value = result_ctor(result_ctor_arg)
for tag, value in grouped:
result_value[tag].append(value)
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 7b7b6a8..37b2258 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -44,8 +44,8 @@ class TrivialInferenceTest(unittest.TestCase):
typehints.Tuple[str, int, float], lambda x: (x, 0, 1.0), [str])
def testUnpack(self):
- def reverse(xxx_todo_changeme):
- (a, b) = xxx_todo_changeme
+ def reverse(a_b):
+ (a, b) = a_b
return b, a
any_tuple = typehints.Tuple[typehints.Any, typehints.Any]
self.assertReturnType(
http://git-wip-us.apache.org/repos/asf/beam/blob/a795e545/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 501715b..2581457 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -103,16 +103,16 @@ class NativeTypesTest(unittest.TestCase):
def test_good_main_input(self):
@typehints.with_input_types(typing.Tuple[str, int])
- def munge(xxx_todo_changeme):
- (s, i) = xxx_todo_changeme
+ def munge(s_i):
+ (s, i) = s_i
return (s + 's', i * 2)
result = [('apple', 5), ('pear', 3)] | beam.Map(munge)
self.assertEqual([('apples', 10), ('pears', 6)], sorted(result))
def test_bad_main_input(self):
@typehints.with_input_types(typing.Tuple[str, str])
- def munge(xxx_todo_changeme1):
- (s, i) = xxx_todo_changeme1
+ def munge(s_i):
+ (s, i) = s_i
return (s + 's', i * 2)
with self.assertRaises(typehints.TypeCheckError):
[('apple', 5), ('pear', 3)] | beam.Map(munge)
@@ -120,8 +120,8 @@ class NativeTypesTest(unittest.TestCase):
def test_bad_main_output(self):
@typehints.with_input_types(typing.Tuple[int, int])
@typehints.with_output_types(typing.Tuple[str, str])
- def munge(xxx_todo_changeme2):
- (a, b) = xxx_todo_changeme2
+ def munge(a_b):
+ (a, b) = a_b
return (str(a), str(b))
with self.assertRaises(typehints.TypeCheckError):
[(5, 4), (3, 2)] | beam.Map(munge) | 'Again' >> beam.Map(munge)
[15/18] beam git commit: Clarify docstring and update run_pylint to
be more clear about what steps the developer can take
Posted by ro...@apache.org.
Clarify docstring and update run_pylint to be more clear about what steps the developer can take
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf910c11
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf910c11
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf910c11
Branch: refs/heads/master
Commit: bf910c118b6940fef951d4441762b4c55b199ec1
Parents: 252c679
Author: Holden Karau <ho...@us.ibm.com>
Authored: Thu Sep 7 21:26:51 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/typehints/typehints.py | 6 +++++-
sdks/python/run_pylint.sh | 2 ++
2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bf910c11/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index b78ead2..6e1d8b7 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -191,7 +191,11 @@ class IndexableTypeConstraint(TypeConstraint):
"""
def _constraint_for_index(self, idx):
- """Returns the type at the given index."""
+ """Returns the type at the given index. This is used to allow type inference
+ to determine the correct type for a specific index. On lists this will also
+ be the same, however for tuples the value will depend on the position. This
+ was added as part of the futurize changes since more of the expressions now
+ index into tuples."""
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/bf910c11/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index 91d5c4a..06f2072 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -89,7 +89,9 @@ futurize_filtered=$(echo "$futurize_results" |grep -v 'pb2\|typehints.py\|trivia
count=${#futurize_filtered}
if [ "$count" != "0" ]; then
echo "Some of the changes require futurize stage 1 changes."
+ echo "The files with required changes:"
echo "$futurize_filtered"
+ echo "You can run futurize apache_beam to see the proposed changes."
exit 1
fi
echo "No future changes needed"
[17/18] beam git commit: Change the examples with long-lambdas which
were pulled out into defs to use explicit tuple unpacking instead of
indexing. Note: this still keeps the lambdas which are short enough and those
use indexing (as required).
Posted by ro...@apache.org.
Change the examples with long-lambdas which were pulled out into defs to use explicit tuple unpacking instead of indexing. Note: this still keeps the lambdas which are short enough and those use indexing (as required).
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3b0ad58c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3b0ad58c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3b0ad58c
Branch: refs/heads/master
Commit: 3b0ad58cd68ee0790ac1f10ddffa96a866d85a0c
Parents: cf2be41
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 8 12:56:27 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:53:55 2017 -0700
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 3 +-
.../examples/complete/game/leader_board.py | 3 +-
.../examples/complete/game/user_score.py | 3 +-
.../complete/juliaset/juliaset/juliaset.py | 3 +-
.../apache_beam/examples/complete/tfidf_test.py | 3 +-
.../examples/cookbook/datastore_wordcount.py | 8 ++++--
.../examples/cookbook/mergecontacts.py | 30 ++++++++++++--------
.../examples/cookbook/multiple_output_pardo.py | 10 ++++---
.../apache_beam/examples/snippets/snippets.py | 13 +++++----
.../apache_beam/examples/streaming_wordcount.py | 3 +-
.../apache_beam/examples/windowed_wordcount.py | 3 +-
sdks/python/apache_beam/examples/wordcount.py | 8 ++++--
.../apache_beam/examples/wordcount_debugging.py | 8 ++++--
.../apache_beam/examples/wordcount_minimal.py | 5 ++--
sdks/python/apache_beam/transforms/util.py | 8 +++---
15 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index c09b78e..b556e65 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -46,7 +46,8 @@ def run(argv=None):
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
def format_result(prefix_candidates):
- return '%s: %s' % (prefix_candidates[0], prefix_candidates[1])
+ (prefix, candidates) = prefix_candidates
+ return '%s: %s' % (prefix, candidates)
(p # pylint: disable=expression-not-assigned
| 'read' >> ReadFromText(known_args.input)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index c1b15e9..a5bde05 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -331,7 +331,8 @@ def run(argv=None):
# Get user scores and write the results to BigQuery
def format_user_score_sums(user_score):
- return {'user': user_score[0], 'total_score': user_score[1]}
+ (user, score) = user_score
+ return {'user': user, 'total_score': score}
(events # pylint: disable=expression-not-assigned
| 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 0405bab..3e3e547 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -139,7 +139,8 @@ def run(argv=None):
with beam.Pipeline(argv=pipeline_args) as p:
def format_user_score_sums(user_score):
- return 'user: %s, total_score: %s' % (user_score[0], user_score[1])
+ (user, score) = user_score
+ return 'user: %s, total_score: %s' % (user, score)
(p # pylint: disable=expression-not-assigned
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 165237d..3f3ef03 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,7 +105,8 @@ def run(argv=None): # pylint: disable=missing-docstring
coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
def x_coord_key(x_y_i):
- return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+ (x, y, i) = x_y_i
+ return (x, (x, y, i))
# Group each coordinate triplet by its x value, then write the coordinates
# to the output file with an x-coordinate grouping per line.
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 71e71e3..637d10a 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -52,7 +52,8 @@ class TfIdfTest(unittest.TestCase):
def test_tfidf_transform(self):
with TestPipeline() as p:
def re_key(word_uri_tfidf):
- return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])
+ (word, (uri, tfidf)) = word_uri_tfidf
+ return (word, uri, tfidf)
uri_to_line = p | 'create sample' >> beam.Create(
[('1.txt', 'abc def ghi'),
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 03fcd8a..099fb08 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -180,7 +180,8 @@ def read_from_datastore(project, user_options, pipeline_options):
# Count the occurrences of each word.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -190,8 +191,9 @@ def read_from_datastore(project, user_options, pipeline_options):
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index fdfa286..b07b98d 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -95,22 +95,28 @@ def run(argv=None, assert_results=None):
# Prepare tab-delimited output; something like this:
# "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only"
- tsv_lines = grouped | beam.Map(
- lambda name_email_phone_snailmail: '\t'.join(
- ['"%s"' % name_email_phone_snailmail[0],
- '"%s"' % ','.join(name_email_phone_snailmail[1][0]),
- '"%s"' % ','.join(name_email_phone_snailmail[1][1]),
- '"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
+ def format_as_tsv(name_email_phone_snailmail):
+ (name, (email, phone, snailmail)) = name_email_phone_snailmail
+ return '\t'.join(
+ ['"%s"' % name,
+ '"%s"' % ','.join(email),
+ '"%s"' % ','.join(phone),
+ '"%s"' % next(iter(snailmail), '')])
+
+ tsv_lines = grouped | beam.Map(format_as_tsv)
# Compute some stats about our database of people.
- def without_email(name_email_phone_snailmail1):
- return not next(iter(name_email_phone_snailmail1[1][0]), None)
+ def without_email(name_email_phone_snailmail):
+ (_, (email, _, _)) = name_email_phone_snailmail
+ return not next(iter(email), None)
- def without_phones(name_email_phone_snailmail2):
- return not next(iter(name_email_phone_snailmail2[1][1]), None)
+ def without_phones(name_email_phone_snailmail):
+ (_, (_, phone, _)) = name_email_phone_snailmail
+ return not next(iter(phone), None)
- def without_address(name_e_p_snailmail):
- return not next(iter(name_e_p_snailmail[1][2]), None)
+ def without_address(name_email_phone_snailmail):
+ (_, (_, _, snailmail)) = name_email_phone_snailmail
+ return not next(iter(snailmail), None)
luddites = grouped | beam.Filter( # People without email.
without_email)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index fe7929e..e3df3a8 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -120,10 +120,12 @@ class CountWords(beam.PTransform):
def expand(self, pcoll):
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
return (pcoll
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
@@ -169,7 +171,7 @@ def run(argv=None):
(character_count
| 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda __counts: sum(__counts[1]))
+ | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
| 'write chars' >> WriteToText(known_args.output + '-chars'))
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 01118b3..54abd8c 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -535,8 +535,9 @@ def examples_wordcount_templated(renames):
lines = p | 'Read' >> ReadFromText(wordcount_options.input)
# [END example_wordcount_templated]
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
(
lines
@@ -614,8 +615,9 @@ def examples_wordcount_debugging(renames):
[('Flourish', 3), ('stomach', 1)]))
# [END example_wordcount_debugging_assert]
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = (filtered_words
| 'format' >> beam.Map(format_result)
@@ -1126,7 +1128,8 @@ def model_group_by_key(contents, output_path):
import apache_beam as beam
with TestPipeline() as p: # Use TestPipeline for testing.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
words_and_counts = (
p
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index d1af4ce..df8a99b 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -59,7 +59,8 @@ def run(argv=None):
# Capitalize the characters in each line.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
transformed = (lines
# Use a pre-defined function that imports the re package.
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index f88b942..4c7eee1 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -70,7 +70,8 @@ def run(argv=None):
# Capitalize the characters in each line.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
transformed = (lines
| 'Split' >> (beam.FlatMap(find_words)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 60f1ffe..b1c4a5e 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -89,7 +89,8 @@ def run(argv=None):
# Count the occurrences of each word.
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
@@ -99,8 +100,9 @@ def run(argv=None):
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 7d18c0d..6ff8f26 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -94,7 +94,8 @@ class CountWords(beam.PTransform):
"""
def expand(self, pcoll):
def count_ones(word_ones):
- return (word_ones[0], sum(word_ones[1]))
+ (word, ones) = word_ones
+ return (word, sum(ones))
return (pcoll
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
@@ -144,8 +145,9 @@ def run(argv=None):
# Format the counts into a PCollection of strings and write the output using
# a "Write" transform that has side effects.
# pylint: disable=unused-variable
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = (filtered_words
| 'format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 54fd1f1..390c8c0 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,8 +106,9 @@ def run(argv=None):
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
- def format_result(w_c):
- return '%s: %s' % (w_c[0], w_c[1])
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
output = counts | 'Format' >> beam.Map(format_result)
http://git-wip-us.apache.org/repos/asf/beam/blob/3b0ad58c/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 6a7e269..647781f 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -98,16 +98,16 @@ class CoGroupByKey(PTransform):
def expand(self, pcolls):
"""Performs CoGroupByKey on argument pcolls; see class docstring."""
# For associating values in K-V pairs with the PCollections they came from.
- def _pair_tag_with_value(k_v, tag):
- (key, value) = k_v
+ def _pair_tag_with_value(key_value, tag):
+ (key, value) = key_value
return (key, (tag, value))
# Creates the key, value pairs for the output PCollection. Values are either
# lists or dicts (per the class docstring), initialized by the result of
# result_ctor(result_ctor_arg).
- def _merge_tagged_vals_under_key(k_grouped, result_ctor,
+ def _merge_tagged_vals_under_key(key_grouped, result_ctor,
result_ctor_arg):
- (key, grouped) = k_grouped
+ (key, grouped) = key_grouped
result_value = result_ctor(result_ctor_arg)
for tag, value in grouped:
result_value[tag].append(value)
[03/18] beam git commit: Add special work to handle indexable return
types. Introduce a base IndexableTypeConstraint.
Posted by ro...@apache.org.
Add special work to handle indexable return types. Introduce a base IndexableTypeConstraint.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f364248b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f364248b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f364248b
Branch: refs/heads/master
Commit: f364248b984e8ff884a34b37d49a17fa1a50cc26
Parents: 5cc6b5f
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 20:51:18 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700
----------------------------------------------------------------------
.../apache_beam/typehints/trivial_inference.py | 20 +++++++++++++-
.../typehints/trivial_inference_test.py | 5 ++++
sdks/python/apache_beam/typehints/typehints.py | 28 +++++++++++++++++---
3 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f364248b/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index 51d3db2..a68bd18 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -107,7 +107,10 @@ class FrameState(object):
def __init__(self, f, local_vars=None, stack=()):
self.f = f
- self.co = f.__code__
+ if sys.version_info[0] >= 3:
+ self.co = f.__code__
+ else:
+ self.co = f.func_code
self.vars = list(local_vars)
self.stack = list(stack)
@@ -362,7 +365,22 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
else:
return_type = Any
state.stack[-pop_count:] = [return_type]
+ elif (opname == 'BINARY_SUBSCR'
+ and isinstance(state.stack[1], Const)
+ and isinstance(state.stack[0], typehints.IndexableTypeConstraint)):
+ if debug:
+ print("Executing special case binary subscript")
+ idx = state.stack.pop()
+ src = state.stack.pop()
+ try:
+ state.stack.append(src._constraint_for_index(idx.value))
+ except Exception as e:
+ if debug:
+ print("Exception {0} during special case indexing".format(e))
+ state.stack.append(Any)
elif opname in simple_ops:
+ if debug:
+ print("Executing simple op " + opname)
simple_ops[opname](state, arg)
elif opname == 'RETURN_VALUE':
returns.add(state.stack[-1])
http://git-wip-us.apache.org/repos/asf/beam/blob/f364248b/sdks/python/apache_beam/typehints/trivial_inference_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 8af9dd6..7b7b6a8 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -32,6 +32,11 @@ class TrivialInferenceTest(unittest.TestCase):
def testIdentity(self):
self.assertReturnType(int, lambda x: x, [int])
+ def testIndexing(self):
+ self.assertReturnType(int, lambda x: x[0], [typehints.Tuple[int, str]])
+ self.assertReturnType(str, lambda x: x[1], [typehints.Tuple[int, str]])
+ self.assertReturnType(str, lambda x: x[1], [typehints.List[str]])
+
def testTuples(self):
self.assertReturnType(
typehints.Tuple[typehints.Tuple[()], int], lambda x: ((), x), [int])
http://git-wip-us.apache.org/repos/asf/beam/blob/f364248b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index a27dd7e..b78ead2 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -65,6 +65,7 @@ In addition, type-hints can be used to implement run-time type-checking via the
import collections
import copy
+import sys
import types
__all__ = [
@@ -184,7 +185,17 @@ def bind_type_variables(type_constraint, bindings):
return type_constraint
-class SequenceTypeConstraint(TypeConstraint):
+class IndexableTypeConstraint(TypeConstraint):
+ """An internal common base-class for all type constraints with indexing.
+ E.G. SequenceTypeConstraint + Tuple's of fixed size.
+ """
+
+ def _constraint_for_index(self, idx):
+ """Returns the type at the given index."""
+ raise NotImplementedError
+
+
+class SequenceTypeConstraint(IndexableTypeConstraint):
"""A common base-class for all sequence related type-constraint classes.
A sequence is defined as an arbitrary length homogeneous container type. Type
@@ -214,6 +225,10 @@ class SequenceTypeConstraint(TypeConstraint):
def _inner_types(self):
yield self.inner_type
+ def _constraint_for_index(self, idx):
+ """Returns the type at the given index."""
+ return self.inner_type
+
def _consistent_with_check_(self, sub):
return (isinstance(sub, self.__class__)
and is_consistent_with(sub.inner_type, self.inner_type))
@@ -314,8 +329,11 @@ def validate_composite_type_param(type_param, error_msg_prefix):
parameter for a :class:`CompositeTypeHint`.
"""
# Must either be a TypeConstraint instance or a basic Python type.
+ possible_classes = [type, TypeConstraint]
+ if sys.version_info[0] == 2:
+ possible_classes.append(types.ClassType)
is_not_type_constraint = (
- not isinstance(type_param, (type, TypeConstraint))
+ not isinstance(type_param, tuple(possible_classes))
and type_param is not None)
is_forbidden_type = (isinstance(type_param, type) and
type_param in DISALLOWED_PRIMITIVE_TYPES)
@@ -546,7 +564,7 @@ class TupleHint(CompositeTypeHint):
for elem in sub.tuple_types)
return super(TupleSequenceConstraint, self)._consistent_with_check_(sub)
- class TupleConstraint(TypeConstraint):
+ class TupleConstraint(IndexableTypeConstraint):
def __init__(self, type_params):
self.tuple_types = tuple(type_params)
@@ -566,6 +584,10 @@ class TupleHint(CompositeTypeHint):
for t in self.tuple_types:
yield t
+ def _constraint_for_index(self, idx):
+ """Returns the type at the given index."""
+ return self.tuple_types[idx]
+
def _consistent_with_check_(self, sub):
return (isinstance(sub, self.__class__)
and len(sub.tuple_types) == len(self.tuple_types)
[07/18] beam git commit: re-order imports (AUTOMATED isort)
Posted by ro...@apache.org.
re-order imports (AUTOMATED isort)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94739377
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94739377
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94739377
Branch: refs/heads/master
Commit: 94739377e02e992d21f24257189acd1c4ece9d42
Parents: dd6a789
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:56:11 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 1 +
.../apache_beam/coders/coders_test_common.py | 3 ++-
.../io/gcp/datastore/v1/datastoreio_test.py | 24 ++++++++++++--------
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 2 +-
sdks/python/apache_beam/typehints/opcodes.py | 3 ++-
.../apache_beam/typehints/trivial_inference.py | 5 ++--
7 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index 9f7b739..172ee74 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -27,6 +27,7 @@ coder_impl.pxd file for type hints.
For internal use only; no backwards-compatibility guarantees.
"""
from __future__ import absolute_import
+
from types import NoneType
from apache_beam.coders import observable
http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index d42e637..fc7279d 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -24,7 +24,6 @@ import unittest
import dill
-from . import observable
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
from apache_beam.runners import pipeline_context
@@ -34,6 +33,8 @@ from apache_beam.utils import timestamp
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from . import observable
+
# Defined out of line for picklability.
class CustomCoder(coders.Coder):
http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index 7c73a06..96866ce 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -1,4 +1,18 @@
from __future__ import print_function
+
+import unittest
+
+from mock import MagicMock
+from mock import call
+from mock import patch
+
+from apache_beam.io.gcp.datastore.v1 import fake_datastore
+from apache_beam.io.gcp.datastore.v1 import helper
+from apache_beam.io.gcp.datastore.v1 import query_splitter
+from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
+
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -16,18 +30,8 @@ from __future__ import print_function
# limitations under the License.
#
-import unittest
-from mock import MagicMock
-from mock import call
-from mock import patch
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.io.gcp.datastore.v1 import query_splitter
-from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
# Protect against environments where datastore library is not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index d333ace..0b6d608 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -41,6 +41,7 @@ import inspect
import operator
import os
import sys
+from functools import reduce
from google.protobuf import wrappers_pb2
@@ -58,7 +59,6 @@ from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.typehints import validate_composite_type_param
from apache_beam.utils import proto_utils
from apache_beam.utils import urns
-from functools import reduce
__all__ = [
'PTransform',
http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 112c092..c137b14 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -24,6 +24,7 @@ import collections
import operator
import re
import unittest
+from functools import reduce
import hamcrest as hc
from nose.plugins.attrib import attr
@@ -48,7 +49,6 @@ from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
from apache_beam.typehints.typehints_test import TypeHintTestCase
from apache_beam.utils.windowed_value import WindowedValue
-from functools import reduce
# Disable frequent lint warning due to pipe operator for chaining transforms.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/typehints/opcodes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py
index c3ba92a..dcca6d0 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -27,7 +27,9 @@ are handled inline rather than here.
For internal use only; no backwards-compatibility guarantees.
"""
from __future__ import absolute_import
+
import types
+from functools import reduce
from . import typehints
from .trivial_inference import BoundMethod
@@ -40,7 +42,6 @@ from .typehints import Iterable
from .typehints import List
from .typehints import Tuple
from .typehints import Union
-from functools import reduce
def pop_one(state, unused_arg):
http://git-wip-us.apache.org/repos/asf/beam/blob/94739377/sdks/python/apache_beam/typehints/trivial_inference.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py b/sdks/python/apache_beam/typehints/trivial_inference.py
index df96900..51d3db2 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -19,18 +19,19 @@
For internal use only; no backwards-compatibility guarantees.
"""
-from __future__ import print_function
from __future__ import absolute_import
+from __future__ import print_function
+
import __builtin__
import collections
import dis
import pprint
import sys
import types
+from functools import reduce
from apache_beam.typehints import Any
from apache_beam.typehints import typehints
-from functools import reduce
class TypeInferenceError(ValueError):
[05/18] beam git commit: Cleanup whitespace (AUTOMATED autopep8)
Posted by ro...@apache.org.
Cleanup whitespace (AUTOMATED autopep8)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d0040db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d0040db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d0040db
Branch: refs/heads/master
Commit: 7d0040dbc680e4c144311303014c3feffdc90b60
Parents: 9473937
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 19:57:19 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py | 2 --
sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 +-
2 files changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d0040db/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index 96866ce..b1d18fe 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -31,8 +31,6 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
#
-
-
# Protect against environments where datastore library is not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
http://git-wip-us.apache.org/repos/asf/beam/blob/7d0040db/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 5786105..1ad65fe 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -75,7 +75,7 @@ class SdkHarness(object):
except Exception as e: # pylint: disable=broad-except
traceback_str = traceback.format_exc(e)
raise Exception("Error processing request. Original traceback "
- "is\n%s\n" % traceback_str)
+ "is\n%s\n" % traceback_str)
def handle_response(request, response_future):
try:
[06/18] beam git commit: Fix remaining style issues from auto
conversion,
switch ref in pylint to stage 1 (stages in futurize are apparently 1 indexed)
Posted by ro...@apache.org.
Fix remaining style issues from auto conversion, switch ref in pylint to stage 1 (stages in futurize are apparently 1 indexed)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd702b3e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd702b3e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd702b3e
Branch: refs/heads/master
Commit: cd702b3e7ea81fe41952066cfe9a1892d5433bd9
Parents: f364248
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 21:38:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:08 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/complete/autocomplete.py | 5 +++--
.../examples/complete/game/leader_board.py | 6 ++++--
.../apache_beam/examples/complete/game/user_score.py | 6 ++++--
.../examples/complete/juliaset/juliaset/juliaset.py | 5 ++++-
sdks/python/apache_beam/examples/complete/tfidf.py | 6 ++++--
.../apache_beam/examples/complete/tfidf_test.py | 5 ++++-
.../examples/cookbook/datastore_wordcount.py | 10 ++++++++--
.../apache_beam/examples/cookbook/mergecontacts.py | 15 ++++++++++++---
.../examples/cookbook/multiple_output_pardo.py | 10 ++++++++--
.../python/apache_beam/examples/snippets/snippets.py | 15 ++++++++++++---
.../apache_beam/examples/streaming_wordcount.py | 5 ++++-
.../apache_beam/examples/windowed_wordcount.py | 5 ++++-
sdks/python/apache_beam/examples/wordcount.py | 10 ++++++++--
.../apache_beam/examples/wordcount_debugging.py | 10 ++++++++--
.../python/apache_beam/examples/wordcount_minimal.py | 5 ++++-
.../runners/portability/maptask_executor_runner.py | 6 +++++-
sdks/python/apache_beam/transforms/trigger_test.py | 10 ++++++++--
17 files changed, 104 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 81c5351..c09b78e 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -45,13 +45,14 @@ def run(argv=None):
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
+ def format_result(prefix_candidates):
+ return '%s: %s' % (prefix_candidates[0], prefix_candidates[1])
(p # pylint: disable=expression-not-assigned
| 'read' >> ReadFromText(known_args.input)
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
- | 'format' >> beam.Map(
- lambda prefix_candidates: '%s: %s' % (prefix_candidates[0], prefix_candidates[1]))
+ | 'format' >> beam.Map(format_result)
| 'write' >> WriteToText(known_args.output))
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/leader_board.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index 6fc7b5d..c1b15e9 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -330,10 +330,12 @@ def run(argv=None):
}))
# Get user scores and write the results to BigQuery
+ def format_user_score_sums(user_score):
+ return {'user': user_score[0], 'total_score': user_score[1]}
+
(events # pylint: disable=expression-not-assigned
| 'CalculateUserScores' >> CalculateUserScores(args.allowed_lateness)
- | 'FormatUserScoreSums' >> beam.Map(
- lambda user_score: {'user': user_score[0], 'total_score': user_score[1]})
+ | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
| 'WriteUserScoreSums' >> WriteToBigQuery(
args.table_name + '_users', args.dataset, {
'user': 'STRING',
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 5e093bb..0405bab 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -138,11 +138,13 @@ def run(argv=None):
args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
+ def format_user_score_sums(user_score):
+ return 'user: %s, total_score: %s' % (user_score[0], user_score[1])
+
(p # pylint: disable=expression-not-assigned
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
| 'UserScore' >> UserScore()
- | 'FormatUserScoreSums' >> beam.Map(
- lambda user_score: 'user: %s, total_score: %s' % (user_score[0], user_score[1]))
+ | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
| 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
# [END main]
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 1013168..bb5b185 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -107,8 +107,11 @@ def run(argv=None): # pylint: disable=missing-docstring
# Group each coordinate triplet by its x value, then write the coordinates
# to the output file with an x-coordinate grouping per line.
# pylint: disable=expression-not-assigned
+ def x_coord_key(x_y_i):
+ return (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2]))
+
(coordinates
- | 'x coord key' >> beam.Map(lambda x_y_i: (x_y_i[0], (x_y_i[0], x_y_i[1], x_y_i[2])))
+ | 'x coord key' >> beam.Map(x_coord_key)
| 'x coord' >> beam.GroupByKey()
| 'format' >> beam.Map(
lambda k_coords: ' '.join('(%s, %s, %s)' % c for c in k_coords[1]))
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 0300505..55404df 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -100,10 +100,12 @@ class TfIdf(beam.PTransform):
# Adjust the above collection to a mapping from (URI, word) pairs to counts
# into an isomorphic mapping from URI to (word, count) pairs, to prepare
# for a join by the URI key.
+ def shift_keys(uri_word_count):
+ return (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))
+
uri_to_word_and_count = (
uri_and_word_to_count
- | 'ShiftKeys' >> beam.Map(
- lambda uri_word_count: (uri_word_count[0][0], (uri_word_count[0][1], uri_word_count[1]))))
+ | 'ShiftKeys' >> beam.Map(shift_keys))
# Perform a CoGroupByKey (a sort of pre-join) on the prepared
# uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 957f4c7..71e71e3 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -51,6 +51,9 @@ class TfIdfTest(unittest.TestCase):
def test_tfidf_transform(self):
with TestPipeline() as p:
+ def re_key(word_uri_tfidf):
+ return (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])
+
uri_to_line = p | 'create sample' >> beam.Create(
[('1.txt', 'abc def ghi'),
('2.txt', 'abc def'),
@@ -58,7 +61,7 @@ class TfIdfTest(unittest.TestCase):
result = (
uri_to_line
| tfidf.TfIdf()
- | beam.Map(lambda word_uri_tfidf: (word_uri_tfidf[0], word_uri_tfidf[1][0], word_uri_tfidf[1][1])))
+ | beam.Map(re_key))
assert_that(result, equal_to(EXPECTED_RESULTS))
# Run the pipeline. Note that the assert_that above adds to the pipeline
# a check that the result PCollection contains expected values.
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 13c5998..03fcd8a 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -179,15 +179,21 @@ def read_from_datastore(project, user_options, pipeline_options):
project, query, user_options.namespace)
# Count the occurrences of each word.
+ def count_ones(word_ones):
+ return (word_ones[0], sum(word_ones[1]))
+
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
+ | 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
- output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
+ def format_result(w_c):
+ return '%s: %s' % (w_c[0], w_c[1])
+
+ output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 5a35e51..fdfa286 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -103,12 +103,21 @@ def run(argv=None, assert_results=None):
'"%s"' % next(iter(name_email_phone_snailmail[1][2]), '')]))
# Compute some stats about our database of people.
+ def without_email(name_email_phone_snailmail1):
+ return not next(iter(name_email_phone_snailmail1[1][0]), None)
+
+ def without_phones(name_email_phone_snailmail2):
+ return not next(iter(name_email_phone_snailmail2[1][1]), None)
+
+ def without_address(name_e_p_snailmail):
+ return not next(iter(name_e_p_snailmail[1][2]), None)
+
luddites = grouped | beam.Filter( # People without email.
- lambda name_email_phone_snailmail1: not next(iter(name_email_phone_snailmail1[1][0]), None))
+ without_email)
writers = grouped | beam.Filter( # People without phones.
- lambda name_email_phone_snailmail2: not next(iter(name_email_phone_snailmail2[1][1]), None))
+ without_phones)
nomads = grouped | beam.Filter( # People without addresses.
- lambda name_e_p_snailmail: not next(iter(name_e_p_snailmail[1][2]), None))
+ without_address)
num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally()
num_writers = writers | 'Writers' >> beam.combiners.Count.Globally()
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 259f95d..fe7929e 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -119,11 +119,17 @@ class CountWords(beam.PTransform):
"""
def expand(self, pcoll):
+ def count_ones(word_ones):
+ return (word_ones[0], sum(word_ones[1]))
+
+ def format_result(w_c):
+ return '%s: %s' % (w_c[0], w_c[1])
+
return (pcoll
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
- | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1])))
+ | 'count' >> beam.Map(count_ones)
+ | 'format' >> beam.Map(format_result))
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 873f3c3..10080c9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -535,6 +535,9 @@ def examples_wordcount_templated(renames):
lines = p | 'Read' >> ReadFromText(wordcount_options.input)
# [END example_wordcount_templated]
+ def format_result(w_c):
+ return '%s: %s' % (w_c[0], w_c[1])
+
(
lines
| 'ExtractWords' >> beam.FlatMap(
@@ -542,7 +545,7 @@ def examples_wordcount_templated(renames):
| 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
| 'Group' >> beam.GroupByKey()
| 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
- | 'Format' >> beam.Map(lambda word_c2: '%s: %s' % (word_c2[0], word_c2[1]))
+ | 'Format' >> beam.Map(format_result)
| 'Write' >> WriteToText(wordcount_options.output)
)
@@ -611,8 +614,11 @@ def examples_wordcount_debugging(renames):
[('Flourish', 3), ('stomach', 1)]))
# [END example_wordcount_debugging_assert]
+ def format_result(w_c):
+ return '%s: %s' % (w_c[0], w_c[1])
+
output = (filtered_words
- | 'format' >> beam.Map(lambda word_c1: '%s: %s' % (word_c1[0], word_c1[1]))
+ | 'format' >> beam.Map(format_result)
| 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
p.visit(SnippetUtils.RenameFiles(renames))
@@ -1119,6 +1125,9 @@ def model_group_by_key(contents, output_path):
import apache_beam as beam
with TestPipeline() as p: # Use TestPipeline for testing.
+ def count_ones(word_ones):
+ return (word_ones[0], sum(word_ones[1]))
+
words_and_counts = (
p
| beam.Create(contents)
@@ -1133,7 +1142,7 @@ def model_group_by_key(contents, output_path):
grouped_words = words_and_counts | beam.GroupByKey()
# [END model_group_by_key_transform]
(grouped_words
- | 'count words' >> beam.Map(lambda word_counts: (word_counts[0], sum(word_counts[1])))
+ | 'count words' >> beam.Map(count_ones)
| beam.io.WriteToText(output_path))
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 8a05991..d1af4ce 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -58,6 +58,9 @@ def run(argv=None):
lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
# Capitalize the characters in each line.
+ def count_ones(word_ones):
+ return (word_ones[0], sum(word_ones[1]))
+
transformed = (lines
# Use a pre-defined function that imports the re package.
| 'Split' >> (
@@ -65,7 +68,7 @@ def run(argv=None):
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'Group' >> beam.GroupByKey()
- | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+ | 'Count' >> beam.Map(count_ones)
| 'Format' >> beam.Map(lambda tup: '%s: %d' % tup))
# Write to PubSub.
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/windowed_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 680314b..f88b942 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -69,13 +69,16 @@ def run(argv=None):
lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
# Capitalize the characters in each line.
+ def count_ones(word_ones):
+ return (word_ones[0], sum(word_ones[1]))
+
transformed = (lines
| 'Split' >> (beam.FlatMap(find_words)
.with_output_types(unicode))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(2*60, 0))
| 'Group' >> beam.GroupByKey()
- | 'Count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
+ | 'Count' >> beam.Map(count_ones)
| 'Format' >> beam.ParDo(FormatDoFn()))
# Write to BigQuery.
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index e21e91d..60f1ffe 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -88,15 +88,21 @@ def run(argv=None):
lines = p | 'read' >> ReadFromText(known_args.input)
# Count the occurrences of each word.
+ def count_ones(word_ones):
+ return (word_ones[0], sum(word_ones[1]))
+
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
+ | 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
- output = counts | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
+ def format_result(w_c):
+ return '%s: %s' % (w_c[0], w_c[1])
+
+ output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index bdc4c16..7d18c0d 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -93,12 +93,15 @@ class CountWords(beam.PTransform):
PCollection of (word, count) tuples.
"""
def expand(self, pcoll):
+ def count_ones(word_ones):
+ return (word_ones[0], sum(word_ones[1]))
+
return (pcoll
| 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
- | 'count' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))))
+ | 'count' >> beam.Map(count_ones))
def run(argv=None):
@@ -141,8 +144,11 @@ def run(argv=None):
# Format the counts into a PCollection of strings and write the output using
# a "Write" transform that has side effects.
# pylint: disable=unused-variable
+ def format_result(w_c):
+ return '%s: %s' % (w_c[0], w_c[1])
+
output = (filtered_words
- | 'format' >> beam.Map(lambda word_c: '%s: %s' % (word_c[0], word_c[1]))
+ | 'format' >> beam.Map(format_result)
| 'write' >> WriteToText(known_args.output))
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 01c3955..54fd1f1 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -106,7 +106,10 @@ def run(argv=None):
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
- output = counts | 'Format' >> beam.Map(lambda w_c: '%s: %s' % (w_c[0], w_c[1]))
+ def format_result(w_c):
+ return '%s: %s' % (w_c[0], w_c[1])
+
+ output = counts | 'Format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index d4063df..5b580a6 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -449,7 +449,11 @@ class MergeAccumulators(beam.PTransform):
return beam.pvalue.PCollection(input.pipeline)
else:
merge_accumulators = self.combine_fn.merge_accumulators
- return input | beam.Map(lambda k_vs: (k_vs[0], merge_accumulators(k_vs[1])))
+
+ def merge_with_existing_key(k_vs):
+ return (k_vs[0], merge_accumulators(k_vs[1]))
+
+ return input | beam.Map(merge_with_existing_key)
class ExtractOutputs(beam.PTransform):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd702b3e/sdks/python/apache_beam/transforms/trigger_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 5a56c7a..3afabaf 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -404,14 +404,20 @@ class TriggerPipelineTest(unittest.TestCase):
def test_after_count(self):
with TestPipeline() as p:
+ def construct_timestamped(k_t):
+ return TimestampedValue((k_t[0], k_t[1]), k_t[1])
+
+ def format_result(k_v):
+ return ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))
+
result = (p
| beam.Create([1, 2, 3, 4, 5, 10, 11])
| beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
- | beam.Map(lambda k_t: TimestampedValue((k_t[0], k_t[1]), k_t[1]))
+ | beam.Map(construct_timestamped)
| beam.WindowInto(FixedWindows(10), trigger=AfterCount(3),
accumulation_mode=AccumulationMode.DISCARDING)
| beam.GroupByKey()
- | beam.Map(lambda k_v: ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))))
+ | beam.Map(format_result))
assert_that(result, equal_to(
{
'A-5': {1, 2, 3, 4, 5},
[10/18] beam git commit: Restore license position that got screwed up
Posted by ro...@apache.org.
Restore license position that got screwed up
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0bc5840
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0bc5840
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0bc5840
Branch: refs/heads/master
Commit: c0bc5840084e46e68d9fcff38aacf125f01a8613
Parents: cd702b3
Author: Holden Karau <ho...@us.ibm.com>
Authored: Fri Sep 1 21:43:35 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Oct 12 15:50:09 2017 -0700
----------------------------------------------------------------------
.../io/gcp/datastore/v1/datastoreio_test.py | 30 ++++++++++----------
sdks/python/run_pylint.sh | 1 +
2 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c0bc5840/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index b1d18fe..e131f93 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -1,18 +1,3 @@
-from __future__ import print_function
-
-import unittest
-
-from mock import MagicMock
-from mock import call
-from mock import patch
-
-from apache_beam.io.gcp.datastore.v1 import fake_datastore
-from apache_beam.io.gcp.datastore.v1 import helper
-from apache_beam.io.gcp.datastore.v1 import query_splitter
-from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
-from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -30,6 +15,21 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
# limitations under the License.
#
+from __future__ import print_function
+
+import unittest
+
+from mock import MagicMock
+from mock import call
+from mock import patch
+
+from apache_beam.io.gcp.datastore.v1 import fake_datastore
+from apache_beam.io.gcp.datastore.v1 import helper
+from apache_beam.io.gcp.datastore.v1 import query_splitter
+from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
+from apache_beam.io.gcp.datastore.v1.datastoreio import _Mutate
+
# Protect against environments where datastore library is not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
http://git-wip-us.apache.org/repos/asf/beam/blob/c0bc5840/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index d53bb14..ccd2e31 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -92,3 +92,4 @@ if [ "$count" != "0" ]; then
echo "$futurize_filtered"
exit 1
fi
+echo "No future changes needed"