You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/30 01:24:14 UTC
[1/2] incubator-beam git commit: Deletes some code that is not used
by SDK.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 5541c0305 -> fa302a3be
Deletes some code that is not used by SDK.
Also deletes corresponding tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d50d804
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d50d804
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d50d804
Branch: refs/heads/python-sdk
Commit: 7d50d8040585e0cea5bc02de4cb199f29c1472fc
Parents: 5541c03
Author: Chamikara Jayalath <ch...@google.com>
Authored: Fri Jul 29 15:40:39 2016 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Fri Jul 29 15:40:39 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 347 -------------------
.../apache_beam/internal/apiclient_test.py | 92 +----
2 files changed, 7 insertions(+), 432 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d50d804/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 137a40b..bc4a4e0 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -29,13 +29,10 @@ from apitools.base.py import encoding
from apitools.base.py import exceptions
from apache_beam import utils
-from apache_beam.internal import pickler
from apache_beam.internal.auth import get_service_credentials
from apache_beam.internal.json_value import to_json_value
-from apache_beam.io import iobase
from apache_beam.transforms import cy_combiners
from apache_beam.utils import dependency
-from apache_beam.utils import names
from apache_beam.utils import retry
from apache_beam.utils.dependency import get_required_container_version
from apache_beam.utils.dependency import get_sdk_name_and_version
@@ -53,77 +50,6 @@ COMPUTE_API_SERVICE = 'compute.googleapis.com'
STORAGE_API_SERVICE = 'storage.googleapis.com'
-def append_counter(status_object, counter, tentative):
- """Appends a counter to the status.
-
- Args:
- status_object: a work_item_status to which to add this counter
- counter: a counters.Counter object to append
- tentative: whether the value should be reported as tentative
- """
- logging.debug('Appending counter%s %s',
- ' (tentative)' if tentative else '',
- counter)
- kind, setter = metric_translations[counter.combine_fn.__class__]
- append_metric(
- status_object, counter.name, kind, counter.accumulator,
- setter, tentative=tentative)
-
-
-def append_metric(status_object, metric_name, kind, value, setter=None,
- step=None, output_user_name=None, tentative=False,
- worker_id=None, cumulative=True):
- """Creates and adds a MetricUpdate field to the passed-in protobuf.
-
- Args:
- status_object: a work_item_status to which to add this metric
- metric_name: a string naming this metric
- kind: dataflow counter kind (e.g. 'sum')
- value: accumulator value to encode
- setter: if not None, a lambda to use to update metric_update with value
- step: the name of the associated step
- output_user_name: the user-visible name to use
- tentative: whether this should be labeled as a tentative metric
- worker_id: the id of this worker. Specifying a worker_id also
- causes this to be encoded as a metric, not a counter.
- cumulative: Whether this metric is cumulative, default True.
- Set to False for a delta value.
- """
- # Does this look like a counter or like a metric?
- is_counter = not worker_id
-
- metric_update = dataflow.MetricUpdate()
- metric_update.name = dataflow.MetricStructuredName()
- metric_update.name.name = metric_name
- # Handle attributes stored in the name context
- if step or output_user_name or tentative or worker_id:
- metric_update.name.context = dataflow.MetricStructuredName.ContextValue()
-
- def append_to_context(key, value):
- metric_update.name.context.additionalProperties.append(
- dataflow.MetricStructuredName.ContextValue.AdditionalProperty(
- key=key, value=value))
- if step:
- append_to_context('step', step)
- if output_user_name:
- append_to_context('output_user_name', output_user_name)
- if tentative:
- append_to_context('tentative', 'true')
- if worker_id:
- append_to_context('workerId', worker_id)
- if cumulative and is_counter:
- metric_update.cumulative = cumulative
- if is_counter:
- # Counters are distinguished by having a kind; metrics do not.
- metric_update.kind = kind
- if setter:
- setter(value, metric_update)
- else:
- metric_update.scalar = to_json_value(value, with_type=True)
- logging.debug('Appending metric_update: %s', metric_update)
- status_object.metricUpdates.append(metric_update)
-
-
class Step(object):
"""Wrapper for a dataflow Step protobuf."""
@@ -615,238 +541,6 @@ class DataflowApplicationClient(object):
return response.jobMessages, response.nextPageToken
-class DataflowWorkerClient(object):
- """A Dataflow API client used by worker code to lease work items."""
-
- def __init__(self, worker, skip_get_credentials=False):
- """Initializes a Dataflow API client object with worker functionality.
-
- Args:
- worker: A Worker instance.
- skip_get_credentials: If true disables credentials loading logic.
- """
- self._client = (
- dataflow.DataflowV1b3(
- url=worker.service_path,
- get_credentials=(not skip_get_credentials)))
-
- @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
- def lease_work(self, worker_info, desired_lease_duration):
- """Leases a work item from the service."""
- work_request = dataflow.LeaseWorkItemRequest()
- work_request.workerId = worker_info.worker_id
- work_request.requestedLeaseDuration = desired_lease_duration
- work_request.currentWorkerTime = worker_info.formatted_current_time
- work_request.workerCapabilities.append(worker_info.worker_id)
- for value in worker_info.capabilities:
- work_request.workerCapabilities.append(value)
- for value in worker_info.work_types:
- work_request.workItemTypes.append(value)
- request = dataflow.DataflowProjectsJobsWorkItemsLeaseRequest()
- request.jobId = worker_info.job_id
- request.projectId = worker_info.project_id
- try:
- request.leaseWorkItemRequest = work_request
- except AttributeError:
- request.lease_work_item_request = work_request
- logging.debug('lease_work: %s', request)
- response = self._client.projects_jobs_workItems.Lease(request)
- logging.debug('lease_work: %s', response)
- return response
-
- def report_status(self,
- worker_info,
- desired_lease_duration,
- work_item,
- completed,
- progress,
- dynamic_split_result_to_report=None,
- source_operation_response=None,
- exception_details=None):
- """Reports status for a work item (success or failure).
-
- This is an integration point. The @retry decorator is used on callers
- of this method defined in apache_beam/worker/worker.py because
- there are different retry strategies for a completed versus in progress
- work item.
-
- Args:
- worker_info: A batchworker.BatchWorkerInfo that contains
- information about the Worker instance executing the work
- item.
- desired_lease_duration: The duration for which the worker would like to
- extend the lease of the work item. Should be in seconds formatted as a
- string.
- work_item: The work item for which to report status.
- completed: True if there is no further work to be done on this work item
- either because it succeeded or because it failed. False if this is a
- progress report.
- progress: A SourceReaderProgress that gives the progress of worker
- handling the work item.
- dynamic_split_result_to_report: A successful dynamic split result that
- should be sent to the Dataflow service along with the status report.
- source_operation_response: Response to a source operation request from
- the service. This will be sent to the service along with the status
- report.
- exception_details: A string representation of the stack trace for an
- exception raised while executing the work item. The string is the
- output of the standard traceback.format_exc() function.
-
- Returns:
- A protobuf containing the response from the service for the status
- update (WorkItemServiceState).
-
- Raises:
- TypeError: if progress is of an unknown type
- RuntimeError: if dynamic split request is of an unknown type.
- """
- work_item_status = dataflow.WorkItemStatus()
- work_item_status.completed = completed
-
- if not completed:
- work_item_status.requestedLeaseDuration = desired_lease_duration
-
- if progress is not None:
- work_item_progress = dataflow.ApproximateProgress()
- work_item_status.progress = work_item_progress
-
- if progress.position is not None:
- work_item_progress.position = (
- reader_position_to_cloud_position(progress.position))
- elif progress.percent_complete is not None:
- work_item_progress.percentComplete = progress.percent_complete
- elif progress.remaining_time is not None:
- work_item_progress.remainingTime = progress.remaining_time
- else:
- raise TypeError('Unknown type of progress')
-
- if dynamic_split_result_to_report is not None:
- assert isinstance(dynamic_split_result_to_report,
- iobase.DynamicSplitResult)
-
- if isinstance(dynamic_split_result_to_report,
- iobase.DynamicSplitResultWithPosition):
- work_item_status.stopPosition = (
- dynamic_split_result_with_position_to_cloud_stop_position(
- dynamic_split_result_to_report))
- else:
- raise RuntimeError('Unknown type of dynamic split result.')
-
- # The service keeps track of the report indexes in order to handle lost
- # and duplicate message.
- work_item_status.reportIndex = work_item.next_report_index
- work_item_status.workItemId = str(work_item.proto.id)
-
- # Add exception information if any.
- if exception_details is not None:
- status = dataflow.Status()
- # TODO(silviuc): Replace Code.UNKNOWN with a generated definition.
- status.code = 2
- # TODO(silviuc): Attach the stack trace as exception details.
- status.message = exception_details
- work_item_status.errors.append(status)
-
- if source_operation_response is not None:
- work_item_status.sourceOperationResponse = source_operation_response
-
- # Look through the work item for metrics to send.
- if work_item.map_task:
- for counter in work_item.map_task.itercounters():
- append_counter(work_item_status, counter, tentative=not completed)
-
- report_request = dataflow.ReportWorkItemStatusRequest()
- report_request.currentWorkerTime = worker_info.formatted_current_time
- report_request.workerId = worker_info.worker_id
- report_request.workItemStatuses.append(work_item_status)
-
- request = dataflow.DataflowProjectsJobsWorkItemsReportStatusRequest()
- request.jobId = worker_info.job_id
- request.projectId = worker_info.project_id
- try:
- request.reportWorkItemStatusRequest = report_request
- except AttributeError:
- request.report_work_item_status_request = report_request
- logging.debug('report_status: %s', request)
- response = self._client.projects_jobs_workItems.ReportStatus(request)
- logging.debug('report_status: %s', response)
- return response
-
-# Utility functions for translating cloud reader objects to corresponding SDK
-# reader objects and vice versa.
-
-
-def reader_progress_to_cloud_progress(reader_progress):
- """Converts a given 'ReaderProgress' to corresponding cloud format."""
-
- cloud_progress = dataflow.ApproximateProgress()
- if reader_progress.position is not None:
- cloud_progress.position = reader_position_to_cloud_position(
- reader_progress.position)
- if reader_progress.percent_complete is not None:
- cloud_progress.percentComplete = reader_progress.percent_complete
- if reader_progress.remaining_time is not None:
- cloud_progress.remainingTime = reader_progress.remaining_time
-
- return cloud_progress
-
-
-def reader_position_to_cloud_position(reader_position):
- """Converts a given 'ReaderPosition' to corresponding cloud format."""
-
- cloud_position = dataflow.Position()
- if reader_position.end is not None:
- cloud_position.end = reader_position.end
- if reader_position.key is not None:
- cloud_position.key = reader_position.key
- if reader_position.byte_offset is not None:
- cloud_position.byteOffset = reader_position.byte_offset
- if reader_position.record_index is not None:
- cloud_position.recordIndex = reader_position.record_index
- if reader_position.shuffle_position is not None:
- cloud_position.shufflePosition = reader_position.shuffle_position
- if reader_position.concat_position is not None:
- concat_position = dataflow.ConcatPosition()
- concat_position.index = reader_position.concat_position.index
- concat_position.position = reader_position_to_cloud_position(
- reader_position.concat_position.position)
- cloud_position.concatPosition = concat_position
-
- return cloud_position
-
-
-def dynamic_split_result_with_position_to_cloud_stop_position(split_result):
- """Converts a given 'DynamicSplitResultWithPosition' to cloud format."""
-
- return reader_position_to_cloud_position(split_result.stop_position)
-
-
-def cloud_progress_to_reader_progress(cloud_progress):
- reader_position = None
- if cloud_progress.position is not None:
- reader_position = cloud_position_to_reader_position(cloud_progress.position)
- return iobase.ReaderProgress(reader_position, cloud_progress.percentComplete,
- cloud_progress.remainingTime)
-
-
-def cloud_position_to_reader_position(cloud_position):
- concat_position = None
- if cloud_position.concatPosition is not None:
- inner_position = cloud_position_to_reader_position(
- cloud_position.concatPosition.position)
- concat_position = iobase.ConcatPosition(cloud_position.index,
- inner_position)
-
- return iobase.ReaderPosition(cloud_position.end, cloud_position.key,
- cloud_position.byteOffset,
- cloud_position.recordIndex,
- cloud_position.shufflePosition, concat_position)
-
-
-def approximate_progress_to_dynamic_split_request(approximate_progress):
- return iobase.DynamicSplitRequest(cloud_progress_to_reader_progress(
- approximate_progress))
-
-
def set_scalar(accumulator, metric_update):
metric_update.scalar = to_json_value(accumulator.value, with_type=True)
@@ -875,44 +569,3 @@ metric_translations = {
cy_combiners.AllCombineFn: ('and', set_scalar),
cy_combiners.AnyCombineFn: ('or', set_scalar),
}
-
-
-def splits_to_split_response(bundles):
- """Generates a response to a custom source split request.
-
- Args:
- bundles: a set of bundles generated by a BoundedSource.split() invocation.
- Returns:
- a SourceOperationResponse object.
- """
- derived_sources = []
- for bundle in bundles:
- derived_source = dataflow.DerivedSource()
- derived_source.derivationMode = (
- dataflow.DerivedSource.DerivationModeValueValuesEnum
- .SOURCE_DERIVATION_MODE_INDEPENDENT)
- derived_source.source = dataflow.Source()
- derived_source.source.doesNotNeedSplitting = True
-
- derived_source.source.spec = dataflow.Source.SpecValue()
- derived_source.source.spec.additionalProperties.append(
- dataflow.Source.SpecValue.AdditionalProperty(
- key=names.SERIALIZED_SOURCE_KEY,
- value=to_json_value(pickler.dumps(
- (bundle.source, bundle.start_position, bundle.stop_position)),
- with_type=True)))
- derived_source.source.spec.additionalProperties.append(
- dataflow.Source.SpecValue.AdditionalProperty(key='@type',
- value=to_json_value(
- names.SOURCE_TYPE)))
- derived_sources.append(derived_source)
-
- split_response = dataflow.SourceSplitResponse()
- split_response.bundles = derived_sources
- split_response.outcome = (
- dataflow.SourceSplitResponse.OutcomeValueValuesEnum
- .SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED)
-
- response = dataflow.SourceOperationResponse()
- response.split = split_response
- return response
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d50d804/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 68ad842..8fddae7 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -18,96 +18,18 @@
import unittest
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
from apache_beam.internal import apiclient
-from apache_beam.io import iobase
-
-import apache_beam.internal.clients.dataflow as dataflow
class UtilTest(unittest.TestCase):
- def test_reader_progress_to_cloud_progress_position(self):
- reader_position = iobase.ReaderPosition(byte_offset=9999)
- reader_progress = iobase.ReaderProgress(position=reader_position)
-
- cloud_progress = apiclient.reader_progress_to_cloud_progress(
- reader_progress)
- self.assertIsNotNone(cloud_progress)
- self.assertIsInstance(cloud_progress, dataflow.ApproximateProgress)
- self.assertIsNotNone(cloud_progress.position)
- self.assertIsInstance(cloud_progress.position, dataflow.Position)
- self.assertEquals(9999, cloud_progress.position.byteOffset)
-
- def test_reader_progress_to_cloud_progress_percent_complete(self):
- reader_progress = iobase.ReaderProgress(percent_complete=0.123)
-
- cloud_progress = apiclient.reader_progress_to_cloud_progress(
- reader_progress)
- self.assertIsNotNone(cloud_progress)
- self.assertIsInstance(cloud_progress, dataflow.ApproximateProgress)
- self.assertIsNotNone(cloud_progress.percentComplete)
- self.assertEquals(0.123, cloud_progress.percentComplete)
-
- def test_reader_position_to_cloud_position(self):
- reader_position = iobase.ReaderPosition(byte_offset=9999)
-
- cloud_position = apiclient.reader_position_to_cloud_position(
- reader_position)
- self.assertIsNotNone(cloud_position)
-
- def test_dynamic_split_result_with_position_to_cloud_stop_position(self):
- position = iobase.ReaderPosition(byte_offset=9999)
- dynamic_split_result = iobase.DynamicSplitResultWithPosition(position)
-
- approximate_position = (
- apiclient.dynamic_split_result_with_position_to_cloud_stop_position(
- dynamic_split_result))
- self.assertIsNotNone(approximate_position)
- self.assertIsInstance(approximate_position, dataflow.Position)
- self.assertEqual(9999, approximate_position.byteOffset)
-
- def test_cloud_progress_to_reader_progress_index_position(self):
- cloud_progress = dataflow.ApproximateProgress()
- cloud_progress.position = dataflow.Position()
- cloud_progress.position.byteOffset = 9999
-
- reader_progress = apiclient.cloud_progress_to_reader_progress(
- cloud_progress)
- self.assertIsNotNone(reader_progress.position)
- self.assertIsInstance(reader_progress.position, iobase.ReaderPosition)
- self.assertEqual(9999, reader_progress.position.byte_offset)
-
- def test_cloud_progress_to_reader_progress_percent_complete(self):
- cloud_progress = dataflow.ApproximateProgress()
- cloud_progress.percentComplete = 0.123
-
- reader_progress = apiclient.cloud_progress_to_reader_progress(
- cloud_progress)
- self.assertIsNotNone(reader_progress.percent_complete)
- self.assertEqual(0.123, reader_progress.percent_complete)
-
- def test_cloud_position_to_reader_position_byte_offset(self):
- cloud_position = dataflow.Position()
- cloud_position.byteOffset = 9999
-
- reader_position = apiclient.cloud_position_to_reader_position(
- cloud_position)
- self.assertIsNotNone(reader_position)
- self.assertIsInstance(reader_position, iobase.ReaderPosition)
- self.assertEqual(9999, reader_position.byte_offset)
-
- def test_approximate_progress_to_dynamic_split_request(self):
- approximate_progress = dataflow.ApproximateProgress()
- approximate_progress.percentComplete = 0.123
-
- dynamic_split_request = (
- apiclient.approximate_progress_to_dynamic_split_request(
- approximate_progress))
- self.assertIsNotNone(dynamic_split_request)
- self.assertIsInstance(dynamic_split_request.progress, iobase.ReaderProgress)
- self.assertIsNotNone(dynamic_split_request.progress.percent_complete)
- self.assertEqual(dynamic_split_request.progress.percent_complete, 0.123)
-
+ def test_create_application_client(self):
+ pipeline_options = PipelineOptions()
+ apiclient.DataflowApplicationClient(
+ pipeline_options,
+ DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
if __name__ == '__main__':
unittest.main()
[2/2] incubator-beam git commit: Closes #763
Posted by dh...@apache.org.
Closes #763
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa302a3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa302a3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa302a3b
Branch: refs/heads/python-sdk
Commit: fa302a3be3171376b80e47da95c2340119ef43a8
Parents: 5541c03 7d50d80
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 18:24:03 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 29 18:24:03 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/internal/apiclient.py | 347 -------------------
.../apache_beam/internal/apiclient_test.py | 92 +----
2 files changed, 7 insertions(+), 432 deletions(-)
----------------------------------------------------------------------