You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/14 20:06:51 UTC
[2/3] beam git commit: [BEAM-1964] Fix lint issues for linter upgrade
-2
[BEAM-1964] Fix lint issues for linter upgrade -2
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf474a0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf474a0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf474a0b
Branch: refs/heads/master
Commit: bf474a0b72beb2e946be39ce04e3f07800a3b307
Parents: cf9ac45
Author: Sourabh Bajaj <so...@google.com>
Authored: Thu Apr 13 17:19:56 2017 -0700
Committer: Ahmet Altay <al...@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:14 2017 -0700
----------------------------------------------------------------------
.../io/gcp/datastore/v1/datastoreio.py | 4 +--
.../apache_beam/io/gcp/datastore/v1/helper.py | 16 ++++-------
.../io/gcp/datastore/v1/query_splitter.py | 2 +-
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 3 +-
.../io/gcp/tests/bigquery_matcher.py | 3 +-
sdks/python/apache_beam/metrics/cells.py | 28 +++++++++----------
sdks/python/apache_beam/metrics/execution.py | 3 +-
sdks/python/apache_beam/metrics/metric.py | 9 ++----
sdks/python/apache_beam/runners/common.py | 9 ++----
.../runners/dataflow/dataflow_metrics_test.py | 3 +-
.../runners/dataflow/dataflow_runner.py | 6 ++--
.../runners/dataflow/internal/apiclient.py | 8 +++---
.../runners/dataflow/internal/dependency.py | 6 ++--
.../runners/dataflow/test_dataflow_runner.py | 4 ---
.../runners/direct/bundle_factory.py | 14 ++++------
.../runners/direct/evaluation_context.py | 10 +++----
.../apache_beam/runners/direct/executor.py | 9 +-----
.../runners/direct/transform_evaluator.py | 7 -----
sdks/python/apache_beam/runners/runner.py | 3 +-
.../apache_beam/tests/pipeline_verifiers.py | 7 ++---
sdks/python/apache_beam/transforms/combiners.py | 29 +++++++-------------
.../apache_beam/transforms/combiners_test.py | 4 +--
sdks/python/apache_beam/typehints/decorators.py | 3 +-
sdks/python/apache_beam/typehints/typehints.py | 3 +-
24 files changed, 68 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index e8ca05d..d9b3598 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -253,7 +253,7 @@ class ReadFromDatastore(PTransform):
query = helper.make_latest_timestamp_query(namespace)
req = helper.make_request(project, namespace, query)
resp = datastore.run_query(req)
- if len(resp.batch.entity_results) == 0:
+ if not resp.batch.entity_results:
raise RuntimeError("Datastore total statistics unavailable.")
entity = resp.batch.entity_results[0].entity
@@ -281,7 +281,7 @@ class ReadFromDatastore(PTransform):
req = helper.make_request(project, namespace, kind_stats_query)
resp = datastore.run_query(req)
- if len(resp.batch.entity_results) == 0:
+ if not resp.batch.entity_results:
raise RuntimeError("Datastore statistics for kind %s unavailable" % kind)
entity = resp.batch.entity_results[0].entity
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index b1ef9af..d544226 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -62,8 +62,7 @@ def key_comparator(k1, k2):
k2_path = next(k2_iter, None)
if k2_path:
return -1
- else:
- return 0
+ return 0
def compare_path(p1, p2):
@@ -99,8 +98,7 @@ def str_compare(s1, s2):
return 0
elif s1 < s2:
return -1
- else:
- return 1
+ return 1
def get_datastore(project):
@@ -131,13 +129,9 @@ def make_partition(project, namespace):
def retry_on_rpc_error(exception):
"""A retry filter for Cloud Datastore RPCErrors."""
if isinstance(exception, RPCError):
- if exception.code >= 500:
- return True
- else:
- return False
- else:
- # TODO(vikasrk): Figure out what other errors should be retried.
- return False
+ return exception.code >= 500
+ # TODO(vikasrk): Figure out what other errors should be retried.
+ return False
def fetch_entities(project, namespace, query, datastore):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
index 8ced170..d5674f9 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
@@ -97,7 +97,7 @@ def _validate_query(query):
if len(query.kind) != 1:
raise ValueError('Query must have exactly one kind.')
- if len(query.order) != 0:
+ if query.order:
raise ValueError('Query cannot have any sort orders.')
if query.HasField('limit'):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index b2bc809..a10a3d2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -93,8 +93,7 @@ class GCSFileSystem(FileSystem):
raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
- else:
- return CompressedFile(raw_file, compression_type=compression_type)
+ return CompressedFile(raw_file, compression_type=compression_type)
def create(self, path, mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index cc26689..66d99b3 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -38,8 +38,7 @@ MAX_RETRIES = 4
def retry_on_http_and_value_error(exception):
"""Filter allowing retries on Bigquery errors and value error."""
- return isinstance(exception, GoogleCloudError) or \
- isinstance(exception, ValueError)
+ return isinstance(exception, (GoogleCloudError, ValueError))
class BigqueryMatcher(BaseMatcher):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index 5a571f5..c421949 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -97,9 +97,8 @@ class CellCommitState(object):
with self._lock:
if self._state == CellCommitState.CLEAN:
return False
- else:
- self._state = CellCommitState.COMMITTING
- return True
+ self._state = CellCommitState.COMMITTING
+ return True
class MetricCell(object):
@@ -218,8 +217,7 @@ class DistributionResult(object):
"""
if self.data.count == 0:
return None
- else:
- return float(self.data.sum)/self.data.count
+ return float(self.data.sum)/self.data.count
class DistributionData(object):
@@ -257,16 +255,16 @@ class DistributionData(object):
def combine(self, other):
if other is None:
return self
- else:
- new_min = (None if self.min is None and other.min is None else
- min(x for x in (self.min, other.min) if x is not None))
- new_max = (None if self.max is None and other.max is None else
- max(x for x in (self.max, other.max) if x is not None))
- return DistributionData(
- self.sum + other.sum,
- self.count + other.count,
- new_min,
- new_max)
+
+ new_min = (None if self.min is None and other.min is None else
+ min(x for x in (self.min, other.min) if x is not None))
+ new_max = (None if self.max is None and other.max is None else
+ max(x for x in (self.max, other.max) if x is not None))
+ return DistributionData(
+ self.sum + other.sum,
+ self.count + other.count,
+ new_min,
+ new_max)
@classmethod
def singleton(cls, value):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index f6c8990..887423b 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -129,8 +129,7 @@ class _MetricsEnvironment(object):
index = len(self.PER_THREAD.container) - 1
if index < 0:
return None
- else:
- return self.PER_THREAD.container[index]
+ return self.PER_THREAD.container[index]
def set_current_container(self, container):
self.set_container_stack()
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index f6a0923..33db4e1 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -103,8 +103,7 @@ class MetricResults(object):
(filter.names and
metric_key.metric.name in filter.names)):
return True
- else:
- return False
+ return False
@staticmethod
def _matches_sub_path(actual_scope, filter_scope):
@@ -117,8 +116,7 @@ class MetricResults(object):
return False # The first entry was not exactly matched
elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/':
return False # The last entry was not exactly matched
- else:
- return True
+ return True
@staticmethod
def _matches_scope(filter, metric_key):
@@ -139,8 +137,7 @@ class MetricResults(object):
if (MetricResults._matches_name(filter, metric_key) and
MetricResults._matches_scope(filter, metric_key)):
return True
- else:
- return False
+ return False
def query(self, filter=None):
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 2c1032d..8f86b75 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -414,10 +414,8 @@ def get_logging_context(maybe_logger, **kwargs):
maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs)
if isinstance(maybe_context, LoggingContext):
return maybe_context
- else:
- return _LoggingContextAdapter(maybe_context)
- else:
- return LoggingContext()
+ return _LoggingContextAdapter(maybe_context)
+ return LoggingContext()
class _ReceiverAdapter(Receiver):
@@ -432,5 +430,4 @@ class _ReceiverAdapter(Receiver):
def as_receiver(maybe_receiver):
if isinstance(maybe_receiver, Receiver):
return maybe_receiver
- else:
- return _ReceiverAdapter(maybe_receiver)
+ return _ReceiverAdapter(maybe_receiver)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 95027a3..ffee3e5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -38,8 +38,7 @@ class DictToObject(object):
def _wrap(self, value):
if isinstance(value, (tuple, list, set, frozenset)):
return type(value)([self._wrap(v) for v in value])
- else:
- return DictToObject(value) if isinstance(value, dict) else value
+ return DictToObject(value) if isinstance(value, dict) else value
class TestDataflowMetrics(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a92c26..2e9fc52 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -92,8 +92,7 @@ class DataflowRunner(PipelineRunner):
return -1
elif 'Traceback' in msg:
return 1
- else:
- return 0
+ return 0
job_id = result.job_id()
while True:
@@ -194,8 +193,7 @@ class DataflowRunner(PipelineRunner):
return coders.WindowedValueCoder(
coders.registry.get_coder(typehint),
window_coder=window_coder)
- else:
- return coders.registry.get_coder(typehint)
+ return coders.registry.get_coder(typehint)
def _get_cloud_encoding(self, coder):
"""Returns an encoding based on a coder object."""
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 6a8aa93..8d44dff 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -436,10 +436,10 @@ class DataflowApplicationClient(object):
if not template_location:
return self.submit_job_description(job)
- else:
- logging.info('A template was just created at location %s',
- template_location)
- return None
+
+ logging.info('A template was just created at location %s',
+ template_location)
+ return None
def create_job_description(self, job):
"""Creates a job described by the workflow proto."""
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 22de5c6..1f28b26 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -493,8 +493,7 @@ def get_sdk_name_and_version():
container_version = get_required_container_version()
if container_version == BEAM_CONTAINER_VERSION:
return ('Apache Beam SDK for Python', beam_version.__version__)
- else:
- return ('Google Cloud Dataflow SDK for Python', container_version)
+ return ('Google Cloud Dataflow SDK for Python', container_version)
def get_sdk_package_name():
@@ -502,8 +501,7 @@ def get_sdk_package_name():
container_version = get_required_container_version()
if container_version == BEAM_CONTAINER_VERSION:
return BEAM_PACKAGE_NAME
- else:
- return GOOGLE_PACKAGE_NAME
+ return GOOGLE_PACKAGE_NAME
def _download_pypi_sdk_package(temp_dir):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 046313a..4cf4131 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -23,10 +23,6 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
class TestDataflowRunner(DataflowRunner):
-
- def __init__(self):
- super(TestDataflowRunner, self).__init__()
-
def run(self, pipeline):
"""Execute test pipeline and verify test matcher"""
options = pipeline.options.view_as(TestOptions)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 647b5f2..42c8095 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -127,8 +127,7 @@ class Bundle(object):
if not self._stacked:
if self._committed and not make_copy:
return self._elements
- else:
- return list(self._elements)
+ return list(self._elements)
def iterable_stacked_or_elements(elements):
for e in elements:
@@ -140,9 +139,8 @@ class Bundle(object):
if self._committed and not make_copy:
return iterable_stacked_or_elements(self._elements)
- else:
- # returns a copy.
- return [e for e in iterable_stacked_or_elements(self._elements)]
+ # returns a copy.
+ return [e for e in iterable_stacked_or_elements(self._elements)]
def has_elements(self):
return len(self._elements) > 0
@@ -171,9 +169,9 @@ class Bundle(object):
if not self._stacked:
self._elements.append(element)
return
- if (len(self._elements) > 0 and
- (isinstance(self._elements[-1], WindowedValue) or
- isinstance(self._elements[-1], Bundle.StackedWindowedValues)) and
+ if (self._elements and
+ (isinstance(self._elements[-1], (WindowedValue,
+ Bundle.StackedWindowedValues))) and
self._elements[-1].timestamp == element.timestamp and
self._elements[-1].windows == element.windows):
if isinstance(self._elements[-1], WindowedValue):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 8114104..2169c7c 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -281,11 +281,11 @@ class EvaluationContext(object):
"""
if transform:
return self._is_transform_done(transform)
- else:
- for applied_ptransform in self._step_names:
- if not self._is_transform_done(applied_ptransform):
- return False
- return True
+
+ for applied_ptransform in self._step_names:
+ if not self._is_transform_done(applied_ptransform):
+ return False
+ return True
def _is_transform_done(self, transform):
tw = self._watermark_manager.get_watermarks(transform)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index ce6356c..f6a1d7f 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -240,13 +240,6 @@ class _CompletionCallback(object):
_ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
-class _TimerCompletionCallback(_CompletionCallback):
-
- def __init__(self, evaluation_context, all_updates, timers):
- super(_TimerCompletionCallback, self).__init__(
- evaluation_context, all_updates, timers)
-
-
class TransformExecutor(ExecutorService.CallableTask):
"""TransformExecutor will evaluate a bundle using an applied ptransform.
@@ -529,7 +522,7 @@ class _ExecutorServiceParallelExecutor(object):
empty_bundle = (
self._executor.evaluation_context.create_empty_committed_bundle(
applied_ptransform.inputs[0]))
- timer_completion_callback = _TimerCompletionCallback(
+ timer_completion_callback = _CompletionCallback(
self._executor.evaluation_context, self._executor.all_updates,
applied_ptransform)
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 662c61d..f34513c 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -278,13 +278,6 @@ class _TaggedReceivers(dict):
class _ParDoEvaluator(_TransformEvaluator):
"""TransformEvaluator for ParDo transform."""
-
- def __init__(self, evaluation_context, applied_ptransform,
- input_committed_bundle, side_inputs, scoped_metrics_container):
- super(_ParDoEvaluator, self).__init__(
- evaluation_context, applied_ptransform, input_committed_bundle,
- side_inputs, scoped_metrics_container)
-
def start_bundle(self):
transform = self._applied_ptransform.transform
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 7e7ec24..6c05951 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -111,8 +111,7 @@ def group_by_key_input_visitor():
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import GroupByKey, GroupByKeyOnly
from apache_beam import typehints
- if (isinstance(transform_node.transform, GroupByKey) or
- isinstance(transform_node.transform, GroupByKeyOnly)):
+ if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
pcoll = transform_node.inputs[0]
input_type = pcoll.element_type
# If input_type is not specified, then treat it as `Any`.
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 3cac658..51302b0 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -66,11 +66,8 @@ class PipelineStateMatcher(BaseMatcher):
def retry_on_io_error_and_server_error(exception):
"""Filter allowing retries on file I/O errors and service error."""
- if isinstance(exception, IOError) or \
- (HttpError is not None and isinstance(exception, HttpError)):
- return True
- else:
- return False
+ return isinstance(exception, IOError) or \
+ (HttpError is not None and isinstance(exception, HttpError))
class FileChecksumMatcher(BaseMatcher):
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index f55d46a..a4cd462 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -95,8 +95,7 @@ class MeanCombineFn(core.CombineFn):
return cy_combiners.MeanInt64Fn()
elif input_type is float:
return cy_combiners.MeanFloatFn()
- else:
- return self
+ return self
class Count(object):
@@ -310,23 +309,19 @@ class TopCombineFn(core.CombineFn):
if len(buffer) < self._n:
if not buffer:
return element_key, [element]
- else:
- buffer.append(element)
- if lt(element_key, threshold): # element_key < threshold
- return element_key, buffer
- else:
- return accumulator # with mutated buffer
+ buffer.append(element)
+ if lt(element_key, threshold): # element_key < threshold
+ return element_key, buffer
+ return accumulator # with mutated buffer
elif lt(threshold, element_key): # threshold < element_key
buffer.append(element)
if len(buffer) < self._buffer_size:
return accumulator
- else:
- self._sort_buffer(buffer, lt)
- min_element = buffer[-self._n]
- threshold = self._key_fn(min_element) if self._key_fn else min_element
- return threshold, buffer[-self._n:]
- else:
- return accumulator
+ self._sort_buffer(buffer, lt)
+ min_element = buffer[-self._n]
+ threshold = self._key_fn(min_element) if self._key_fn else min_element
+ return threshold, buffer[-self._n:]
+ return accumulator
def merge_accumulators(self, accumulators, *args, **kwargs):
accumulators = list(accumulators)
@@ -357,10 +352,6 @@ class TopCombineFn(core.CombineFn):
class Largest(TopCombineFn):
-
- def __init__(self, n):
- super(Largest, self).__init__(n)
-
def default_label(self):
return 'Largest(%s)' % self._n
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 6c101fe..af76889 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -164,10 +164,10 @@ class CombineTest(unittest.TestCase):
DisplayDataItemMatcher('fn', sampleFn.fn.__name__),
DisplayDataItemMatcher('combine_fn',
transform.fn.__class__)]
- if len(args) > 0:
+ if args:
expected_items.append(
DisplayDataItemMatcher('args', str(args)))
- if len(kwargs) > 0:
+ if kwargs:
expected_items.append(
DisplayDataItemMatcher('kwargs', str(kwargs)))
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index d8f0b1b..af6c499 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -237,8 +237,7 @@ def _unpack_positional_arg_hints(arg, hint):
if isinstance(hint, typehints.TupleConstraint):
return tuple(_unpack_positional_arg_hints(a, t)
for a, t in zip(arg, hint.tuple_types))
- else:
- return (typehints.Any,) * len(arg)
+ return (typehints.Any,) * len(arg)
return hint
http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 1557d85..9b41adb 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1039,8 +1039,7 @@ def is_consistent_with(sub, base):
if isinstance(base, TypeConstraint):
if isinstance(sub, UnionConstraint):
return all(is_consistent_with(c, base) for c in sub.union_types)
- else:
- return base._consistent_with_check_(sub)
+ return base._consistent_with_check_(sub)
elif isinstance(sub, TypeConstraint):
# Nothing but object lives above any type constraints.
return base == object