You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Victor Wong (Jira)" <ji...@apache.org> on 2019/11/07 03:37:01 UTC

[jira] [Closed] (FLINK-12648) Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()

     [ https://issues.apache.org/jira/browse/FLINK-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Victor Wong closed FLINK-12648.
-------------------------------
    Resolution: Won't Fix

> Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()
> -----------------------------------------------------------------
>
>                 Key: FLINK-12648
>                 URL: https://issues.apache.org/jira/browse/FLINK-12648
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Victor Wong
>            Assignee: Victor Wong
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I think there are some duplicated codes in _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in apache hadoop-common dependency.
> We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.
>  
> Replace
> {code:java}
> // -- (2) get the Hadoop file system class for that scheme
> final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
> try {
>    fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
> }
> catch (IOException e) {
>    throw new UnsupportedFileSystemSchemeException(
>          "Hadoop File System abstraction does not support scheme '" + scheme + "'. " +
>                "Either no file system implementation exists for that scheme, " +
>                "or the relevant classes are missing from the classpath.", e);
> }
> // -- (3) instantiate the Hadoop file system
> LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName());
> final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
> // -- (4) create the proper URI to initialize the file system
> final URI initUri;
> if (fsUri.getAuthority() != null) {
>    initUri = fsUri;
> }
> else {
>    LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)");
>    String configEntry = hadoopConfig.get("fs.defaultFS", null);
>    if (configEntry == null) {
>       // fs.default.name deprecated as of hadoop 2.2.0 - see
>       // http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
>       configEntry = hadoopConfig.get("fs.default.name", null);
>    }
>    if (LOG.isDebugEnabled()) {
>       LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
>    }
>    if (configEntry == null) {
>       throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
>             "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
>    }
>    else {
>       try {
>          initUri = URI.create(configEntry);
>       }
>       catch (IllegalArgumentException e) {
>          throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
>                "The configuration contains an invalid file system default name " +
>                "('fs.default.name' or 'fs.defaultFS'): " + configEntry);
>       }
>       if (initUri.getAuthority() == null) {
>          throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
>                "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " +
>                "contains no valid authority component (like hdfs namenode, S3 host, etc)");
>       }
>    }
> }
> // -- (5) configure the Hadoop file system
> try {
>    hadoopFs.initialize(initUri, hadoopConfig);
> }
> catch (UnknownHostException e) {
>    String message = "The Hadoop file system's authority (" + initUri.getAuthority() +
>          "), specified by either the file URI or the configuration, cannot be resolved.";
>    throw new IOException(message, e);
> }
> {code}
> with
> {code:java}
> final org.apache.hadoop.fs.FileSystem hadoopFs = org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)