You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/05/02 17:39:25 UTC
[1/2] beam git commit: Revert clean else-return lint changes.
Repository: beam
Updated Branches:
refs/heads/master 43443c94f -> 97dde95bc
Revert clean else-return lint changes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a907323f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a907323f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a907323f
Branch: refs/heads/master
Commit: a907323ff8c6d4dda61087a5b026703a3c6f43a8
Parents: 43443c9
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon May 1 16:44:55 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 10:39:05 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/concat_source.py | 74 +++++++++++---------
sdks/python/apache_beam/io/filesystems_util.py | 3 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++--
sdks/python/apache_beam/io/iobase.py | 3 +-
sdks/python/apache_beam/io/localfilesystem.py | 3 +-
sdks/python/apache_beam/io/range_trackers.py | 11 +--
sdks/python/apache_beam/io/source_test_utils.py | 7 +-
sdks/python/apache_beam/io/textio.py | 13 ++--
8 files changed, 70 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index de51f0f..1656180 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -84,7 +84,8 @@ class ConcatSource(iobase.BoundedSource):
# Getting coder from the first sub-sources. This assumes all sub-sources
# to produce the same coder.
return self._source_bundles[0].source.default_output_coder()
- return super(ConcatSource, self).default_output_coder()
+ else:
+ return super(ConcatSource, self).default_output_coder()
class ConcatRangeTracker(iobase.RangeTracker):
@@ -164,12 +165,13 @@ class ConcatRangeTracker(iobase.RangeTracker):
return False
elif source_ix == self._end[0] and self._end[1] is None:
return False
-
- assert source_ix >= self._claimed_source_ix
- self._claimed_source_ix = source_ix
- if source_pos is None:
- return True
- return self.sub_range_tracker(source_ix).try_claim(source_pos)
+ else:
+ assert source_ix >= self._claimed_source_ix
+ self._claimed_source_ix = source_ix
+ if source_pos is None:
+ return True
+ else:
+ return self.sub_range_tracker(source_ix).try_claim(source_pos)
def try_split(self, pos):
source_ix, source_pos = pos
@@ -183,24 +185,24 @@ class ConcatRangeTracker(iobase.RangeTracker):
elif source_ix == self._end[0] and self._end[1] is None:
# At/after end.
return None
-
- if source_ix > self._claimed_source_ix:
- # Prefer to split on even boundary.
- split_pos = None
- ratio = self._cumulative_weights[source_ix]
else:
- # Split the current subsource.
- split = self.sub_range_tracker(source_ix).try_split(
- source_pos)
- if not split:
- return None
- split_pos, frac = split
- ratio = self.local_to_global(source_ix, frac)
-
- self._end = source_ix, split_pos
- self._cumulative_weights = [min(w / ratio, 1)
- for w in self._cumulative_weights]
- return (source_ix, split_pos), ratio
+ if source_ix > self._claimed_source_ix:
+ # Prefer to split on even boundary.
+ split_pos = None
+ ratio = self._cumulative_weights[source_ix]
+ else:
+ # Split the current subsource.
+ split = self.sub_range_tracker(source_ix).try_split(
+ source_pos)
+ if not split:
+ return None
+ split_pos, frac = split
+ ratio = self.local_to_global(source_ix, frac)
+
+ self._end = source_ix, split_pos
+ self._cumulative_weights = [min(w / ratio, 1)
+ for w in self._cumulative_weights]
+ return (source_ix, split_pos), ratio
def set_current_position(self, pos):
raise NotImplementedError('Should only be called on sub-trackers')
@@ -210,9 +212,10 @@ class ConcatRangeTracker(iobase.RangeTracker):
last = self._end[0] if self._end[1] is None else self._end[0] + 1
if source_ix == last:
return (source_ix, None)
- return (source_ix,
- self.sub_range_tracker(source_ix).position_at_fraction(
- source_frac))
+ else:
+ return (source_ix,
+ self.sub_range_tracker(source_ix).position_at_fraction(
+ source_frac))
def fraction_consumed(self):
with self._lock:
@@ -231,14 +234,15 @@ class ConcatRangeTracker(iobase.RangeTracker):
if frac == 1:
last = self._end[0] if self._end[1] is None else self._end[0] + 1
return (last, None)
- cw = self._cumulative_weights
- # Find the last source that starts at or before frac.
- source_ix = bisect.bisect(cw, frac) - 1
- # Return this source, converting what's left of frac after starting
- # this source into a value in [0.0, 1.0) representing how far we are
- # towards the next source.
- return (source_ix,
- (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+ else:
+ cw = self._cumulative_weights
+ # Find the last source that starts at or before frac.
+ source_ix = bisect.bisect(cw, frac) - 1
+ # Return this source, converting what's left of frac after starting
+ # this source into a value in [0.0, 1.0) representing how far we are
+ # towards the next source.
+ return (source_ix,
+ (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
def sub_range_tracker(self, source_ix):
assert self._start[0] <= source_ix <= self._end[0]
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
index 5034068..6d21298 100644
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -32,4 +32,5 @@ def get_filesystem(path):
'Google Cloud Platform IO not available, '
'please install apache_beam[gcp]')
return GCSFileSystem()
- return LocalFileSystem()
+ else:
+ return LocalFileSystem()
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 0db965f..4e8d61b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -996,12 +996,13 @@ class BigQueryWrapper(object):
% (project_id, dataset_id, table_id))
if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
return found_table
- # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
- # the table before this point.
- return self._create_table(project_id=project_id,
- dataset_id=dataset_id,
- table_id=table_id,
- schema=schema or found_table.schema)
+ else:
+ # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
+ # the table before this point.
+ return self._create_table(project_id=project_id,
+ dataset_id=dataset_id,
+ table_id=table_id,
+ schema=schema or found_table.schema)
def run_query(self, project_id, query, use_legacy_sql, flatten_results,
dry_run=False):
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 2cac67f..312542a 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -805,7 +805,8 @@ class Read(ptransform.PTransform):
def _infer_output_coder(self, input_type=None, input_coder=None):
if isinstance(self.source, BoundedSource):
return self.source.default_output_coder()
- return self.source.coder
+ else:
+ return self.source.coder
def display_data(self):
return {'source': DisplayDataItem(self.source.__class__,
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index fbb65bf..8b2bda9 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -104,7 +104,8 @@ class LocalFileSystem(FileSystem):
raw_file = open(path, mode)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
- return CompressedFile(raw_file, compression_type=compression_type)
+ else:
+ return CompressedFile(raw_file, compression_type=compression_type)
def create(self, path, mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 6e7b84f..000df81 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -354,7 +354,8 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
if self._stop_position is None or position < self._stop_position:
self._last_claim = position
return True
- return False
+ else:
+ return False
def position_at_fraction(self, fraction):
return self.fraction_to_position(
@@ -372,13 +373,15 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
position, start=self._start_position, end=self._stop_position)
self._stop_position = position
return position, fraction
- return None
+ else:
+ return None
def fraction_consumed(self):
if self._last_claim is self.UNSTARTED:
return 0
- return self.position_to_fraction(
- self._last_claim, self._start_position, self._stop_position)
+ else:
+ return self.position_to_fraction(
+ self._last_claim, self._start_position, self._stop_position)
def position_to_fraction(self, pos, start, end):
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 91aae33..edb6409 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -610,9 +610,10 @@ def _assert_split_at_fraction_concurrent(
def read_or_split(test_params):
if test_params[0]:
return [val for val in test_params[1]]
- position = test_params[1].position_at_fraction(test_params[2])
- result = test_params[1].try_split(position)
- return result
+ else:
+ position = test_params[1].position_at_fraction(test_params[2])
+ result = test_params[1].try_split(position)
+ return result
inputs = []
pool = thread_pool if thread_pool else _ThreadPool(2)
http://git-wip-us.apache.org/repos/asf/beam/blob/a907323f/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index f2c3d34..750ec45 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -198,9 +198,9 @@ class _TextSource(filebasedsource.FileBasedSource):
if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r':
# Found a '\r\n'. Accepting that as the next separator.
return (next_lf - 1, next_lf + 1)
-
- # Found a '\n'. Accepting that as the next separator.
- return (next_lf, next_lf + 1)
+ else:
+ # Found a '\n'. Accepting that as the next separator.
+ return (next_lf, next_lf + 1)
current_pos = len(read_buffer.data)
@@ -256,9 +256,10 @@ class _TextSource(filebasedsource.FileBasedSource):
# Current record should not contain the separator.
return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]],
sep_bounds[1] - record_start_position_in_buffer)
- # Current record should contain the separator.
- return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
- sep_bounds[1] - record_start_position_in_buffer)
+ else:
+ # Current record should contain the separator.
+ return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
+ sep_bounds[1] - record_start_position_in_buffer)
class _TextSink(fileio.FileSink):
[2/2] beam git commit: Closes #2813
Posted by ro...@apache.org.
Closes #2813
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97dde95b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97dde95b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97dde95b
Branch: refs/heads/master
Commit: 97dde95bca79c4849e9ff1cffea59adb3a846c30
Parents: 43443c9 a907323
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue May 2 10:39:06 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue May 2 10:39:06 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/concat_source.py | 74 +++++++++++---------
sdks/python/apache_beam/io/filesystems_util.py | 3 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++--
sdks/python/apache_beam/io/iobase.py | 3 +-
sdks/python/apache_beam/io/localfilesystem.py | 3 +-
sdks/python/apache_beam/io/range_trackers.py | 11 +--
sdks/python/apache_beam/io/source_test_utils.py | 7 +-
sdks/python/apache_beam/io/textio.py | 13 ++--
8 files changed, 70 insertions(+), 57 deletions(-)
----------------------------------------------------------------------