You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2018/07/12 22:48:28 UTC

[beam] branch master updated: Full tracking of Batch side inputs. Performance tests show no regression

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new df1d267  Full tracking of Batch side inputs. Performance tests show no regression
df1d267 is described below

commit df1d26748b91caf893eb14bf95b2a309192ce807
Author: Pablo <pa...@google.com>
AuthorDate: Wed Jul 11 18:21:38 2018 -0700

    Full tracking of Batch side inputs. Performance tests show no regression
---
 sdks/python/apache_beam/runners/worker/operations.py    | 16 ++++++----------
 sdks/python/apache_beam/runners/worker/sideinputs.py    |  7 +------
 .../apache_beam/runners/worker/sideinputs_test.py       | 17 -----------------
 3 files changed, 7 insertions(+), 33 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 78a67bc..1e561e1 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -33,7 +33,6 @@ from apache_beam.internal import pickler
 from apache_beam.io import iobase
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import ScopedMetricsContainer
-from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.runners import common
 from apache_beam.runners.common import Receiver
@@ -322,15 +321,12 @@ class DoOperation(Operation):
         sources.append(si.source)
         # The tracking of time spend reading and bytes read from side inputs is
         # behind an experiment flag to test its performance impact.
-        if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments:
-          si_counter = opcounters.SideInputReadCounter(
-              self.counter_factory,
-              self.state_sampler,
-              declaring_step=self.name_context.step_name,
-              # Inputs are 1-indexed, so we add 1 to i in the side input id
-              input_index=i + 1)
-        else:
-          si_counter = opcounters.NoOpTransformIOCounter()
+        si_counter = opcounters.SideInputReadCounter(
+            self.counter_factory,
+            self.state_sampler,
+            declaring_step=self.name_context.step_name,
+            # Inputs are 1-indexed, so we add 1 to i in the side input id
+            input_index=i + 1)
       iterator_fn = sideinputs.get_iterator_fn_for_sources(
           sources, read_counter=si_counter)
 
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index d806f9e..23190a5 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -31,7 +31,6 @@ from future import standard_library
 
 from apache_beam.coders import observable
 from apache_beam.io import iobase
-from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.runners.worker import opcounters
 from apache_beam.transforms import window
 
@@ -88,9 +87,6 @@ class PrefetchingSourceSetIterable(object):
   def add_byte_counter(self, reader):
     """Adds byte counter observer to a side input reader.
 
-    If the 'sideinput_io_metrics_v2' experiment flag is not passed in, then
-    nothing is attached to the reader.
-
     Args:
       reader: A reader that should inherit from ObservableMixin to have
         bytes tracked.
@@ -131,8 +127,7 @@ class PrefetchingSourceSetIterable(object):
               # The tracking of time spend reading and bytes read from side
               # inputs is kept behind an experiment flag to test performance
               # impact.
-              if 'sideinput_io_metrics_v2' in RuntimeValueProvider.experiments:
-                self.add_byte_counter(reader)
+              self.add_byte_counter(reader)
               returns_windowed_values = reader.returns_windowed_values
               for value in reader:
                 if self.has_errored:
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index 3a5ff38..4a8f7c8 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -28,7 +28,6 @@ from builtins import range
 import mock
 
 from apache_beam.coders import observable
-from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.runners.worker import sideinputs
 
 
@@ -83,20 +82,7 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
         sources, max_reader_threads=2)
     assert list(strip_windows(iterator_fn())) == list(range(6))
 
-  def test_bytes_read_behind_experiment(self):
-    mock_read_counter = mock.MagicMock()
-    source_records = ['a', 'b', 'c', 'd']
-    sources = [
-        FakeSource(source_records, notify_observers=True),
-    ]
-    iterator_fn = sideinputs.get_iterator_fn_for_sources(
-        sources, max_reader_threads=3, read_counter=mock_read_counter)
-    assert list(strip_windows(iterator_fn())) == source_records
-    mock_read_counter.add_bytes_read.assert_not_called()
-
   def test_bytes_read_are_reported(self):
-    RuntimeValueProvider.set_runtime_options(
-        {'experiments': ['sideinput_io_metrics_v2', 'other']})
     mock_read_counter = mock.MagicMock()
     source_records = ['a', 'b', 'c', 'd']
     sources = [
@@ -107,9 +93,6 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
     assert list(strip_windows(iterator_fn())) == source_records
     mock_read_counter.add_bytes_read.assert_called_with(4)
 
-    # Remove runtime options from the runtime value provider.
-    RuntimeValueProvider.set_runtime_options({})
-
   def test_multiple_sources_iterator_fn(self):
     sources = [
         FakeSource([0]),