You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/09 06:12:51 UTC
[1/2] beam git commit: [BEAM-2431] Add experimental python rpc direct
runner
Repository: beam
Updated Branches:
refs/heads/master 9ed2cf41f -> fb85d84dc
[BEAM-2431] Add experimental python rpc direct runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76db0aa3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76db0aa3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76db0aa3
Branch: refs/heads/master
Commit: 76db0aa30c632296a6a882c012f9da2d21f775b5
Parents: 9ed2cf4
Author: Sourabh Bajaj <so...@google.com>
Authored: Wed Aug 2 10:49:48 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Aug 8 23:12:30 2017 -0700
----------------------------------------------------------------------
.../runners/experimental/__init__.py | 16 +++
.../experimental/python_rpc_direct/__init__.py | 22 ++++
.../python_rpc_direct_runner.py | 111 +++++++++++++++++++
.../experimental/python_rpc_direct/server.py | 111 +++++++++++++++++++
sdks/python/apache_beam/runners/job/__init__.py | 16 +++
sdks/python/apache_beam/runners/job/manager.py | 52 +++++++++
sdks/python/apache_beam/runners/job/utils.py | 32 ++++++
sdks/python/apache_beam/runners/runner.py | 6 +
8 files changed, 366 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py b/sdks/python/apache_beam/runners/experimental/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
new file mode 100644
index 0000000..5d14030
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This is the experimental direct runner for testing the job api that
+sends a runner API proto over the API and then runs it on the other side.
+"""
+
+from apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner import PythonRPCDirectRunner
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
new file mode 100644
index 0000000..247ce1f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A runner implementation that submits a job for remote execution.
+"""
+
+import logging
+import random
+import string
+
+import grpc
+
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.runners.job import utils as job_utils
+from apache_beam.runners.job.manager import DockerRPCManager
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.runners.runner import PipelineRunner
+
+
+__all__ = ['PythonRPCDirectRunner']
+
+
+class PythonRPCDirectRunner(PipelineRunner):
+ """Executes a single pipeline on the local machine inside a container."""
+
+ # A list of PTransformOverride objects to be applied before running a pipeline
+ # using DirectRunner.
+ # Currently this only works for overrides where the input and output types do
+ # not change.
+ # For internal SDK use only. This should not be updated by Beam pipeline
+ # authors.
+ _PTRANSFORM_OVERRIDES = []
+
+ def __init__(self):
+ self._cache = None
+
+ def run(self, pipeline):
+ """Remotely executes entire pipeline or parts reachable from node."""
+
+ # Performing configured PTransform overrides.
+ pipeline.replace_all(PythonRPCDirectRunner._PTRANSFORM_OVERRIDES)
+
+ # Start the RPC co-process
+ manager = DockerRPCManager()
+
+ # Submit the job to the RPC co-process
+ jobName = ('Job-' +
+ ''.join(random.choice(string.ascii_uppercase) for _ in range(6)))
+ options = {k: v for k, v in pipeline._options.get_all_options().iteritems()
+ if v is not None}
+
+ try:
+ response = manager.service.run(beam_job_api_pb2.SubmitJobRequest(
+ pipeline=pipeline.to_runner_api(),
+ pipelineOptions=job_utils.dict_to_struct(options),
+ jobName=jobName))
+
+ logging.info('Submitted a job with id: %s', response.jobId)
+
+ # Return the result object that references the manager instance
+ result = PythonRPCDirectPipelineResult(response.jobId, manager)
+ return result
+ except grpc.RpcError:
+ logging.error('Failed to run the job with name: %s', jobName)
+ raise
+
+
+class PythonRPCDirectPipelineResult(PipelineResult):
+ """Represents the state of a pipeline run on the Dataflow service."""
+
+ def __init__(self, job_id, job_manager):
+ self.job_id = job_id
+ self.manager = job_manager
+
+ @property
+ def state(self):
+ return self.manager.service.getState(
+ beam_job_api_pb2.GetJobStateRequest(jobId=self.job_id))
+
+ def wait_until_finish(self):
+ messages_request = beam_job_api_pb2.JobMessagesRequest(jobId=self.job_id)
+ for message in self.manager.service.getMessageStream(messages_request):
+ if message.HasField('stateResponse'):
+ logging.info(
+ 'Current state of job: %s',
+ beam_job_api_pb2.JobState.JobStateType.Name(
+ message.stateResponse.state))
+ else:
+ logging.info('Message %s', message.messageResponse)
+ logging.info('Job with id: %s in terminal state now.', self.job_id)
+
+ def cancel(self):
+ return self.manager.service.cancel(
+ beam_job_api_pb2.CancelJobRequest(jobId=self.job_id))
+
+ def metrics(self):
+ raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
new file mode 100644
index 0000000..3addf92
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A runner implementation that submits a job for remote execution.
+"""
+from concurrent import futures
+import time
+import uuid
+
+import grpc
+
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.pipeline import Pipeline
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+
+
+class JobService(beam_job_api_pb2_grpc.JobServiceServicer):
+
+ def __init__(self):
+ self.jobs = {}
+
+ def run(self, request, context):
+ job_id = uuid.uuid4().get_hex()
+ pipeline_result = Pipeline.from_runner_api(
+ request.pipeline,
+ 'DirectRunner',
+ PipelineOptions()).run()
+ self.jobs[job_id] = pipeline_result
+ return beam_job_api_pb2.SubmitJobResponse(jobId=job_id)
+
+ def getState(self, request, context):
+ pipeline_result = self.jobs[request.jobId]
+ return beam_job_api_pb2.GetJobStateResponse(
+ state=self._map_state_to_jobState(pipeline_result.state))
+
+ def cancel(self, request, context):
+ pipeline_result = self.jobs[request.jobId]
+ pipeline_result.cancel()
+ return beam_job_api_pb2.CancelJobResponse(
+ state=self._map_state_to_jobState(pipeline_result.state))
+
+ def getMessageStream(self, request, context):
+ pipeline_result = self.jobs[request.jobId]
+ pipeline_result.wait_until_finish()
+ yield beam_job_api_pb2.JobMessagesResponse(
+ stateResponse=beam_job_api_pb2.GetJobStateResponse(
+ state=self._map_state_to_jobState(pipeline_result.state)))
+
+ def getStateStream(self, request, context):
+ context.set_details('Not Implemented for direct runner!')
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ return
+
+ @staticmethod
+ def _map_state_to_jobState(state):
+ if state == PipelineState.UNKNOWN:
+ return beam_job_api_pb2.JobState.UNKNOWN
+ elif state == PipelineState.STOPPED:
+ return beam_job_api_pb2.JobState.STOPPED
+ elif state == PipelineState.RUNNING:
+ return beam_job_api_pb2.JobState.RUNNING
+ elif state == PipelineState.DONE:
+ return beam_job_api_pb2.JobState.DONE
+ elif state == PipelineState.FAILED:
+ return beam_job_api_pb2.JobState.FAILED
+ elif state == PipelineState.CANCELLED:
+ return beam_job_api_pb2.JobState.CANCELLED
+ elif state == PipelineState.UPDATED:
+ return beam_job_api_pb2.JobState.UPDATED
+ elif state == PipelineState.DRAINING:
+ return beam_job_api_pb2.JobState.DRAINING
+ elif state == PipelineState.DRAINED:
+ return beam_job_api_pb2.JobState.DRAINED
+ else:
+ raise ValueError('Unknown pipeline state')
+
+
+def serve():
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server)
+
+ server.add_insecure_port('[::]:50051')
+ server.start()
+
+ try:
+ while True:
+ time.sleep(_ONE_DAY_IN_SECONDS)
+ except KeyboardInterrupt:
+ server.stop(0)
+
+
+if __name__ == '__main__':
+ serve()
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/job/__init__.py b/sdks/python/apache_beam/runners/job/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/job/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/job/manager.py b/sdks/python/apache_beam/runners/job/manager.py
new file mode 100644
index 0000000..4d88a11
--- /dev/null
+++ b/sdks/python/apache_beam/runners/job/manager.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A object to control to the Job API Co-Process
+"""
+
+import logging
+import subprocess
+import time
+
+import grpc
+
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+
+
+class DockerRPCManager(object):
+ """A native co-process to start a contianer that speaks the JobApi
+ """
+ def __init__(self, run_command=None):
+ # TODO(BEAM-2431): Change this to a docker container from a command.
+ self.process = subprocess.Popen(
+ ['python',
+ '-m',
+ 'apache_beam.runners.experimental.python_rpc_direct.server'])
+
+ self.channel = grpc.insecure_channel('localhost:50051')
+ self.service = beam_job_api_pb2_grpc.JobServiceStub(self.channel)
+
+ # Sleep for 2 seconds for process to start completely
+ # This is just for the co-process and would be removed
+ # once we migrate to docker.
+ time.sleep(2)
+
+ def __del__(self):
+ """Terminate the co-process when the manager is GC'ed
+ """
+ logging.info('Shutting the co-process')
+ self.process.terminate()
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/job/utils.py b/sdks/python/apache_beam/runners/job/utils.py
new file mode 100644
index 0000000..84c727f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/job/utils.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility functions for efficiently processing with the job API
+"""
+
+import json
+
+from google.protobuf import json_format
+from google.protobuf import struct_pb2
+
+
+def dict_to_struct(dict_obj):
+ return json_format.Parse(json.dumps(dict_obj), struct_pb2.Struct())
+
+
+def struct_to_dict(struct_obj):
+ return json.loads(json_format.MessageToJson(struct_obj))
http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index af00d8f..7ce9a03 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -41,7 +41,11 @@ _DIRECT_RUNNER_PATH = 'apache_beam.runners.direct.direct_runner.'
_DATAFLOW_RUNNER_PATH = (
'apache_beam.runners.dataflow.dataflow_runner.')
_TEST_RUNNER_PATH = 'apache_beam.runners.test.'
+_PYTHON_RPC_DIRECT_RUNNER = (
+ 'apache_beam.runners.experimental.python_rpc_direct.'
+ 'python_rpc_direct_runner.')
+_KNOWN_PYTHON_RPC_DIRECT_RUNNER = ('PythonRPCDirectRunner',)
_KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner')
_KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',)
_KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
@@ -51,6 +55,8 @@ _RUNNER_MAP.update(_get_runner_map(_KNOWN_DIRECT_RUNNERS,
_DIRECT_RUNNER_PATH))
_RUNNER_MAP.update(_get_runner_map(_KNOWN_DATAFLOW_RUNNERS,
_DATAFLOW_RUNNER_PATH))
+_RUNNER_MAP.update(_get_runner_map(_KNOWN_PYTHON_RPC_DIRECT_RUNNER,
+ _PYTHON_RPC_DIRECT_RUNNER))
_RUNNER_MAP.update(_get_runner_map(_KNOWN_TEST_RUNNERS,
_TEST_RUNNER_PATH))
[2/2] beam git commit: This closes #3667
Posted by al...@apache.org.
This closes #3667
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb85d84d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb85d84d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb85d84d
Branch: refs/heads/master
Commit: fb85d84dcb6cdbf87e881117a359eab27d8fd935
Parents: 9ed2cf4 76db0aa
Author: Ahmet Altay <al...@google.com>
Authored: Tue Aug 8 23:12:37 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Aug 8 23:12:37 2017 -0700
----------------------------------------------------------------------
.../runners/experimental/__init__.py | 16 +++
.../experimental/python_rpc_direct/__init__.py | 22 ++++
.../python_rpc_direct_runner.py | 111 +++++++++++++++++++
.../experimental/python_rpc_direct/server.py | 111 +++++++++++++++++++
sdks/python/apache_beam/runners/job/__init__.py | 16 +++
sdks/python/apache_beam/runners/job/manager.py | 52 +++++++++
sdks/python/apache_beam/runners/job/utils.py | 32 ++++++
sdks/python/apache_beam/runners/runner.py | 6 +
8 files changed, 366 insertions(+)
----------------------------------------------------------------------