You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/23 00:07:12 UTC
[2/3] beam git commit: Remove unused (and untested) initial
splittling logic.
Remove unused (and untested) initial splittling logic.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a882e8f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a882e8f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a882e8f3
Branch: refs/heads/master
Commit: a882e8f3a33c4a430f55d53b65285123c5a4f50d
Parents: 5d6ad19
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 12:46:13 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 17:05:32 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 1 -
.../apache_beam/runners/worker/sdk_worker.py | 51 -------------
.../runners/worker/sdk_worker_test.py | 77 --------------------
3 files changed, 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index b45ff76..a8e2eb4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -19,7 +19,6 @@
"""
import base64
import collections
-import json
import logging
import Queue as queue
import threading
http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d135984..6a366eb 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -28,9 +28,7 @@ import logging
import Queue as queue
import threading
import traceback
-import zlib
-import dill
from google.protobuf import wrappers_pb2
from apache_beam.coders import coder_impl
@@ -165,37 +163,6 @@ class SideInputSource(native_iobase.NativeSource,
yield self._coder.get_impl().decode_from_stream(input_stream, True)
-def unpack_and_deserialize_py_fn(function_spec):
- """Returns unpacked and deserialized object from function spec proto."""
- return pickler.loads(unpack_function_spec_data(function_spec))
-
-
-def unpack_function_spec_data(function_spec):
- """Returns unpacked data from function spec proto."""
- data = wrappers_pb2.BytesValue()
- function_spec.data.Unpack(data)
- return data.value
-
-
-# pylint: disable=redefined-builtin
-def serialize_and_pack_py_fn(fn, urn, id=None):
- """Returns serialized and packed function in a function spec proto."""
- return pack_function_spec_data(pickler.dumps(fn), urn, id)
-# pylint: enable=redefined-builtin
-
-
-# pylint: disable=redefined-builtin
-def pack_function_spec_data(value, urn, id=None):
- """Returns packed data in a function spec proto."""
- data = wrappers_pb2.BytesValue(value=value)
- fn_proto = beam_fn_api_pb2.FunctionSpec(urn=urn)
- fn_proto.data.Pack(data)
- if id:
- fn_proto.id = id
- return fn_proto
-# pylint: enable=redefined-builtin
-
-
def memoize(func):
cache = {}
missing = object()
@@ -286,24 +253,6 @@ class SdkWorker(object):
self.fns[p_transform.function_spec.id] = p_transform.function_spec
return beam_fn_api_pb2.RegisterResponse()
- def initial_source_split(self, request, unused_instruction_id=None):
- source_spec = self.fns[request.source_reference]
- assert source_spec.urn == PYTHON_SOURCE_URN
- source_bundle = unpack_and_deserialize_py_fn(
- self.fns[request.source_reference])
- splits = source_bundle.source.split(request.desired_bundle_size_bytes,
- source_bundle.start_position,
- source_bundle.stop_position)
- response = beam_fn_api_pb2.InitialSourceSplitResponse()
- response.splits.extend([
- beam_fn_api_pb2.SourceSplit(
- source=serialize_and_pack_py_fn(split, PYTHON_SOURCE_URN),
- relative_size=split.weight,
- )
- for split in splits
- ])
- return response
-
def create_execution_tree(self, descriptor):
# TODO(robertwb): Figure out the correct prefix to use for output counters
# from StateSampler.
http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index c431bcd..553d5b8 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -27,10 +27,7 @@ import unittest
from concurrent import futures
import grpc
-from apache_beam.io.concat_source_test import RangeSource
-from apache_beam.io.iobase import SourceBundle
from apache_beam.portability.api import beam_fn_api_pb2
-from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import sdk_worker
@@ -88,80 +85,6 @@ class SdkWorkerTest(unittest.TestCase):
harness.worker.fns,
{item.id: item for item in fns + process_bundle_descriptors})
- @unittest.skip("initial splitting not in proto")
- def test_source_split(self):
- source = RangeSource(0, 100)
- expected_splits = list(source.split(30))
-
- worker = sdk_harness.SdkWorker(
- None, data_plane.GrpcClientDataChannelFactory())
- worker.register(
- beam_fn_api_pb2.RegisterRequest(
- process_bundle_descriptor=[beam_fn_api_pb2.ProcessBundleDescriptor(
- primitive_transform=[beam_fn_api_pb2.PrimitiveTransform(
- function_spec=sdk_harness.serialize_and_pack_py_fn(
- SourceBundle(1.0, source, None, None),
- sdk_harness.PYTHON_SOURCE_URN,
- id="src"))])]))
- split_response = worker.initial_source_split(
- beam_fn_api_pb2.InitialSourceSplitRequest(
- desired_bundle_size_bytes=30,
- source_reference="src"))
-
- self.assertEqual(
- expected_splits,
- [sdk_harness.unpack_and_deserialize_py_fn(s.source)
- for s in split_response.splits])
-
- self.assertEqual(
- [s.weight for s in expected_splits],
- [s.relative_size for s in split_response.splits])
-
- @unittest.skip("initial splitting not in proto")
- def test_source_split_via_instruction(self):
-
- source = RangeSource(0, 100)
- expected_splits = list(source.split(30))
-
- test_controller = BeamFnControlServicer([
- beam_fn_api_pb2.InstructionRequest(
- instruction_id="register_request",
- register=beam_fn_api_pb2.RegisterRequest(
- process_bundle_descriptor=[
- beam_fn_api_pb2.ProcessBundleDescriptor(
- primitive_transform=[beam_fn_api_pb2.PrimitiveTransform(
- function_spec=sdk_harness.serialize_and_pack_py_fn(
- SourceBundle(1.0, source, None, None),
- sdk_harness.PYTHON_SOURCE_URN,
- id="src"))])])),
- beam_fn_api_pb2.InstructionRequest(
- instruction_id="split_request",
- initial_source_split=beam_fn_api_pb2.InitialSourceSplitRequest(
- desired_bundle_size_bytes=30,
- source_reference="src"))
- ])
-
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- beam_fn_api_pb2.add_BeamFnControlServicer_to_server(test_controller, server)
- test_port = server.add_insecure_port("[::]:0")
- server.start()
-
- channel = grpc.insecure_channel("localhost:%s" % test_port)
- harness = sdk_harness.SdkHarness(channel)
- harness.run()
-
- split_response = test_controller.responses[
- "split_request"].initial_source_split
-
- self.assertEqual(
- expected_splits,
- [sdk_harness.unpack_and_deserialize_py_fn(s.source)
- for s in split_response.splits])
-
- self.assertEqual(
- [s.weight for s in expected_splits],
- [s.relative_size for s in split_response.splits])
-
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)