You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/07/12 22:48:28 UTC
[beam] branch master updated: Full tracking of Batch side inputs.
Performance tests show no regression
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new df1d267 Full tracking of Batch side inputs. Performance tests show no regression
df1d267 is described below
commit df1d26748b91caf893eb14bf95b2a309192ce807
Author: Pablo <pa...@google.com>
AuthorDate: Wed Jul 11 18:21:38 2018 -0700
Full tracking of Batch side inputs. Performance tests show no regression
---
sdks/python/apache_beam/runners/worker/operations.py | 16 ++++++----------
sdks/python/apache_beam/runners/worker/sideinputs.py | 7 +------
.../apache_beam/runners/worker/sideinputs_test.py | 17 -----------------
3 files changed, 7 insertions(+), 33 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 78a67bc..1e561e1 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -33,7 +33,6 @@ from apache_beam.internal import pickler
from apache_beam.io import iobase
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import ScopedMetricsContainer
-from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners import common
from apache_beam.runners.common import Receiver
@@ -322,15 +321,12 @@ class DoOperation(Operation):
sources.append(si.source)
# The tracking of time spend reading and bytes read from side inputs is
# behind an experiment flag to test its performance impact.
- if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments:
- si_counter = opcounters.SideInputReadCounter(
- self.counter_factory,
- self.state_sampler,
- declaring_step=self.name_context.step_name,
- # Inputs are 1-indexed, so we add 1 to i in the side input id
- input_index=i + 1)
- else:
- si_counter = opcounters.NoOpTransformIOCounter()
+ si_counter = opcounters.SideInputReadCounter(
+ self.counter_factory,
+ self.state_sampler,
+ declaring_step=self.name_context.step_name,
+ # Inputs are 1-indexed, so we add 1 to i in the side input id
+ input_index=i + 1)
iterator_fn = sideinputs.get_iterator_fn_for_sources(
sources, read_counter=si_counter)
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index d806f9e..23190a5 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -31,7 +31,6 @@ from future import standard_library
from apache_beam.coders import observable
from apache_beam.io import iobase
-from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.runners.worker import opcounters
from apache_beam.transforms import window
@@ -88,9 +87,6 @@ class PrefetchingSourceSetIterable(object):
def add_byte_counter(self, reader):
"""Adds byte counter observer to a side input reader.
- If the 'sideinput_io_metrics_v2' experiment flag is not passed in, then
- nothing is attached to the reader.
-
Args:
reader: A reader that should inherit from ObservableMixin to have
bytes tracked.
@@ -131,8 +127,7 @@ class PrefetchingSourceSetIterable(object):
# The tracking of time spend reading and bytes read from side
# inputs is kept behind an experiment flag to test performance
# impact.
- if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments:
- self.add_byte_counter(reader)
+ self.add_byte_counter(reader)
returns_windowed_values = reader.returns_windowed_values
for value in reader:
if self.has_errored:
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 3a5ff38..4a8f7c8 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -28,7 +28,6 @@ from builtins import range
import mock
from apache_beam.coders import observable
-from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.runners.worker import sideinputs
@@ -83,20 +82,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
sources, max_reader_threads=2)
assert list(strip_windows(iterator_fn())) == list(range(6))
- def test_bytes_read_behind_experiment(self):
- mock_read_counter = mock.MagicMock()
- source_records = ['a', 'b', 'c', 'd']
- sources = [
- FakeSource(source_records, notify_observers=True),
- ]
- iterator_fn = sideinputs.get_iterator_fn_for_sources(
- sources, max_reader_threads=3, read_counter=mock_read_counter)
- assert list(strip_windows(iterator_fn())) == source_records
- mock_read_counter.add_bytes_read.assert_not_called()
-
def test_bytes_read_are_reported(self):
- RuntimeValueProvider.set_runtime_options(
- {'experiments': ['sideinput_io_metrics_v2', 'other']})
mock_read_counter = mock.MagicMock()
source_records = ['a', 'b', 'c', 'd']
sources = [
@@ -107,9 +93,6 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
assert list(strip_windows(iterator_fn())) == source_records
mock_read_counter.add_bytes_read.assert_called_with(4)
- # Remove runtime options from the runtime value provider.
- RuntimeValueProvider.set_runtime_options({})
-
def test_multiple_sources_iterator_fn(self):
sources = [
FakeSource([0]),