You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/23 00:07:12 UTC

[2/3] beam git commit: Remove unused (and untested) initial splittling logic.

Remove unused (and untested) initial splittling logic.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a882e8f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a882e8f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a882e8f3

Branch: refs/heads/master
Commit: a882e8f3a33c4a430f55d53b65285123c5a4f50d
Parents: 5d6ad19
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 12:46:13 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 17:05:32 2017 -0700

----------------------------------------------------------------------
 .../runners/portability/fn_api_runner.py        |  1 -
 .../apache_beam/runners/worker/sdk_worker.py    | 51 -------------
 .../runners/worker/sdk_worker_test.py           | 77 --------------------
 3 files changed, 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index b45ff76..a8e2eb4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -19,7 +19,6 @@
 """
 import base64
 import collections
-import json
 import logging
 import Queue as queue
 import threading

http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d135984..6a366eb 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -28,9 +28,7 @@ import logging
 import Queue as queue
 import threading
 import traceback
-import zlib
 
-import dill
 from google.protobuf import wrappers_pb2
 
 from apache_beam.coders import coder_impl
@@ -165,37 +163,6 @@ class SideInputSource(native_iobase.NativeSource,
       yield self._coder.get_impl().decode_from_stream(input_stream, True)
 
 
-def unpack_and_deserialize_py_fn(function_spec):
-  """Returns unpacked and deserialized object from function spec proto."""
-  return pickler.loads(unpack_function_spec_data(function_spec))
-
-
-def unpack_function_spec_data(function_spec):
-  """Returns unpacked data from function spec proto."""
-  data = wrappers_pb2.BytesValue()
-  function_spec.data.Unpack(data)
-  return data.value
-
-
-# pylint: disable=redefined-builtin
-def serialize_and_pack_py_fn(fn, urn, id=None):
-  """Returns serialized and packed function in a function spec proto."""
-  return pack_function_spec_data(pickler.dumps(fn), urn, id)
-# pylint: enable=redefined-builtin
-
-
-# pylint: disable=redefined-builtin
-def pack_function_spec_data(value, urn, id=None):
-  """Returns packed data in a function spec proto."""
-  data = wrappers_pb2.BytesValue(value=value)
-  fn_proto = beam_fn_api_pb2.FunctionSpec(urn=urn)
-  fn_proto.data.Pack(data)
-  if id:
-    fn_proto.id = id
-  return fn_proto
-# pylint: enable=redefined-builtin
-
-
 def memoize(func):
   cache = {}
   missing = object()
@@ -286,24 +253,6 @@ class SdkWorker(object):
         self.fns[p_transform.function_spec.id] = p_transform.function_spec
     return beam_fn_api_pb2.RegisterResponse()
 
-  def initial_source_split(self, request, unused_instruction_id=None):
-    source_spec = self.fns[request.source_reference]
-    assert source_spec.urn == PYTHON_SOURCE_URN
-    source_bundle = unpack_and_deserialize_py_fn(
-        self.fns[request.source_reference])
-    splits = source_bundle.source.split(request.desired_bundle_size_bytes,
-                                        source_bundle.start_position,
-                                        source_bundle.stop_position)
-    response = beam_fn_api_pb2.InitialSourceSplitResponse()
-    response.splits.extend([
-        beam_fn_api_pb2.SourceSplit(
-            source=serialize_and_pack_py_fn(split, PYTHON_SOURCE_URN),
-            relative_size=split.weight,
-        )
-        for split in splits
-    ])
-    return response
-
   def create_execution_tree(self, descriptor):
     # TODO(robertwb): Figure out the correct prefix to use for output counters
     # from StateSampler.

http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index c431bcd..553d5b8 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -27,10 +27,7 @@ import unittest
 from concurrent import futures
 import grpc
 
-from apache_beam.io.concat_source_test import RangeSource
-from apache_beam.io.iobase import SourceBundle
 from apache_beam.portability.api import beam_fn_api_pb2
-from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import sdk_worker
 
 
@@ -88,80 +85,6 @@ class SdkWorkerTest(unittest.TestCase):
         harness.worker.fns,
         {item.id: item for item in fns + process_bundle_descriptors})
 
-  @unittest.skip("initial splitting not in proto")
-  def test_source_split(self):
-    source = RangeSource(0, 100)
-    expected_splits = list(source.split(30))
-
-    worker = sdk_harness.SdkWorker(
-        None, data_plane.GrpcClientDataChannelFactory())
-    worker.register(
-        beam_fn_api_pb2.RegisterRequest(
-            process_bundle_descriptor=[beam_fn_api_pb2.ProcessBundleDescriptor(
-                primitive_transform=[beam_fn_api_pb2.PrimitiveTransform(
-                    function_spec=sdk_harness.serialize_and_pack_py_fn(
-                        SourceBundle(1.0, source, None, None),
-                        sdk_harness.PYTHON_SOURCE_URN,
-                        id="src"))])]))
-    split_response = worker.initial_source_split(
-        beam_fn_api_pb2.InitialSourceSplitRequest(
-            desired_bundle_size_bytes=30,
-            source_reference="src"))
-
-    self.assertEqual(
-        expected_splits,
-        [sdk_harness.unpack_and_deserialize_py_fn(s.source)
-         for s in split_response.splits])
-
-    self.assertEqual(
-        [s.weight for s in expected_splits],
-        [s.relative_size for s in split_response.splits])
-
-  @unittest.skip("initial splitting not in proto")
-  def test_source_split_via_instruction(self):
-
-    source = RangeSource(0, 100)
-    expected_splits = list(source.split(30))
-
-    test_controller = BeamFnControlServicer([
-        beam_fn_api_pb2.InstructionRequest(
-            instruction_id="register_request",
-            register=beam_fn_api_pb2.RegisterRequest(
-                process_bundle_descriptor=[
-                    beam_fn_api_pb2.ProcessBundleDescriptor(
-                        primitive_transform=[beam_fn_api_pb2.PrimitiveTransform(
-                            function_spec=sdk_harness.serialize_and_pack_py_fn(
-                                SourceBundle(1.0, source, None, None),
-                                sdk_harness.PYTHON_SOURCE_URN,
-                                id="src"))])])),
-        beam_fn_api_pb2.InstructionRequest(
-            instruction_id="split_request",
-            initial_source_split=beam_fn_api_pb2.InitialSourceSplitRequest(
-                desired_bundle_size_bytes=30,
-                source_reference="src"))
-        ])
-
-    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
-    beam_fn_api_pb2.add_BeamFnControlServicer_to_server(test_controller, server)
-    test_port = server.add_insecure_port("[::]:0")
-    server.start()
-
-    channel = grpc.insecure_channel("localhost:%s" % test_port)
-    harness = sdk_harness.SdkHarness(channel)
-    harness.run()
-
-    split_response = test_controller.responses[
-        "split_request"].initial_source_split
-
-    self.assertEqual(
-        expected_splits,
-        [sdk_harness.unpack_and_deserialize_py_fn(s.source)
-         for s in split_response.splits])
-
-    self.assertEqual(
-        [s.weight for s in expected_splits],
-        [s.relative_size for s in split_response.splits])
-
 
 if __name__ == "__main__":
   logging.getLogger().setLevel(logging.INFO)