You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/14 20:06:50 UTC
[1/3] beam git commit: [BEAM-1964] Fix lint issues for linter upgrade
-3
Repository: beam
Updated Branches:
refs/heads/master cf9ac454d -> 89ff0b145
[BEAM-1964] Fix lint issues for linter upgrade -3
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd79f4d8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd79f4d8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd79f4d8
Branch: refs/heads/master
Commit: bd79f4d8bacba116a4c7f188cad0cdbf507d36d8
Parents: bf474a0
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Apr 14 11:22:25 2017 -0700
Committer: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:14 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/concat_source.py | 74 +++++++++-----------
.../apache_beam/io/filebasedsource_test.py | 2 +-
sdks/python/apache_beam/io/fileio.py | 6 +-
sdks/python/apache_beam/io/filesystems_util.py | 3 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++--
sdks/python/apache_beam/io/iobase.py | 7 +-
sdks/python/apache_beam/io/localfilesystem.py | 3 +-
sdks/python/apache_beam/io/range_trackers.py | 19 +++--
sdks/python/apache_beam/io/source_test_utils.py | 7 +-
sdks/python/apache_beam/io/textio.py | 13 ++--
sdks/python/apache_beam/transforms/combiners.py | 29 ++++----
11 files changed, 81 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index 1656180..de51f0f 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -84,8 +84,7 @@ class ConcatSource(iobase.BoundedSource):
# Getting coder from the first sub-sources. This assumes all sub-sources
# to produce the same coder.
return self._source_bundles[0].source.default_output_coder()
- else:
- return super(ConcatSource, self).default_output_coder()
+ return super(ConcatSource, self).default_output_coder()
class ConcatRangeTracker(iobase.RangeTracker):
@@ -165,13 +164,12 @@ class ConcatRangeTracker(iobase.RangeTracker):
return False
elif source_ix == self._end[0] and self._end[1] is None:
return False
- else:
- assert source_ix >= self._claimed_source_ix
- self._claimed_source_ix = source_ix
- if source_pos is None:
- return True
- else:
- return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+ assert source_ix >= self._claimed_source_ix
+ self._claimed_source_ix = source_ix
+ if source_pos is None:
+ return True
+ return self.sub_range_tracker(source_ix).try_claim(source_pos)
def try_split(self, pos):
source_ix, source_pos = pos
@@ -185,24 +183,24 @@ class ConcatRangeTracker(iobase.RangeTracker):
elif source_ix == self._end[0] and self._end[1] is None:
# At/after end.
return None
+
+ if source_ix > self._claimed_source_ix:
+ # Prefer to split on even boundary.
+ split_pos = None
+ ratio = self._cumulative_weights[source_ix]
else:
- if source_ix > self._claimed_source_ix:
- # Prefer to split on even boundary.
- split_pos = None
- ratio = self._cumulative_weights[source_ix]
- else:
- # Split the current subsource.
- split = self.sub_range_tracker(source_ix).try_split(
- source_pos)
- if not split:
- return None
- split_pos, frac = split
- ratio = self.local_to_global(source_ix, frac)
-
- self._end = source_ix, split_pos
- self._cumulative_weights = [min(w / ratio, 1)
- for w in self._cumulative_weights]
- return (source_ix, split_pos), ratio
+ # Split the current subsource.
+ split = self.sub_range_tracker(source_ix).try_split(
+ source_pos)
+ if not split:
+ return None
+ split_pos, frac = split
+ ratio = self.local_to_global(source_ix, frac)
+
+ self._end = source_ix, split_pos
+ self._cumulative_weights = [min(w / ratio, 1)
+ for w in self._cumulative_weights]
+ return (source_ix, split_pos), ratio
def set_current_position(self, pos):
raise NotImplementedError('Should only be called on sub-trackers')
@@ -212,10 +210,9 @@ class ConcatRangeTracker(iobase.RangeTracker):
last = self._end[0] if self._end[1] is None else self._end[0] + 1
if source_ix == last:
return (source_ix, None)
- else:
- return (source_ix,
- self.sub_range_tracker(source_ix).position_at_fraction(
- source_frac))
+ return (source_ix,
+ self.sub_range_tracker(source_ix).position_at_fraction(
+ source_frac))
def fraction_consumed(self):
with self._lock:
@@ -234,15 +231,14 @@ class ConcatRangeTracker(iobase.RangeTracker):
if frac == 1:
last = self._end[0] if self._end[1] is None else self._end[0] + 1
return (last, None)
- else:
- cw = self._cumulative_weights
- # Find the last source that starts at or before frac.
- source_ix = bisect.bisect(cw, frac) - 1
- # Return this source, converting what's left of frac after starting
- # this source into a value in [0.0, 1.0) representing how far we are
- # towards the next source.
- return (source_ix,
- (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+ cw = self._cumulative_weights
+ # Find the last source that starts at or before frac.
+ source_ix = bisect.bisect(cw, frac) - 1
+ # Return this source, converting what's left of frac after starting
+ # this source into a value in [0.0, 1.0) representing how far we are
+ # towards the next source.
+ return (source_ix,
+ (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
def sub_range_tracker(self, source_ix):
assert self._start[0] <= source_ix <= self._end[0]
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7b7ec8a..24a31b1 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -115,10 +115,10 @@ def _write_prepared_data(data, directory=None,
def write_prepared_pattern(data, suffixes=None):
+ assert data, 'Data (%s) seems to be empty' % data
if suffixes is None:
suffixes = [''] * len(data)
temp_dir = tempfile.mkdtemp()
- assert len(data) > 0
for i, d in enumerate(data):
file_name = _write_prepared_data(d, temp_dir, prefix='mytemp',
suffix=suffixes[i])
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index b128dc5..dc8957e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -65,7 +65,7 @@ class ChannelFactory(object):
def rename_batch(src_dest_pairs):
sources = [s for s, _ in src_dest_pairs]
destinations = [d for _, d in src_dest_pairs]
- if len(sources) == 0:
+ if not sources:
return []
bfs = get_filesystem(sources[0])
try:
@@ -165,7 +165,7 @@ class FileSink(iobase.Sink):
if shard_name_template is None:
shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
- elif shard_name_template is '':
+ elif shard_name_template == '':
num_shards = 1
self.file_path_prefix = file_path_prefix
self.file_name_suffix = file_name_suffix
@@ -275,7 +275,7 @@ class FileSink(iobase.Sink):
return exceptions
except BeamIOError as exp:
if exp.exception_details is None:
- raise exp
+ raise
for (src, dest), exception in exp.exception_details.iteritems():
if exception:
logging.warning('Rename not successful: %s -> %s, %s', src, dest,
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
index 6d21298..5034068 100644
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -32,5 +32,4 @@ def get_filesystem(path):
'Google Cloud Platform IO not available, '
'please install apache_beam[gcp]')
return GCSFileSystem()
- else:
- return LocalFileSystem()
+ return LocalFileSystem()
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9a8174a..25f544d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -967,13 +967,12 @@ class BigQueryWrapper(object):
% (project_id, dataset_id, table_id))
if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
return found_table
- else:
- # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
- # the table before this point.
- return self._create_table(project_id=project_id,
- dataset_id=dataset_id,
- table_id=table_id,
- schema=schema or found_table.schema)
+ # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
+ # the table before this point.
+ return self._create_table(project_id=project_id,
+ dataset_id=dataset_id,
+ table_id=table_id,
+ schema=schema or found_table.schema)
def run_query(self, project_id, query, use_legacy_sql, flatten_results,
dry_run=False):
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 512824b..d9df5c4 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -805,8 +805,7 @@ class Read(ptransform.PTransform):
def _infer_output_coder(self, input_type=None, input_coder=None):
if isinstance(self.source, BoundedSource):
return self.source.default_output_coder()
- else:
- return self.source.coder
+ return self.source.coder
def display_data(self):
return {'source': DisplayDataItem(self.source.__class__,
@@ -945,8 +944,8 @@ class _WriteKeyedBundleDoFn(core.DoFn):
def process(self, element, init_result):
bundle = element
writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
- for element in bundle[1]: # values
- writer.write(element)
+ for e in bundle[1]: # values
+ writer.write(e)
return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)]
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 46589b0..7637f2a 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -93,8 +93,7 @@ class LocalFileSystem(FileSystem):
raw_file = open(path, mode)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
- else:
- return CompressedFile(raw_file, compression_type=compression_type)
+ return CompressedFile(raw_file, compression_type=compression_type)
def create(self, path, mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 8627f76..6e7b84f 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -42,9 +42,9 @@ class OffsetRangeTracker(iobase.RangeTracker):
raise ValueError('Start offset must not be \'None\'')
if end is None:
raise ValueError('End offset must not be \'None\'')
- assert isinstance(start, int) or isinstance(start, long)
+ assert isinstance(start, (int, long))
if end != self.OFFSET_INFINITY:
- assert isinstance(end, int) or isinstance(end, long)
+ assert isinstance(end, (int, long))
assert start <= end
@@ -91,8 +91,8 @@ class OffsetRangeTracker(iobase.RangeTracker):
'The first record [starting at %d] must be at a split point' %
record_start)
- if (split_point and self._offset_of_last_split_point is not -1 and
- record_start is self._offset_of_last_split_point):
+ if (split_point and self._offset_of_last_split_point != -1 and
+ record_start == self._offset_of_last_split_point):
raise ValueError(
'Record at a split point has same offset as the previous split '
'point: %d' % record_start)
@@ -354,8 +354,7 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
if self._stop_position is None or position < self._stop_position:
self._last_claim = position
return True
- else:
- return False
+ return False
def position_at_fraction(self, fraction):
return self.fraction_to_position(
@@ -373,15 +372,13 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
position, start=self._start_position, end=self._stop_position)
self._stop_position = position
return position, fraction
- else:
- return None
+ return None
def fraction_consumed(self):
if self._last_claim is self.UNSTARTED:
return 0
- else:
- return self.position_to_fraction(
- self._last_claim, self._start_position, self._stop_position)
+ return self.position_to_fraction(
+ self._last_claim, self._start_position, self._stop_position)
def position_to_fraction(self, pos, start, end):
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 07de738..542e9f6 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -611,10 +611,9 @@ def _assertSplitAtFractionConcurrent(
def read_or_split(test_params):
if test_params[0]:
return [val for val in test_params[1]]
- else:
- position = test_params[1].position_at_fraction(test_params[2])
- result = test_params[1].try_split(position)
- return result
+ position = test_params[1].position_at_fraction(test_params[2])
+ result = test_params[1].try_split(position)
+ return result
inputs = []
pool = thread_pool if thread_pool else _ThreadPool(2)
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9217e74..b6a24b0 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -198,9 +198,9 @@ class _TextSource(filebasedsource.FileBasedSource):
if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r':
# Found a '\r\n'. Accepting that as the next separator.
return (next_lf - 1, next_lf + 1)
- else:
- # Found a '\n'. Accepting that as the next separator.
- return (next_lf, next_lf + 1)
+
+ # Found a '\n'. Accepting that as the next separator.
+ return (next_lf, next_lf + 1)
current_pos = len(read_buffer.data)
@@ -256,10 +256,9 @@ class _TextSource(filebasedsource.FileBasedSource):
# Current record should not contain the separator.
return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]],
sep_bounds[1] - record_start_position_in_buffer)
- else:
- # Current record should contain the separator.
- return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
- sep_bounds[1] - record_start_position_in_buffer)
+ # Current record should contain the separator.
+ return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
+ sep_bounds[1] - record_start_position_in_buffer)
class _TextSink(fileio.FileSink):
http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index a4cd462..f812832 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -531,27 +531,26 @@ def curry_combine_fn(fn, args, kwargs):
if not args and not kwargs:
return fn
- else:
+ # Create CurriedFn class for the combiner
+ class CurriedFn(core.CombineFn):
+ """CombineFn that applies extra arguments."""
- class CurriedFn(core.CombineFn):
- """CombineFn that applies extra arguments."""
+ def create_accumulator(self):
+ return fn.create_accumulator(*args, **kwargs)
- def create_accumulator(self):
- return fn.create_accumulator(*args, **kwargs)
+ def add_input(self, accumulator, element):
+ return fn.add_input(accumulator, element, *args, **kwargs)
- def add_input(self, accumulator, element):
- return fn.add_input(accumulator, element, *args, **kwargs)
+ def merge_accumulators(self, accumulators):
+ return fn.merge_accumulators(accumulators, *args, **kwargs)
- def merge_accumulators(self, accumulators):
- return fn.merge_accumulators(accumulators, *args, **kwargs)
+ def extract_output(self, accumulator):
+ return fn.extract_output(accumulator, *args, **kwargs)
- def extract_output(self, accumulator):
- return fn.extract_output(accumulator, *args, **kwargs)
+ def apply(self, elements):
+ return fn.apply(elements, *args, **kwargs)
- def apply(self, elements):
- return fn.apply(elements, *args, **kwargs)
-
- return CurriedFn()
+ return CurriedFn()
class PhasedCombineFnExecutor(object):
[2/3] beam git commit: [BEAM-1964] Fix lint issues for linter upgrade
-2
Posted by al...@apache.org.
[BEAM-1964] Fix lint issues for linter upgrade -2
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf474a0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf474a0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf474a0b
Branch: refs/heads/master
Commit: bf474a0b72beb2e946be39ce04e3f07800a3b307
Parents: cf9ac45
Author: Sourabh Bajaj <so...@google.com>
Authored: Thu Apr 13 17:19:56 2017 -0700
Committer: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:14 2017 -0700
----------------------------------------------------------------------
.../io/gcp/datastore/v1/datastoreio.py | 4 +--
.../apache_beam/io/gcp/datastore/v1/helper.py | 16 ++++-------
.../io/gcp/datastore/v1/query_splitter.py | 2 +-
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 +-
.../io/gcp/tests/bigquery_matcher.py | 3 +-
sdks/python/apache_beam/metrics/cells.py | 28 +++++++++----------
sdks/python/apache_beam/metrics/execution.py | 3 +-
sdks/python/apache_beam/metrics/metric.py | 9 ++----
sdks/python/apache_beam/runners/common.py | 9 ++----
.../runners/dataflow/dataflow_metrics_test.py | 3 +-
.../runners/dataflow/dataflow_runner.py | 6 ++--
.../runners/dataflow/internal/apiclient.py | 8 +++---
.../runners/dataflow/internal/dependency.py | 6 ++--
.../runners/dataflow/test_dataflow_runner.py | 4 ---
.../runners/direct/bundle_factory.py | 14 ++++------
.../runners/direct/evaluation_context.py | 10 +++----
.../apache_beam/runners/direct/executor.py | 9 +-----
.../runners/direct/transform_evaluator.py | 7 -----
sdks/python/apache_beam/runners/runner.py | 3 +-
.../apache_beam/tests/pipeline_verifiers.py | 7 ++---
sdks/python/apache_beam/transforms/combiners.py | 29 +++++++-------------
.../apache_beam/transforms/combiners_test.py | 4 +--
sdks/python/apache_beam/typehints/decorators.py | 3 +-
sdks/python/apache_beam/typehints/typehints.py | 3 +-
24 files changed, 68 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index e8ca05d..d9b3598 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -253,7 +253,7 @@ class ReadFromDatastore(PTransform):
query = helper.make_latest_timestamp_query(namespace)
req = helper.make_request(project, namespace, query)
resp = datastore.run_query(req)
- if len(resp.batch.entity_results) == 0:
+ if not resp.batch.entity_results:
raise RuntimeError("Datastore total statistics unavailable.")
entity = resp.batch.entity_results[0].entity
@@ -281,7 +281,7 @@ class ReadFromDatastore(PTransform):
req = helper.make_request(project, namespace, kind_stats_query)
resp = datastore.run_query(req)
- if len(resp.batch.entity_results) == 0:
+ if not resp.batch.entity_results:
raise RuntimeError("Datastore statistics for kind %s unavailable" % kind)
entity = resp.batch.entity_results[0].entity
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index b1ef9af..d544226 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -62,8 +62,7 @@ def key_comparator(k1, k2):
k2_path = next(k2_iter, None)
if k2_path:
return -1
- else:
- return 0
+ return 0
def compare_path(p1, p2):
@@ -99,8 +98,7 @@ def str_compare(s1, s2):
return 0
elif s1 < s2:
return -1
- else:
- return 1
+ return 1
def get_datastore(project):
@@ -131,13 +129,9 @@ def make_partition(project, namespace):
def retry_on_rpc_error(exception):
"""A retry filter for Cloud Datastore RPCErrors."""
if isinstance(exception, RPCError):
- if exception.code >= 500:
- return True
- else:
- return False
- else:
- # TODO(vikasrk): Figure out what other errors should be retried.
- return False
+ return exception.code >= 500
+ # TODO(vikasrk): Figure out what other errors should be retried.
+ return False
def fetch_entities(project, namespace, query, datastore):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
index 8ced170..d5674f9 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
@@ -97,7 +97,7 @@ def _validate_query(query):
if len(query.kind) != 1:
raise ValueError('Query must have exactly one kind.')
- if len(query.order) != 0:
+ if query.order:
raise ValueError('Query cannot have any sort orders.')
if query.HasField('limit'):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index b2bc809..a10a3d2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -93,8 +93,7 @@ class GCSFileSystem(FileSystem):
raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
- else:
- return CompressedFile(raw_file, compression_type=compression_type)
+ return CompressedFile(raw_file, compression_type=compression_type)
def create(self, path, mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index cc26689..66d99b3 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -38,8 +38,7 @@ MAX_RETRIES = 4
def retry_on_http_and_value_error(exception):
"""Filter allowing retries on Bigquery errors and value error."""
- return isinstance(exception, GoogleCloudError) or \
- isinstance(exception, ValueError)
+ return isinstance(exception, (GoogleCloudError, ValueError))
class BigqueryMatcher(BaseMatcher):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index 5a571f5..c421949 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -97,9 +97,8 @@ class CellCommitState(object):
with self._lock:
if self._state == CellCommitState.CLEAN:
return False
- else:
- self._state = CellCommitState.COMMITTING
- return True
+ self._state = CellCommitState.COMMITTING
+ return True
class MetricCell(object):
@@ -218,8 +217,7 @@ class DistributionResult(object):
"""
if self.data.count == 0:
return None
- else:
- return float(self.data.sum)/self.data.count
+ return float(self.data.sum)/self.data.count
class DistributionData(object):
@@ -257,16 +255,16 @@ class DistributionData(object):
def combine(self, other):
if other is None:
return self
- else:
- new_min = (None if self.min is None and other.min is None else
- min(x for x in (self.min, other.min) if x is not None))
- new_max = (None if self.max is None and other.max is None else
- max(x for x in (self.max, other.max) if x is not None))
- return DistributionData(
- self.sum + other.sum,
- self.count + other.count,
- new_min,
- new_max)
+
+ new_min = (None if self.min is None and other.min is None else
+ min(x for x in (self.min, other.min) if x is not None))
+ new_max = (None if self.max is None and other.max is None else
+ max(x for x in (self.max, other.max) if x is not None))
+ return DistributionData(
+ self.sum + other.sum,
+ self.count + other.count,
+ new_min,
+ new_max)
@classmethod
def singleton(cls, value):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index f6c8990..887423b 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -129,8 +129,7 @@ class _MetricsEnvironment(object):
index = len(self.PER_THREAD.container) - 1
if index < 0:
return None
- else:
- return self.PER_THREAD.container[index]
+ return self.PER_THREAD.container[index]
def set_current_container(self, container):
self.set_container_stack()
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index f6a0923..33db4e1 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -103,8 +103,7 @@ class MetricResults(object):
(filter.names and
metric_key.metric.name in filter.names)):
return True
- else:
- return False
+ return False
@staticmethod
def _matches_sub_path(actual_scope, filter_scope):
@@ -117,8 +116,7 @@ class MetricResults(object):
return False # The first entry was not exactly matched
elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/':
return False # The last entry was not exactly matched
- else:
- return True
+ return True
@staticmethod
def _matches_scope(filter, metric_key):
@@ -139,8 +137,7 @@ class MetricResults(object):
if (MetricResults._matches_name(filter, metric_key) and
MetricResults._matches_scope(filter, metric_key)):
return True
- else:
- return False
+ return False
def query(self, filter=None):
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 2c1032d..8f86b75 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -414,10 +414,8 @@ def get_logging_context(maybe_logger, **kwargs):
maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs)
if isinstance(maybe_context, LoggingContext):
return maybe_context
- else:
- return _LoggingContextAdapter(maybe_context)
- else:
- return LoggingContext()
+ return _LoggingContextAdapter(maybe_context)
+ return LoggingContext()
class _ReceiverAdapter(Receiver):
@@ -432,5 +430,4 @@ class _ReceiverAdapter(Receiver):
def as_receiver(maybe_receiver):
if isinstance(maybe_receiver, Receiver):
return maybe_receiver
- else:
- return _ReceiverAdapter(maybe_receiver)
+ return _ReceiverAdapter(maybe_receiver)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 95027a3..ffee3e5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -38,8 +38,7 @@ class DictToObject(object):
def _wrap(self, value):
if isinstance(value, (tuple, list, set, frozenset)):
return type(value)([self._wrap(v) for v in value])
- else:
- return DictToObject(value) if isinstance(value, dict) else value
+ return DictToObject(value) if isinstance(value, dict) else value
class TestDataflowMetrics(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a92c26..2e9fc52 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -92,8 +92,7 @@ class DataflowRunner(PipelineRunner):
return -1
elif 'Traceback' in msg:
return 1
- else:
- return 0
+ return 0
job_id = result.job_id()
while True:
@@ -194,8 +193,7 @@ class DataflowRunner(PipelineRunner):
return coders.WindowedValueCoder(
coders.registry.get_coder(typehint),
window_coder=window_coder)
- else:
- return coders.registry.get_coder(typehint)
+ return coders.registry.get_coder(typehint)
def _get_cloud_encoding(self, coder):
"""Returns an encoding based on a coder object."""
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 6a8aa93..8d44dff 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -436,10 +436,10 @@ class DataflowApplicationClient(object):
if not template_location:
return self.submit_job_description(job)
- else:
- logging.info('A template was just created at location %s',
- template_location)
- return None
+
+ logging.info('A template was just created at location %s',
+ template_location)
+ return None
def create_job_description(self, job):
"""Creates a job described by the workflow proto."""
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 22de5c6..1f28b26 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -493,8 +493,7 @@ def get_sdk_name_and_version():
container_version = get_required_container_version()
if container_version == BEAM_CONTAINER_VERSION:
return ('Apache Beam SDK for Python', beam_version.__version__)
- else:
- return ('Google Cloud Dataflow SDK for Python', container_version)
+ return ('Google Cloud Dataflow SDK for Python', container_version)
def get_sdk_package_name():
@@ -502,8 +501,7 @@ def get_sdk_package_name():
container_version = get_required_container_version()
if container_version == BEAM_CONTAINER_VERSION:
return BEAM_PACKAGE_NAME
- else:
- return GOOGLE_PACKAGE_NAME
+ return GOOGLE_PACKAGE_NAME
def _download_pypi_sdk_package(temp_dir):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 046313a..4cf4131 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -23,10 +23,6 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
class TestDataflowRunner(DataflowRunner):
-
- def __init__(self):
- super(TestDataflowRunner, self).__init__()
-
def run(self, pipeline):
"""Execute test pipeline and verify test matcher"""
options = pipeline.options.view_as(TestOptions)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 647b5f2..42c8095 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -127,8 +127,7 @@ class Bundle(object):
if not self._stacked:
if self._committed and not make_copy:
return self._elements
- else:
- return list(self._elements)
+ return list(self._elements)
def iterable_stacked_or_elements(elements):
for e in elements:
@@ -140,9 +139,8 @@ class Bundle(object):
if self._committed and not make_copy:
return iterable_stacked_or_elements(self._elements)
- else:
- # returns a copy.
- return [e for e in iterable_stacked_or_elements(self._elements)]
+ # returns a copy.
+ return [e for e in iterable_stacked_or_elements(self._elements)]
def has_elements(self):
return len(self._elements) > 0
@@ -171,9 +169,9 @@ class Bundle(object):
if not self._stacked:
self._elements.append(element)
return
- if (len(self._elements) > 0 and
- (isinstance(self._elements[-1], WindowedValue) or
- isinstance(self._elements[-1], Bundle.StackedWindowedValues)) and
+ if (self._elements and
+ (isinstance(self._elements[-1], (WindowedValue,
+ Bundle.StackedWindowedValues))) and
self._elements[-1].timestamp == element.timestamp and
self._elements[-1].windows == element.windows):
if isinstance(self._elements[-1], WindowedValue):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 8114104..2169c7c 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -281,11 +281,11 @@ class EvaluationContext(object):
"""
if transform:
return self._is_transform_done(transform)
- else:
- for applied_ptransform in self._step_names:
- if not self._is_transform_done(applied_ptransform):
- return False
- return True
+
+ for applied_ptransform in self._step_names:
+ if not self._is_transform_done(applied_ptransform):
+ return False
+ return True
def _is_transform_done(self, transform):
tw = self._watermark_manager.get_watermarks(transform)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index ce6356c..f6a1d7f 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -240,13 +240,6 @@ class _CompletionCallback(object):
_ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
-class _TimerCompletionCallback(_CompletionCallback):
-
- def __init__(self, evaluation_context, all_updates, timers):
- super(_TimerCompletionCallback, self).__init__(
- evaluation_context, all_updates, timers)
-
-
class TransformExecutor(ExecutorService.CallableTask):
"""TransformExecutor will evaluate a bundle using an applied ptransform.
@@ -529,7 +522,7 @@ class _ExecutorServiceParallelExecutor(object):
empty_bundle = (
self._executor.evaluation_context.create_empty_committed_bundle(
applied_ptransform.inputs[0]))
- timer_completion_callback = _TimerCompletionCallback(
+ timer_completion_callback = _CompletionCallback(
self._executor.evaluation_context, self._executor.all_updates,
applied_ptransform)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 662c61d..f34513c 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -278,13 +278,6 @@ class _TaggedReceivers(dict):
class _ParDoEvaluator(_TransformEvaluator):
"""TransformEvaluator for ParDo transform."""
-
- def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs, scoped_metrics_container):
- super(_ParDoEvaluator, self).__init__(
- evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs, scoped_metrics_container)
-
def start_bundle(self):
transform = self._applied_ptransform.transform
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 7e7ec24..6c05951 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -111,8 +111,7 @@ def group_by_key_input_visitor():
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import GroupByKey, GroupByKeyOnly
from apache_beam import typehints
- if (isinstance(transform_node.transform, GroupByKey) or
- isinstance(transform_node.transform, GroupByKeyOnly)):
+ if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
pcoll = transform_node.inputs[0]
input_type = pcoll.element_type
# If input_type is not specified, then treat it as `Any`.
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 3cac658..51302b0 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -66,11 +66,8 @@ class PipelineStateMatcher(BaseMatcher):
def retry_on_io_error_and_server_error(exception):
"""Filter allowing retries on file I/O errors and service error."""
- if isinstance(exception, IOError) or \
- (HttpError is not None and isinstance(exception, HttpError)):
- return True
- else:
- return False
+ return isinstance(exception, IOError) or \
+ (HttpError is not None and isinstance(exception, HttpError))
class FileChecksumMatcher(BaseMatcher):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index f55d46a..a4cd462 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -95,8 +95,7 @@ class MeanCombineFn(core.CombineFn):
return cy_combiners.MeanInt64Fn()
elif input_type is float:
return cy_combiners.MeanFloatFn()
- else:
- return self
+ return self
class Count(object):
@@ -310,23 +309,19 @@ class TopCombineFn(core.CombineFn):
if len(buffer) < self._n:
if not buffer:
return element_key, [element]
- else:
- buffer.append(element)
- if lt(element_key, threshold): # element_key < threshold
- return element_key, buffer
- else:
- return accumulator # with mutated buffer
+ buffer.append(element)
+ if lt(element_key, threshold): # element_key < threshold
+ return element_key, buffer
+ return accumulator # with mutated buffer
elif lt(threshold, element_key): # threshold < element_key
buffer.append(element)
if len(buffer) < self._buffer_size:
return accumulator
- else:
- self._sort_buffer(buffer, lt)
- min_element = buffer[-self._n]
- threshold = self._key_fn(min_element) if self._key_fn else min_element
- return threshold, buffer[-self._n:]
- else:
- return accumulator
+ self._sort_buffer(buffer, lt)
+ min_element = buffer[-self._n]
+ threshold = self._key_fn(min_element) if self._key_fn else min_element
+ return threshold, buffer[-self._n:]
+ return accumulator
def merge_accumulators(self, accumulators, *args, **kwargs):
accumulators = list(accumulators)
@@ -357,10 +352,6 @@ class TopCombineFn(core.CombineFn):
class Largest(TopCombineFn):
-
- def __init__(self, n):
- super(Largest, self).__init__(n)
-
def default_label(self):
return 'Largest(%s)' % self._n
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 6c101fe..af76889 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -164,10 +164,10 @@ class CombineTest(unittest.TestCase):
DisplayDataItemMatcher('fn', sampleFn.fn.__name__),
DisplayDataItemMatcher('combine_fn',
transform.fn.__class__)]
- if len(args) > 0:
+ if args:
expected_items.append(
DisplayDataItemMatcher('args', str(args)))
- if len(kwargs) > 0:
+ if kwargs:
expected_items.append(
DisplayDataItemMatcher('kwargs', str(kwargs)))
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index d8f0b1b..af6c499 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -237,8 +237,7 @@ def _unpack_positional_arg_hints(arg, hint):
if isinstance(hint, typehints.TupleConstraint):
return tuple(_unpack_positional_arg_hints(a, t)
for a, t in zip(arg, hint.tuple_types))
- else:
- return (typehints.Any,) * len(arg)
+ return (typehints.Any,) * len(arg)
return hint
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 1557d85..9b41adb 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1039,8 +1039,7 @@ def is_consistent_with(sub, base):
if isinstance(base, TypeConstraint):
if isinstance(sub, UnionConstraint):
return all(is_consistent_with(c, base) for c in sub.union_types)
- else:
- return base._consistent_with_check_(sub)
+ return base._consistent_with_check_(sub)
elif isinstance(sub, TypeConstraint):
# Nothing but object lives above any type constraints.
return base == object
[3/3] beam git commit: This closes #2539
Posted by al...@apache.org.
This closes #2539
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89ff0b14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89ff0b14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89ff0b14
Branch: refs/heads/master
Commit: 89ff0b145fac64ec6f5a38d8df708d226890c7ac
Parents: cf9ac45 bd79f4d
Author: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Authored: Fri Apr 14 13:06:32 2017 -0700
Committer: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:32 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/concat_source.py | 74 +++++++++-----------
.../apache_beam/io/filebasedsource_test.py | 2 +-
sdks/python/apache_beam/io/fileio.py | 6 +-
sdks/python/apache_beam/io/filesystems_util.py | 3 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++--
.../io/gcp/datastore/v1/datastoreio.py | 4 +-
.../apache_beam/io/gcp/datastore/v1/helper.py | 16 ++---
.../io/gcp/datastore/v1/query_splitter.py | 2 +-
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 +-
.../io/gcp/tests/bigquery_matcher.py | 3 +-
sdks/python/apache_beam/io/iobase.py | 7 +-
sdks/python/apache_beam/io/localfilesystem.py | 3 +-
sdks/python/apache_beam/io/range_trackers.py | 19 +++--
sdks/python/apache_beam/io/source_test_utils.py | 7 +-
sdks/python/apache_beam/io/textio.py | 13 ++--
sdks/python/apache_beam/metrics/cells.py | 28 ++++----
sdks/python/apache_beam/metrics/execution.py | 3 +-
sdks/python/apache_beam/metrics/metric.py | 9 +--
sdks/python/apache_beam/runners/common.py | 9 +--
.../runners/dataflow/dataflow_metrics_test.py | 3 +-
.../runners/dataflow/dataflow_runner.py | 6 +-
.../runners/dataflow/internal/apiclient.py | 8 +--
.../runners/dataflow/internal/dependency.py | 6 +-
.../runners/dataflow/test_dataflow_runner.py | 4 --
.../runners/direct/bundle_factory.py | 14 ++--
.../runners/direct/evaluation_context.py | 10 +--
.../apache_beam/runners/direct/executor.py | 9 +--
.../runners/direct/transform_evaluator.py | 7 --
sdks/python/apache_beam/runners/runner.py | 3 +-
.../apache_beam/tests/pipeline_verifiers.py | 7 +-
sdks/python/apache_beam/transforms/combiners.py | 58 +++++++--------
.../apache_beam/transforms/combiners_test.py | 4 +-
sdks/python/apache_beam/typehints/decorators.py | 3 +-
sdks/python/apache_beam/typehints/typehints.py | 3 +-
34 files changed, 149 insertions(+), 220 deletions(-)
----------------------------------------------------------------------