You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Adam Pearce (Jira)" <ji...@apache.org> on 2021/09/09 14:31:00 UTC

[jira] [Updated] (BEAM-12864) FlinkRunner (DOCKER) changes HDFS HOST to a name resolvable to host, rather than Docker

     [ https://issues.apache.org/jira/browse/BEAM-12864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Adam Pearce updated BEAM-12864:
-------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Triage Needed)

The source of this issue was the Hadoop HDFS core-site.xml configuration was set to "localhost" which appended "localhost:9000" to the access URL when Beam makes the connection.

Reconfiguring HDFS to use the IP of my Docker Host allowed Beam to access HDFS from within the Pipeline on the FlinkRunner.

 

> FlinkRunner (DOCKER) changes HDFS HOST to a name resolvable to host, rather than Docker
> ---------------------------------------------------------------------------------------
>
>                 Key: BEAM-12864
>                 URL: https://issues.apache.org/jira/browse/BEAM-12864
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.31.0
>         Environment: macOS 11.5.2, Flink 1.13.2, Beam 2.31.0, Docker for Mac 20.10.8
>            Reporter: Adam Pearce
>            Priority: P2
>
> I'm attempting to create a simple example of reading from an HDFS FileSystem using the Python SDK. I am able to do this with the direct runner, and am even able to read the filesystem directly in a simple Python file outside of a Beam pipeline (but using the Beam IO FileSystem class).
> When I create a Beam pipeline and submit it to Flink, it is unable to resolve the hostname of the Docker Host, because it is set to 'localhost'. I've tried setting `hdfs_host` in the pipeline options with the typical value of `host.docker.internal` to reach the Host's network, and even the IP address of my Docker Host (macOS) (which usually works and is resolvable when testing with dummy containers). The `host.docker.internal` fails because it is not resolvable by the Host. A situation is created where the Host _and_ the container both need to be able to resolve the `hdfs_host` hostname.
> When using the IP, this is possible, but I believe that in preparation for the Flink run, Beam replaces the HDFS Host entry with "localhost" because that is what the IP resolves to on the Docker Host, which is then not resolvable by the Docker container.
> Users need to be able to explicitly set the HDFS Host parameter with respect to the Docker environment that the FlinkRunner executes the pipeline in, regardless of if the Host can resolve that hostname. In some cases, this could be another Docker container on the Docker network that is resolvable to the Docker container, but not to the Docker Host. Setting the "hdfs_host" to an IP Address should not result in replacement with "localhost".
> To summarize, running a Beam pipeline with the FlinkRunner (using the Docker environment), is unable to reach the Docker Host via networking, and therefor would be unable to connect to an HDFS Filesystem located on the Docker Host.
>  
> Code example:
>  
> {code:java}
> HDFS_HOSTNAME = '192.168.1.11' # Docker Host IP Address (macOS)
> HDFS_PORT = 9870
> hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, hdfs_port=HDFS_PORT, hdfs_user="apearce")
> hdfs_filesystem = HadoopFileSystem(hdfs_client_options)
> input_file_hdfs = "hdfs://user/apearce/testdata/001.csv"
> # This works
> # for x in hdfs_filesystem.open(input_file_hdfs).readlines():
> #    print(x)
> p = beam.Pipeline(options=PipelineOptions())
> def run(argv=None, save_main_session=True):
>  config = {
>  "runner": "FlinkRunner",
>  "flink_master": "localhost:8081",
>  "environment_type": "DOCKER",
>  "save_main_session": True,
>  "hdfs_host": HDFS_HOSTNAME,
>  "hdfs_port": HDFS_PORT,
>  "hdfs_user": "apearce",
>  }
>  pipeline_options = PipelineOptions.from_dictionary(config)
>  with beam.Pipeline(options=pipeline_options) as p:
>    (
>      p
>      | 'ReadFromHDFS' >> beam.io.ReadFromText(input_file_hdfs)
>      | 'Print' >> beam.Map(print)
>    )
> if __name__ == '__main__':
>  logging.getLogger().setLevel(logging.INFO)
>  run(){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)