You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/14 20:06:50 UTC

[1/3] beam git commit: [BEAM-1964] Fix lint issues for linter upgrade -3

Repository: beam
Updated Branches:
  refs/heads/master cf9ac454d -> 89ff0b145


[BEAM-1964] Fix lint issues for linter upgrade -3


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd79f4d8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd79f4d8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd79f4d8

Branch: refs/heads/master
Commit: bd79f4d8bacba116a4c7f188cad0cdbf507d36d8
Parents: bf474a0
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Apr 14 11:22:25 2017 -0700
Committer: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:14 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py     | 74 +++++++++-----------
 .../apache_beam/io/filebasedsource_test.py      |  2 +-
 sdks/python/apache_beam/io/fileio.py            |  6 +-
 sdks/python/apache_beam/io/filesystems_util.py  |  3 +-
 sdks/python/apache_beam/io/gcp/bigquery.py      | 13 ++--
 sdks/python/apache_beam/io/iobase.py            |  7 +-
 sdks/python/apache_beam/io/localfilesystem.py   |  3 +-
 sdks/python/apache_beam/io/range_trackers.py    | 19 +++--
 sdks/python/apache_beam/io/source_test_utils.py |  7 +-
 sdks/python/apache_beam/io/textio.py            | 13 ++--
 sdks/python/apache_beam/transforms/combiners.py | 29 ++++----
 11 files changed, 81 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index 1656180..de51f0f 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -84,8 +84,7 @@ class ConcatSource(iobase.BoundedSource):
       # Getting coder from the first sub-sources. This assumes all sub-sources
       # to produce the same coder.
       return self._source_bundles[0].source.default_output_coder()
-    else:
-      return super(ConcatSource, self).default_output_coder()
+    return super(ConcatSource, self).default_output_coder()
 
 
 class ConcatRangeTracker(iobase.RangeTracker):
@@ -165,13 +164,12 @@ class ConcatRangeTracker(iobase.RangeTracker):
         return False
       elif source_ix == self._end[0] and self._end[1] is None:
         return False
-      else:
-        assert source_ix >= self._claimed_source_ix
-        self._claimed_source_ix = source_ix
-        if source_pos is None:
-          return True
-        else:
-          return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+      assert source_ix >= self._claimed_source_ix
+      self._claimed_source_ix = source_ix
+      if source_pos is None:
+        return True
+      return self.sub_range_tracker(source_ix).try_claim(source_pos)
 
   def try_split(self, pos):
     source_ix, source_pos = pos
@@ -185,24 +183,24 @@ class ConcatRangeTracker(iobase.RangeTracker):
       elif source_ix == self._end[0] and self._end[1] is None:
         # At/after end.
         return None
+
+      if source_ix > self._claimed_source_ix:
+        # Prefer to split on even boundary.
+        split_pos = None
+        ratio = self._cumulative_weights[source_ix]
       else:
-        if source_ix > self._claimed_source_ix:
-          # Prefer to split on even boundary.
-          split_pos = None
-          ratio = self._cumulative_weights[source_ix]
-        else:
-          # Split the current subsource.
-          split = self.sub_range_tracker(source_ix).try_split(
-              source_pos)
-          if not split:
-            return None
-          split_pos, frac = split
-          ratio = self.local_to_global(source_ix, frac)
-
-        self._end = source_ix, split_pos
-        self._cumulative_weights = [min(w / ratio, 1)
-                                    for w in self._cumulative_weights]
-        return (source_ix, split_pos), ratio
+        # Split the current subsource.
+        split = self.sub_range_tracker(source_ix).try_split(
+            source_pos)
+        if not split:
+          return None
+        split_pos, frac = split
+        ratio = self.local_to_global(source_ix, frac)
+
+      self._end = source_ix, split_pos
+      self._cumulative_weights = [min(w / ratio, 1)
+                                  for w in self._cumulative_weights]
+      return (source_ix, split_pos), ratio
 
   def set_current_position(self, pos):
     raise NotImplementedError('Should only be called on sub-trackers')
@@ -212,10 +210,9 @@ class ConcatRangeTracker(iobase.RangeTracker):
     last = self._end[0] if self._end[1] is None else self._end[0] + 1
     if source_ix == last:
       return (source_ix, None)
-    else:
-      return (source_ix,
-              self.sub_range_tracker(source_ix).position_at_fraction(
-                  source_frac))
+    return (source_ix,
+            self.sub_range_tracker(source_ix).position_at_fraction(
+                source_frac))
 
   def fraction_consumed(self):
     with self._lock:
@@ -234,15 +231,14 @@ class ConcatRangeTracker(iobase.RangeTracker):
     if frac == 1:
       last = self._end[0] if self._end[1] is None else self._end[0] + 1
       return (last, None)
-    else:
-      cw = self._cumulative_weights
-      # Find the last source that starts at or before frac.
-      source_ix = bisect.bisect(cw, frac) - 1
-      # Return this source, converting what's left of frac after starting
-      # this source into a value in [0.0, 1.0) representing how far we are
-      # towards the next source.
-      return (source_ix,
-              (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+    cw = self._cumulative_weights
+    # Find the last source that starts at or before frac.
+    source_ix = bisect.bisect(cw, frac) - 1
+    # Return this source, converting what's left of frac after starting
+    # this source into a value in [0.0, 1.0) representing how far we are
+    # towards the next source.
+    return (source_ix,
+            (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
 
   def sub_range_tracker(self, source_ix):
     assert self._start[0] <= source_ix <= self._end[0]

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7b7ec8a..24a31b1 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -115,10 +115,10 @@ def _write_prepared_data(data, directory=None,
 
 
 def write_prepared_pattern(data, suffixes=None):
+  assert data, 'Data (%s) seems to be empty' % data
   if suffixes is None:
     suffixes = [''] * len(data)
   temp_dir = tempfile.mkdtemp()
-  assert len(data) > 0
   for i, d in enumerate(data):
     file_name = _write_prepared_data(d, temp_dir, prefix='mytemp',
                                      suffix=suffixes[i])

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index b128dc5..dc8957e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -65,7 +65,7 @@ class ChannelFactory(object):
   def rename_batch(src_dest_pairs):
     sources = [s for s, _ in src_dest_pairs]
     destinations = [d for _, d in src_dest_pairs]
-    if len(sources) == 0:
+    if not sources:
       return []
     bfs = get_filesystem(sources[0])
     try:
@@ -165,7 +165,7 @@ class FileSink(iobase.Sink):
 
     if shard_name_template is None:
       shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
-    elif shard_name_template is '':
+    elif shard_name_template == '':
       num_shards = 1
     self.file_path_prefix = file_path_prefix
     self.file_name_suffix = file_name_suffix
@@ -275,7 +275,7 @@ class FileSink(iobase.Sink):
         return exceptions
       except BeamIOError as exp:
         if exp.exception_details is None:
-          raise exp
+          raise
         for (src, dest), exception in exp.exception_details.iteritems():
           if exception:
             logging.warning('Rename not successful: %s -> %s, %s', src, dest,

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
index 6d21298..5034068 100644
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -32,5 +32,4 @@ def get_filesystem(path):
           'Google Cloud Platform IO not available, '
           'please install apache_beam[gcp]')
     return GCSFileSystem()
-  else:
-    return LocalFileSystem()
+  return LocalFileSystem()

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9a8174a..25f544d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -967,13 +967,12 @@ class BigQueryWrapper(object):
           % (project_id, dataset_id, table_id))
     if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
       return found_table
-    else:
-      # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
-      # the table before this point.
-      return self._create_table(project_id=project_id,
-                                dataset_id=dataset_id,
-                                table_id=table_id,
-                                schema=schema or found_table.schema)
+    # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
+    # the table before this point.
+    return self._create_table(project_id=project_id,
+                              dataset_id=dataset_id,
+                              table_id=table_id,
+                              schema=schema or found_table.schema)
 
   def run_query(self, project_id, query, use_legacy_sql, flatten_results,
                 dry_run=False):

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 512824b..d9df5c4 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -805,8 +805,7 @@ class Read(ptransform.PTransform):
   def _infer_output_coder(self, input_type=None, input_coder=None):
     if isinstance(self.source, BoundedSource):
       return self.source.default_output_coder()
-    else:
-      return self.source.coder
+    return self.source.coder
 
   def display_data(self):
     return {'source': DisplayDataItem(self.source.__class__,
@@ -945,8 +944,8 @@ class _WriteKeyedBundleDoFn(core.DoFn):
   def process(self, element, init_result):
     bundle = element
     writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
-    for element in bundle[1]:  # values
-      writer.write(element)
+    for e in bundle[1]:  # values
+      writer.write(e)
     return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)]
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 46589b0..7637f2a 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -93,8 +93,7 @@ class LocalFileSystem(FileSystem):
     raw_file = open(path, mode)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
-    else:
-      return CompressedFile(raw_file, compression_type=compression_type)
+    return CompressedFile(raw_file, compression_type=compression_type)
 
   def create(self, path, mime_type='application/octet-stream',
              compression_type=CompressionTypes.AUTO):

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 8627f76..6e7b84f 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -42,9 +42,9 @@ class OffsetRangeTracker(iobase.RangeTracker):
       raise ValueError('Start offset must not be \'None\'')
     if end is None:
       raise ValueError('End offset must not be \'None\'')
-    assert isinstance(start, int) or isinstance(start, long)
+    assert isinstance(start, (int, long))
     if end != self.OFFSET_INFINITY:
-      assert isinstance(end, int) or isinstance(end, long)
+      assert isinstance(end, (int, long))
 
     assert start <= end
 
@@ -91,8 +91,8 @@ class OffsetRangeTracker(iobase.RangeTracker):
           'The first record [starting at %d] must be at a split point' %
           record_start)
 
-    if (split_point and self._offset_of_last_split_point is not -1 and
-        record_start is self._offset_of_last_split_point):
+    if (split_point and self._offset_of_last_split_point != -1 and
+        record_start == self._offset_of_last_split_point):
       raise ValueError(
           'Record at a split point has same offset as the previous split '
           'point: %d' % record_start)
@@ -354,8 +354,7 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
       if self._stop_position is None or position < self._stop_position:
         self._last_claim = position
         return True
-      else:
-        return False
+      return False
 
   def position_at_fraction(self, fraction):
     return self.fraction_to_position(
@@ -373,15 +372,13 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
             position, start=self._start_position, end=self._stop_position)
         self._stop_position = position
         return position, fraction
-      else:
-        return None
+      return None
 
   def fraction_consumed(self):
     if self._last_claim is self.UNSTARTED:
       return 0
-    else:
-      return self.position_to_fraction(
-          self._last_claim, self._start_position, self._stop_position)
+    return self.position_to_fraction(
+        self._last_claim, self._start_position, self._stop_position)
 
   def position_to_fraction(self, pos, start, end):
     """

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 07de738..542e9f6 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -611,10 +611,9 @@ def _assertSplitAtFractionConcurrent(
   def read_or_split(test_params):
     if test_params[0]:
       return [val for val in test_params[1]]
-    else:
-      position = test_params[1].position_at_fraction(test_params[2])
-      result = test_params[1].try_split(position)
-      return result
+    position = test_params[1].position_at_fraction(test_params[2])
+    result = test_params[1].try_split(position)
+    return result
 
   inputs = []
   pool = thread_pool if thread_pool else _ThreadPool(2)

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9217e74..b6a24b0 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -198,9 +198,9 @@ class _TextSource(filebasedsource.FileBasedSource):
         if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r':
           # Found a '\r\n'. Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
-        else:
-          # Found a '\n'. Accepting that as the next separator.
-          return (next_lf, next_lf + 1)
+
+        # Found a '\n'. Accepting that as the next separator.
+        return (next_lf, next_lf + 1)
 
       current_pos = len(read_buffer.data)
 
@@ -256,10 +256,9 @@ class _TextSource(filebasedsource.FileBasedSource):
       # Current record should not contain the separator.
       return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]],
               sep_bounds[1] - record_start_position_in_buffer)
-    else:
-      # Current record should contain the separator.
-      return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
-              sep_bounds[1] - record_start_position_in_buffer)
+    # Current record should contain the separator.
+    return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
+            sep_bounds[1] - record_start_position_in_buffer)
 
 
 class _TextSink(fileio.FileSink):

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index a4cd462..f812832 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -531,27 +531,26 @@ def curry_combine_fn(fn, args, kwargs):
   if not args and not kwargs:
     return fn
 
-  else:
+  # Create CurriedFn class for the combiner
+  class CurriedFn(core.CombineFn):
+    """CombineFn that applies extra arguments."""
 
-    class CurriedFn(core.CombineFn):
-      """CombineFn that applies extra arguments."""
+    def create_accumulator(self):
+      return fn.create_accumulator(*args, **kwargs)
 
-      def create_accumulator(self):
-        return fn.create_accumulator(*args, **kwargs)
+    def add_input(self, accumulator, element):
+      return fn.add_input(accumulator, element, *args, **kwargs)
 
-      def add_input(self, accumulator, element):
-        return fn.add_input(accumulator, element, *args, **kwargs)
+    def merge_accumulators(self, accumulators):
+      return fn.merge_accumulators(accumulators, *args, **kwargs)
 
-      def merge_accumulators(self, accumulators):
-        return fn.merge_accumulators(accumulators, *args, **kwargs)
+    def extract_output(self, accumulator):
+      return fn.extract_output(accumulator, *args, **kwargs)
 
-      def extract_output(self, accumulator):
-        return fn.extract_output(accumulator, *args, **kwargs)
+    def apply(self, elements):
+      return fn.apply(elements, *args, **kwargs)
 
-      def apply(self, elements):
-        return fn.apply(elements, *args, **kwargs)
-
-    return CurriedFn()
+  return CurriedFn()
 
 
 class PhasedCombineFnExecutor(object):


[2/3] beam git commit: [BEAM-1964] Fix lint issues for linter upgrade -2

Posted by al...@apache.org.
[BEAM-1964] Fix lint issues for linter upgrade -2


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf474a0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf474a0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf474a0b

Branch: refs/heads/master
Commit: bf474a0b72beb2e946be39ce04e3f07800a3b307
Parents: cf9ac45
Author: Sourabh Bajaj <so...@google.com>
Authored: Thu Apr 13 17:19:56 2017 -0700
Committer: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:14 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/datastore/v1/datastoreio.py          |  4 +--
 .../apache_beam/io/gcp/datastore/v1/helper.py   | 16 ++++-------
 .../io/gcp/datastore/v1/query_splitter.py       |  2 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  3 +-
 .../io/gcp/tests/bigquery_matcher.py            |  3 +-
 sdks/python/apache_beam/metrics/cells.py        | 28 +++++++++----------
 sdks/python/apache_beam/metrics/execution.py    |  3 +-
 sdks/python/apache_beam/metrics/metric.py       |  9 ++----
 sdks/python/apache_beam/runners/common.py       |  9 ++----
 .../runners/dataflow/dataflow_metrics_test.py   |  3 +-
 .../runners/dataflow/dataflow_runner.py         |  6 ++--
 .../runners/dataflow/internal/apiclient.py      |  8 +++---
 .../runners/dataflow/internal/dependency.py     |  6 ++--
 .../runners/dataflow/test_dataflow_runner.py    |  4 ---
 .../runners/direct/bundle_factory.py            | 14 ++++------
 .../runners/direct/evaluation_context.py        | 10 +++----
 .../apache_beam/runners/direct/executor.py      |  9 +-----
 .../runners/direct/transform_evaluator.py       |  7 -----
 sdks/python/apache_beam/runners/runner.py       |  3 +-
 .../apache_beam/tests/pipeline_verifiers.py     |  7 ++---
 sdks/python/apache_beam/transforms/combiners.py | 29 +++++++-------------
 .../apache_beam/transforms/combiners_test.py    |  4 +--
 sdks/python/apache_beam/typehints/decorators.py |  3 +-
 sdks/python/apache_beam/typehints/typehints.py  |  3 +-
 24 files changed, 68 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index e8ca05d..d9b3598 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -253,7 +253,7 @@ class ReadFromDatastore(PTransform):
     query = helper.make_latest_timestamp_query(namespace)
     req = helper.make_request(project, namespace, query)
     resp = datastore.run_query(req)
-    if len(resp.batch.entity_results) == 0:
+    if not resp.batch.entity_results:
       raise RuntimeError("Datastore total statistics unavailable.")
 
     entity = resp.batch.entity_results[0].entity
@@ -281,7 +281,7 @@ class ReadFromDatastore(PTransform):
 
     req = helper.make_request(project, namespace, kind_stats_query)
     resp = datastore.run_query(req)
-    if len(resp.batch.entity_results) == 0:
+    if not resp.batch.entity_results:
       raise RuntimeError("Datastore statistics for kind %s unavailable" % kind)
 
     entity = resp.batch.entity_results[0].entity

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index b1ef9af..d544226 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -62,8 +62,7 @@ def key_comparator(k1, k2):
   k2_path = next(k2_iter, None)
   if k2_path:
     return -1
-  else:
-    return 0
+  return 0
 
 
 def compare_path(p1, p2):
@@ -99,8 +98,7 @@ def str_compare(s1, s2):
     return 0
   elif s1 < s2:
     return -1
-  else:
-    return 1
+  return 1
 
 
 def get_datastore(project):
@@ -131,13 +129,9 @@ def make_partition(project, namespace):
 def retry_on_rpc_error(exception):
   """A retry filter for Cloud Datastore RPCErrors."""
   if isinstance(exception, RPCError):
-    if exception.code >= 500:
-      return True
-    else:
-      return False
-  else:
-    # TODO(vikasrk): Figure out what other errors should be retried.
-    return False
+    return exception.code >= 500
+  # TODO(vikasrk): Figure out what other errors should be retried.
+  return False
 
 
 def fetch_entities(project, namespace, query, datastore):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
index 8ced170..d5674f9 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
@@ -97,7 +97,7 @@ def _validate_query(query):
   if len(query.kind) != 1:
     raise ValueError('Query must have exactly one kind.')
 
-  if len(query.order) != 0:
+  if query.order:
     raise ValueError('Query cannot have any sort orders.')
 
   if query.HasField('limit'):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index b2bc809..a10a3d2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -93,8 +93,7 @@ class GCSFileSystem(FileSystem):
     raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
-    else:
-      return CompressedFile(raw_file, compression_type=compression_type)
+    return CompressedFile(raw_file, compression_type=compression_type)
 
   def create(self, path, mime_type='application/octet-stream',
              compression_type=CompressionTypes.AUTO):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index cc26689..66d99b3 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -38,8 +38,7 @@ MAX_RETRIES = 4
 
 def retry_on_http_and_value_error(exception):
   """Filter allowing retries on Bigquery errors and value error."""
-  return isinstance(exception, GoogleCloudError) or \
-          isinstance(exception, ValueError)
+  return isinstance(exception, (GoogleCloudError, ValueError))
 
 
 class BigqueryMatcher(BaseMatcher):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index 5a571f5..c421949 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -97,9 +97,8 @@ class CellCommitState(object):
     with self._lock:
       if self._state == CellCommitState.CLEAN:
         return False
-      else:
-        self._state = CellCommitState.COMMITTING
-        return True
+      self._state = CellCommitState.COMMITTING
+      return True
 
 
 class MetricCell(object):
@@ -218,8 +217,7 @@ class DistributionResult(object):
     """
     if self.data.count == 0:
       return None
-    else:
-      return float(self.data.sum)/self.data.count
+    return float(self.data.sum)/self.data.count
 
 
 class DistributionData(object):
@@ -257,16 +255,16 @@ class DistributionData(object):
   def combine(self, other):
     if other is None:
       return self
-    else:
-      new_min = (None if self.min is None and other.min is None else
-                 min(x for x in (self.min, other.min) if x is not None))
-      new_max = (None if self.max is None and other.max is None else
-                 max(x for x in (self.max, other.max) if x is not None))
-      return DistributionData(
-          self.sum + other.sum,
-          self.count + other.count,
-          new_min,
-          new_max)
+
+    new_min = (None if self.min is None and other.min is None else
+               min(x for x in (self.min, other.min) if x is not None))
+    new_max = (None if self.max is None and other.max is None else
+               max(x for x in (self.max, other.max) if x is not None))
+    return DistributionData(
+        self.sum + other.sum,
+        self.count + other.count,
+        new_min,
+        new_max)
 
   @classmethod
   def singleton(cls, value):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index f6c8990..887423b 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -129,8 +129,7 @@ class _MetricsEnvironment(object):
     index = len(self.PER_THREAD.container) - 1
     if index < 0:
       return None
-    else:
-      return self.PER_THREAD.container[index]
+    return self.PER_THREAD.container[index]
 
   def set_current_container(self, container):
     self.set_container_stack()

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index f6a0923..33db4e1 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -103,8 +103,7 @@ class MetricResults(object):
         (filter.names and
          metric_key.metric.name in filter.names)):
       return True
-    else:
-      return False
+    return False
 
   @staticmethod
   def _matches_sub_path(actual_scope, filter_scope):
@@ -117,8 +116,7 @@ class MetricResults(object):
       return False  # The first entry was not exactly matched
     elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/':
       return False  # The last entry was not exactly matched
-    else:
-      return True
+    return True
 
   @staticmethod
   def _matches_scope(filter, metric_key):
@@ -139,8 +137,7 @@ class MetricResults(object):
     if (MetricResults._matches_name(filter, metric_key) and
         MetricResults._matches_scope(filter, metric_key)):
       return True
-    else:
-      return False
+    return False
 
   def query(self, filter=None):
     raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 2c1032d..8f86b75 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -414,10 +414,8 @@ def get_logging_context(maybe_logger, **kwargs):
     maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs)
     if isinstance(maybe_context, LoggingContext):
       return maybe_context
-    else:
-      return _LoggingContextAdapter(maybe_context)
-  else:
-    return LoggingContext()
+    return _LoggingContextAdapter(maybe_context)
+  return LoggingContext()
 
 
 class _ReceiverAdapter(Receiver):
@@ -432,5 +430,4 @@ class _ReceiverAdapter(Receiver):
 def as_receiver(maybe_receiver):
   if isinstance(maybe_receiver, Receiver):
     return maybe_receiver
-  else:
-    return _ReceiverAdapter(maybe_receiver)
+  return _ReceiverAdapter(maybe_receiver)

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 95027a3..ffee3e5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -38,8 +38,7 @@ class DictToObject(object):
   def _wrap(self, value):
     if isinstance(value, (tuple, list, set, frozenset)):
       return type(value)([self._wrap(v) for v in value])
-    else:
-      return DictToObject(value) if isinstance(value, dict) else value
+    return DictToObject(value) if isinstance(value, dict) else value
 
 
 class TestDataflowMetrics(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a92c26..2e9fc52 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -92,8 +92,7 @@ class DataflowRunner(PipelineRunner):
         return -1
       elif 'Traceback' in msg:
         return 1
-      else:
-        return 0
+      return 0
 
     job_id = result.job_id()
     while True:
@@ -194,8 +193,7 @@ class DataflowRunner(PipelineRunner):
       return coders.WindowedValueCoder(
           coders.registry.get_coder(typehint),
           window_coder=window_coder)
-    else:
-      return coders.registry.get_coder(typehint)
+    return coders.registry.get_coder(typehint)
 
   def _get_cloud_encoding(self, coder):
     """Returns an encoding based on a coder object."""

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 6a8aa93..8d44dff 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -436,10 +436,10 @@ class DataflowApplicationClient(object):
 
     if not template_location:
       return self.submit_job_description(job)
-    else:
-      logging.info('A template was just created at location %s',
-                   template_location)
-      return None
+
+    logging.info('A template was just created at location %s',
+                 template_location)
+    return None
 
   def create_job_description(self, job):
     """Creates a job described by the workflow proto."""

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 22de5c6..1f28b26 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -493,8 +493,7 @@ def get_sdk_name_and_version():
   container_version = get_required_container_version()
   if container_version == BEAM_CONTAINER_VERSION:
     return ('Apache Beam SDK for Python', beam_version.__version__)
-  else:
-    return ('Google Cloud Dataflow SDK for Python', container_version)
+  return ('Google Cloud Dataflow SDK for Python', container_version)
 
 
 def get_sdk_package_name():
@@ -502,8 +501,7 @@ def get_sdk_package_name():
   container_version = get_required_container_version()
   if container_version == BEAM_CONTAINER_VERSION:
     return BEAM_PACKAGE_NAME
-  else:
-    return GOOGLE_PACKAGE_NAME
+  return GOOGLE_PACKAGE_NAME
 
 
 def _download_pypi_sdk_package(temp_dir):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 046313a..4cf4131 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -23,10 +23,6 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
 
 
 class TestDataflowRunner(DataflowRunner):
-
-  def __init__(self):
-    super(TestDataflowRunner, self).__init__()
-
   def run(self, pipeline):
     """Execute test pipeline and verify test matcher"""
     options = pipeline.options.view_as(TestOptions)

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 647b5f2..42c8095 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -127,8 +127,7 @@ class Bundle(object):
     if not self._stacked:
       if self._committed and not make_copy:
         return self._elements
-      else:
-        return list(self._elements)
+      return list(self._elements)
 
     def iterable_stacked_or_elements(elements):
       for e in elements:
@@ -140,9 +139,8 @@ class Bundle(object):
 
     if self._committed and not make_copy:
       return iterable_stacked_or_elements(self._elements)
-    else:
-      # returns a copy.
-      return [e for e in iterable_stacked_or_elements(self._elements)]
+    # returns a copy.
+    return [e for e in iterable_stacked_or_elements(self._elements)]
 
   def has_elements(self):
     return len(self._elements) > 0
@@ -171,9 +169,9 @@ class Bundle(object):
     if not self._stacked:
       self._elements.append(element)
       return
-    if (len(self._elements) > 0 and
-        (isinstance(self._elements[-1], WindowedValue) or
-         isinstance(self._elements[-1], Bundle.StackedWindowedValues)) and
+    if (self._elements and
+        (isinstance(self._elements[-1], (WindowedValue,
+                                         Bundle.StackedWindowedValues))) and
         self._elements[-1].timestamp == element.timestamp and
         self._elements[-1].windows == element.windows):
       if isinstance(self._elements[-1], WindowedValue):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 8114104..2169c7c 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -281,11 +281,11 @@ class EvaluationContext(object):
     """
     if transform:
       return self._is_transform_done(transform)
-    else:
-      for applied_ptransform in self._step_names:
-        if not self._is_transform_done(applied_ptransform):
-          return False
-      return True
+
+    for applied_ptransform in self._step_names:
+      if not self._is_transform_done(applied_ptransform):
+        return False
+    return True
 
   def _is_transform_done(self, transform):
     tw = self._watermark_manager.get_watermarks(transform)

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index ce6356c..f6a1d7f 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -240,13 +240,6 @@ class _CompletionCallback(object):
         _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
 
 
-class _TimerCompletionCallback(_CompletionCallback):
-
-  def __init__(self, evaluation_context, all_updates, timers):
-    super(_TimerCompletionCallback, self).__init__(
-        evaluation_context, all_updates, timers)
-
-
 class TransformExecutor(ExecutorService.CallableTask):
   """TransformExecutor will evaluate a bundle using an applied ptransform.
 
@@ -529,7 +522,7 @@ class _ExecutorServiceParallelExecutor(object):
         empty_bundle = (
             self._executor.evaluation_context.create_empty_committed_bundle(
                 applied_ptransform.inputs[0]))
-        timer_completion_callback = _TimerCompletionCallback(
+        timer_completion_callback = _CompletionCallback(
             self._executor.evaluation_context, self._executor.all_updates,
             applied_ptransform)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 662c61d..f34513c 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -278,13 +278,6 @@ class _TaggedReceivers(dict):
 
 class _ParDoEvaluator(_TransformEvaluator):
   """TransformEvaluator for ParDo transform."""
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    super(_ParDoEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
   def start_bundle(self):
     transform = self._applied_ptransform.transform
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 7e7ec24..6c05951 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -111,8 +111,7 @@ def group_by_key_input_visitor():
       # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam import GroupByKey, GroupByKeyOnly
       from apache_beam import typehints
-      if (isinstance(transform_node.transform, GroupByKey) or
-          isinstance(transform_node.transform, GroupByKeyOnly)):
+      if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
         pcoll = transform_node.inputs[0]
         input_type = pcoll.element_type
         # If input_type is not specified, then treat it as `Any`.

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 3cac658..51302b0 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -66,11 +66,8 @@ class PipelineStateMatcher(BaseMatcher):
 
 def retry_on_io_error_and_server_error(exception):
   """Filter allowing retries on file I/O errors and service error."""
-  if isinstance(exception, IOError) or \
-          (HttpError is not None and isinstance(exception, HttpError)):
-    return True
-  else:
-    return False
+  return isinstance(exception, IOError) or \
+          (HttpError is not None and isinstance(exception, HttpError))
 
 
 class FileChecksumMatcher(BaseMatcher):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index f55d46a..a4cd462 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -95,8 +95,7 @@ class MeanCombineFn(core.CombineFn):
       return cy_combiners.MeanInt64Fn()
     elif input_type is float:
       return cy_combiners.MeanFloatFn()
-    else:
-      return self
+    return self
 
 
 class Count(object):
@@ -310,23 +309,19 @@ class TopCombineFn(core.CombineFn):
     if len(buffer) < self._n:
       if not buffer:
         return element_key, [element]
-      else:
-        buffer.append(element)
-        if lt(element_key, threshold):  # element_key < threshold
-          return element_key, buffer
-        else:
-          return accumulator  # with mutated buffer
+      buffer.append(element)
+      if lt(element_key, threshold):  # element_key < threshold
+        return element_key, buffer
+      return accumulator  # with mutated buffer
     elif lt(threshold, element_key):  # threshold < element_key
       buffer.append(element)
       if len(buffer) < self._buffer_size:
         return accumulator
-      else:
-        self._sort_buffer(buffer, lt)
-        min_element = buffer[-self._n]
-        threshold = self._key_fn(min_element) if self._key_fn else min_element
-        return threshold, buffer[-self._n:]
-    else:
-      return accumulator
+      self._sort_buffer(buffer, lt)
+      min_element = buffer[-self._n]
+      threshold = self._key_fn(min_element) if self._key_fn else min_element
+      return threshold, buffer[-self._n:]
+    return accumulator
 
   def merge_accumulators(self, accumulators, *args, **kwargs):
     accumulators = list(accumulators)
@@ -357,10 +352,6 @@ class TopCombineFn(core.CombineFn):
 
 
 class Largest(TopCombineFn):
-
-  def __init__(self, n):
-    super(Largest, self).__init__(n)
-
   def default_label(self):
     return 'Largest(%s)' % self._n
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 6c101fe..af76889 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -164,10 +164,10 @@ class CombineTest(unittest.TestCase):
             DisplayDataItemMatcher('fn', sampleFn.fn.__name__),
             DisplayDataItemMatcher('combine_fn',
                                    transform.fn.__class__)]
-        if len(args) > 0:
+        if args:
           expected_items.append(
               DisplayDataItemMatcher('args', str(args)))
-        if len(kwargs) > 0:
+        if kwargs:
           expected_items.append(
               DisplayDataItemMatcher('kwargs', str(kwargs)))
         hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index d8f0b1b..af6c499 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -237,8 +237,7 @@ def _unpack_positional_arg_hints(arg, hint):
     if isinstance(hint, typehints.TupleConstraint):
       return tuple(_unpack_positional_arg_hints(a, t)
                    for a, t in zip(arg, hint.tuple_types))
-    else:
-      return (typehints.Any,) * len(arg)
+    return (typehints.Any,) * len(arg)
   return hint
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 1557d85..9b41adb 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1039,8 +1039,7 @@ def is_consistent_with(sub, base):
   if isinstance(base, TypeConstraint):
     if isinstance(sub, UnionConstraint):
       return all(is_consistent_with(c, base) for c in sub.union_types)
-    else:
-      return base._consistent_with_check_(sub)
+    return base._consistent_with_check_(sub)
   elif isinstance(sub, TypeConstraint):
     # Nothing but object lives above any type constraints.
     return base == object


[3/3] beam git commit: This closes #2539

Posted by al...@apache.org.
This closes #2539


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89ff0b14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89ff0b14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89ff0b14

Branch: refs/heads/master
Commit: 89ff0b145fac64ec6f5a38d8df708d226890c7ac
Parents: cf9ac45 bd79f4d
Author: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Authored: Fri Apr 14 13:06:32 2017 -0700
Committer: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:32 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py     | 74 +++++++++-----------
 .../apache_beam/io/filebasedsource_test.py      |  2 +-
 sdks/python/apache_beam/io/fileio.py            |  6 +-
 sdks/python/apache_beam/io/filesystems_util.py  |  3 +-
 sdks/python/apache_beam/io/gcp/bigquery.py      | 13 ++--
 .../io/gcp/datastore/v1/datastoreio.py          |  4 +-
 .../apache_beam/io/gcp/datastore/v1/helper.py   | 16 ++---
 .../io/gcp/datastore/v1/query_splitter.py       |  2 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  3 +-
 .../io/gcp/tests/bigquery_matcher.py            |  3 +-
 sdks/python/apache_beam/io/iobase.py            |  7 +-
 sdks/python/apache_beam/io/localfilesystem.py   |  3 +-
 sdks/python/apache_beam/io/range_trackers.py    | 19 +++--
 sdks/python/apache_beam/io/source_test_utils.py |  7 +-
 sdks/python/apache_beam/io/textio.py            | 13 ++--
 sdks/python/apache_beam/metrics/cells.py        | 28 ++++----
 sdks/python/apache_beam/metrics/execution.py    |  3 +-
 sdks/python/apache_beam/metrics/metric.py       |  9 +--
 sdks/python/apache_beam/runners/common.py       |  9 +--
 .../runners/dataflow/dataflow_metrics_test.py   |  3 +-
 .../runners/dataflow/dataflow_runner.py         |  6 +-
 .../runners/dataflow/internal/apiclient.py      |  8 +--
 .../runners/dataflow/internal/dependency.py     |  6 +-
 .../runners/dataflow/test_dataflow_runner.py    |  4 --
 .../runners/direct/bundle_factory.py            | 14 ++--
 .../runners/direct/evaluation_context.py        | 10 +--
 .../apache_beam/runners/direct/executor.py      |  9 +--
 .../runners/direct/transform_evaluator.py       |  7 --
 sdks/python/apache_beam/runners/runner.py       |  3 +-
 .../apache_beam/tests/pipeline_verifiers.py     |  7 +-
 sdks/python/apache_beam/transforms/combiners.py | 58 +++++++--------
 .../apache_beam/transforms/combiners_test.py    |  4 +-
 sdks/python/apache_beam/typehints/decorators.py |  3 +-
 sdks/python/apache_beam/typehints/typehints.py  |  3 +-
 34 files changed, 149 insertions(+), 220 deletions(-)
----------------------------------------------------------------------