You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Valentyn Tymofieiev (Jira)" <ji...@apache.org> on 2019/11/02 03:19:00 UTC

[jira] [Updated] (BEAM-8547) Portable Wordcount fails with on stadalone Flink cluster

     [ https://issues.apache.org/jira/browse/BEAM-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Valentyn Tymofieiev updated BEAM-8547:
--------------------------------------
    Description: 
Repro:
 # git checkout origin/release-2.16.0
 # ./flink-1.8.2/bin/start-cluster.sh
 # gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081
 # python -m apache_beam.examples.wordcount --input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner --experiments=worker_threads=100 --parallelism=1 --shutdown_sources_on_final_watermark --sdk_worker_parallelism=1 --environment_cache_millis=60000 --job_endpoint=localhost:8099

This causes the runner to crash with:
{noformat}
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
    response = task()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
    data.ptransform_id].process_encoded(data.data)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 660, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1042, in process
    self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 137, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 186, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 390, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/textio.py", line 391, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 137, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 129, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystems.py", line 203, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", line 151, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", line 134, in _path_open
    raw_file = open(path, mode)
RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-py-wordcount-direct-ea951c18fd1211e9ac84a0c589d778c3/d39e13af-277b-437e-89f2-e00249287e1d.py-wordcount-direct' [while running 'write/Write/WriteImpl/WriteBundles'] {noformat}

The error happens with Flink 1.5 and Flink 1.8. 

The error does not happen if we run SDK harness in LOOPBACK mode (--environment_type=LOOPBACK) 

The error does not happen if we launch Flink jobServer without pointing to a Flink cluster, that is if we remove -PflinkMasterUrl=localhost:8081, or if we use Spark Jobserver + Spark cluster, so this  seems to be a Flink-specific problem 

Similar error: https://issues.apache.org/jira/browse/BEAM-7859

Note that default parallelism parameters set in portableWordCountBatch are not compatible with default configuration of standalone Flink cluster, which starts with only one available slot.  

cc: [~ibzib] [~goenka] [~robertwb]

  was:
Repro:
 # git checkout origin/release-2.16.0
 # ./flink-1.8.2/bin/start-cluster.sh
 # gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081
 # python -m apache_beam.examples.wordcount --input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner --experiments=worker_threads=100 --parallelism=1 --shutdown_sources_on_final_watermark --sdk_worker_parallelism=1 --environment_cache_millis=60000 --job_endpoint=localhost:8099

This causes the runner to crash with:
{noformat}
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
    response = task()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
    data.ptransform_id].process_encoded(data.data)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 660, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1042, in process
    self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 137, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 186, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 390, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/textio.py", line 391, in open
    file_handle = super(_TextSink, self).open(temp_path)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 137, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 129, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystems.py", line 203, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", line 151, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", line 134, in _path_open
    raw_file = open(path, mode)
RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-py-wordcount-direct-ea951c18fd1211e9ac84a0c589d778c3/d39e13af-277b-437e-89f2-e00249287e1d.py-wordcount-direct' [while running 'write/Write/WriteImpl/WriteBundles'] {noformat}

The error happens with flink 1.5 and flink 1.8. 

The error does not happen if we run SDK harness in LOOPBACK mode (--environment_type=LOOPBACK) 

The error does not happen if we launch flink jobServer without pointing to a flink cluster, that is if we remove -PflinkMasterUrl=localhost:8081

Similar error: https://issues.apache.org/jira/browse/BEAM-7859

Note that default parallelism parameters set in portableWordCountBatch are not compatible with default configuration of standalone Flink cluster, which starts with only one available slot.  

cc: [~ibzib] [~goenka] [~robertwb]


> Portable Wordcount fails with on stadalone Flink cluster 
> ---------------------------------------------------------
>
>                 Key: BEAM-8547
>                 URL: https://issues.apache.org/jira/browse/BEAM-8547
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-py-harness
>            Reporter: Valentyn Tymofieiev
>            Priority: Major
>
> Repro:
>  # git checkout origin/release-2.16.0
>  # ./flink-1.8.2/bin/start-cluster.sh
>  # gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081
>  # python -m apache_beam.examples.wordcount --input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner --experiments=worker_threads=100 --parallelism=1 --shutdown_sources_on_final_watermark --sdk_worker_parallelism=1 --environment_cache_millis=60000 --job_endpoint=localhost:8099
> This causes the runner to crash with:
> {noformat}
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
>     response = task()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
>     self._execute(lambda: worker.do_instruction(work), work)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
>     request.instruction_id)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
>   File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 660, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1042, in process
>     self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 137, in _f
>     return fnc(self, *args, **kwargs)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 186, in open_writer
>     return FileBasedSinkWriter(self, writer_path)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 390, in __init__
>     self.temp_handle = self.sink.open(temp_shard_path)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/textio.py", line 391, in open
>     file_handle = super(_TextSink, self).open(temp_path)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 137, in _f
>     return fnc(self, *args, **kwargs)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 129, in open
>     return FileSystems.create(temp_path, self.mime_type, self.compression_type)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystems.py", line 203, in create
>     return filesystem.create(path, mime_type, compression_type)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", line 151, in create
>     return self._path_open(path, 'wb', mime_type, compression_type)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", line 134, in _path_open
>     raw_file = open(path, mode)
> RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: '/tmp/beam-temp-py-wordcount-direct-ea951c18fd1211e9ac84a0c589d778c3/d39e13af-277b-437e-89f2-e00249287e1d.py-wordcount-direct' [while running 'write/Write/WriteImpl/WriteBundles'] {noformat}
> The error happens with Flink 1.5 and Flink 1.8. 
> The error does not happen if we run SDK harness in LOOPBACK mode (--environment_type=LOOPBACK) 
> The error does not happen if we launch Flink jobServer without pointing to a Flink cluster, that is if we remove -PflinkMasterUrl=localhost:8081, or if we use Spark Jobserver + Spark cluster, so this  seems to be a Flink-specific problem 
> Similar error: https://issues.apache.org/jira/browse/BEAM-7859
> Note that default parallelism parameters set in portableWordCountBatch are not compatible with default configuration of standalone Flink cluster, which starts with only one available slot.  
> cc: [~ibzib] [~goenka] [~robertwb]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)