You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Victor Wong (Jira)" <ji...@apache.org> on 2019/11/07 03:37:01 UTC
[jira] [Closed] (FLINK-12648) Load Hadoop file system via
org.apache.hadoop.fs.FileSystem.get()
[ https://issues.apache.org/jira/browse/FLINK-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Victor Wong closed FLINK-12648.
-------------------------------
Resolution: Won't Fix
> Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()
> -----------------------------------------------------------------
>
> Key: FLINK-12648
> URL: https://issues.apache.org/jira/browse/FLINK-12648
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Reporter: Victor Wong
> Assignee: Victor Wong
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> I think there are some duplicated codes in _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in apache hadoop-common dependency.
> We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.
>
> Replace
> {code:java}
> // -- (2) get the Hadoop file system class for that scheme
> final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass;
> try {
> fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig);
> }
> catch (IOException e) {
> throw new UnsupportedFileSystemSchemeException(
> "Hadoop File System abstraction does not support scheme '" + scheme + "'. " +
> "Either no file system implementation exists for that scheme, " +
> "or the relevant classes are missing from the classpath.", e);
> }
> // -- (3) instantiate the Hadoop file system
> LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName());
> final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
> // -- (4) create the proper URI to initialize the file system
> final URI initUri;
> if (fsUri.getAuthority() != null) {
> initUri = fsUri;
> }
> else {
> LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)");
> String configEntry = hadoopConfig.get("fs.defaultFS", null);
> if (configEntry == null) {
> // fs.default.name deprecated as of hadoop 2.2.0 - see
> // http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
> configEntry = hadoopConfig.get("fs.default.name", null);
> }
> if (LOG.isDebugEnabled()) {
> LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
> }
> if (configEntry == null) {
> throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS').");
> }
> else {
> try {
> initUri = URI.create(configEntry);
> }
> catch (IllegalArgumentException e) {
> throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "The configuration contains an invalid file system default name " +
> "('fs.default.name' or 'fs.defaultFS'): " + configEntry);
> }
> if (initUri.getAuthority() == null) {
> throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " +
> "contains no valid authority component (like hdfs namenode, S3 host, etc)");
> }
> }
> }
> // -- (5) configure the Hadoop file system
> try {
> hadoopFs.initialize(initUri, hadoopConfig);
> }
> catch (UnknownHostException e) {
> String message = "The Hadoop file system's authority (" + initUri.getAuthority() +
> "), specified by either the file URI or the configuration, cannot be resolved.";
> throw new IOException(message, e);
> }
> {code}
> with
> {code:java}
> final org.apache.hadoop.fs.FileSystem hadoopFs = org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)