You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aj...@apache.org on 2019/04/04 21:00:59 UTC

[hadoop] branch ozone-0.4 updated: HDDS-1333. OzoneFileSystem can't work with spark/hadoop2.7 because incompatible security classes. Contributed by Elek, Marton. (#653)

This is an automated email from the ASF dual-hosted git repository.

ajay pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4 by this push:
     new 7ec6a31  HDDS-1333. OzoneFileSystem can't work with spark/hadoop2.7 because incompatible security classes. Contributed by Elek, Marton. (#653)
7ec6a31 is described below

commit 7ec6a31eb314b9ecc1c2affbcc7d63c07bd33523
Author: Elek, Márton <el...@users.noreply.github.com>
AuthorDate: Thu Apr 4 23:00:54 2019 +0200

    HDDS-1333. OzoneFileSystem can't work with spark/hadoop2.7 because incompatible security classes. Contributed by Elek, Marton. (#653)
---
 hadoop-hdds/docs/content/SparkOzoneFSK8S.md        |   6 +-
 .../ozone/om/ha/OMFailoverProxyProvider.java       |   2 +-
 .../dist/dev-support/bin/dist-layout-stitching     |   3 +-
 hadoop-ozone/dist/pom.xml                          |  22 +
 .../src/main/compose/ozonefs/docker-compose.yaml   |  40 +-
 .../dist/src/main/compose/ozonefs/hadoopo3fs.robot |  56 ++
 .../src/main/compose/ozonesecure/docker-config     |   2 +
 .../dist/src/main/smoketest/commonlib.robot        |   6 +
 .../dist/src/main/smoketest/createbucketenv.robot  |  43 +
 .../src/main/smoketest/env-compose.robot}          |  18 +-
 ...rImpl.java => BasicOzoneClientAdapterImpl.java} |  89 +-
 ...neFileSystem.java => BasicOzoneFileSystem.java} | 177 ++--
 .../hadoop/fs/ozone/FilteredClassLoader.java       |   1 +
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   2 +-
 .../hadoop/fs/ozone/OzoneClientAdapterFactory.java |  30 +-
 .../hadoop/fs/ozone/OzoneClientAdapterImpl.java    | 367 +-------
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    | 922 +--------------------
 .../hadoop/fs/ozone/TestOzoneFileInterfaces.java   |   2 +-
 .../services/org.apache.hadoop.fs.FileSystem       |   0
 19 files changed, 352 insertions(+), 1436 deletions(-)

diff --git a/hadoop-hdds/docs/content/SparkOzoneFSK8S.md b/hadoop-hdds/docs/content/SparkOzoneFSK8S.md
index 3e598d9..fa6cacd 100644
--- a/hadoop-hdds/docs/content/SparkOzoneFSK8S.md
+++ b/hadoop-hdds/docs/content/SparkOzoneFSK8S.md
@@ -78,11 +78,13 @@ And create a custom `core-site.xml`:
 <configuration>
     <property>
         <name>fs.o3fs.impl</name>
-        <value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
+        <value>org.apache.hadoop.fs.ozone.BasicOzoneFileSystem</value>
     </property>
 </configuration>
 ```
 
+_Note_: You may also use `org.apache.hadoop.fs.ozone.OzoneFileSystem` without the `Basic` prefix. The `Basic` version doesn't support FS statistics and encryption zones but can work together with older hadoop versions.
+
 Copy the `ozonefs.jar` file from an ozone distribution (__use the legacy version!__)
 
 ```
@@ -134,7 +136,7 @@ Write down the ozone filesystem uri as it should be used with the spark-submit c
 
 ```
 kubectl create serviceaccount spark -n yournamespace
-kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=poc:yournamespace --namespace=yournamespace
+kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=yournamespace:spark --namespace=yournamespace
 ```
 ## Execute the job
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index d5baf9d..0e6d483 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -83,7 +83,7 @@ public class OMFailoverProxyProvider implements
   /**
    * Class to store proxy information.
    */
-  public final class OMProxyInfo
+  public class OMProxyInfo
       extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
     private InetSocketAddress address;
     private Text dtService;
diff --git a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
index 9a470d5..840abbc 100755
--- a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
+++ b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
@@ -114,6 +114,7 @@ run cp "${ROOT}/hadoop-ozone/objectstore-service/target/hadoop-ozone-objectstore
 cp -r "${ROOT}/hadoop-hdds/docs/target/classes/docs" ./
 
 #Copy docker compose files
-run cp -p -R "${ROOT}/hadoop-ozone/dist/src/main/compose" .
+#compose files are preprocessed: properties (eg. project.version) are replaced first by maven.
+run cp -p -R "${ROOT}/hadoop-ozone/dist/target/compose" .
 run cp -p -r "${ROOT}/hadoop-ozone/dist/src/main/smoketest" .
 run cp -p -r "${ROOT}/hadoop-ozone/dist/src/main/blockade" .
diff --git a/hadoop-ozone/dist/pom.xml b/hadoop-ozone/dist/pom.xml
index 5e9cbf5..a9cc6b7 100644
--- a/hadoop-ozone/dist/pom.xml
+++ b/hadoop-ozone/dist/pom.xml
@@ -121,6 +121,28 @@
         </executions>
       </plugin>
       <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>3.1.0</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/compose</outputDirectory>
+              <resources>
+                <resource>
+                  <directory>src/main/compose</directory>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>exec-maven-plugin</artifactId>
         <executions>
diff --git a/hadoop-ozone/dist/src/main/compose/ozonefs/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonefs/docker-compose.yaml
index 5d54ecf..ce5de8c 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonefs/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonefs/docker-compose.yaml
@@ -49,21 +49,53 @@ services:
       environment:
          ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
       command: ["/opt/hadoop/bin/ozone","scm"]
-   hadoop3:
+   hadoop32:
       image: flokkr/hadoop:3.1.0
       volumes:
          - ../..:/opt/ozone
       env_file:
          - ./docker-config
       environment:
-         HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-current*.jar
+         HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-current-@project.version@.jar
+         CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.OzoneFileSystem
       command: ["watch","-n","100000","ls"]
-   hadoop2:
+   hadoop31:
+      image: flokkr/hadoop:3.1.0
+      volumes:
+         - ../..:/opt/ozone
+      env_file:
+         - ./docker-config
+      environment:
+         HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
+         CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.OzoneFileSystem
+      command: ["watch","-n","100000","ls"]
+   hadoop29:
       image: flokkr/hadoop:2.9.0
       volumes:
          - ../..:/opt/ozone
       env_file:
          - ./docker-config
       environment:
-         HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy*.jar
+         HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
+         CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
+      command: ["watch","-n","100000","ls"]
+   hadoop27:
+      image: flokkr/hadoop:2.7.3
+      volumes:
+         - ../..:/opt/ozone
+      env_file:
+         - ./docker-config
+      environment:
+         HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
+         CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
+      command: ["watch","-n","100000","ls"]
+   spark:
+      image: flokkr/spark
+      volumes:
+         - ../..:/opt/ozone
+      env_file:
+         - ./docker-config
+      environment:
+         HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
+         CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
       command: ["watch","-n","100000","ls"]
diff --git a/hadoop-ozone/dist/src/main/compose/ozonefs/hadoopo3fs.robot b/hadoop-ozone/dist/src/main/compose/ozonefs/hadoopo3fs.robot
new file mode 100644
index 0000000..6da9d60
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/compose/ozonefs/hadoopo3fs.robot
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation       Test ozone fs usage from Hdfs and Spark
+Library             OperatingSystem
+Library             String
+Resource            ../../smoketest/env-compose.robot
+Resource            ../../smoketest/commonlib.robot
+
+*** Variables ***
+${DATANODE_HOST}        datanode
+
+*** Keywords ***
+
+Test hadoop dfs
+    [arguments]        ${prefix}
+    ${random} =        Generate Random String  5  [NUMBERS]
+    ${result} =        Execute on host            ${prefix}       hdfs dfs -put /opt/hadoop/NOTICE.txt o3fs://bucket1.vol1/${prefix}-${random}
+    ${result} =        Execute on host            ${prefix}       hdfs dfs -ls o3fs://bucket1.vol1/
+                       Should contain             ${result}       ${prefix}-${random}
+
+*** Test Cases ***
+
+Create bucket and volume to test
+   ${result} =         Run tests on host      scm            createbucketenv.robot
+
+Test hadoop 3.1
+   Test hadoop dfs     hadoop31
+
+Test hadoop 3.2
+   Test hadoop dfs     hadoop31
+
+Test hadoop 2.9
+   Test hadoop dfs     hadoop29
+
+Test hadoop 2.7
+   Test hadoop dfs     hadoop27
+
+Test spark 2.3
+   ${legacyjar} =      Execute on host        spark         bash -c 'find /opt/ozone/share/ozone/lib/ -name *legacy*.jar'
+   ${postfix} =        Generate Random String  5  [NUMBERS]
+   ${result} =         Execute on host        spark         /opt/spark/bin/spark-submit --jars ${legacyjar} --class org.apache.spark.examples.DFSReadWriteTest /opt/spark//examples/jars/spark-examples_2.11-2.3.0.jar /opt/spark/README.md o3fs://bucket1.vol1/spark-${postfix}
+
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index bc296d0..463af12 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
+
 OZONE-SITE.XML_ozone.om.address=om
 OZONE-SITE.XML_ozone.om.http-address=om:9874
 OZONE-SITE.XML_ozone.scm.names=scm
diff --git a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot b/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
index 0db01f9..d0ea344 100644
--- a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
@@ -29,6 +29,12 @@ Execute
     Should Be Equal As Integers     ${rc}                       0
     [return]                        ${output}
 
+Execute And Ignore Error
+    [arguments]                     ${command}
+    ${rc}                           ${output} =                 Run And Return Rc And Output           ${command}
+    Log                             ${output}
+    [return]                        ${output}
+
 Execute and checkrc
     [arguments]                     ${command}                  ${expected_error_code}
     ${rc}                           ${output} =                 Run And Return Rc And Output           ${command}
diff --git a/hadoop-ozone/dist/src/main/smoketest/createbucketenv.robot b/hadoop-ozone/dist/src/main/smoketest/createbucketenv.robot
new file mode 100644
index 0000000..fc0fda4
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/createbucketenv.robot
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation       Create bucket and volume for any other testings
+Library             OperatingSystem
+Resource            commonlib.robot
+Test Timeout        2 minute
+
+
+*** Variables ***
+${volume}       vol1
+${bucket}       bucket1
+
+
+*** Keywords ***
+Create volume
+    ${result} =     Execute             ozone sh volume create /${volume} --user hadoop --quota 100TB --root
+                    Should not contain  ${result}       Failed
+                    Should contain      ${result}       Creating Volume: ${volume}
+Create bucket
+                    Execute             ozone sh bucket create /${volume}/${bucket}
+
+*** Test Cases ***
+Test ozone shell
+    ${result} =     Execute And Ignore Error             ozone sh bucket info /${volume}/${bucket}
+                    Run Keyword if      "VOLUME_NOT_FOUND" in """${result}"""       Create volume
+                    Run Keyword if      "VOLUME_NOT_FOUND" in """${result}"""       Create bucket
+                    Run Keyword if      "BUCKET_NOT_FOUND" in """${result}"""       Create bucket
+    ${result} =     Execute             ozone sh bucket info /${volume}/${bucket}
+                    Should not contain  ${result}  NOT_FOUND
diff --git a/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-ozone/dist/src/main/smoketest/env-compose.robot
similarity index 52%
copy from hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
copy to hadoop-ozone/dist/src/main/smoketest/env-compose.robot
index 0368002..d529d7f 100644
--- a/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
+++ b/hadoop-ozone/dist/src/main/smoketest/env-compose.robot
@@ -13,4 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.hadoop.fs.ozone.OzoneFileSystem
+*** Settings ***
+Documentation       High level utilities to execute commands and tests in docker-compose based environments.
+Resource            commonlib.robot
+
+
+*** Keywords ***
+
+Run tests on host
+    [arguments]        ${host}       ${robotfile}
+    ${result} =        Execute       docker-compose exec ${host} robot smoketest/${robotfile}
+
+Execute on host
+    [arguments]                     ${host}     ${command}
+    ${rc}                           ${output} =                 Run And Return Rc And Output           docker-compose exec ${host} ${command}
+    Log                             ${output}
+    Should Be Equal As Integers     ${rc}                       0
+    [return]                        ${output}
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
similarity index 81%
copy from hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
copy to hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 9536fbc..bd1d412 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -17,15 +17,12 @@
  */
 package org.apache.hadoop.fs.ozone;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -45,16 +42,23 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
+
+import org.apache.commons.lang3.StringUtils;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of the OzoneFileSystem calls.
+ * Basic Implementation of the OzoneFileSystem calls.
+ * <p>
+ * This is the minimal version which doesn't include any statistics.
+ * <p>
+ * For full featured version use OzoneClientAdapterImpl.
  */
-public class OzoneClientAdapterImpl implements OzoneClientAdapter {
+public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
 
   static final Logger LOG =
-      LoggerFactory.getLogger(OzoneClientAdapterImpl.class);
+      LoggerFactory.getLogger(BasicOzoneClientAdapterImpl.class);
 
   private OzoneClient ozoneClient;
   private ObjectStore objectStore;
@@ -62,35 +66,20 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
   private OzoneBucket bucket;
   private ReplicationType replicationType;
   private ReplicationFactor replicationFactor;
-  private OzoneFSStorageStatistics storageStatistics;
   private boolean securityEnabled;
-  /**
-   * Create new OzoneClientAdapter implementation.
-   *
-   * @param volumeStr         Name of the volume to use.
-   * @param bucketStr         Name of the bucket to use
-   * @param storageStatistics Storage statistic (optional, can be null)
-   * @throws IOException In case of a problem.
-   */
-  public OzoneClientAdapterImpl(String volumeStr, String bucketStr,
-      OzoneFSStorageStatistics storageStatistics) throws IOException {
-    this(createConf(), volumeStr, bucketStr, storageStatistics);
-  }
 
   /**
    * Create new OzoneClientAdapter implementation.
    *
-   * @param volumeStr         Name of the volume to use.
-   * @param bucketStr         Name of the bucket to use
+   * @param volumeStr Name of the volume to use.
+   * @param bucketStr Name of the bucket to use
    * @throws IOException In case of a problem.
    */
-  public OzoneClientAdapterImpl(String volumeStr, String bucketStr)
+  public BasicOzoneClientAdapterImpl(String volumeStr, String bucketStr)
       throws IOException {
-    this(createConf(), volumeStr, bucketStr, null);
+    this(createConf(), volumeStr, bucketStr);
   }
 
-
-
   private static OzoneConfiguration createConf() {
     ClassLoader contextClassLoader =
         Thread.currentThread().getContextClassLoader();
@@ -100,15 +89,15 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
     return conf;
   }
 
-  public OzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
-      String bucketStr, OzoneFSStorageStatistics storageStatistics)
+  public BasicOzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
+      String bucketStr)
       throws IOException {
-    this(null, -1, conf, volumeStr, bucketStr, storageStatistics);
+    this(null, -1, conf, volumeStr, bucketStr);
   }
 
-  public OzoneClientAdapterImpl(String omHost, int omPort,
-      Configuration hadoopConf, String volumeStr, String bucketStr,
-      OzoneFSStorageStatistics storageStatistics) throws IOException {
+  public BasicOzoneClientAdapterImpl(String omHost, int omPort,
+      Configuration hadoopConf, String volumeStr, String bucketStr)
+      throws IOException {
 
     ClassLoader contextClassLoader =
         Thread.currentThread().getContextClassLoader();
@@ -146,14 +135,12 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
       this.bucket = volume.getBucket(bucketStr);
       this.replicationType = ReplicationType.valueOf(replicationTypeConf);
       this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
-      this.storageStatistics = storageStatistics;
     } finally {
       Thread.currentThread().setContextClassLoader(contextClassLoader);
     }
 
   }
 
-
   @Override
   public void close() throws IOException {
     ozoneClient.close();
@@ -161,17 +148,17 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
 
   @Override
   public InputStream createInputStream(String key) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_READ, 1);
-    }
+    incrementCounter(Statistic.OBJECTS_READ);
     return bucket.readKey(key).getInputStream();
   }
 
+  protected void incrementCounter(Statistic objectsRead) {
+    //noop: Use OzoneClientAdapterImpl which supports statistics.
+  }
+
   @Override
   public OzoneFSOutputStream createKey(String key) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
-    }
+    incrementCounter(Statistic.OBJECTS_CREATED);
     OzoneOutputStream ozoneOutputStream =
         bucket.createKey(key, 0, replicationType, replicationFactor,
             new HashMap<>());
@@ -180,9 +167,7 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
 
   @Override
   public void renameKey(String key, String newKeyName) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_RENAMED, 1);
-    }
+    incrementCounter(Statistic.OBJECTS_RENAMED);
     bucket.renameKey(key, newKeyName);
   }
 
@@ -195,9 +180,7 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
   @Override
   public BasicKeyInfo getKeyInfo(String keyName) {
     try {
-      if (storageStatistics != null) {
-        storageStatistics.incrementCounter(Statistic.OBJECTS_QUERY, 1);
-      }
+      incrementCounter(Statistic.OBJECTS_QUERY);
       OzoneKey key = bucket.getKey(keyName);
       return new BasicKeyInfo(
           keyName,
@@ -234,9 +217,7 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
   public boolean createDirectory(String keyName) {
     try {
       LOG.trace("creating dir for key:{}", keyName);
-      if (storageStatistics != null) {
-        storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
-      }
+      incrementCounter(Statistic.OBJECTS_CREATED);
       bucket.createKey(keyName, 0, replicationType, replicationFactor,
           new HashMap<>()).close();
       return true;
@@ -256,9 +237,7 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
   public boolean deleteObject(String keyName) {
     LOG.trace("issuing delete for key" + keyName);
     try {
-      if (storageStatistics != null) {
-        storageStatistics.incrementCounter(Statistic.OBJECTS_DELETED, 1);
-      }
+      incrementCounter(Statistic.OBJECTS_DELETED);
       bucket.deleteKey(keyName);
       return true;
     } catch (IOException ioe) {
@@ -274,17 +253,13 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
 
   @Override
   public boolean hasNextKey(String key) {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_LIST, 1);
-    }
+    incrementCounter(Statistic.OBJECTS_LIST);
     return bucket.listKeys(key).hasNext();
   }
 
   @Override
   public Iterator<BasicKeyInfo> listKeys(String pathKey) {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_LIST, 1);
-    }
+    incrementCounter(Statistic.OBJECTS_LIST);
     return new IteratorAdapter(bucket.listKeys(pathKey));
   }
 
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
similarity index 87%
copy from hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
copy to hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index 9f425ac..705ccfe 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -34,12 +34,9 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.key.KeyProvider;
-import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -48,15 +45,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.DelegationTokenIssuer;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
 import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
@@ -67,18 +61,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The Ozone Filesystem implementation.
+ * The minimal Ozone Filesystem implementation.
  * <p>
- * This subclass is marked as private as code should not be creating it
- * directly; use {@link FileSystem#get(Configuration)} and variants to create
- * one. If cast to {@link OzoneFileSystem}, extra methods and features may be
- * accessed. Consider those private and unstable.
+ * This is a basic version which doesn't extend
+ * KeyProviderTokenIssuer and doesn't include statistics. It can be used
+ * from older hadoop version. For newer hadoop version use the full featured
+ * OzoneFileSystem.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class OzoneFileSystem extends FileSystem
-    implements KeyProviderTokenIssuer {
-  static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class);
+public class BasicOzoneFileSystem extends FileSystem {
+  static final Logger LOG =
+      LoggerFactory.getLogger(BasicOzoneFileSystem.class);
 
   /**
    * The Ozone client for connecting to Ozone server.
@@ -90,8 +84,6 @@ public class OzoneFileSystem extends FileSystem
 
   private OzoneClientAdapter adapter;
 
-  private OzoneFSStorageStatistics storageStatistics;
-
   private static final Pattern URL_SCHEMA_PATTERN =
       Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
 
@@ -121,28 +113,29 @@ public class OzoneFileSystem extends FileSystem
 
     String omHost = null;
     String omPort = String.valueOf(-1);
-    if (StringUtils.isNotEmpty(remaining)) {
+    if (!isEmpty(remaining)) {
       String[] parts = remaining.split(":");
       if (parts.length != 2) {
         throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
       }
       omHost = parts[0];
       omPort = parts[1];
-      if (!NumberUtils.isParsable(omPort)) {
+      if (!isNumber(omPort)) {
         throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
       }
     }
 
     try {
       uri = new URIBuilder().setScheme(OZONE_URI_SCHEME)
-        .setHost(authority)
-        .build();
+          .setHost(authority)
+          .build();
       LOG.trace("Ozone URI for ozfs initialization is " + uri);
 
       //isolated is the default for ozonefs-lib-legacy which includes the
       // /ozonefs.txt, otherwise the default is false. It could be overridden.
       boolean defaultValue =
-          OzoneFileSystem.class.getClassLoader().getResource("ozonefs.txt")
+          BasicOzoneFileSystem.class.getClassLoader()
+              .getResource("ozonefs.txt")
               != null;
 
       //Use string here instead of the constant as constant may not be available
@@ -150,36 +143,8 @@ public class OzoneFileSystem extends FileSystem
       boolean isolatedClassloader =
           conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
 
-      try {
-        //register only to the GlobalStorageStatistics if the class exists.
-        //This is required to support hadoop versions <2.7
-        Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
-        storageStatistics = (OzoneFSStorageStatistics)
-            GlobalStorageStatistics.INSTANCE
-                .put(OzoneFSStorageStatistics.NAME,
-                    OzoneFSStorageStatistics::new);
-      } catch (ClassNotFoundException e) {
-        //we don't support storage statistics for hadoop2.7 and older
-      }
-
-      if (isolatedClassloader) {
-        try {
-          //register only to the GlobalStorageStatistics if the class exists.
-          //This is required to support hadoop versions <2.7
-          Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
-          this.adapter =
-              OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr,
-                  storageStatistics);
-        } catch (ClassNotFoundException e) {
-          this.adapter =
-              OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr);
-        }
-      } else {
-
-        this.adapter = new OzoneClientAdapterImpl(omHost,
-            Integer.parseInt(omPort), conf,
-            volumeStr, bucketStr, storageStatistics);
-      }
+      this.adapter = createAdapter(conf, bucketStr, volumeStr, omHost, omPort,
+          isolatedClassloader);
 
       try {
         this.userName =
@@ -196,6 +161,24 @@ public class OzoneFileSystem extends FileSystem
     }
   }
 
+  protected OzoneClientAdapter createAdapter(Configuration conf,
+      String bucketStr,
+      String volumeStr, String omHost, String omPort,
+      boolean isolatedClassloader) throws IOException {
+
+    if (isolatedClassloader) {
+
+      return OzoneClientAdapterFactory
+          .createAdapter(volumeStr, bucketStr);
+
+    } else {
+
+      return new BasicOzoneClientAdapterImpl(omHost,
+          Integer.parseInt(omPort), conf,
+          volumeStr, bucketStr);
+    }
+  }
+
   @Override
   public void close() throws IOException {
     try {
@@ -215,19 +198,9 @@ public class OzoneFileSystem extends FileSystem
     return OZONE_URI_SCHEME;
   }
 
-  Statistics getFsStatistics() {
-    return statistics;
-  }
-
-  OzoneFSStorageStatistics getOzoneFSOpsCountStatistics() {
-    return storageStatistics;
-  }
-
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_OPEN, 1);
-    }
+    incrementCounter(Statistic.INVOCATION_OPEN);
     statistics.incrementWriteOps(1);
     LOG.trace("open() path:{}", f);
     final FileStatus fileStatus = getFileStatus(f);
@@ -240,15 +213,17 @@ public class OzoneFileSystem extends FileSystem
         new OzoneFSInputStream(adapter.createInputStream(key)));
   }
 
+  protected void incrementCounter(Statistic statistic) {
+    //don't do anyting in this default implementation.
+  }
+
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-                                   boolean overwrite, int bufferSize,
-                                   short replication, long blockSize,
-                                   Progressable progress) throws IOException {
+      boolean overwrite, int bufferSize,
+      short replication, long blockSize,
+      Progressable progress) throws IOException {
     LOG.trace("create() path:{}", f);
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_CREATE, 1);
-    }
+    incrementCounter(Statistic.INVOCATION_CREATE);
     statistics.incrementWriteOps(1);
     final String key = pathToKey(f);
     final FileStatus status;
@@ -281,10 +256,7 @@ public class OzoneFileSystem extends FileSystem
       short replication,
       long blockSize,
       Progressable progress) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(
-          Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
-    }
+    incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE);
     statistics.incrementWriteOps(1);
     final Path parent = path.getParent();
     if (parent != null) {
@@ -299,31 +271,11 @@ public class OzoneFileSystem extends FileSystem
 
   @Override
   public FSDataOutputStream append(Path f, int bufferSize,
-                                   Progressable progress) throws IOException {
+      Progressable progress) throws IOException {
     throw new UnsupportedOperationException("append() Not implemented by the "
         + getClass().getSimpleName() + " FileSystem implementation");
   }
 
-  @Override
-  public KeyProvider getKeyProvider() throws IOException {
-    return adapter.getKeyProvider();
-  }
-
-  @Override
-  public URI getKeyProviderUri() throws IOException {
-    return adapter.getKeyProviderUri();
-  }
-
-  @Override
-  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
-      throws IOException {
-    KeyProvider keyProvider = getKeyProvider();
-    if (keyProvider instanceof DelegationTokenIssuer) {
-      return new DelegationTokenIssuer[]{(DelegationTokenIssuer)keyProvider};
-    }
-    return null;
-  }
-
   private class RenameIterator extends OzoneListingIterator {
     private final String srcKey;
     private final String dstKey;
@@ -359,9 +311,7 @@ public class OzoneFileSystem extends FileSystem
    */
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_RENAME, 1);
-    }
+    incrementCounter(Statistic.INVOCATION_RENAME);
     statistics.incrementWriteOps(1);
     if (src.equals(dst)) {
       return true;
@@ -496,9 +446,7 @@ public class OzoneFileSystem extends FileSystem
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_DELETE, 1);
-    }
+    incrementCounter(Statistic.INVOCATION_DELETE);
     statistics.incrementWriteOps(1);
     LOG.debug("Delete path {} - recursive {}", f, recursive);
     FileStatus status;
@@ -690,9 +638,7 @@ public class OzoneFileSystem extends FileSystem
 
   @Override
   public FileStatus[] listStatus(Path f) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_LIST_STATUS, 1);
-    }
+    incrementCounter(Statistic.INVOCATION_LIST_STATUS);
     statistics.incrementReadOps(1);
     LOG.trace("listStatus() path:{}", f);
     ListStatusIterator iterator = new ListStatusIterator(f);
@@ -718,6 +664,7 @@ public class OzoneFileSystem extends FileSystem
   /**
    * Get a canonical service name for this file system. If the URI is logical,
    * the hostname part of the URI will be returned.
+   *
    * @return a service string that uniquely identifies this file system.
    */
   @Override
@@ -786,7 +733,7 @@ public class OzoneFileSystem extends FileSystem
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
     LOG.trace("mkdir() path:{} ", f);
     String key = pathToKey(f);
-    if (StringUtils.isEmpty(key)) {
+    if (isEmpty(key)) {
       return false;
     }
     return mkdir(f);
@@ -794,10 +741,7 @@ public class OzoneFileSystem extends FileSystem
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics
-          .incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS, 1);
-    }
+    incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS);
     statistics.incrementReadOps(1);
     LOG.trace("getFileStatus() path:{}", f);
     Path qualifiedPath = f.makeQualified(uri, workingDir);
@@ -887,7 +831,7 @@ public class OzoneFileSystem extends FileSystem
    * @return delimiter appended key
    */
   private String addTrailingSlashIfNeeded(String key) {
-    if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
+    if (!isEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
       return key + OZONE_URI_DELIMITER;
     } else {
       return key;
@@ -976,4 +920,21 @@ public class OzoneFileSystem extends FileSystem
       return status;
     }
   }
+
+  public OzoneClientAdapter getAdapter() {
+    return adapter;
+  }
+
+  public boolean isEmpty(CharSequence cs) {
+    return cs == null || cs.length() == 0;
+  }
+
+  public boolean isNumber(String number) {
+    try {
+      Integer.parseInt(number);
+    } catch (NumberFormatException ex) {
+      return false;
+    }
+    return true;
+  }
 }
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/FilteredClassLoader.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/FilteredClassLoader.java
index 96503cb..e9e51de 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/FilteredClassLoader.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/FilteredClassLoader.java
@@ -51,6 +51,7 @@ public class FilteredClassLoader extends URLClassLoader {
   public FilteredClassLoader(URL[] urls, ClassLoader parent) {
     super(urls, null);
     delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneClientAdapter");
+    delegatedClasses.add("org.apache.hadoop.security.token.Token");
     delegatedClasses.add("org.apache.hadoop.fs.ozone.BasicKeyInfo");
     delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneFSOutputStream");
     delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneFSStorageStatistics");
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index dab8017..59cd2c1 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -27,7 +27,7 @@ import java.net.URI;
 import java.util.Iterator;
 
 /**
- * Lightweight adapter to separte hadoop/ozone classes.
+ * Lightweight adapter to separate hadoop/ozone classes.
  * <p>
  * This class contains only the bare minimum Ozone classes in the signature.
  * It could be loaded by a different classloader because only the objects in
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java
index 8ab470c..fee4298 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterFactory.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
 
+import org.apache.hadoop.fs.StorageStatistics;
+
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +46,7 @@ public final class OzoneClientAdapterFactory {
   public static OzoneClientAdapter createAdapter(
       String volumeStr,
       String bucketStr) throws IOException {
-    return createAdapter(volumeStr, bucketStr,
+    return createAdapter(volumeStr, bucketStr, true,
         (aClass) -> (OzoneClientAdapter) aClass
             .getConstructor(String.class, String.class)
             .newInstance(
@@ -56,9 +58,8 @@ public final class OzoneClientAdapterFactory {
   public static OzoneClientAdapter createAdapter(
       String volumeStr,
       String bucketStr,
-      OzoneFSStorageStatistics storageStatistics)
-      throws IOException {
-    return createAdapter(volumeStr, bucketStr,
+      StorageStatistics storageStatistics) throws IOException {
+    return createAdapter(volumeStr, bucketStr, false,
         (aClass) -> (OzoneClientAdapter) aClass
             .getConstructor(String.class, String.class,
                 OzoneFSStorageStatistics.class)
@@ -72,9 +73,11 @@ public final class OzoneClientAdapterFactory {
   public static OzoneClientAdapter createAdapter(
       String volumeStr,
       String bucketStr,
+      boolean basic,
       OzoneClientAdapterCreator creator) throws IOException {
 
-    ClassLoader currentClassLoader = OzoneFileSystem.class.getClassLoader();
+    ClassLoader currentClassLoader =
+        OzoneClientAdapterFactory.class.getClassLoader();
     List<URL> urls = new ArrayList<>();
 
     findEmbeddedLibsUrl(urls, currentClassLoader);
@@ -99,10 +102,18 @@ public final class OzoneClientAdapterFactory {
       reflectionUtils.getMethod("getClassByName", String.class)
           .invoke(null, "org.apache.ratis.grpc.GrpcFactory");
 
-      Class<?> aClass = classLoader
-          .loadClass("org.apache.hadoop.fs.ozone.OzoneClientAdapterImpl");
+      Class<?> adapterClass = null;
+      if (basic) {
+        adapterClass = classLoader
+            .loadClass(
+                "org.apache.hadoop.fs.ozone.BasicOzoneClientAdapterImpl");
+      } else {
+        adapterClass = classLoader
+            .loadClass(
+                "org.apache.hadoop.fs.ozone.OzoneClientAdapterImpl");
+      }
       OzoneClientAdapter ozoneClientAdapter =
-          creator.createOzoneClientAdapter(aClass);
+          creator.createOzoneClientAdapter(adapterClass);
 
       Thread.currentThread().setContextClassLoader(contextClassLoader);
 
@@ -134,7 +145,8 @@ public final class OzoneClientAdapterFactory {
     //marker file is added to the jar to make it easier to find the URL
     // for the current jar.
     String markerFile = "ozonefs.txt";
-    ClassLoader currentClassLoader = OzoneFileSystem.class.getClassLoader();
+    ClassLoader currentClassLoader =
+        OzoneClientAdapterFactory.class.getClassLoader();
 
     URL ozFs = currentClassLoader
         .getResource(markerFile);
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
index 9536fbc..c91f67b 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
@@ -17,380 +17,45 @@
  */
 package org.apache.hadoop.fs.ozone;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
-
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Iterator;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.crypto.key.KeyProvider;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKey;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of the OzoneFileSystem calls.
  */
-public class OzoneClientAdapterImpl implements OzoneClientAdapter {
-
-  static final Logger LOG =
-      LoggerFactory.getLogger(OzoneClientAdapterImpl.class);
+public class OzoneClientAdapterImpl extends BasicOzoneClientAdapterImpl {
 
-  private OzoneClient ozoneClient;
-  private ObjectStore objectStore;
-  private OzoneVolume volume;
-  private OzoneBucket bucket;
-  private ReplicationType replicationType;
-  private ReplicationFactor replicationFactor;
   private OzoneFSStorageStatistics storageStatistics;
-  private boolean securityEnabled;
-  /**
-   * Create new OzoneClientAdapter implementation.
-   *
-   * @param volumeStr         Name of the volume to use.
-   * @param bucketStr         Name of the bucket to use
-   * @param storageStatistics Storage statistic (optional, can be null)
-   * @throws IOException In case of a problem.
-   */
-  public OzoneClientAdapterImpl(String volumeStr, String bucketStr,
-      OzoneFSStorageStatistics storageStatistics) throws IOException {
-    this(createConf(), volumeStr, bucketStr, storageStatistics);
-  }
 
-  /**
-   * Create new OzoneClientAdapter implementation.
-   *
-   * @param volumeStr         Name of the volume to use.
-   * @param bucketStr         Name of the bucket to use
-   * @throws IOException In case of a problem.
-   */
-  public OzoneClientAdapterImpl(String volumeStr, String bucketStr)
+  public OzoneClientAdapterImpl(String volumeStr, String bucketStr,
+      OzoneFSStorageStatistics storageStatistics)
       throws IOException {
-    this(createConf(), volumeStr, bucketStr, null);
+    super(volumeStr, bucketStr);
+    this.storageStatistics = storageStatistics;
   }
 
-
-
-  private static OzoneConfiguration createConf() {
-    ClassLoader contextClassLoader =
-        Thread.currentThread().getContextClassLoader();
-    Thread.currentThread().setContextClassLoader(null);
-    OzoneConfiguration conf = new OzoneConfiguration();
-    Thread.currentThread().setContextClassLoader(contextClassLoader);
-    return conf;
-  }
-
-  public OzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
-      String bucketStr, OzoneFSStorageStatistics storageStatistics)
+  public OzoneClientAdapterImpl(
+      OzoneConfiguration conf, String volumeStr, String bucketStr,
+      OzoneFSStorageStatistics storageStatistics)
       throws IOException {
-    this(null, -1, conf, volumeStr, bucketStr, storageStatistics);
+    super(conf, volumeStr, bucketStr);
+    this.storageStatistics = storageStatistics;
   }
 
   public OzoneClientAdapterImpl(String omHost, int omPort,
       Configuration hadoopConf, String volumeStr, String bucketStr,
-      OzoneFSStorageStatistics storageStatistics) throws IOException {
-
-    ClassLoader contextClassLoader =
-        Thread.currentThread().getContextClassLoader();
-    Thread.currentThread().setContextClassLoader(null);
-    OzoneConfiguration conf;
-    if (hadoopConf instanceof OzoneConfiguration) {
-      conf = (OzoneConfiguration) hadoopConf;
-    } else {
-      conf = new OzoneConfiguration(hadoopConf);
-    }
-
-    SecurityConfig secConfig = new SecurityConfig(conf);
-
-    if (secConfig.isSecurityEnabled()) {
-      this.securityEnabled = true;
-    }
-
-    try {
-      String replicationTypeConf =
-          conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
-              OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT);
-
-      int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
-          OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
-
-      if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
-        this.ozoneClient =
-            OzoneClientFactory.getRpcClient(omHost, omPort, conf);
-      } else {
-        this.ozoneClient =
-            OzoneClientFactory.getRpcClient(conf);
-      }
-      objectStore = ozoneClient.getObjectStore();
-      this.volume = objectStore.getVolume(volumeStr);
-      this.bucket = volume.getBucket(bucketStr);
-      this.replicationType = ReplicationType.valueOf(replicationTypeConf);
-      this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
-      this.storageStatistics = storageStatistics;
-    } finally {
-      Thread.currentThread().setContextClassLoader(contextClassLoader);
-    }
-
-  }
-
-
-  @Override
-  public void close() throws IOException {
-    ozoneClient.close();
-  }
-
-  @Override
-  public InputStream createInputStream(String key) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_READ, 1);
-    }
-    return bucket.readKey(key).getInputStream();
-  }
-
-  @Override
-  public OzoneFSOutputStream createKey(String key) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
-    }
-    OzoneOutputStream ozoneOutputStream =
-        bucket.createKey(key, 0, replicationType, replicationFactor,
-            new HashMap<>());
-    return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
-  }
-
-  @Override
-  public void renameKey(String key, String newKeyName) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_RENAMED, 1);
-    }
-    bucket.renameKey(key, newKeyName);
-  }
-
-  /**
-   * Helper method to fetch the key metadata info.
-   *
-   * @param keyName key whose metadata information needs to be fetched
-   * @return metadata info of the key
-   */
-  @Override
-  public BasicKeyInfo getKeyInfo(String keyName) {
-    try {
-      if (storageStatistics != null) {
-        storageStatistics.incrementCounter(Statistic.OBJECTS_QUERY, 1);
-      }
-      OzoneKey key = bucket.getKey(keyName);
-      return new BasicKeyInfo(
-          keyName,
-          key.getModificationTime(),
-          key.getDataSize()
-      );
-    } catch (IOException e) {
-      LOG.trace("Key:{} does not exist", keyName);
-      return null;
-    }
-  }
-
-  /**
-   * Helper method to check if an Ozone key is representing a directory.
-   *
-   * @param key key to be checked as a directory
-   * @return true if key is a directory, false otherwise
-   */
-  @Override
-  public boolean isDirectory(BasicKeyInfo key) {
-    LOG.trace("key name:{} size:{}", key.getName(),
-        key.getDataSize());
-    return key.getName().endsWith(OZONE_URI_DELIMITER)
-        && (key.getDataSize() == 0);
-  }
-
-  /**
-   * Helper method to create an directory specified by key name in bucket.
-   *
-   * @param keyName key name to be created as directory
-   * @return true if the key is created, false otherwise
-   */
-  @Override
-  public boolean createDirectory(String keyName) {
-    try {
-      LOG.trace("creating dir for key:{}", keyName);
-      if (storageStatistics != null) {
-        storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
-      }
-      bucket.createKey(keyName, 0, replicationType, replicationFactor,
-          new HashMap<>()).close();
-      return true;
-    } catch (IOException ioe) {
-      LOG.error("create key failed for key:{}", keyName, ioe);
-      return false;
-    }
-  }
-
-  /**
-   * Helper method to delete an object specified by key name in bucket.
-   *
-   * @param keyName key name to be deleted
-   * @return true if the key is deleted, false otherwise
-   */
-  @Override
-  public boolean deleteObject(String keyName) {
-    LOG.trace("issuing delete for key" + keyName);
-    try {
-      if (storageStatistics != null) {
-        storageStatistics.incrementCounter(Statistic.OBJECTS_DELETED, 1);
-      }
-      bucket.deleteKey(keyName);
-      return true;
-    } catch (IOException ioe) {
-      LOG.error("delete key failed " + ioe.getMessage());
-      return false;
-    }
-  }
-
-  @Override
-  public long getCreationTime() {
-    return bucket.getCreationTime();
-  }
-
-  @Override
-  public boolean hasNextKey(String key) {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_LIST, 1);
-    }
-    return bucket.listKeys(key).hasNext();
-  }
-
-  @Override
-  public Iterator<BasicKeyInfo> listKeys(String pathKey) {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.OBJECTS_LIST, 1);
-    }
-    return new IteratorAdapter(bucket.listKeys(pathKey));
-  }
-
-  @Override
-  public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
+      OzoneFSStorageStatistics storageStatistics)
       throws IOException {
-    if (!securityEnabled) {
-      return null;
-    }
-    Token<OzoneTokenIdentifier> token = ozoneClient.getObjectStore()
-        .getDelegationToken(renewer == null ? null : new Text(renewer));
-    token.setKind(OzoneTokenIdentifier.KIND_NAME);
-    return token;
-
-  }
-
-  @Override
-  public KeyProvider getKeyProvider() throws IOException {
-    return objectStore.getKeyProvider();
-  }
-
-  @Override
-  public URI getKeyProviderUri() throws IOException {
-    return objectStore.getKeyProviderUri();
+    super(omHost, omPort, hadoopConf, volumeStr, bucketStr);
+    this.storageStatistics = storageStatistics;
   }
 
   @Override
-  public String getCanonicalServiceName() {
-    return objectStore.getCanonicalServiceName();
-  }
-
-  /**
-   * Ozone Delegation Token Renewer.
-   */
-  @InterfaceAudience.Private
-  public static class Renewer extends TokenRenewer {
-
-    //Ensure that OzoneConfiguration files are loaded before trying to use
-    // the renewer.
-    static {
-      OzoneConfiguration.activate();
-    }
-
-    public Text getKind() {
-      return OzoneTokenIdentifier.KIND_NAME;
-    }
-
-    @Override
-    public boolean handleKind(Text kind) {
-      return getKind().equals(kind);
-    }
-
-    @Override
-    public boolean isManaged(Token<?> token) throws IOException {
-      return true;
-    }
-
-    @Override
-    public long renew(Token<?> token, Configuration conf)
-        throws IOException, InterruptedException {
-      Token<OzoneTokenIdentifier> ozoneDt =
-          (Token<OzoneTokenIdentifier>) token;
-      OzoneClient ozoneClient =
-          OzoneClientFactory.getRpcClient(conf);
-      return ozoneClient.getObjectStore().renewDelegationToken(ozoneDt);
-    }
-
-    @Override
-    public void cancel(Token<?> token, Configuration conf)
-        throws IOException, InterruptedException {
-      Token<OzoneTokenIdentifier> ozoneDt =
-          (Token<OzoneTokenIdentifier>) token;
-      OzoneClient ozoneClient =
-          OzoneClientFactory.getRpcClient(conf);
-      ozoneClient.getObjectStore().cancelDelegationToken(ozoneDt);
-    }
-  }
-
-  /**
-   * Adapter to convert OzoneKey to a safe and simple Key implementation.
-   */
-  public static class IteratorAdapter implements Iterator<BasicKeyInfo> {
-
-    private Iterator<? extends OzoneKey> original;
-
-    public IteratorAdapter(Iterator<? extends OzoneKey> listKeys) {
-      this.original = listKeys;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return original.hasNext();
-    }
-
-    @Override
-    public BasicKeyInfo next() {
-      OzoneKey next = original.next();
-      if (next == null) {
-        return null;
-      } else {
-        return new BasicKeyInfo(
-            next.getName(),
-            next.getModificationTime(),
-            next.getDataSize()
-        );
-      }
+  protected void incrementCounter(Statistic objectsRead) {
+    if (storageStatistics != null) {
+      storageStatistics.incrementCounter(objectsRead, 1);
     }
   }
 }
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 9f425ac..f8e8fb4 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -18,53 +18,18 @@
 
 package org.apache.hadoop.fs.ozone;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.GlobalStorageStatistics;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Progressable;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
-import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
-import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
-import org.apache.http.client.utils.URIBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The Ozone Filesystem implementation.
@@ -76,242 +41,19 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class OzoneFileSystem extends FileSystem
+public class OzoneFileSystem extends BasicOzoneFileSystem
     implements KeyProviderTokenIssuer {
-  static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class);
-
-  /**
-   * The Ozone client for connecting to Ozone server.
-   */
-
-  private URI uri;
-  private String userName;
-  private Path workingDir;
-
-  private OzoneClientAdapter adapter;
 
   private OzoneFSStorageStatistics storageStatistics;
 
-  private static final Pattern URL_SCHEMA_PATTERN =
-      Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
-
-  private static final String URI_EXCEPTION_TEXT = "Ozone file system url " +
-      "should be either one of the two forms: " +
-      "o3fs://bucket.volume/key  OR " +
-      "o3fs://bucket.volume.om-host.example.com:5678/key";
-
-  @Override
-  public void initialize(URI name, Configuration conf) throws IOException {
-    super.initialize(name, conf);
-    setConf(conf);
-    Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
-    Preconditions.checkArgument(getScheme().equals(name.getScheme()),
-        "Invalid scheme provided in " + name);
-
-    String authority = name.getAuthority();
-
-    Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority);
-
-    if (!matcher.matches()) {
-      throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
-    }
-    String bucketStr = matcher.group(1);
-    String volumeStr = matcher.group(2);
-    String remaining = matcher.groupCount() == 3 ? matcher.group(3) : null;
-
-    String omHost = null;
-    String omPort = String.valueOf(-1);
-    if (StringUtils.isNotEmpty(remaining)) {
-      String[] parts = remaining.split(":");
-      if (parts.length != 2) {
-        throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
-      }
-      omHost = parts[0];
-      omPort = parts[1];
-      if (!NumberUtils.isParsable(omPort)) {
-        throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
-      }
-    }
-
-    try {
-      uri = new URIBuilder().setScheme(OZONE_URI_SCHEME)
-        .setHost(authority)
-        .build();
-      LOG.trace("Ozone URI for ozfs initialization is " + uri);
-
-      //isolated is the default for ozonefs-lib-legacy which includes the
-      // /ozonefs.txt, otherwise the default is false. It could be overridden.
-      boolean defaultValue =
-          OzoneFileSystem.class.getClassLoader().getResource("ozonefs.txt")
-              != null;
-
-      //Use string here instead of the constant as constant may not be available
-      //on the classpath of a hadoop 2.7
-      boolean isolatedClassloader =
-          conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
-
-      try {
-        //register only to the GlobalStorageStatistics if the class exists.
-        //This is required to support hadoop versions <2.7
-        Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
-        storageStatistics = (OzoneFSStorageStatistics)
-            GlobalStorageStatistics.INSTANCE
-                .put(OzoneFSStorageStatistics.NAME,
-                    OzoneFSStorageStatistics::new);
-      } catch (ClassNotFoundException e) {
-        //we don't support storage statistics for hadoop2.7 and older
-      }
-
-      if (isolatedClassloader) {
-        try {
-          //register only to the GlobalStorageStatistics if the class exists.
-          //This is required to support hadoop versions <2.7
-          Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
-          this.adapter =
-              OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr,
-                  storageStatistics);
-        } catch (ClassNotFoundException e) {
-          this.adapter =
-              OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr);
-        }
-      } else {
-
-        this.adapter = new OzoneClientAdapterImpl(omHost,
-            Integer.parseInt(omPort), conf,
-            volumeStr, bucketStr, storageStatistics);
-      }
-
-      try {
-        this.userName =
-            UserGroupInformation.getCurrentUser().getShortUserName();
-      } catch (IOException e) {
-        this.userName = OZONE_DEFAULT_USER;
-      }
-      this.workingDir = new Path(OZONE_USER_DIR, this.userName)
-          .makeQualified(this.uri, this.workingDir);
-    } catch (URISyntaxException ue) {
-      final String msg = "Invalid Ozone endpoint " + name;
-      LOG.error(msg, ue);
-      throw new IOException(msg, ue);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      adapter.close();
-    } finally {
-      super.close();
-    }
-  }
-
-  @Override
-  public URI getUri() {
-    return uri;
-  }
-
-  @Override
-  public String getScheme() {
-    return OZONE_URI_SCHEME;
-  }
-
-  Statistics getFsStatistics() {
-    return statistics;
-  }
-
-  OzoneFSStorageStatistics getOzoneFSOpsCountStatistics() {
-    return storageStatistics;
-  }
-
-  @Override
-  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_OPEN, 1);
-    }
-    statistics.incrementWriteOps(1);
-    LOG.trace("open() path:{}", f);
-    final FileStatus fileStatus = getFileStatus(f);
-    final String key = pathToKey(f);
-    if (fileStatus.isDirectory()) {
-      throw new FileNotFoundException("Can't open directory " + f + " to read");
-    }
-
-    return new FSDataInputStream(
-        new OzoneFSInputStream(adapter.createInputStream(key)));
-  }
-
-  @Override
-  public FSDataOutputStream create(Path f, FsPermission permission,
-                                   boolean overwrite, int bufferSize,
-                                   short replication, long blockSize,
-                                   Progressable progress) throws IOException {
-    LOG.trace("create() path:{}", f);
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_CREATE, 1);
-    }
-    statistics.incrementWriteOps(1);
-    final String key = pathToKey(f);
-    final FileStatus status;
-    try {
-      status = getFileStatus(f);
-      if (status.isDirectory()) {
-        throw new FileAlreadyExistsException(f + " is a directory");
-      } else {
-        if (!overwrite) {
-          // path references a file and overwrite is disabled
-          throw new FileAlreadyExistsException(f + " already exists");
-        }
-        LOG.trace("Overwriting file {}", f);
-        adapter.deleteObject(key);
-      }
-    } catch (FileNotFoundException ignored) {
-      // this means the file is not found
-    }
-
-    // We pass null to FSDataOutputStream so it won't count writes that
-    // are being buffered to a file
-    return new FSDataOutputStream(adapter.createKey(key), statistics);
-  }
-
-  @Override
-  public FSDataOutputStream createNonRecursive(Path path,
-      FsPermission permission,
-      EnumSet<CreateFlag> flags,
-      int bufferSize,
-      short replication,
-      long blockSize,
-      Progressable progress) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(
-          Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
-    }
-    statistics.incrementWriteOps(1);
-    final Path parent = path.getParent();
-    if (parent != null) {
-      // expect this to raise an exception if there is no parent
-      if (!getFileStatus(parent).isDirectory()) {
-        throw new FileAlreadyExistsException("Not a directory: " + parent);
-      }
-    }
-    return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
-        bufferSize, replication, blockSize, progress);
-  }
-
-  @Override
-  public FSDataOutputStream append(Path f, int bufferSize,
-                                   Progressable progress) throws IOException {
-    throw new UnsupportedOperationException("append() Not implemented by the "
-        + getClass().getSimpleName() + " FileSystem implementation");
-  }
-
   @Override
   public KeyProvider getKeyProvider() throws IOException {
-    return adapter.getKeyProvider();
+    return getAdapter().getKeyProvider();
   }
 
   @Override
   public URI getKeyProviderUri() throws IOException {
-    return adapter.getKeyProviderUri();
+    return getAdapter().getKeyProviderUri();
   }
 
   @Override
@@ -324,656 +66,36 @@ public class OzoneFileSystem extends FileSystem
     return null;
   }
 
-  private class RenameIterator extends OzoneListingIterator {
-    private final String srcKey;
-    private final String dstKey;
-
-    RenameIterator(Path srcPath, Path dstPath)
-        throws IOException {
-      super(srcPath);
-      srcKey = pathToKey(srcPath);
-      dstKey = pathToKey(dstPath);
-      LOG.trace("rename from:{} to:{}", srcKey, dstKey);
-    }
-
-    @Override
-    boolean processKey(String key) throws IOException {
-      String newKeyName = dstKey.concat(key.substring(srcKey.length()));
-      adapter.renameKey(key, newKeyName);
-      return true;
-    }
-  }
-
-  /**
-   * Check whether the source and destination path are valid and then perform
-   * rename from source path to destination path.
-   * <p>
-   * The rename operation is performed by renaming the keys with src as prefix.
-   * For such keys the prefix is changed from src to dst.
-   *
-   * @param src source path for rename
-   * @param dst destination path for rename
-   * @return true if rename operation succeeded or
-   * if the src and dst have the same path and are of the same type
-   * @throws IOException on I/O errors or if the src/dst paths are invalid.
-   */
-  @Override
-  public boolean rename(Path src, Path dst) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_RENAME, 1);
-    }
-    statistics.incrementWriteOps(1);
-    if (src.equals(dst)) {
-      return true;
-    }
-
-    LOG.trace("rename() from:{} to:{}", src, dst);
-    if (src.isRoot()) {
-      // Cannot rename root of file system
-      LOG.trace("Cannot rename the root of a filesystem");
-      return false;
-    }
-
-    // Cannot rename a directory to its own subdirectory
-    Path dstParent = dst.getParent();
-    while (dstParent != null && !src.equals(dstParent)) {
-      dstParent = dstParent.getParent();
-    }
-    Preconditions.checkArgument(dstParent == null,
-        "Cannot rename a directory to its own subdirectory");
-    // Check if the source exists
-    FileStatus srcStatus;
-    try {
-      srcStatus = getFileStatus(src);
-    } catch (FileNotFoundException fnfe) {
-      // source doesn't exist, return
-      return false;
-    }
-
-    // Check if the destination exists
-    FileStatus dstStatus;
-    try {
-      dstStatus = getFileStatus(dst);
-    } catch (FileNotFoundException fnde) {
-      dstStatus = null;
-    }
-
-    if (dstStatus == null) {
-      // If dst doesn't exist, check whether dst parent dir exists or not
-      // if the parent exists, the source can still be renamed to dst path
-      dstStatus = getFileStatus(dst.getParent());
-      if (!dstStatus.isDirectory()) {
-        throw new IOException(String.format(
-            "Failed to rename %s to %s, %s is a file", src, dst,
-            dst.getParent()));
-      }
-    } else {
-      // if dst exists and source and destination are same,
-      // check both the src and dst are of same type
-      if (srcStatus.getPath().equals(dstStatus.getPath())) {
-        return !srcStatus.isDirectory();
-      } else if (dstStatus.isDirectory()) {
-        // If dst is a directory, rename source as subpath of it.
-        // for example rename /source to /dst will lead to /dst/source
-        dst = new Path(dst, src.getName());
-        FileStatus[] statuses;
-        try {
-          statuses = listStatus(dst);
-        } catch (FileNotFoundException fnde) {
-          statuses = null;
-        }
-
-        if (statuses != null && statuses.length > 0) {
-          // If dst exists and not a directory not empty
-          throw new FileAlreadyExistsException(String.format(
-              "Failed to rename %s to %s, file already exists or not empty!",
-              src, dst));
-        }
-      } else {
-        // If dst is not a directory
-        throw new FileAlreadyExistsException(String.format(
-            "Failed to rename %s to %s, file already exists!", src, dst));
-      }
-    }
-
-    if (srcStatus.isDirectory()) {
-      if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
-        LOG.trace("Cannot rename a directory to a subdirectory of self");
-        return false;
-      }
-    }
-    RenameIterator iterator = new RenameIterator(src, dst);
-    return iterator.iterate();
-  }
-
-  private class DeleteIterator extends OzoneListingIterator {
-    private boolean recursive;
-
-    DeleteIterator(Path f, boolean recursive)
-        throws IOException {
-      super(f);
-      this.recursive = recursive;
-      if (getStatus().isDirectory()
-          && !this.recursive
-          && listStatus(f).length != 0) {
-        throw new PathIsNotEmptyDirectoryException(f.toString());
-      }
-    }
-
-    @Override
-    boolean processKey(String key) throws IOException {
-      if (key.equals("")) {
-        LOG.trace("Skipping deleting root directory");
-        return true;
-      } else {
-        LOG.trace("deleting key:" + key);
-        boolean succeed = adapter.deleteObject(key);
-        // if recursive delete is requested ignore the return value of
-        // deleteObject and issue deletes for other keys.
-        return recursive || succeed;
-      }
-    }
-  }
-
-  /**
-   * Deletes the children of the input dir path by iterating though the
-   * DeleteIterator.
-   *
-   * @param f directory path to be deleted
-   * @return true if successfully deletes all required keys, false otherwise
-   * @throws IOException
-   */
-  private boolean innerDelete(Path f, boolean recursive) throws IOException {
-    LOG.trace("delete() path:{} recursive:{}", f, recursive);
-    try {
-      DeleteIterator iterator = new DeleteIterator(f, recursive);
-      return iterator.iterate();
-    } catch (FileNotFoundException e) {
-      LOG.debug("Couldn't delete {} - does not exist", f);
-      return false;
-    }
-  }
-
-  @Override
-  public boolean delete(Path f, boolean recursive) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_DELETE, 1);
-    }
-    statistics.incrementWriteOps(1);
-    LOG.debug("Delete path {} - recursive {}", f, recursive);
-    FileStatus status;
-    try {
-      status = getFileStatus(f);
-    } catch (FileNotFoundException ex) {
-      LOG.warn("delete: Path does not exist: {}", f);
-      return false;
-    }
-
-    String key = pathToKey(f);
-    boolean result;
-
-    if (status.isDirectory()) {
-      LOG.debug("delete: Path is a directory: {}", f);
-      key = addTrailingSlashIfNeeded(key);
-
-      if (key.equals("/")) {
-        LOG.warn("Cannot delete root directory.");
-        return false;
-      }
-
-      result = innerDelete(f, recursive);
-    } else {
-      LOG.debug("delete: Path is a file: {}", f);
-      result = adapter.deleteObject(key);
-    }
-
-    if (result) {
-      // If this delete operation removes all files/directories from the
-      // parent direcotry, then an empty parent directory must be created.
-      Path parent = f.getParent();
-      if (parent != null && !parent.isRoot()) {
-        createFakeDirectoryIfNecessary(parent);
-      }
-    }
-
-    return result;
-  }
-
-  /**
-   * Create a fake parent directory key if it does not already exist and no
-   * other child of this parent directory exists.
-   *
-   * @param f path to the fake parent directory
-   * @throws IOException
-   */
-  private void createFakeDirectoryIfNecessary(Path f) throws IOException {
-    String key = pathToKey(f);
-    if (!key.isEmpty() && !o3Exists(f)) {
-      LOG.debug("Creating new fake directory at {}", f);
-      String dirKey = addTrailingSlashIfNeeded(key);
-      adapter.createDirectory(dirKey);
-    }
-  }
-
-  /**
-   * Check if a file or directory exists corresponding to given path.
-   *
-   * @param f path to file/directory.
-   * @return true if it exists, false otherwise.
-   * @throws IOException
-   */
-  private boolean o3Exists(final Path f) throws IOException {
-    Path path = makeQualified(f);
-    try {
-      getFileStatus(path);
-      return true;
-    } catch (FileNotFoundException ex) {
-      return false;
-    }
-  }
-
-  private class ListStatusIterator extends OzoneListingIterator {
-    // _fileStatuses_ maintains a list of file(s) which is either the input
-    // path itself or a child of the input directory path.
-    private List<FileStatus> fileStatuses = new ArrayList<>(LISTING_PAGE_SIZE);
-    // _subDirStatuses_ maintains a list of sub-dirs of the input directory
-    // path.
-    private Map<Path, FileStatus> subDirStatuses =
-        new HashMap<>(LISTING_PAGE_SIZE);
-    private Path f; // the input path
-
-    ListStatusIterator(Path f) throws IOException {
-      super(f);
-      this.f = f;
-    }
-
-    /**
-     * Add the key to the listStatus result if the key corresponds to the
-     * input path or is an immediate child of the input path.
-     *
-     * @param key key to be processed
-     * @return always returns true
-     * @throws IOException
-     */
-    @Override
-    boolean processKey(String key) throws IOException {
-      Path keyPath = new Path(OZONE_URI_DELIMITER + key);
-      if (key.equals(getPathKey())) {
-        if (pathIsDirectory()) {
-          // if input path is a directory, we add the sub-directories and
-          // files under this directory.
-          return true;
-        } else {
-          addFileStatus(keyPath);
-          return true;
-        }
-      }
-      // Left with only subkeys now
-      // We add only the immediate child files and sub-dirs i.e. we go only
-      // upto one level down the directory tree structure.
-      if (pathToKey(keyPath.getParent()).equals(pathToKey(f))) {
-        // This key is an immediate child. Can be file or directory
-        if (key.endsWith(OZONE_URI_DELIMITER)) {
-          // Key is a directory
-          addSubDirStatus(keyPath);
-        } else {
-          addFileStatus(keyPath);
-        }
-      } else {
-        // This key is not the immediate child of the input directory. So we
-        // traverse the parent tree structure of this key until we get the
-        // immediate child of the input directory.
-        Path immediateChildPath = getImmediateChildPath(keyPath.getParent());
-        if (immediateChildPath != null) {
-          addSubDirStatus(immediateChildPath);
-        }
-      }
-      return true;
-    }
-
-    /**
-     * Adds the FileStatus of keyPath to final result of listStatus.
-     *
-     * @param filePath path to the file
-     * @throws FileNotFoundException
-     */
-    void addFileStatus(Path filePath) throws IOException {
-      fileStatuses.add(getFileStatus(filePath));
-    }
-
-    /**
-     * Adds the FileStatus of the subdir to final result of listStatus, if not
-     * already included.
-     *
-     * @param dirPath path to the dir
-     * @throws FileNotFoundException
-     */
-    void addSubDirStatus(Path dirPath) throws FileNotFoundException {
-      // Check if subdir path is already included in statuses.
-      if (!subDirStatuses.containsKey(dirPath)) {
-        subDirStatuses.put(dirPath, innerGetFileStatusForDir(dirPath));
-      }
-    }
-
-    /**
-     * Traverse the parent directory structure of keyPath to determine the
-     * which parent/ grand-parent/.. is the immediate child of the input path f.
-     *
-     * @param keyPath path whose parent directory structure should be traversed.
-     * @return immediate child path of the input path f.
-     */
-    Path getImmediateChildPath(Path keyPath) {
-      Path path = keyPath;
-      Path parent = path.getParent();
-      while (parent != null) {
-        if (pathToKey(parent).equals(pathToKey(f))) {
-          return path;
-        }
-        path = parent;
-        parent = path.getParent();
-      }
-      return null;
-    }
-
-    /**
-     * Return the result of listStatus operation. If the input path is a
-     * file, return the status for only that file. If the input path is a
-     * directory, return the statuses for all the child files and sub-dirs.
-     */
-    FileStatus[] getStatuses() {
-      List<FileStatus> result = Stream.concat(
-          fileStatuses.stream(), subDirStatuses.values().stream())
-          .collect(Collectors.toList());
-      return result.toArray(new FileStatus[result.size()]);
-    }
+  StorageStatistics getOzoneFSOpsCountStatistics() {
+    return storageStatistics;
   }
 
   @Override
-  public FileStatus[] listStatus(Path f) throws IOException {
+  protected void incrementCounter(Statistic statistic) {
     if (storageStatistics != null) {
-      storageStatistics.incrementCounter(Statistic.INVOCATION_LIST_STATUS, 1);
-    }
-    statistics.incrementReadOps(1);
-    LOG.trace("listStatus() path:{}", f);
-    ListStatusIterator iterator = new ListStatusIterator(f);
-    iterator.iterate();
-    return iterator.getStatuses();
-  }
-
-  @Override
-  public void setWorkingDirectory(Path newDir) {
-    workingDir = newDir;
-  }
-
-  @Override
-  public Path getWorkingDirectory() {
-    return workingDir;
-  }
-
-  @Override
-  public Token<?> getDelegationToken(String renewer) throws IOException {
-    return adapter.getDelegationToken(renewer);
-  }
-
-  /**
-   * Get a canonical service name for this file system. If the URI is logical,
-   * the hostname part of the URI will be returned.
-   * @return a service string that uniquely identifies this file system.
-   */
-  @Override
-  public String getCanonicalServiceName() {
-    return adapter.getCanonicalServiceName();
-  }
-
-  /**
-   * Get the username of the FS.
-   *
-   * @return the short name of the user who instantiated the FS
-   */
-  public String getUsername() {
-    return userName;
-  }
-
-  /**
-   * Check whether the path is valid and then create directories.
-   * Directory is represented using a key with no value.
-   * All the non-existent parent directories are also created.
-   *
-   * @param path directory path to be created
-   * @return true if directory exists or created successfully.
-   * @throws IOException
-   */
-  private boolean mkdir(Path path) throws IOException {
-    Path fPart = path;
-    Path prevfPart = null;
-    do {
-      LOG.trace("validating path:{}", fPart);
-      try {
-        FileStatus fileStatus = getFileStatus(fPart);
-        if (fileStatus.isDirectory()) {
-          // If path exists and a directory, exit
-          break;
-        } else {
-          // Found a file here, rollback and delete newly created directories
-          LOG.trace("Found a file with same name as directory, path:{}", fPart);
-          if (prevfPart != null) {
-            delete(prevfPart, true);
-          }
-          throw new FileAlreadyExistsException(String.format(
-              "Can't make directory for path '%s', it is a file.", fPart));
-        }
-      } catch (FileNotFoundException fnfe) {
-        LOG.trace("creating directory for fpart:{}", fPart);
-        String key = pathToKey(fPart);
-        String dirKey = addTrailingSlashIfNeeded(key);
-        if (!adapter.createDirectory(dirKey)) {
-          // Directory creation failed here,
-          // rollback and delete newly created directories
-          LOG.trace("Directory creation failed, path:{}", fPart);
-          if (prevfPart != null) {
-            delete(prevfPart, true);
-          }
-          return false;
-        }
-      }
-      prevfPart = fPart;
-      fPart = fPart.getParent();
-    } while (fPart != null);
-    return true;
-  }
-
-  @Override
-  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    LOG.trace("mkdir() path:{} ", f);
-    String key = pathToKey(f);
-    if (StringUtils.isEmpty(key)) {
-      return false;
+      storageStatistics.incrementCounter(statistic, 1);
     }
-    return mkdir(f);
   }
 
   @Override
-  public FileStatus getFileStatus(Path f) throws IOException {
-    if (storageStatistics != null) {
-      storageStatistics
-          .incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS, 1);
-    }
-    statistics.incrementReadOps(1);
-    LOG.trace("getFileStatus() path:{}", f);
-    Path qualifiedPath = f.makeQualified(uri, workingDir);
-    String key = pathToKey(qualifiedPath);
+  protected OzoneClientAdapter createAdapter(Configuration conf,
+      String bucketStr,
+      String volumeStr, String omHost, String omPort,
+      boolean isolatedClassloader) throws IOException {
 
-    if (key.length() == 0) {
-      return new FileStatus(0, true, 1, 0,
-          adapter.getCreationTime(), qualifiedPath);
-    }
+    this.storageStatistics =
+        (OzoneFSStorageStatistics) GlobalStorageStatistics.INSTANCE
+            .put(OzoneFSStorageStatistics.NAME,
+                OzoneFSStorageStatistics::new);
 
-    // Check if the key exists
-    BasicKeyInfo ozoneKey = adapter.getKeyInfo(key);
-    if (ozoneKey != null) {
-      LOG.debug("Found exact file for path {}: normal file", f);
-      return new FileStatus(ozoneKey.getDataSize(), false, 1,
-          getDefaultBlockSize(f), ozoneKey.getModificationTime(), 0,
-          FsPermission.getFileDefault(), getUsername(), getUsername(),
-          qualifiedPath);
-    }
+    if (isolatedClassloader) {
+      return OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr,
+          storageStatistics);
 
-    return innerGetFileStatusForDir(f);
-  }
-
-  /**
-   * Get the FileStatus for input directory path.
-   * They key corresponding to input path is appended with a trailing slash
-   * to return only the corresponding directory key in the bucket.
-   *
-   * @param f directory path
-   * @return FileStatus for the input directory path
-   * @throws FileNotFoundException
-   */
-  public FileStatus innerGetFileStatusForDir(Path f)
-      throws FileNotFoundException {
-    Path qualifiedPath = f.makeQualified(uri, workingDir);
-    String key = pathToKey(qualifiedPath);
-    key = addTrailingSlashIfNeeded(key);
-
-    BasicKeyInfo ozoneKey = adapter.getKeyInfo(key);
-    if (ozoneKey != null) {
-      if (adapter.isDirectory(ozoneKey)) {
-        // Key is a directory
-        LOG.debug("Found file (with /) for path {}: fake directory", f);
-      } else {
-        // Key is a file with trailing slash
-        LOG.warn("Found file (with /) for path {}: real file? should not " +
-            "happen", f, key);
-      }
-      return new FileStatus(0, true, 1, 0,
-          ozoneKey.getModificationTime(), 0,
-          FsPermission.getDirDefault(), getUsername(), getUsername(),
-          qualifiedPath);
-    }
-
-    // File or directory corresponding to input path does not exist.
-    // Check if there exists a key prefixed with this key.
-    boolean hasChildren = adapter.hasNextKey(key);
-    if (hasChildren) {
-      return new FileStatus(0, true, 1, 0, 0, 0, FsPermission.getDirDefault(),
-          getUsername(), getUsername(), qualifiedPath);
-    }
-
-    throw new FileNotFoundException(f + ": No such file or directory!");
-  }
-
-  /**
-   * Turn a path (relative or otherwise) into an Ozone key.
-   *
-   * @param path the path of the file.
-   * @return the key of the object that represents the file.
-   */
-  public String pathToKey(Path path) {
-    Objects.requireNonNull(path, "Path canf not be null!");
-    if (!path.isAbsolute()) {
-      path = new Path(workingDir, path);
-    }
-    // removing leading '/' char
-    String key = path.toUri().getPath().substring(1);
-    LOG.trace("path for key:{} is:{}", key, path);
-    return key;
-  }
-
-  /**
-   * Add trailing delimiter to path if it is already not present.
-   *
-   * @param key the ozone Key which needs to be appended
-   * @return delimiter appended key
-   */
-  private String addTrailingSlashIfNeeded(String key) {
-    if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
-      return key + OZONE_URI_DELIMITER;
     } else {
-      return key;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "OzoneFileSystem{URI=" + uri + ", "
-        + "workingDir=" + workingDir + ", "
-        + "userName=" + userName + ", "
-        + "statistics=" + statistics
-        + "}";
-  }
-
-  /**
-   * This class provides an interface to iterate through all the keys in the
-   * bucket prefixed with the input path key and process them.
-   * <p>
-   * Each implementing class should define how the keys should be processed
-   * through the processKey() function.
-   */
-  private abstract class OzoneListingIterator {
-    private final Path path;
-    private final FileStatus status;
-    private String pathKey;
-    private Iterator<BasicKeyInfo> keyIterator;
-
-    OzoneListingIterator(Path path)
-        throws IOException {
-      this.path = path;
-      this.status = getFileStatus(path);
-      this.pathKey = pathToKey(path);
-      if (status.isDirectory()) {
-        this.pathKey = addTrailingSlashIfNeeded(pathKey);
-      }
-      keyIterator = adapter.listKeys(pathKey);
-    }
-
-    /**
-     * The output of processKey determines if further iteration through the
-     * keys should be done or not.
-     *
-     * @return true if we should continue iteration of keys, false otherwise.
-     * @throws IOException
-     */
-    abstract boolean processKey(String key) throws IOException;
-
-    /**
-     * Iterates thorugh all the keys prefixed with the input path's key and
-     * processes the key though processKey().
-     * If for any key, the processKey() returns false, then the iteration is
-     * stopped and returned with false indicating that all the keys could not
-     * be processed successfully.
-     *
-     * @return true if all keys are processed successfully, false otherwise.
-     * @throws IOException
-     */
-    boolean iterate() throws IOException {
-      LOG.trace("Iterating path {}", path);
-      if (status.isDirectory()) {
-        LOG.trace("Iterating directory:{}", pathKey);
-        while (keyIterator.hasNext()) {
-          BasicKeyInfo key = keyIterator.next();
-          LOG.trace("iterating key:{}", key.getName());
-          if (!processKey(key.getName())) {
-            return false;
-          }
-        }
-        return true;
-      } else {
-        LOG.trace("iterating file:{}", path);
-        return processKey(pathKey);
-      }
-    }
-
-    String getPathKey() {
-      return pathKey;
-    }
-
-    boolean pathIsDirectory() {
-      return status.isDirectory();
-    }
-
-    FileStatus getStatus() {
-      return status;
+      return new OzoneClientAdapterImpl(omHost,
+          Integer.parseInt(omPort), conf,
+          volumeStr, bucketStr, storageStatistics);
     }
   }
 }
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
index ef1ae18..b64a650 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
@@ -139,7 +139,7 @@ public class TestOzoneFileInterfaces {
       fs = FileSystem.get(new URI(rootPath + "/test.txt"), conf);
     }
     o3fs = (OzoneFileSystem) fs;
-    statistics = o3fs.getOzoneFSOpsCountStatistics();
+    statistics = (OzoneFSStorageStatistics) o3fs.getOzoneFSOpsCountStatistics();
   }
 
   @After
diff --git a/hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-ozone/ozonefs/src/test/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
similarity index 100%
rename from hadoop-ozone/ozonefs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
rename to hadoop-ozone/ozonefs/src/test/resources/META-INF/services/org.apache.hadoop.fs.FileSystem


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org