You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/10/16 20:09:50 UTC

[2/2] beam git commit: Add progress metrics to Python SDK.

Add progress metrics to Python SDK.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ad84791
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ad84791
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ad84791

Branch: refs/heads/master
Commit: 3ad84791d4d85896f46b7956b5bd8045cdc4a0e9
Parents: 014614b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Oct 2 17:20:38 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Oct 16 13:09:31 2017 -0700

----------------------------------------------------------------------
 .../src/main/proto/beam_fn_api.proto            | 10 ++-
 .../runners/portability/fn_api_runner.py        | 15 +++-
 .../runners/portability/fn_api_runner_test.py   | 67 +++++++++++++++
 .../runners/worker/bundle_processor.py          | 90 ++++++++++++++------
 .../apache_beam/runners/worker/opcounters.py    |  9 +-
 .../apache_beam/runners/worker/operations.pxd   |  3 +-
 .../apache_beam/runners/worker/operations.py    | 55 ++++++++----
 .../apache_beam/runners/worker/sdk_worker.py    | 22 +++--
 .../apache_beam/runners/worker/statesampler.pyx |  3 +
 .../runners/worker/statesampler_fake.py         | 15 ++++
 10 files changed, 226 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/model/fn-execution/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 7d3e05b..132d366 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -216,8 +216,10 @@ message Metrics {
   message PTransform {
     // Metrics that are measured for processed and active element groups.
     message Measured {
-      // (Required) Map from local input name to number of elements processed
+      // (Optional) Map from local input name to number of elements processed
       // from this input.
+      // If unset, assumed to be the sum of the outputs of all producers to
+      // this transform (for ProcessedElements) and 0 (for ActiveElements).
       map<string, int64> input_element_counts = 1;
 
       // (Required) Map from local output name to number of elements produced
@@ -225,8 +227,8 @@ message Metrics {
       map<string, int64> output_element_counts = 2;
 
       // (Optional) The total time spent so far in processing the elements in
-      // this group.
-      int64 total_time_spent = 3;
+      // this group, in seconds.
+      double total_time_spent = 3;
 
       // TODO: Add other element group level metrics.
     }
@@ -542,7 +544,7 @@ message StateKey {
     string ptransform_id = 1;
     // (Required) The id of the user state.
     string user_state_id = 2;
-    // (Required) The window encoded in a nested context. 
+    // (Required) The window encoded in a nested context.
     bytes window = 3;
     // (Required) The key of the currently executing element encoded in a
     // nested context.

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 20a4a61..463f78f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -632,16 +632,18 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       controller = FnApiRunner.GrpcController(self._sdk_harness_factory)
     else:
       controller = FnApiRunner.DirectController()
+    metrics_by_stage = {}
 
     try:
       pcoll_buffers = collections.defaultdict(list)
       for stage in stages:
-        self.run_stage(
-            controller, pipeline_components, stage, pcoll_buffers, safe_coders)
+        metrics_by_stage[stage.name] = self.run_stage(
+            controller, pipeline_components, stage,
+            pcoll_buffers, safe_coders).process_bundle.metrics
     finally:
       controller.close()
 
-    return maptask_executor_runner.WorkerRunnerResult(PipelineState.DONE)
+    return RunnerResult(PipelineState.DONE, metrics_by_stage)
 
   def run_stage(
       self, controller, pipeline_components, stage, pcoll_buffers, safe_coders):
@@ -757,6 +759,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
           # These should be the only two identifiers we produce for now,
           # but special side input writes may go here.
           raise NotImplementedError(pcoll_id)
+    return result
 
   # This is the "old" way of executing pipelines.
   # TODO(robertwb): Remove once runner API supports side inputs.
@@ -1071,6 +1074,12 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       self.data_server.stop(5).wait()
 
 
+class RunnerResult(maptask_executor_runner.WorkerRunnerResult):
+  def __init__(self, state, metrics_by_stage):
+    super(RunnerResult, self).__init__(state)
+    self._metrics_by_stage = metrics_by_stage
+
+
 def only_element(iterable):
   element, = iterable
   return element

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index a564911..31f1b6f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -16,6 +16,7 @@
 #
 
 import logging
+import time
 import unittest
 
 import apache_beam as beam
@@ -24,6 +25,11 @@ from apache_beam.runners.portability import maptask_executor_runner_test
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 
+try:
+  from apache_beam.runners.worker.statesampler import DEFAULT_SAMPLING_PERIOD_MS
+except ImportError:
+  DEFAULT_SAMPLING_PERIOD_MS = 0
+
 
 class FnApiRunnerTest(
     maptask_executor_runner_test.MapTaskExecutorRunnerTest):
@@ -55,6 +61,67 @@ class FnApiRunnerTest(
       with self.create_pipeline() as p:
         assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
 
+  def test_progress_metrics(self):
+    p = self.create_pipeline()
+    if not isinstance(p.runner, fn_api_runner.FnApiRunner):
+      # This test is inherited by others that may not support the same
+      # internal way of accessing progress metrics.
+      return
+
+    _ = (p
+         | beam.Create([0, 0, 0, 2.1e-3 * DEFAULT_SAMPLING_PERIOD_MS])
+         | beam.Map(time.sleep)
+         | beam.Map(lambda x: ('key', x))
+         | beam.GroupByKey()
+         | 'm_out' >> beam.FlatMap(lambda x: [
+             1, 2, 3, 4, 5,
+             beam.pvalue.TaggedOutput('once', x),
+             beam.pvalue.TaggedOutput('twice', x),
+             beam.pvalue.TaggedOutput('twice', x)]))
+    res = p.run()
+    res.wait_until_finish()
+    try:
+      self.assertEqual(2, len(res._metrics_by_stage))
+      pregbk_metrics, postgbk_metrics = res._metrics_by_stage.values()
+      if 'Create/Read' not in pregbk_metrics.ptransforms:
+        # The metrics above are actually unordered. Swap.
+        pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics
+
+      self.assertEqual(
+          4,
+          pregbk_metrics.ptransforms['Create/Read']
+          .processed_elements.measured.output_element_counts['None'])
+      self.assertEqual(
+          4,
+          pregbk_metrics.ptransforms['Map(sleep)']
+          .processed_elements.measured.output_element_counts['None'])
+      self.assertLessEqual(
+          2e-3 * DEFAULT_SAMPLING_PERIOD_MS,
+          pregbk_metrics.ptransforms['Map(sleep)']
+          .processed_elements.measured.total_time_spent)
+      self.assertEqual(
+          1,
+          postgbk_metrics.ptransforms['GroupByKey/Read']
+          .processed_elements.measured.output_element_counts['None'])
+
+      # The actual stage name ends up being something like 'm_out/lamdbda...'
+      m_out, = [
+          metrics for name, metrics in postgbk_metrics.ptransforms.items()
+          if name.startswith('m_out')]
+      self.assertEqual(
+          5,
+          m_out.processed_elements.measured.output_element_counts['None'])
+      self.assertEqual(
+          1,
+          m_out.processed_elements.measured.output_element_counts['once'])
+      self.assertEqual(
+          2,
+          m_out.processed_elements.measured.output_element_counts['twice'])
+
+    except:
+      print res._metrics_by_stage
+      raise
+
   # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index a54e6b1..1049ae1 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -187,17 +187,19 @@ class BundleProcessor(object):
     self.process_bundle_descriptor = process_bundle_descriptor
     self.state_handler = state_handler
     self.data_channel_factory = data_channel_factory
-
-  def create_execution_tree(self, descriptor):
     # TODO(robertwb): Figure out the correct prefix to use for output counters
     # from StateSampler.
-    counter_factory = counters.CounterFactory()
-    state_sampler = statesampler.StateSampler(
-        'fnapi-step%s' % descriptor.id, counter_factory)
+    self.counter_factory = counters.CounterFactory()
+    self.state_sampler = statesampler.StateSampler(
+        'fnapi-step-%s' % self.process_bundle_descriptor.id,
+        self.counter_factory)
+    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
+
+  def create_execution_tree(self, descriptor):
 
     transform_factory = BeamTransformFactory(
-        descriptor, self.data_channel_factory, counter_factory, state_sampler,
-        self.state_handler)
+        descriptor, self.data_channel_factory, self.counter_factory,
+        self.state_sampler, self.state_handler)
 
     pcoll_consumers = collections.defaultdict(list)
     for transform_id, transform_proto in descriptor.transforms.items():
@@ -223,15 +225,15 @@ class BundleProcessor(object):
            for pcoll in descriptor.transforms[transform_id].outputs.values()
            for consumer in pcoll_consumers[pcoll]])
 
-    return [get_operation(transform_id)
-            for transform_id in sorted(
-                descriptor.transforms, key=topological_height, reverse=True)]
+    return collections.OrderedDict([
+        (transform_id, get_operation(transform_id))
+        for transform_id in sorted(
+            descriptor.transforms, key=topological_height, reverse=True)])
 
   def process_bundle(self, instruction_id):
-    ops = self.create_execution_tree(self.process_bundle_descriptor)
 
     expected_inputs = []
-    for op in ops:
+    for op in self.ops.values():
       if isinstance(op, DataOutputOperation):
         # TODO(robertwb): Is there a better way to pass the instruction id to
         # the operation?
@@ -241,22 +243,54 @@ class BundleProcessor(object):
         # We must wait until we receive "end of stream" for each of these ops.
         expected_inputs.append(op)
 
-    # Start all operations.
-    for op in reversed(ops):
-      logging.info('start %s', op)
-      op.start()
-
-    # Inject inputs from data plane.
-    for input_op in expected_inputs:
-      for data in input_op.data_channel.input_elements(
-          instruction_id, [input_op.target]):
-        # ignores input name
-        input_op.process_encoded(data.data)
-
-    # Finish all operations.
-    for op in ops:
-      logging.info('finish %s', op)
-      op.finish()
+    try:
+      self.state_sampler.start()
+      # Start all operations.
+      for op in reversed(self.ops.values()):
+        logging.info('start %s', op)
+        op.start()
+
+      # Inject inputs from data plane.
+      for input_op in expected_inputs:
+        for data in input_op.data_channel.input_elements(
+            instruction_id, [input_op.target]):
+          # ignores input name
+          input_op.process_encoded(data.data)
+
+      # Finish all operations.
+      for op in self.ops.values():
+        logging.info('finish %s', op)
+        op.finish()
+    finally:
+      self.state_sampler.stop_if_still_running()
+
+  def metrics(self):
+    return beam_fn_api_pb2.Metrics(
+        # TODO(robertwb): Rename to progress?
+        ptransforms=
+        {transform_id:
+         self._fix_output_tags(transform_id, op.progress_metrics())
+         for transform_id, op in self.ops.items()})
+
+  def _fix_output_tags(self, transform_id, metrics):
+    # Outputs are still referred to by index, not by name, in many Operations.
+    # However, if there is exactly one output, we can fix up the name here.
+    def fix_only_output_tag(actual_output_tag, mapping):
+      if len(mapping) == 1:
+        fake_output_tag, count = only_element(mapping.items())
+        if fake_output_tag != actual_output_tag:
+          del mapping[fake_output_tag]
+          mapping[actual_output_tag] = count
+    actual_output_tags = list(
+        self.process_bundle_descriptor.transforms[transform_id].outputs.keys())
+    if len(actual_output_tags) == 1:
+      fix_only_output_tag(
+          actual_output_tags[0],
+          metrics.processed_elements.measured.output_element_counts)
+      fix_only_output_tag(
+          actual_output_tags[0],
+          metrics.active_elements.measured.output_element_counts)
+    return metrics
 
 
 class BeamTransformFactory(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/opcounters.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/opcounters.py b/sdks/python/apache_beam/runners/worker/opcounters.py
index f8f4b51..f4ba6b9 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters.py
@@ -48,10 +48,10 @@ class OperationCounters(object):
   def __init__(self, counter_factory, step_name, coder, output_index):
     self._counter_factory = counter_factory
     self.element_counter = counter_factory.get_counter(
-        '%s-out%d-ElementCount' % (step_name, output_index), Counter.SUM)
+        '%s-out%s-ElementCount' % (step_name, output_index), Counter.SUM)
     self.mean_byte_counter = counter_factory.get_counter(
-        '%s-out%d-MeanByteCount' % (step_name, output_index), Counter.MEAN)
-    self.coder_impl = coder.get_impl()
+        '%s-out%s-MeanByteCount' % (step_name, output_index), Counter.MEAN)
+    self.coder_impl = coder.get_impl() if coder else None
     self.active_accumulator = None
     self._sample_counter = 0
     self._next_sample = 0
@@ -138,7 +138,8 @@ class OperationCounters(object):
     Returns:
       True if it is time to compute another element's size.
     """
-
+    if self.coder_impl is None:
+      return False
     self._sample_counter += 1
     if self._next_sample == 0:
       if random.randint(1, self._sample_counter) <= 10:

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/operations.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index 2b4e526..d380a45 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -28,7 +28,7 @@ cdef type _global_window_type
 
 cdef class ConsumerSet(Receiver):
   cdef list consumers
-  cdef opcounters.OperationCounters opcounter
+  cdef readonly opcounters.OperationCounters opcounter
   cdef public step_name
   cdef public output_index
   cdef public coder
@@ -71,6 +71,7 @@ cdef class ReadOperation(Operation):
 cdef class DoOperation(Operation):
   cdef object dofn_runner
   cdef Receiver dofn_receiver
+  cdef object tagged_receivers
 
 cdef class CombineOperation(Operation):
   cdef object phased_combine_fn

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index ed9d84d..ed3f3b8 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -28,6 +28,7 @@ from apache_beam.internal import pickler
 from apache_beam.io import iobase
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.runners import common
 from apache_beam.runners.common import Receiver
 from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -130,6 +131,7 @@ class Operation(object):
     # TODO(ccy): the '-abort' state can be added when the abort is supported in
     # Operations.
     self.scoped_metrics_container = None
+    self.receivers = []
 
   def start(self):
     """Start operation."""
@@ -157,6 +159,24 @@ class Operation(object):
     """Adds a receiver operation for the specified output."""
     self.consumers[output_index].append(operation)
 
+  def progress_metrics(self):
+    return beam_fn_api_pb2.Metrics.PTransform(
+        processed_elements=beam_fn_api_pb2.Metrics.PTransform.ProcessedElements(
+            measured=beam_fn_api_pb2.Metrics.PTransform.Measured(
+                total_time_spent=(
+                    self.scoped_start_state.sampled_seconds()
+                    + self.scoped_process_state.sampled_seconds()
+                    + self.scoped_finish_state.sampled_seconds()),
+                # Multi-output operations should override this.
+                output_element_counts=(
+                    # If there is exactly one output, we can unambiguously
+                    # fix its name later, which we do.
+                    # TODO(robertwb): Plumb the actual name here.
+                    {'ONLY_OUTPUT': self.receivers[0].opcounter
+                                    .element_counter.value()}
+                    if len(self.receivers) == 1
+                    else None))))
+
   def __str__(self):
     """Generates a useful string for this object.
 
@@ -226,19 +246,14 @@ class InMemoryWriteOperation(Operation):
 
 class _TaggedReceivers(dict):
 
-  class NullReceiver(Receiver):
-
-    def receive(self, element):
-      pass
-
-    # For old SDKs.
-    def output(self, element):
-      pass
+  def __init__(self, counter_factory, step_name):
+    self._counter_factory = counter_factory
+    self._step_name = step_name
 
-  def __missing__(self, unused_key):
-    if not getattr(self, '_null_receiver', None):
-      self._null_receiver = _TaggedReceivers.NullReceiver()
-    return self._null_receiver
+  def __missing__(self, tag):
+    self[tag] = receiver = ConsumerSet(
+        self._counter_factory, self._step_name, tag, [], None)
+    return receiver
 
 
 class DoOperation(Operation):
@@ -308,7 +323,8 @@ class DoOperation(Operation):
       # Tag to output index map used to dispatch the side output values emitted
       # by the DoFn function to the appropriate receivers. The main output is
       # tagged with None and is associated with its corresponding index.
-      tagged_receivers = _TaggedReceivers()
+      self.tagged_receivers = _TaggedReceivers(
+          self.counter_factory, self.step_name)
 
       output_tag_prefix = PropertyNames.OUT + '_'
       for index, tag in enumerate(self.spec.output_tags):
@@ -318,11 +334,11 @@ class DoOperation(Operation):
           original_tag = tag[len(output_tag_prefix):]
         else:
           raise ValueError('Unexpected output name for operation: %s' % tag)
-        tagged_receivers[original_tag] = self.receivers[index]
+        self.tagged_receivers[original_tag] = self.receivers[index]
 
       self.dofn_runner = common.DoFnRunner(
           fn, args, kwargs, self._read_side_inputs(tags_and_types),
-          window_fn, context, tagged_receivers,
+          window_fn, context, self.tagged_receivers,
           logger, self.step_name,
           scoped_metrics_container=self.scoped_metrics_container)
       self.dofn_receiver = (self.dofn_runner
@@ -339,6 +355,15 @@ class DoOperation(Operation):
     with self.scoped_process_state:
       self.dofn_receiver.receive(o)
 
+  def progress_metrics(self):
+    metrics = super(DoOperation, self).progress_metrics()
+    if self.tagged_receivers:
+      metrics.processed_elements.measured.output_element_counts.clear()
+      for tag, receiver in self.tagged_receivers.items():
+        metrics.processed_elements.measured.output_element_counts[
+            str(tag)] = receiver.opcounter.element_counter.value()
+    return metrics
+
 
 class DoFnRunnerReceiver(Receiver):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 1ad65fe..d1b0c0e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -111,6 +111,7 @@ class SdkWorker(object):
     self.fns = {}
     self.state_handler = state_handler
     self.data_channel_factory = data_channel_factory
+    self.bundle_processors = {}
 
   def do_instruction(self, request):
     request_type = request.WhichOneof('request')
@@ -129,16 +130,21 @@ class SdkWorker(object):
         register=beam_fn_api_pb2.RegisterResponse())
 
   def process_bundle(self, request, instruction_id):
-    bundle_processor.BundleProcessor(
-        self.fns[request.process_bundle_descriptor_reference],
-        self.state_handler,
-        self.data_channel_factory).process_bundle(instruction_id)
+    self.bundle_processors[
+        instruction_id] = processor = bundle_processor.BundleProcessor(
+            self.fns[request.process_bundle_descriptor_reference],
+            self.state_handler,
+            self.data_channel_factory)
+    try:
+      processor.process_bundle(instruction_id)
+    finally:
+      del self.bundle_processors[instruction_id]
 
     return beam_fn_api_pb2.InstructionResponse(
         instruction_id=instruction_id,
-        process_bundle=beam_fn_api_pb2.ProcessBundleResponse())
+        process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
+            metrics=processor.metrics()))
 
   def process_bundle_progress(self, request, instruction_id):
-    return beam_fn_api_pb2.InstructionResponse(
-        instruction_id=instruction_id,
-        error='Not Supported')
+    # It is an error to get progress for a not-in-flight bundle.
+    return self.bundle_processors.get(instruction_id).metrics()

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/statesampler.pyx
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler.pyx b/sdks/python/apache_beam/runners/worker/statesampler.pyx
index c562763..f0527c6 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler.pyx
+++ b/sdks/python/apache_beam/runners/worker/statesampler.pyx
@@ -263,3 +263,6 @@ cdef class ScopedState(object):
 
   def __repr__(self):
     return "ScopedState[%s, %s, %s]" % (self.name, self.state_index, self.nsecs)
+
+  def sampled_seconds(self):
+    return 1e-9 * self.nsecs

http://git-wip-us.apache.org/repos/asf/beam/blob/3ad84791/sdks/python/apache_beam/runners/worker/statesampler_fake.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fake.py b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
index 5cd0fd2..bc56021 100644
--- a/sdks/python/apache_beam/runners/worker/statesampler_fake.py
+++ b/sdks/python/apache_beam/runners/worker/statesampler_fake.py
@@ -26,6 +26,18 @@ class StateSampler(object):
   def scoped_state(self, step_name, state_name=None, io_target=None):
     return _FakeScopedState()
 
+  def start(self):
+    pass
+
+  def stop(self):
+    pass
+
+  def stop_if_still_running(self):
+    self.stop()
+
+  def commit_counters(self):
+    pass
+
 
 class _FakeScopedState(object):
 
@@ -34,3 +46,6 @@ class _FakeScopedState(object):
 
   def __exit__(self, *unused_args):
     pass
+
+  def sampled_seconds(self):
+    return 0