You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/30 09:49:58 UTC
[5/7] flink git commit: [FLINK-8432] Add support for openstack's
swift filesystem
[FLINK-8432] Add support for openstack's swift filesystem
This closes #5296.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03d042a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03d042a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03d042a2
Branch: refs/heads/master
Commit: 03d042a21a948cb5b9bee534b07e382a5b5a6738
Parents: f53f846
Author: Jelmer Kuperus <jk...@marktplaats.nl>
Authored: Sun Jan 14 10:42:02 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 29 16:23:40 2018 +0100
----------------------------------------------------------------------
docs/ops/filesystems.md | 5 +
flink-dist/pom.xml | 7 +
flink-dist/src/main/assemblies/opt.xml | 7 +
.../flink-swift-fs-hadoop/README.md | 36 +
flink-filesystems/flink-swift-fs-hadoop/pom.xml | 292 ++
.../openstackhadoop/SwiftFileSystemFactory.java | 135 +
.../org/apache/hadoop/conf/Configuration.java | 2968 ++++++++++++++++++
.../apache/hadoop/util/NativeCodeLoader.java | 94 +
.../resources/META-INF/core-default-shaded.xml | 2312 ++++++++++++++
.../org.apache.flink.core.fs.FileSystemFactory | 16 +
.../HadoopSwiftFileSystemITCase.java | 208 ++
.../src/test/resources/core-site.xml | 2312 ++++++++++++++
.../src/test/resources/log4j-test.properties | 27 +
flink-filesystems/pom.xml | 1 +
14 files changed, 8420 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/03d042a2/docs/ops/filesystems.md
----------------------------------------------------------------------
diff --git a/docs/ops/filesystems.md b/docs/ops/filesystems.md
index 4773639..75ddf6d 100644
--- a/docs/ops/filesystems.md
+++ b/docs/ops/filesystems.md
@@ -51,6 +51,11 @@ When starting a Flink application from the Flink binaries, copy or move the resp
See [AWS setup](deployment/aws.html) for details.
- **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
+
+ - **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*.
+ The implementation `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
+ To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
+ When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
### HDFS and Hadoop File System support
http://git-wip-us.apache.org/repos/asf/flink/blob/03d042a2/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 5d63b42..c85aad2 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -317,6 +317,13 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-swift-fs-hadoop</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/03d042a2/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 14ec174..3f724e3 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -139,6 +139,13 @@
<fileMode>0644</fileMode>
</file>
+ <file>
+ <source>../flink-filesystems/flink-swift-fs-hadoop/target/flink-swift-fs-hadoop-${project.version}.jar</source>
+ <outputDirectory>opt/</outputDirectory>
+ <destName>flink-swift-fs-hadoop-${project.version}.jar</destName>
+ <fileMode>0644</fileMode>
+ </file>
+
<!-- Queryable State -->
<file>
<source>../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar</source>
http://git-wip-us.apache.org/repos/asf/flink/blob/03d042a2/flink-filesystems/flink-swift-fs-hadoop/README.md
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-swift-fs-hadoop/README.md b/flink-filesystems/flink-swift-fs-hadoop/README.md
new file mode 100644
index 0000000..0b6d091
--- /dev/null
+++ b/flink-filesystems/flink-swift-fs-hadoop/README.md
@@ -0,0 +1,36 @@
+This project is a wrapper around Hadoop's swift native file system. By pulling a smaller dependency tree and
+shading all dependencies away, this keeps the appearance of Flink being Hadoop-free,
+from a dependency perspective.
+
+We also relocate the shaded Hadoop version to allow running in a different
+setup. For this to work, however, we needed to adapt Hadoop's `Configuration`
+class to load a (shaded) `core-default-shaded.xml` configuration with the
+relocated class names of classes loaded via reflection
+(in the future, we may need to extend this to `mapred-default.xml` and `hdfs-defaults.xml` and their respective configuration classes).
+
+# Changing the Hadoop Version
+
+If you want to change the Hadoop version this project depends on, the following
+steps are required to keep the shading correct:
+
+1. from the respective Hadoop jar (currently 2.8.1 as of the `openstackhadoop.hadoop.version` property our `pom.xml`),
+ - copy `org/apache/hadoop/conf/Configuration.java` to `src/main/java/org/apache/hadoop/conf/` and
+ - replace `core-default.xml` with `core-default-shaded.xml`.
+ - copy `org/apache/hadoop/util/NativeCodeLoader.java` to `src/main/java/org/apache/hadoop/util/` and
+ - replace the static initializer with
+ ```
+ static {
+ LOG.info("Skipping native-hadoop library for flink-openstack-fs-hadoop's relocated Hadoop... " +
+ "using builtin-java classes where applicable");
+ }
+ ```
+ - copy `core-default.xml` to `src/main/resources/core-default-shaded.xml` and
+ - change every occurence of `org.apache.hadoop` into `org.apache.flink.fs.openstackhadoop.shaded.org.apache.hadoop`
+ - copy `core-site.xml` to `src/test/resources/core-site.xml` (as is)
+2. verify the shaded jar:
+ - does not contain any unshaded classes except for `org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem`
+ - all other classes should be under `org.apache.flink.fs.openstackhadoop.shaded`
+ - there should be a `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` file pointing to the `org.apache.flink.fs.openstackhadoop.SwiftFileSystemFactory` class
+ - other service files under `META-INF/services` should have their names and contents in the relocated `org.apache.flink.fs.swifthadoop.shaded` package
+ - contains a `core-default-shaded.xml` file
+ - does not contain a `core-default.xml` or `core-site.xml` file
http://git-wip-us.apache.org/repos/asf/flink/blob/03d042a2/flink-filesystems/flink-swift-fs-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-swift-fs-hadoop/pom.xml b/flink-filesystems/flink-swift-fs-hadoop/pom.xml
new file mode 100644
index 0000000..66ee6d7
--- /dev/null
+++ b/flink-filesystems/flink-swift-fs-hadoop/pom.xml
@@ -0,0 +1,292 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-filesystems</artifactId>
+ <version>1.5-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-swift-fs-hadoop</artifactId>
+ <name>flink-swift-fs-hadoop</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <!-- Do not change this without updating the copied Configuration class! -->
+ <openstackhadoop.hadoop.version>2.8.1</openstackhadoop.hadoop.version>
+ </properties>
+
+ <dependencies>
+
+ <!-- Flink core -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- File system builds on the Hadoop file system support -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-fs</artifactId>
+ <version>${project.version}</version>
+
+ <!-- this exclusion is only needed to run tests in the IDE, pre shading,
+ because the optional Hadoop dependency is also pulled in for tests -->
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop2</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Hadoop's openstack file system -->
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${openstackhadoop.hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.directory.server</groupId>
+ <artifactId>apacheds-kerberos-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-openstack</artifactId>
+ <version>${openstackhadoop.hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- make sure that also logger and JSR is provided -->
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+
+ <!-- we need to explicitly override this version, because the -->
+ <!-- earlier versions of the shade plugin have a bug relocating services -->
+ <version>3.0.0</version>
+
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>false</shadeTestJar>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.fasterxml</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.com.fasterxml</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.com.google</shadedPattern>
+ <excludes>
+ <!-- provided -->
+ <exclude>com.google.code.findbugs.**</exclude>
+ </excludes>
+ </relocation>
+ <relocation>
+ <pattern>com.nimbusds</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.com.nimbusds</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.squareup</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.com.squareup</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>net.jcip</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.net.jcip</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>net.minidev</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.net.minidev</shadedPattern>
+ </relocation>
+
+ <!-- relocate everything from the flink-hadoop-fs project -->
+ <relocation>
+ <pattern>org.apache.flink.runtime.fs.hdfs</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.apache.flink.runtime.fs.hdfs</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.flink.runtime.util</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.apache.flink.runtime.util</shadedPattern>
+ <includes>
+ <include>org.apache.flink.runtime.util.**Hadoop*</include>
+ </includes>
+ </relocation>
+
+ <relocation>
+ <pattern>org.apache</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.apache</shadedPattern>
+ <excludes>
+ <!-- keep all other classes of flink as they are (exceptions above) -->
+ <exclude>org.apache.flink.**</exclude>
+ <exclude>org.apache.log4j.**</exclude> <!-- provided -->
+ </excludes>
+ </relocation>
+ <relocation>
+ <pattern>org.codehaus</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.codehaus</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.joda</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.joda</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.mortbay</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.mortbay</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.tukaani</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.tukaani</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.znerd</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.org.znerd</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>okio</pattern>
+ <shadedPattern>org.apache.flink.fs.openstackhadoop.shaded.okio</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>*</artifact>
+ <excludes>
+ <exclude>log4j.properties</exclude>
+ <exclude>mime.types</exclude>
+ <exclude>properties.dtd</exclude>
+ <exclude>PropertyList-1.0.dtd</exclude>
+ <exclude>models/**</exclude>
+ <exclude>mozilla/**</exclude>
+ <exclude>META-INF/maven/com*/**</exclude>
+ <exclude>META-INF/maven/net*/**</exclude>
+ <exclude>META-INF/maven/software*/**</exclude>
+ <exclude>META-INF/maven/joda*/**</exclude>
+ <exclude>META-INF/maven/org.mortbay.jetty/**</exclude>
+ <exclude>META-INF/maven/org.apache.h*/**</exclude>
+ <exclude>META-INF/maven/org.apache.commons/**</exclude>
+ <exclude>META-INF/maven/org.apache.flink/flink-hadoop-fs/**</exclude>
+ <exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
+ <!-- we use our own "shaded" core-default.xml: core-default-shaded.xml -->
+ <exclude>core-default.xml</exclude>
+ <!-- we only add a core-site.xml with unshaded classnames for the unit tests -->
+ <exclude>core-site.xml</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/03d042a2/flink-filesystems/flink-swift-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-swift-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java b/flink-filesystems/flink-swift-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
new file mode 100644
index 0000000..1a04049
--- /dev/null
+++ b/flink-filesystems/flink-swift-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+ /** The prefixes that Flink adds to the Hadoop config under 'fs.swift.'. */
+ private static final String CONFIG_PREFIX = "swift.";
+
+ /** Flink's configuration object. */
+ private Configuration flinkConfig;
+
+ /** Hadoop's configuration for the file systems, lazily initialized. */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ @Override
+ public String getScheme() {
+ return "swift";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null;
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating swift file system (backed by a Hadoop native swift file system)");
+
+ try {
+ // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath)
+
+ org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig;
+ if (hadoopConfig == null) {
+ if (flinkConfig != null) {
+ LOG.debug("Loading Hadoop configuration for swift native file system");
+ hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+ // hadoop.tmp.dir needs to be defined because it is used as buffer directory
+ if (hadoopConfig.get("hadoop.tmp.dir") == null) {
+ String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(flinkConfig);
+ File tmpDir = new File(tmpDirPaths[0], "hadoop-" + System.getProperty("user.name"));
+ hadoopConfig.set("hadoop.tmp.dir", tmpDir.getPath());
+ }
+
+ // add additional config entries from the Flink config to the Hadoop config
+ for (String key : flinkConfig.keySet()) {
+ if (key.startsWith(CONFIG_PREFIX)) {
+ String value = flinkConfig.getString(key, null);
+ String newKey = "fs.swift." + key.substring(CONFIG_PREFIX.length());
+ hadoopConfig.set(newKey, value);
+
+ LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " +
+ "Swift native File System", key, newKey, value);
+ }
+ }
+
+ this.hadoopConfig = hadoopConfig;
+ }
+ else {
+ LOG.warn("The factory has not been configured prior to loading the Swift native file system."
+ + " Using Hadoop configuration from the classpath.");
+
+ hadoopConfig = new org.apache.hadoop.conf.Configuration();
+ this.hadoopConfig = hadoopConfig;
+ }
+ }
+
+ // -- (2) Instantiate the Hadoop file system class for that scheme
+
+ final String scheme = fsUri.getScheme();
+ final String authority = fsUri.getAuthority();
+
+ if (scheme == null && authority == null) {
+ fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ }
+ else if (scheme != null && authority == null) {
+ URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
+ fsUri = defaultUri;
+ }
+ }
+
+ LOG.debug("Using scheme {} for swift file system backing the Swift Native File System", fsUri);
+
+ final SwiftNativeFileSystem fs = new SwiftNativeFileSystem();
+ fs.initialize(fsUri, hadoopConfig);
+
+ return new HadoopFileSystem(fs);
+ }
+ catch (IOException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+}