You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 16:22:45 UTC
[GitHub] [beam] damccorm opened a new issue, #20248: save_main_session not working in DoFn.setup() in Dataflow with Python
damccorm opened a new issue, #20248:
URL: https://github.com/apache/beam/issues/20248
I have a dataflow pipeline that calls the DLP API in GCP. The relevant pieces of my code are:
```
from google.cloud.dlp import DlpServiceClient
def run(argv):
"""Main entry point; defines and runs
the pipeline."""
opts = HashPipelineOptions()
opts.view_as(SetupOptions).save_main_session = True
std_opts = opts.view_as(StandardOptions)
std_opts.streaming = True
...
class DlpFindingDoFn(beam.DoFn):
"""Fetch
DLP Findings as a PCollection"""
def __init__(self, project, runner):
beam.DoFn.__init__(self)
self.project = project
self.runner = runner
self.inspect_config = {
'info_types'
: [{'name': 'US_SOCIAL_SECURITY_NUMBER'}],
'min_likelihood': 'VERY_UNLIKELY',
'include_quote'
: True # We need the output to match against the KV Store
}
def setup(self):
self.dlp_client
= DlpServiceClient()
def process(self, element):
# TODO: Remove when version 2.22.0 is released
BEAM-7885
if self.runner == 'DirectRunner':
self.setup()
# Convert the project id into
a full resource id.
parent = self.dlp_client.project_path(self.project)
filename, chunk =
element
# Call the API.
response = self.dlp_client.inspect_content(parent, self.inspect_config,
{'value': chunk})
if response.result.findings:
for f in response.result.findings:
yield (filename, f.quote)
```
This runs just fine locally, but when I run it in Dataflow I get this NameError because the class DlpServiceClient cannot be found.
```
2020-05-29 06:29:19.799 PDTError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
Error received from SDK harness for instruction -2659: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 368, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
from empty list During handling of the above exception, another exception occurred: Traceback (most
recent call last): File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined During
handling of the above exception, another exception occurred: Traceback (most recent call last): File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 500, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 374, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 843, in __init__ op.setup() File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 660, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/common.py", line 1010, in apache_beam.runners.common.DoFnRunner.setup File
"apache_beam/runners/common.py", line 1006, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
running 'generatedPtransform-2651'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK
harness for instruction -2659: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 368, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
from empty list During handling of the above exception, another exception occurred: Traceback (most
recent call last): File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined During
handling of the above exception, another exception occurred: Traceback (most recent call last): File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 500, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 374, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 843, in __init__ op.setup() File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 660, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/common.py", line 1010, in apache_beam.runners.common.DoFnRunner.setup File
"apache_beam/runners/common.py", line 1006, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
running 'generatedPtransform-2651'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
```
However, when I move the import line inside the setup function, everything works fine.
Imported from Jira [BEAM-10150](https://issues.apache.org/jira/browse/BEAM-10150). Original Jira may contain additional context.
Reported by: onetwopunch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org