You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/10/16 20:09:50 UTC
[2/2] beam git commit: Add progress metrics to Python SDK.
Add progress metrics to Python SDK.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ad84791
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ad84791
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ad84791
Branch: refs/heads/master
Commit: 3ad84791d4d85896f46b7956b5bd8045cdc4a0e9
Parents: 014614b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Oct 2 17:20:38 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Oct 16 13:09:31 2017 -0700
----------------------------------------------------------------------
.../src/main/proto/beam_fn_api.proto | 10 ++-
.../runners/portability/fn_api_runner.py | 15 +++-
.../runners/portability/fn_api_runner_test.py | 67 +++++++++++++++
.../runners/worker/bundle_processor.py | 90 ++++++++++++++------
.../apache_beam/runners/worker/opcounters.py | 9 +-
.../apache_beam/runners/worker/operations.pxd | 3 +-
.../apache_beam/runners/worker/operations.py | 55 ++++++++----
.../apache_beam/runners/worker/sdk_worker.py | 22 +++--
.../apache_beam/runners/worker/statesampler.pyx | 3 +
.../runners/worker/statesampler_fake.py | 15 ++++
10 files changed, 226 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/model/fn-execution/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 7d3e05b..132d366 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -216,8 +216,10 @@ message Metrics {
message PTransform {
// Metrics that are measured for processed and active element groups.
message Measured {
- // (Required) Map from local input name to number of elements processed
+ // (Optional) Map from local input name to number of elements processed
// from this input.
+ // If unset, assumed to be the sum of the outputs of all producers to
+ // this transform (for ProcessedElements) and 0 (for ActiveElements).
map<string, int64> input_element_counts = 1;
// (Required) Map from local output name to number of elements produced
@@ -225,8 +227,8 @@ message Metrics {
map<string, int64> output_element_counts = 2;
// (Optional) The total time spent so far in processing the elements in
- // this group.
- int64 total_time_spent = 3;
+ // this group, in seconds.
+ double total_time_spent = 3;
// TODO: Add other element group level metrics.
}
@@ -542,7 +544,7 @@ message StateKey {
string ptransform_id = 1;
// (Required) The id of the user state.
string user_state_id = 2;
- // (Required) The window encoded in a nested context.
+ // (Required) The window encoded in a nested context.
bytes window = 3;
// (Required) The key of the currently executing element encoded in a
// nested context.
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 20a4a61..463f78f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -632,16 +632,18 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
controller = FnApiRunner.GrpcController(self._sdk_harness_factory)
else:
controller = FnApiRunner.DirectController()
+ metrics_by_stage = {}
try:
pcoll_buffers = collections.defaultdict(list)
for stage in stages:
- self.run_stage(
- controller, pipeline_components, stage, pcoll_buffers, safe_coders)
+ metrics_by_stage[stage.name] = self.run_stage(
+ controller, pipeline_components, stage,
+ pcoll_buffers, safe_coders).process_bundle.metrics
finally:
controller.close()
- return maptask_executor_runner.WorkerRunnerResult(PipelineState.DONE)
+ return RunnerResult(PipelineState.DONE, metrics_by_stage)
def run_stage(
self, controller, pipeline_components, stage, pcoll_buffers, safe_coders):
@@ -757,6 +759,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
# These should be the only two identifiers we produce for now,
# but special side input writes may go here.
raise NotImplementedError(pcoll_id)
+ return result
# This is the "old" way of executing pipelines.
# TODO(robertwb): Remove once runner API supports side inputs.
@@ -1071,6 +1074,12 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
self.data_server.stop(5).wait()
+class RunnerResult(maptask_executor_runner.WorkerRunnerResult):
+ def __init__(self, state, metrics_by_stage):
+ super(RunnerResult, self).__init__(state)
+ self._metrics_by_stage = metrics_by_stage
+
+
def only_element(iterable):
element, = iterable
return element
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index a564911..31f1b6f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -16,6 +16,7 @@
#
import logging
+import time
import unittest
import apache_beam as beam
@@ -24,6 +25,11 @@ from apache_beam.runners.portability import maptask_executor_runner_test
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+try:
+ from apache_beam.runners.worker.statesampler import DEFAULT_SAMPLING_PERIOD_MS
+except ImportError:
+ DEFAULT_SAMPLING_PERIOD_MS = 0
+
class FnApiRunnerTest(
maptask_executor_runner_test.MapTaskExecutorRunnerTest):
@@ -55,6 +61,67 @@ class FnApiRunnerTest(
with self.create_pipeline() as p:
assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+ def test_progress_metrics(self):
+ p = self.create_pipeline()
+ if not isinstance(p.runner, fn_api_runner.FnApiRunner):
+ # This test is inherited by others that may not support the same
+ # internal way of accessing progress metrics.
+ return
+
+ _ = (p
+ | beam.Create([0, 0, 0, 2.1e-3 * DEFAULT_SAMPLING_PERIOD_MS])
+ | beam.Map(time.sleep)
+ | beam.Map(lambda x: ('key', x))
+ | beam.GroupByKey()
+ | 'm_out' >> beam.FlatMap(lambda x: [
+ 1, 2, 3, 4, 5,
+ beam.pvalue.TaggedOutput('once', x),
+ beam.pvalue.TaggedOutput('twice', x),
+ beam.pvalue.TaggedOutput('twice', x)]))
+ res = p.run()
+ res.wait_until_finish()
+ try:
+ self.assertEqual(2, len(res._metrics_by_stage))
+ pregbk_metrics, postgbk_metrics = res._metrics_by_stage.values()
+ if 'Create/Read' not in pregbk_metrics.ptransforms:
+ # The metrics above are actually unordered. Swap.
+ pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics
+
+ self.assertEqual(
+ 4,
+ pregbk_metrics.ptransforms['Create/Read']
+ .processed_elements.measured.output_element_counts['None'])
+ self.assertEqual(
+ 4,
+ pregbk_metrics.ptransforms['Map(sleep)']
+ .processed_elements.measured.output_element_counts['None'])
+ self.assertLessEqual(
+ 2e-3 * DEFAULT_SAMPLING_PERIOD_MS,
+ pregbk_metrics.ptransforms['Map(sleep)']
+ .processed_elements.measured.total_time_spent)
+ self.assertEqual(
+ 1,
+ postgbk_metrics.ptransforms['GroupByKey/Read']
+ .processed_elements.measured.output_element_counts['None'])
+
+ # The actual stage name ends up being something like 'm_out/lamdbda...'
+ m_out, = [
+ metrics for name, metrics in postgbk_metrics.ptransforms.items()
+ if name.startswith('m_out')]
+ self.assertEqual(
+ 5,
+ m_out.processed_elements.measured.output_element_counts['None'])
+ self.assertEqual(
+ 1,
+ m_out.processed_elements.measured.output_element_counts['once'])
+ self.assertEqual(
+ 2,
+ m_out.processed_elements.measured.output_element_counts['twice'])
+
+ except:
+ print res._metrics_by_stage
+ raise
+
# Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index a54e6b1..1049ae1 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -187,17 +187,19 @@ class BundleProcessor(object):
self.process_bundle_descriptor = process_bundle_descriptor
self.state_handler = state_handler
self.data_channel_factory = data_channel_factory
-
- def create_execution_tree(self, descriptor):
# TODO(robertwb): Figure out the correct prefix to use for output counters
# from StateSampler.
- counter_factory = counters.CounterFactory()
- state_sampler = statesampler.StateSampler(
- 'fnapi-step%s' % descriptor.id, counter_factory)
+ self.counter_factory = counters.CounterFactory()
+ self.state_sampler = statesampler.StateSampler(
+ 'fnapi-step-%s' % self.process_bundle_descriptor.id,
+ self.counter_factory)
+ self.ops = self.create_execution_tree(self.process_bundle_descriptor)
+
+ def create_execution_tree(self, descriptor):
transform_factory = BeamTransformFactory(
- descriptor, self.data_channel_factory, counter_factory, state_sampler,
- self.state_handler)
+ descriptor, self.data_channel_factory, self.counter_factory,
+ self.state_sampler, self.state_handler)
pcoll_consumers = collections.defaultdict(list)
for transform_id, transform_proto in descriptor.transforms.items():
@@ -223,15 +225,15 @@ class BundleProcessor(object):
for pcoll in descriptor.transforms[transform_id].outputs.values()
for consumer in pcoll_consumers[pcoll]])
- return [get_operation(transform_id)
- for transform_id in sorted(
- descriptor.transforms, key=topological_height, reverse=True)]
+ return collections.OrderedDict([
+ (transform_id, get_operation(transform_id))
+ for transform_id in sorted(
+ descriptor.transforms, key=topological_height, reverse=True)])
def process_bundle(self, instruction_id):
- ops = self.create_execution_tree(self.process_bundle_descriptor)
expected_inputs = []
- for op in ops:
+ for op in self.ops.values():
if isinstance(op, DataOutputOperation):
# TODO(robertwb): Is there a better way to pass the instruction id to
# the operation?
@@ -241,22 +243,54 @@ class BundleProcessor(object):
# We must wait until we receive "end of stream" for each of these ops.
expected_inputs.append(op)
- # Start all operations.
- for op in reversed(ops):
- logging.info('start %s', op)
- op.start()
-
- # Inject inputs from data plane.
- for input_op in expected_inputs:
- for data in input_op.data_channel.input_elements(
- instruction_id, [input_op.target]):
- # ignores input name
- input_op.process_encoded(data.data)
-
- # Finish all operations.
- for op in ops:
- logging.info('finish %s', op)
- op.finish()
+ try:
+ self.state_sampler.start()
+ # Start all operations.
+ for op in reversed(self.ops.values()):
+ logging.info('start %s', op)
+ op.start()
+
+ # Inject inputs from data plane.
+ for input_op in expected_inputs:
+ for data in input_op.data_channel.input_elements(
+ instruction_id, [input_op.target]):
+ # ignores input name
+ input_op.process_encoded(data.data)
+
+ # Finish all operations.
+ for op in self.ops.values():
+ logging.info('finish %s', op)
+ op.finish()
+ finally:
+ self.state_sampler.stop_if_still_running()
+
+ def metrics(self):
+ return beam_fn_api_pb2.Metrics(
+ # TODO(robertwb): Rename to progress?
+ ptransforms=
+ {transform_id:
+ self._fix_output_tags(transform_id, op.progress_metrics())
+ for transform_id, op in self.ops.items()})
+
+ def _fix_output_tags(self, transform_id, metrics):
+ # Outputs are still referred to by index, not by name, in many Operations.
+ # However, if there is exactly one output, we can fix up the name here.
+ def fix_only_output_tag(actual_output_tag, mapping):
+ if len(mapping) == 1:
+ fake_output_tag, count = only_element(mapping.items())
+ if fake_output_tag != actual_output_tag:
+ del mapping[fake_output_tag]
+ mapping[actual_output_tag] = count
+ actual_output_tags = list(
+ self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
+ if len(actual_output_tags) == 1:
+ fix_only_output_tag(
+ actual_output_tags[0],
+ metrics.processed_elements.measured.output_element_counts)
+ fix_only_output_tag(
+ actual_output_tags[0],
+ metrics.active_elements.measured.output_element_counts)
+ return metrics
class BeamTransformFactory(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/opcounters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index f8f4b51..f4ba6b9 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -48,10 +48,10 @@ class OperationCounters(object):
def __init__(self, counter_factory, step_name, coder, output_index):
self._counter_factory = counter_factory
self.element_counter = counter_factory.get_counter(
- '%s-out%d-ElementCount' % (step_name, output_index), Counter.SUM)
+ '%s-out%s-ElementCount' % (step_name, output_index), Counter.SUM)
self.mean_byte_counter = counter_factory.get_counter(
- '%s-out%d-MeanByteCount' % (step_name, output_index), Counter.MEAN)
- self.coder_impl = coder.get_impl()
+ '%s-out%s-MeanByteCount' % (step_name, output_index), Counter.MEAN)
+ self.coder_impl = coder.get_impl() if coder else None
self.active_accumulator = None
self._sample_counter = 0
self._next_sample = 0
@@ -138,7 +138,8 @@ class OperationCounters(object):
Returns:
True if it is time to compute another element's size.
"""
-
+ if self.coder_impl is None:
+ return False
self._sample_counter += 1
if self._next_sample == 0:
if random.randint(1, self._sample_counter) <= 10:
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/operations.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index 2b4e526..d380a45 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -28,7 +28,7 @@ cdef type _global_window_type
cdef class ConsumerSet(Receiver):
cdef list consumers
- cdef opcounters.OperationCounters opcounter
+ cdef readonly opcounters.OperationCounters opcounter
cdef public step_name
cdef public output_index
cdef public coder
@@ -71,6 +71,7 @@ cdef class ReadOperation(Operation):
cdef class DoOperation(Operation):
cdef object dofn_runner
cdef Receiver dofn_receiver
+ cdef object tagged_receivers
cdef class CombineOperation(Operation):
cdef object phased_combine_fn
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index ed9d84d..ed3f3b8 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -28,6 +28,7 @@ from apache_beam.internal import pickler
from apache_beam.io import iobase
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners import common
from apache_beam.runners.common import Receiver
from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -130,6 +131,7 @@ class Operation(object):
# TODO(ccy): the '-abort' state can be added when the abort is supported in
# Operations.
self.scoped_metrics_container = None
+ self.receivers = []
def start(self):
"""Start operation."""
@@ -157,6 +159,24 @@ class Operation(object):
"""Adds a receiver operation for the specified output."""
self.consumers[output_index].append(operation)
+ def progress_metrics(self):
+ return beam_fn_api_pb2.Metrics.PTransform(
+ processed_elements=beam_fn_api_pb2.Metrics.PTransform.ProcessedElements(
+ measured=beam_fn_api_pb2.Metrics.PTransform.Measured(
+ total_time_spent=(
+ self.scoped_start_state.sampled_seconds()
+ + self.scoped_process_state.sampled_seconds()
+ + self.scoped_finish_state.sampled_seconds()),
+ # Multi-output operations should override this.
+ output_element_counts=(
+ # If there is exactly one output, we can unambiguously
+ # fix its name later, which we do.
+ # TODO(robertwb): Plumb the actual name here.
+ {'ONLY_OUTPUT': self.receivers[0].opcounter
+ .element_counter.value()}
+ if len(self.receivers) == 1
+ else None))))
+
def __str__(self):
"""Generates a useful string for this object.
@@ -226,19 +246,14 @@ class InMemoryWriteOperation(Operation):
class _TaggedReceivers(dict):
- class NullReceiver(Receiver):
-
- def receive(self, element):
- pass
-
- # For old SDKs.
- def output(self, element):
- pass
+ def __init__(self, counter_factory, step_name):
+ self._counter_factory = counter_factory
+ self._step_name = step_name
- def __missing__(self, unused_key):
- if not getattr(self, '_null_receiver', None):
- self._null_receiver = _TaggedReceivers.NullReceiver()
- return self._null_receiver
+ def __missing__(self, tag):
+ self[tag] = receiver = ConsumerSet(
+ self._counter_factory, self._step_name, tag, [], None)
+ return receiver
class DoOperation(Operation):
@@ -308,7 +323,8 @@ class DoOperation(Operation):
# Tag to output index map used to dispatch the side output values emitted
# by the DoFn function to the appropriate receivers. The main output is
# tagged with None and is associated with its corresponding index.
- tagged_receivers = _TaggedReceivers()
+ self.tagged_receivers = _TaggedReceivers(
+ self.counter_factory, self.step_name)
output_tag_prefix = PropertyNames.OUT + '_'
for index, tag in enumerate(self.spec.output_tags):
@@ -318,11 +334,11 @@ class DoOperation(Operation):
original_tag = tag[len(output_tag_prefix):]
else:
raise ValueError('Unexpected output name for operation: %s' % tag)
- tagged_receivers[original_tag] = self.receivers[index]
+ self.tagged_receivers[original_tag] = self.receivers[index]
self.dofn_runner = common.DoFnRunner(
fn, args, kwargs, self._read_side_inputs(tags_and_types),
- window_fn, context, tagged_receivers,
+ window_fn, context, self.tagged_receivers,
logger, self.step_name,
scoped_metrics_container=self.scoped_metrics_container)
self.dofn_receiver = (self.dofn_runner
@@ -339,6 +355,15 @@ class DoOperation(Operation):
with self.scoped_process_state:
self.dofn_receiver.receive(o)
+ def progress_metrics(self):
+ metrics = super(DoOperation, self).progress_metrics()
+ if self.tagged_receivers:
+ metrics.processed_elements.measured.output_element_counts.clear()
+ for tag, receiver in self.tagged_receivers.items():
+ metrics.processed_elements.measured.output_element_counts[
+ str(tag)] = receiver.opcounter.element_counter.value()
+ return metrics
+
class DoFnRunnerReceiver(Receiver):
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 1ad65fe..d1b0c0e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -111,6 +111,7 @@ class SdkWorker(object):
self.fns = {}
self.state_handler = state_handler
self.data_channel_factory = data_channel_factory
+ self.bundle_processors = {}
def do_instruction(self, request):
request_type = request.WhichOneof('request')
@@ -129,16 +130,21 @@ class SdkWorker(object):
register=beam_fn_api_pb2.RegisterResponse())
def process_bundle(self, request, instruction_id):
- bundle_processor.BundleProcessor(
- self.fns[request.process_bundle_descriptor_reference],
- self.state_handler,
- self.data_channel_factory).process_bundle(instruction_id)
+ self.bundle_processors[
+ instruction_id] = processor = bundle_processor.BundleProcessor(
+ self.fns[request.process_bundle_descriptor_reference],
+ self.state_handler,
+ self.data_channel_factory)
+ try:
+ processor.process_bundle(instruction_id)
+ finally:
+ del self.bundle_processors[instruction_id]
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
- process_bundle=beam_fn_api_pb2.ProcessBundleResponse())
+ process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
+ metrics=processor.metrics()))
def process_bundle_progress(self, request, instruction_id):
- return beam_fn_api_pb2.InstructionResponse(
- instruction_id=instruction_id,
- error='Not Supported')
+ # It is an error to get progress for a not-in-flight bundle.
+ return self.bundle_processors.get(instruction_id).metrics()
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/statesampler.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx
index c562763..f0527c6 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx
@@ -263,3 +263,6 @@ cdef class ScopedState(object):
def __repr__(self):
return "ScopedState[%s, %s, %s]" % (self.name, self.state_index, self.nsecs)
+
+ def sampled_seconds(self):
+ return 1e-9 * self.nsecs
http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/statesampler_fake.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
index 5cd0fd2..bc56021 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
@@ -26,6 +26,18 @@ class StateSampler(object):
def scoped_state(self, step_name, state_name=None, io_target=None):
return _FakeScopedState()
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def stop_if_still_running(self):
+ self.stop()
+
+ def commit_counters(self):
+ pass
+
class _FakeScopedState(object):
@@ -34,3 +46,6 @@ class _FakeScopedState(object):
def __exit__(self, *unused_args):
pass
+
+ def sampled_seconds(self):
+ return 0