You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/30 01:24:14 UTC

[1/2] incubator-beam git commit: Deletes some code that is not used by SDK.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 5541c0305 -> fa302a3be


Deletes some code that is not used by SDK.

Also deletes corresponding tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d50d804
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d50d804
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d50d804

Branch: refs/heads/python-sdk
Commit: 7d50d8040585e0cea5bc02de4cb199f29c1472fc
Parents: 5541c03
Author: Chamikara Jayalath <ch...@google.com>
Authored: Fri Jul 29 15:40:39 2016 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Fri Jul 29 15:40:39 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   | 347 -------------------
 .../apache_beam/internal/apiclient_test.py      |  92 +----
 2 files changed, 7 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d50d804/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 137a40b..bc4a4e0 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -29,13 +29,10 @@ from apitools.base.py import encoding
 from apitools.base.py import exceptions
 
 from apache_beam import utils
-from apache_beam.internal import pickler
 from apache_beam.internal.auth import get_service_credentials
 from apache_beam.internal.json_value import to_json_value
-from apache_beam.io import iobase
 from apache_beam.transforms import cy_combiners
 from apache_beam.utils import dependency
-from apache_beam.utils import names
 from apache_beam.utils import retry
 from apache_beam.utils.dependency import get_required_container_version
 from apache_beam.utils.dependency import get_sdk_name_and_version
@@ -53,77 +50,6 @@ COMPUTE_API_SERVICE = 'compute.googleapis.com'
 STORAGE_API_SERVICE = 'storage.googleapis.com'
 
 
-def append_counter(status_object, counter, tentative):
-  """Appends a counter to the status.
-
-  Args:
-    status_object: a work_item_status to which to add this counter
-    counter: a counters.Counter object to append
-    tentative: whether the value should be reported as tentative
-  """
-  logging.debug('Appending counter%s %s',
-                ' (tentative)' if tentative else '',
-                counter)
-  kind, setter = metric_translations[counter.combine_fn.__class__]
-  append_metric(
-      status_object, counter.name, kind, counter.accumulator,
-      setter, tentative=tentative)
-
-
-def append_metric(status_object, metric_name, kind, value, setter=None,
-                  step=None, output_user_name=None, tentative=False,
-                  worker_id=None, cumulative=True):
-  """Creates and adds a MetricUpdate field to the passed-in protobuf.
-
-  Args:
-    status_object: a work_item_status to which to add this metric
-    metric_name: a string naming this metric
-    kind: dataflow counter kind (e.g. 'sum')
-    value: accumulator value to encode
-    setter: if not None, a lambda to use to update metric_update with value
-    step: the name of the associated step
-    output_user_name: the user-visible name to use
-    tentative: whether this should be labeled as a tentative metric
-    worker_id: the id of this worker.  Specifying a worker_id also
-      causes this to be encoded as a metric, not a counter.
-    cumulative: Whether this metric is cumulative, default True.
-      Set to False for a delta value.
-  """
-  # Does this look like a counter or like a metric?
-  is_counter = not worker_id
-
-  metric_update = dataflow.MetricUpdate()
-  metric_update.name = dataflow.MetricStructuredName()
-  metric_update.name.name = metric_name
-  # Handle attributes stored in the name context
-  if step or output_user_name or tentative or worker_id:
-    metric_update.name.context = dataflow.MetricStructuredName.ContextValue()
-
-    def append_to_context(key, value):
-      metric_update.name.context.additionalProperties.append(
-          dataflow.MetricStructuredName.ContextValue.AdditionalProperty(
-              key=key, value=value))
-    if step:
-      append_to_context('step', step)
-    if output_user_name:
-      append_to_context('output_user_name', output_user_name)
-    if tentative:
-      append_to_context('tentative', 'true')
-    if worker_id:
-      append_to_context('workerId', worker_id)
-  if cumulative and is_counter:
-    metric_update.cumulative = cumulative
-  if is_counter:
-    # Counters are distinguished by having a kind; metrics do not.
-    metric_update.kind = kind
-  if setter:
-    setter(value, metric_update)
-  else:
-    metric_update.scalar = to_json_value(value, with_type=True)
-  logging.debug('Appending metric_update: %s', metric_update)
-  status_object.metricUpdates.append(metric_update)
-
-
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
 
@@ -615,238 +541,6 @@ class DataflowApplicationClient(object):
     return response.jobMessages, response.nextPageToken
 
 
-class DataflowWorkerClient(object):
-  """A Dataflow API client used by worker code to lease work items."""
-
-  def __init__(self, worker, skip_get_credentials=False):
-    """Initializes a Dataflow API client object with worker functionality.
-
-    Args:
-      worker: A Worker instance.
-      skip_get_credentials: If true disables credentials loading logic.
-    """
-    self._client = (
-        dataflow.DataflowV1b3(
-            url=worker.service_path,
-            get_credentials=(not skip_get_credentials)))
-
-  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
-  def lease_work(self, worker_info, desired_lease_duration):
-    """Leases a work item from the service."""
-    work_request = dataflow.LeaseWorkItemRequest()
-    work_request.workerId = worker_info.worker_id
-    work_request.requestedLeaseDuration = desired_lease_duration
-    work_request.currentWorkerTime = worker_info.formatted_current_time
-    work_request.workerCapabilities.append(worker_info.worker_id)
-    for value in worker_info.capabilities:
-      work_request.workerCapabilities.append(value)
-    for value in worker_info.work_types:
-      work_request.workItemTypes.append(value)
-    request = dataflow.DataflowProjectsJobsWorkItemsLeaseRequest()
-    request.jobId = worker_info.job_id
-    request.projectId = worker_info.project_id
-    try:
-      request.leaseWorkItemRequest = work_request
-    except AttributeError:
-      request.lease_work_item_request = work_request
-    logging.debug('lease_work: %s', request)
-    response = self._client.projects_jobs_workItems.Lease(request)
-    logging.debug('lease_work: %s', response)
-    return response
-
-  def report_status(self,
-                    worker_info,
-                    desired_lease_duration,
-                    work_item,
-                    completed,
-                    progress,
-                    dynamic_split_result_to_report=None,
-                    source_operation_response=None,
-                    exception_details=None):
-    """Reports status for a work item (success or failure).
-
-    This is an integration point. The @retry decorator is used on callers
-    of this method defined in apache_beam/worker/worker.py because
-    there are different retry strategies for a completed versus in progress
-    work item.
-
-    Args:
-      worker_info: A batchworker.BatchWorkerInfo that contains
-        information about the Worker instance executing the work
-        item.
-      desired_lease_duration: The duration for which the worker would like to
-        extend the lease of the work item. Should be in seconds formatted as a
-        string.
-      work_item: The work item for which to report status.
-      completed: True if there is no further work to be done on this work item
-        either because it succeeded or because it failed. False if this is a
-        progress report.
-      progress: A SourceReaderProgress that gives the progress of worker
-        handling the work item.
-      dynamic_split_result_to_report: A successful dynamic split result that
-        should be sent to the Dataflow service along with the status report.
-      source_operation_response: Response to a source operation request from
-        the service. This will be sent to the service along with the status
-        report.
-      exception_details: A string representation of the stack trace for an
-        exception raised while executing the work item. The string is the
-        output of the standard traceback.format_exc() function.
-
-    Returns:
-      A protobuf containing the response from the service for the status
-      update (WorkItemServiceState).
-
-    Raises:
-      TypeError: if progress is of an unknown type
-      RuntimeError: if dynamic split request is of an unknown type.
-    """
-    work_item_status = dataflow.WorkItemStatus()
-    work_item_status.completed = completed
-
-    if not completed:
-      work_item_status.requestedLeaseDuration = desired_lease_duration
-
-    if progress is not None:
-      work_item_progress = dataflow.ApproximateProgress()
-      work_item_status.progress = work_item_progress
-
-      if progress.position is not None:
-        work_item_progress.position = (
-            reader_position_to_cloud_position(progress.position))
-      elif progress.percent_complete is not None:
-        work_item_progress.percentComplete = progress.percent_complete
-      elif progress.remaining_time is not None:
-        work_item_progress.remainingTime = progress.remaining_time
-      else:
-        raise TypeError('Unknown type of progress')
-
-    if dynamic_split_result_to_report is not None:
-      assert isinstance(dynamic_split_result_to_report,
-                        iobase.DynamicSplitResult)
-
-      if isinstance(dynamic_split_result_to_report,
-                    iobase.DynamicSplitResultWithPosition):
-        work_item_status.stopPosition = (
-            dynamic_split_result_with_position_to_cloud_stop_position(
-                dynamic_split_result_to_report))
-      else:
-        raise RuntimeError('Unknown type of dynamic split result.')
-
-    # The service keeps track of the report indexes in order to handle lost
-    # and duplicate message.
-    work_item_status.reportIndex = work_item.next_report_index
-    work_item_status.workItemId = str(work_item.proto.id)
-
-    # Add exception information if any.
-    if exception_details is not None:
-      status = dataflow.Status()
-      # TODO(silviuc): Replace Code.UNKNOWN with a generated definition.
-      status.code = 2
-      # TODO(silviuc): Attach the stack trace as exception details.
-      status.message = exception_details
-      work_item_status.errors.append(status)
-
-    if source_operation_response is not None:
-      work_item_status.sourceOperationResponse = source_operation_response
-
-    # Look through the work item for metrics to send.
-    if work_item.map_task:
-      for counter in work_item.map_task.itercounters():
-        append_counter(work_item_status, counter, tentative=not completed)
-
-    report_request = dataflow.ReportWorkItemStatusRequest()
-    report_request.currentWorkerTime = worker_info.formatted_current_time
-    report_request.workerId = worker_info.worker_id
-    report_request.workItemStatuses.append(work_item_status)
-
-    request = dataflow.DataflowProjectsJobsWorkItemsReportStatusRequest()
-    request.jobId = worker_info.job_id
-    request.projectId = worker_info.project_id
-    try:
-      request.reportWorkItemStatusRequest = report_request
-    except AttributeError:
-      request.report_work_item_status_request = report_request
-    logging.debug('report_status: %s', request)
-    response = self._client.projects_jobs_workItems.ReportStatus(request)
-    logging.debug('report_status: %s', response)
-    return response
-
-# Utility functions for translating cloud reader objects to corresponding SDK
-# reader objects and vice versa.
-
-
-def reader_progress_to_cloud_progress(reader_progress):
-  """Converts a given 'ReaderProgress' to corresponding cloud format."""
-
-  cloud_progress = dataflow.ApproximateProgress()
-  if reader_progress.position is not None:
-    cloud_progress.position = reader_position_to_cloud_position(
-        reader_progress.position)
-  if reader_progress.percent_complete is not None:
-    cloud_progress.percentComplete = reader_progress.percent_complete
-  if reader_progress.remaining_time is not None:
-    cloud_progress.remainingTime = reader_progress.remaining_time
-
-  return cloud_progress
-
-
-def reader_position_to_cloud_position(reader_position):
-  """Converts a given 'ReaderPosition' to corresponding cloud format."""
-
-  cloud_position = dataflow.Position()
-  if reader_position.end is not None:
-    cloud_position.end = reader_position.end
-  if reader_position.key is not None:
-    cloud_position.key = reader_position.key
-  if reader_position.byte_offset is not None:
-    cloud_position.byteOffset = reader_position.byte_offset
-  if reader_position.record_index is not None:
-    cloud_position.recordIndex = reader_position.record_index
-  if reader_position.shuffle_position is not None:
-    cloud_position.shufflePosition = reader_position.shuffle_position
-  if reader_position.concat_position is not None:
-    concat_position = dataflow.ConcatPosition()
-    concat_position.index = reader_position.concat_position.index
-    concat_position.position = reader_position_to_cloud_position(
-        reader_position.concat_position.position)
-    cloud_position.concatPosition = concat_position
-
-  return cloud_position
-
-
-def dynamic_split_result_with_position_to_cloud_stop_position(split_result):
-  """Converts a given 'DynamicSplitResultWithPosition' to cloud format."""
-
-  return reader_position_to_cloud_position(split_result.stop_position)
-
-
-def cloud_progress_to_reader_progress(cloud_progress):
-  reader_position = None
-  if cloud_progress.position is not None:
-    reader_position = cloud_position_to_reader_position(cloud_progress.position)
-  return iobase.ReaderProgress(reader_position, cloud_progress.percentComplete,
-                               cloud_progress.remainingTime)
-
-
-def cloud_position_to_reader_position(cloud_position):
-  concat_position = None
-  if cloud_position.concatPosition is not None:
-    inner_position = cloud_position_to_reader_position(
-        cloud_position.concatPosition.position)
-    concat_position = iobase.ConcatPosition(cloud_position.index,
-                                            inner_position)
-
-  return iobase.ReaderPosition(cloud_position.end, cloud_position.key,
-                               cloud_position.byteOffset,
-                               cloud_position.recordIndex,
-                               cloud_position.shufflePosition, concat_position)
-
-
-def approximate_progress_to_dynamic_split_request(approximate_progress):
-  return iobase.DynamicSplitRequest(cloud_progress_to_reader_progress(
-      approximate_progress))
-
-
 def set_scalar(accumulator, metric_update):
   metric_update.scalar = to_json_value(accumulator.value, with_type=True)
 
@@ -875,44 +569,3 @@ metric_translations = {
     cy_combiners.AllCombineFn: ('and', set_scalar),
     cy_combiners.AnyCombineFn: ('or', set_scalar),
 }
-
-
-def splits_to_split_response(bundles):
-  """Generates a response to a custom source split request.
-
-  Args:
-    bundles: a set of bundles generated by a BoundedSource.split() invocation.
-  Returns:
-   a SourceOperationResponse object.
-  """
-  derived_sources = []
-  for bundle in bundles:
-    derived_source = dataflow.DerivedSource()
-    derived_source.derivationMode = (
-        dataflow.DerivedSource.DerivationModeValueValuesEnum
-        .SOURCE_DERIVATION_MODE_INDEPENDENT)
-    derived_source.source = dataflow.Source()
-    derived_source.source.doesNotNeedSplitting = True
-
-    derived_source.source.spec = dataflow.Source.SpecValue()
-    derived_source.source.spec.additionalProperties.append(
-        dataflow.Source.SpecValue.AdditionalProperty(
-            key=names.SERIALIZED_SOURCE_KEY,
-            value=to_json_value(pickler.dumps(
-                (bundle.source, bundle.start_position, bundle.stop_position)),
-                                with_type=True)))
-    derived_source.source.spec.additionalProperties.append(
-        dataflow.Source.SpecValue.AdditionalProperty(key='@type',
-                                                     value=to_json_value(
-                                                         names.SOURCE_TYPE)))
-    derived_sources.append(derived_source)
-
-  split_response = dataflow.SourceSplitResponse()
-  split_response.bundles = derived_sources
-  split_response.outcome = (
-      dataflow.SourceSplitResponse.OutcomeValueValuesEnum
-      .SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED)
-
-  response = dataflow.SourceOperationResponse()
-  response.split = split_response
-  return response

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d50d804/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 68ad842..8fddae7 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -18,96 +18,18 @@
 
 import unittest
 
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
 from apache_beam.internal import apiclient
-from apache_beam.io import iobase
-
-import apache_beam.internal.clients.dataflow as dataflow
 
 
 class UtilTest(unittest.TestCase):
 
-  def test_reader_progress_to_cloud_progress_position(self):
-    reader_position = iobase.ReaderPosition(byte_offset=9999)
-    reader_progress = iobase.ReaderProgress(position=reader_position)
-
-    cloud_progress = apiclient.reader_progress_to_cloud_progress(
-        reader_progress)
-    self.assertIsNotNone(cloud_progress)
-    self.assertIsInstance(cloud_progress, dataflow.ApproximateProgress)
-    self.assertIsNotNone(cloud_progress.position)
-    self.assertIsInstance(cloud_progress.position, dataflow.Position)
-    self.assertEquals(9999, cloud_progress.position.byteOffset)
-
-  def test_reader_progress_to_cloud_progress_percent_complete(self):
-    reader_progress = iobase.ReaderProgress(percent_complete=0.123)
-
-    cloud_progress = apiclient.reader_progress_to_cloud_progress(
-        reader_progress)
-    self.assertIsNotNone(cloud_progress)
-    self.assertIsInstance(cloud_progress, dataflow.ApproximateProgress)
-    self.assertIsNotNone(cloud_progress.percentComplete)
-    self.assertEquals(0.123, cloud_progress.percentComplete)
-
-  def test_reader_position_to_cloud_position(self):
-    reader_position = iobase.ReaderPosition(byte_offset=9999)
-
-    cloud_position = apiclient.reader_position_to_cloud_position(
-        reader_position)
-    self.assertIsNotNone(cloud_position)
-
-  def test_dynamic_split_result_with_position_to_cloud_stop_position(self):
-    position = iobase.ReaderPosition(byte_offset=9999)
-    dynamic_split_result = iobase.DynamicSplitResultWithPosition(position)
-
-    approximate_position = (
-        apiclient.dynamic_split_result_with_position_to_cloud_stop_position(
-            dynamic_split_result))
-    self.assertIsNotNone(approximate_position)
-    self.assertIsInstance(approximate_position, dataflow.Position)
-    self.assertEqual(9999, approximate_position.byteOffset)
-
-  def test_cloud_progress_to_reader_progress_index_position(self):
-    cloud_progress = dataflow.ApproximateProgress()
-    cloud_progress.position = dataflow.Position()
-    cloud_progress.position.byteOffset = 9999
-
-    reader_progress = apiclient.cloud_progress_to_reader_progress(
-        cloud_progress)
-    self.assertIsNotNone(reader_progress.position)
-    self.assertIsInstance(reader_progress.position, iobase.ReaderPosition)
-    self.assertEqual(9999, reader_progress.position.byte_offset)
-
-  def test_cloud_progress_to_reader_progress_percent_complete(self):
-    cloud_progress = dataflow.ApproximateProgress()
-    cloud_progress.percentComplete = 0.123
-
-    reader_progress = apiclient.cloud_progress_to_reader_progress(
-        cloud_progress)
-    self.assertIsNotNone(reader_progress.percent_complete)
-    self.assertEqual(0.123, reader_progress.percent_complete)
-
-  def test_cloud_position_to_reader_position_byte_offset(self):
-    cloud_position = dataflow.Position()
-    cloud_position.byteOffset = 9999
-
-    reader_position = apiclient.cloud_position_to_reader_position(
-        cloud_position)
-    self.assertIsNotNone(reader_position)
-    self.assertIsInstance(reader_position, iobase.ReaderPosition)
-    self.assertEqual(9999, reader_position.byte_offset)
-
-  def test_approximate_progress_to_dynamic_split_request(self):
-    approximate_progress = dataflow.ApproximateProgress()
-    approximate_progress.percentComplete = 0.123
-
-    dynamic_split_request = (
-        apiclient.approximate_progress_to_dynamic_split_request(
-            approximate_progress))
-    self.assertIsNotNone(dynamic_split_request)
-    self.assertIsInstance(dynamic_split_request.progress, iobase.ReaderProgress)
-    self.assertIsNotNone(dynamic_split_request.progress.percent_complete)
-    self.assertEqual(dynamic_split_request.progress.percent_complete, 0.123)
-
+  def test_create_application_client(self):
+    pipeline_options = PipelineOptions()
+    apiclient.DataflowApplicationClient(
+        pipeline_options,
+        DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
 
 if __name__ == '__main__':
   unittest.main()


[2/2] incubator-beam git commit: Closes #763

Posted by dh...@apache.org.
Closes #763


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa302a3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa302a3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa302a3b

Branch: refs/heads/python-sdk
Commit: fa302a3be3171376b80e47da95c2340119ef43a8
Parents: 5541c03 7d50d80
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 18:24:03 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 29 18:24:03 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   | 347 -------------------
 .../apache_beam/internal/apiclient_test.py      |  92 +----
 2 files changed, 7 insertions(+), 432 deletions(-)
----------------------------------------------------------------------