You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 23:40:29 UTC
[GitHub] [beam] kennknowles opened a new issue, #19306: Beam 2.10.0 RC1
kennknowles opened a new issue, #19306:
URL: https://github.com/apache/beam/issues/19306
I was testing beam.io.WriteToText on RC1 of 2.10
This addresses the writing to CMEK GCS buckets (which currently fails due to failure in the copy process from staging to CMEK bucket - remedy was to change "copy" to "rewrite")
As per this commit changes:
[https://github.com/apache/beam/commit/e6e85edbeade3a4c038aca85821d2b265ac33909#diff-246c5b6386cc533fbabca3b397ca3c17](https://github.com/apache/beam/commit/e6e85edbeade3a4c038aca85821d2b265ac33909#diff-246c5b6386cc533fbabca3b397ca3c17)
StorageObjectsCopyRequest
was being replaced with
StorageObjectsRewriteRequest
I downloaded RC1 of 2.10 and tried the following code:
def runPipeline(self):
options = pipeline.PipelineOptions(self.__pipelineargs).view_as(TestPipelineOptions)
with (beam.Pipeline(options=options)) as p:
textPColl = p | \
'read text from GCS bucket' \>\> beam.io.ReadFromText(options.testfile)
textPColl | 'write to bucket' \>\> beam.io.WriteToText(options.outputfile)
But got the following exception
INFO:root:2019-02-04T09:17:13.452Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 1077, in <genexpr\>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), HttpBadRequestError(), HttpBadRequestError()] [while running 'write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:18.376Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 1077, in <genexpr\>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), HttpBadRequestError(), HttpBadRequestError()] [while running 'write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:23.304Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 1077, in <genexpr\>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), HttpBadRequestError(), HttpBadRequestError()] [while running 'write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:28.056Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 1077, in <genexpr\>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), HttpBadRequestError(), HttpBadRequestError()] [while running 'write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
INFO:root:2019-02-04T09:17:28.086Z: JOB_MESSAGE_DEBUG: Executing failure step failure15
INFO:root:2019-02-04T09:17:28.101Z: JOB_MESSAGE_ERROR: Workflow failed. Causes: S13:write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
rajsubtest-02040113-nvra-harness-t62t,
rajsubtest-02040113-nvra-harness-t62t,
rajsubtest-02040113-nvra-harness-t62t,
rajsubtest-02040113-nvra-harness-t62t
INFO:root:2019-02-04T09:17:28.794Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:root:2019-02-04T09:17:28.834Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:root:2019-02-04T09:17:28.847Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:root:2019-02-04T09:18:21.451Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized worker pool from 1 to 0.
INFO:root:2019-02-04T09:18:21.468Z: JOB_MESSAGE_DETAILED: Autoscaling: Would further reduce the number of workers but reached the minimum number allowed for the job.
INFO:root:2019-02-04T09:18:21.496Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:root:2019-02-04T09:18:21.514Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:root:Job 2019-02-04_01_13_57-9915685959768429617 is in state JOB_STATE_FAILED
Traceback (most recent call last):
File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 53, in <module\>
main()
File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 11, in main
testdf.runPipeline()
File "C:/sandbox/python-workspace/dftest/beampkg/dftest.py", line 29, in runPipeline
textPColl | 'write to bucket' \>\> beam.io.WriteToText(options.outputfile)
File "C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\pipeline.py", line 425, in __exit__
self.run().wait_until_finish()
File "C:\Users\subrdhar\beam_2_10_RC1_env\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 1186, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 172, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
def start(self):
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
with self.spec.source.reader() as reader:
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 89, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 497, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 498, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 680, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 686, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 724, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 535, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_per_window(
File "apache_beam/runners/common.py", line 604, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 755, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 770, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 1077, in <genexpr\>
window.TimestampedValue(v, timestamp.MAX_TIMESTAMP) for v in outputs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", line 321, in finalize_write
'Encountered exceptions in finalize_write: %s' % all_exceptions)
Exception: Encountered exceptions in finalize_write: [HttpBadRequestError(), HttpBadRequestError(), HttpBadRequestError()] [while running 'write to bucket/Write/WriteImpl/FinalizeWrite/FinalizeWrite']
I presume this is still work in progress
Imported from Jira [BEAM-6580](https://issues.apache.org/jira/browse/BEAM-6580). Original Jira may contain additional context.
Reported by: rajs1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org