You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 16:22:45 UTC

[GitHub] [beam] damccorm opened a new issue, #20248: save_main_session not working in DoFn.setup() in Dataflow with Python

damccorm opened a new issue, #20248:
URL: https://github.com/apache/beam/issues/20248

   I have a dataflow pipeline that calls the DLP API in GCP. The relevant pieces of my code are:
   
   
   ```
   
   from google.cloud.dlp import DlpServiceClient
   def run(argv):
     """Main entry point; defines and runs
   the pipeline."""
     opts = HashPipelineOptions()
     opts.view_as(SetupOptions).save_main_session = True
   
    std_opts = opts.view_as(StandardOptions)
     std_opts.streaming = True
   ...
   class DlpFindingDoFn(beam.DoFn):
   """Fetch
   DLP Findings as a PCollection"""
     def __init__(self, project, runner):
       beam.DoFn.__init__(self)
   
      self.project = project
       self.runner = runner
       self.inspect_config = { 
         'info_types'
   : [{'name': 'US_SOCIAL_SECURITY_NUMBER'}], 
         'min_likelihood': 'VERY_UNLIKELY', 
         'include_quote'
   : True # We need the output to match against the KV Store 
       } 
     def setup(self):
       self.dlp_client
   = DlpServiceClient()
     def process(self, element):
       # TODO: Remove when version 2.22.0 is released
   BEAM-7885
       if self.runner == 'DirectRunner':
         self.setup()
       # Convert the project id into
   a full resource id.
       parent = self.dlp_client.project_path(self.project)
       filename, chunk =
   element
       # Call the API.
       response = self.dlp_client.inspect_content(parent, self.inspect_config,
   {'value': chunk})
       if response.result.findings:
         for f in response.result.findings:
       
      yield (filename, f.quote)
   
   ```
   
    
   
    
   
   This runs just fine locally, but when I run it in Dataflow I get this NameError because the class DlpServiceClient cannot be found.
   ```
   
   2020-05-29 06:29:19.799 PDTError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
   Error received from SDK harness for instruction -2659: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 368, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
   from empty list During handling of the above exception, another exception occurred: Traceback (most
   recent call last): File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
   File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
   File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined During
   handling of the above exception, another exception occurred: Traceback (most recent call last): File
   "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
   response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 500, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 374, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 843, in __init__ op.setup() File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
   File "apache_beam/runners/worker/operations.py", line 660, in apache_beam.runners.worker.operations.DoOperation.setup
   File "apache_beam/runners/common.py", line 1010, in apache_beam.runners.common.DoFnRunner.setup File
   "apache_beam/runners/common.py", line 1006, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
   File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
   File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
   raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
   File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
   File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
   running 'generatedPtransform-2651'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
   org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
   org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
   org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1363)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:153)
   org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1086)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK
   harness for instruction -2659: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 368, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop
   from empty list During handling of the above exception, another exception occurred: Traceback (most
   recent call last): File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
   File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
   File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined During
   handling of the above exception, another exception occurred: Traceback (most recent call last): File
   "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
   response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 500, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
   line 374, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
   line 843, in __init__ op.setup() File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
   File "apache_beam/runners/worker/operations.py", line 660, in apache_beam.runners.worker.operations.DoOperation.setup
   File "apache_beam/runners/common.py", line 1010, in apache_beam.runners.common.DoFnRunner.setup File
   "apache_beam/runners/common.py", line 1006, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
   File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
   File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
   raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1004, in apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
   File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnInvoker.invoke_setup
   File "hashpipeline.py", line 108, in setup NameError: name 'DlpServiceClient' is not defined [while
   running 'generatedPtransform-2651'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:178)
   org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:158)
   org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
   org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
   org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
   org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
   org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
   org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
   org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   
   ```
   
   However, when I move the import line inside the setup function, everything works fine.
   
   Imported from Jira [BEAM-10150](https://issues.apache.org/jira/browse/BEAM-10150). Original Jira may contain additional context.
   Reported by: onetwopunch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org