You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/02 17:39:25 UTC

[1/2] beam git commit: Revert clean else-return lint changes.

Repository: beam
Updated Branches:
  refs/heads/master 43443c94f -> 97dde95bc


Revert clean else-return lint changes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a907323f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a907323f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a907323f

Branch: refs/heads/master
Commit: a907323ff8c6d4dda61087a5b026703a3c6f43a8
Parents: 43443c9
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon May 1 16:44:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 10:39:05 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py     | 74 +++++++++++---------
 sdks/python/apache_beam/io/filesystems_util.py  |  3 +-
 sdks/python/apache_beam/io/gcp/bigquery.py      | 13 ++--
 sdks/python/apache_beam/io/iobase.py            |  3 +-
 sdks/python/apache_beam/io/localfilesystem.py   |  3 +-
 sdks/python/apache_beam/io/range_trackers.py    | 11 +--
 sdks/python/apache_beam/io/source_test_utils.py |  7 +-
 sdks/python/apache_beam/io/textio.py            | 13 ++--
 8 files changed, 70 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index de51f0f..1656180 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -84,7 +84,8 @@ class ConcatSource(iobase.BoundedSource):
       # Getting coder from the first sub-sources. This assumes all sub-sources
       # to produce the same coder.
       return self._source_bundles[0].source.default_output_coder()
-    return super(ConcatSource, self).default_output_coder()
+    else:
+      return super(ConcatSource, self).default_output_coder()
 
 
 class ConcatRangeTracker(iobase.RangeTracker):
@@ -164,12 +165,13 @@ class ConcatRangeTracker(iobase.RangeTracker):
         return False
       elif source_ix == self._end[0] and self._end[1] is None:
         return False
-
-      assert source_ix >= self._claimed_source_ix
-      self._claimed_source_ix = source_ix
-      if source_pos is None:
-        return True
-      return self.sub_range_tracker(source_ix).try_claim(source_pos)
+      else:
+        assert source_ix >= self._claimed_source_ix
+        self._claimed_source_ix = source_ix
+        if source_pos is None:
+          return True
+        else:
+          return self.sub_range_tracker(source_ix).try_claim(source_pos)
 
   def try_split(self, pos):
     source_ix, source_pos = pos
@@ -183,24 +185,24 @@ class ConcatRangeTracker(iobase.RangeTracker):
       elif source_ix == self._end[0] and self._end[1] is None:
         # At/after end.
         return None
-
-      if source_ix > self._claimed_source_ix:
-        # Prefer to split on even boundary.
-        split_pos = None
-        ratio = self._cumulative_weights[source_ix]
       else:
-        # Split the current subsource.
-        split = self.sub_range_tracker(source_ix).try_split(
-            source_pos)
-        if not split:
-          return None
-        split_pos, frac = split
-        ratio = self.local_to_global(source_ix, frac)
-
-      self._end = source_ix, split_pos
-      self._cumulative_weights = [min(w / ratio, 1)
-                                  for w in self._cumulative_weights]
-      return (source_ix, split_pos), ratio
+        if source_ix > self._claimed_source_ix:
+          # Prefer to split on even boundary.
+          split_pos = None
+          ratio = self._cumulative_weights[source_ix]
+        else:
+          # Split the current subsource.
+          split = self.sub_range_tracker(source_ix).try_split(
+              source_pos)
+          if not split:
+            return None
+          split_pos, frac = split
+          ratio = self.local_to_global(source_ix, frac)
+
+        self._end = source_ix, split_pos
+        self._cumulative_weights = [min(w / ratio, 1)
+                                    for w in self._cumulative_weights]
+        return (source_ix, split_pos), ratio
 
   def set_current_position(self, pos):
     raise NotImplementedError('Should only be called on sub-trackers')
@@ -210,9 +212,10 @@ class ConcatRangeTracker(iobase.RangeTracker):
     last = self._end[0] if self._end[1] is None else self._end[0] + 1
     if source_ix == last:
       return (source_ix, None)
-    return (source_ix,
-            self.sub_range_tracker(source_ix).position_at_fraction(
-                source_frac))
+    else:
+      return (source_ix,
+              self.sub_range_tracker(source_ix).position_at_fraction(
+                  source_frac))
 
   def fraction_consumed(self):
     with self._lock:
@@ -231,14 +234,15 @@ class ConcatRangeTracker(iobase.RangeTracker):
     if frac == 1:
       last = self._end[0] if self._end[1] is None else self._end[0] + 1
       return (last, None)
-    cw = self._cumulative_weights
-    # Find the last source that starts at or before frac.
-    source_ix = bisect.bisect(cw, frac) - 1
-    # Return this source, converting what's left of frac after starting
-    # this source into a value in [0.0, 1.0) representing how far we are
-    # towards the next source.
-    return (source_ix,
-            (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+    else:
+      cw = self._cumulative_weights
+      # Find the last source that starts at or before frac.
+      source_ix = bisect.bisect(cw, frac) - 1
+      # Return this source, converting what's left of frac after starting
+      # this source into a value in [0.0, 1.0) representing how far we are
+      # towards the next source.
+      return (source_ix,
+              (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
 
   def sub_range_tracker(self, source_ix):
     assert self._start[0] <= source_ix <= self._end[0]

http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
index 5034068..6d21298 100644
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -32,4 +32,5 @@ def get_filesystem(path):
           'Google Cloud Platform IO not available, '
           'please install apache_beam[gcp]')
     return GCSFileSystem()
-  return LocalFileSystem()
+  else:
+    return LocalFileSystem()

http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 0db965f..4e8d61b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -996,12 +996,13 @@ class BigQueryWrapper(object):
           % (project_id, dataset_id, table_id))
     if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
       return found_table
-    # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
-    # the table before this point.
-    return self._create_table(project_id=project_id,
-                              dataset_id=dataset_id,
-                              table_id=table_id,
-                              schema=schema or found_table.schema)
+    else:
+      # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
+      # the table before this point.
+      return self._create_table(project_id=project_id,
+                                dataset_id=dataset_id,
+                                table_id=table_id,
+                                schema=schema or found_table.schema)
 
   def run_query(self, project_id, query, use_legacy_sql, flatten_results,
                 dry_run=False):

http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 2cac67f..312542a 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -805,7 +805,8 @@ class Read(ptransform.PTransform):
   def _infer_output_coder(self, input_type=None, input_coder=None):
     if isinstance(self.source, BoundedSource):
       return self.source.default_output_coder()
-    return self.source.coder
+    else:
+      return self.source.coder
 
   def display_data(self):
     return {'source': DisplayDataItem(self.source.__class__,

http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index fbb65bf..8b2bda9 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -104,7 +104,8 @@ class LocalFileSystem(FileSystem):
     raw_file = open(path, mode)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
-    return CompressedFile(raw_file, compression_type=compression_type)
+    else:
+      return CompressedFile(raw_file, compression_type=compression_type)
 
   def create(self, path, mime_type='application/octet-stream',
              compression_type=CompressionTypes.AUTO):

http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 6e7b84f..000df81 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -354,7 +354,8 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
       if self._stop_position is None or position < self._stop_position:
         self._last_claim = position
         return True
-      return False
+      else:
+        return False
 
   def position_at_fraction(self, fraction):
     return self.fraction_to_position(
@@ -372,13 +373,15 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
             position, start=self._start_position, end=self._stop_position)
         self._stop_position = position
         return position, fraction
-      return None
+      else:
+        return None
 
   def fraction_consumed(self):
     if self._last_claim is self.UNSTARTED:
       return 0
-    return self.position_to_fraction(
-        self._last_claim, self._start_position, self._stop_position)
+    else:
+      return self.position_to_fraction(
+          self._last_claim, self._start_position, self._stop_position)
 
   def position_to_fraction(self, pos, start, end):
     """

http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 91aae33..edb6409 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -610,9 +610,10 @@ def _assert_split_at_fraction_concurrent(
   def read_or_split(test_params):
     if test_params[0]:
       return [val for val in test_params[1]]
-    position = test_params[1].position_at_fraction(test_params[2])
-    result = test_params[1].try_split(position)
-    return result
+    else:
+      position = test_params[1].position_at_fraction(test_params[2])
+      result = test_params[1].try_split(position)
+      return result
 
   inputs = []
   pool = thread_pool if thread_pool else _ThreadPool(2)

http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index f2c3d34..750ec45 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -198,9 +198,9 @@ class _TextSource(filebasedsource.FileBasedSource):
         if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r':
           # Found a '\r\n'. Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
-
-        # Found a '\n'. Accepting that as the next separator.
-        return (next_lf, next_lf + 1)
+        else:
+          # Found a '\n'. Accepting that as the next separator.
+          return (next_lf, next_lf + 1)
 
       current_pos = len(read_buffer.data)
 
@@ -256,9 +256,10 @@ class _TextSource(filebasedsource.FileBasedSource):
       # Current record should not contain the separator.
       return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]],
               sep_bounds[1] - record_start_position_in_buffer)
-    # Current record should contain the separator.
-    return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
-            sep_bounds[1] - record_start_position_in_buffer)
+    else:
+      # Current record should contain the separator.
+      return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
+              sep_bounds[1] - record_start_position_in_buffer)
 
 
 class _TextSink(fileio.FileSink):


[2/2] beam git commit: Closes #2813

Posted by ro...@apache.org.
Closes #2813


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97dde95b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97dde95b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97dde95b

Branch: refs/heads/master
Commit: 97dde95bca79c4849e9ff1cffea59adb3a846c30
Parents: 43443c9 a907323
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue May 2 10:39:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 10:39:06 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py     | 74 +++++++++++---------
 sdks/python/apache_beam/io/filesystems_util.py  |  3 +-
 sdks/python/apache_beam/io/gcp/bigquery.py      | 13 ++--
 sdks/python/apache_beam/io/iobase.py            |  3 +-
 sdks/python/apache_beam/io/localfilesystem.py   |  3 +-
 sdks/python/apache_beam/io/range_trackers.py    | 11 +--
 sdks/python/apache_beam/io/source_test_utils.py |  7 +-
 sdks/python/apache_beam/io/textio.py            | 13 ++--
 8 files changed, 70 insertions(+), 57 deletions(-)
----------------------------------------------------------------------