You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2018/07/09 18:26:26 UTC

[34/50] [abbrv] hadoop git commit: HDDS-167. Rename KeySpaceManager to OzoneManager. Contributed by Arpit Agarwal.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
new file mode 100644
index 0000000..7c8595c
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
@@ -0,0 +1,1349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.ServicePort;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.web.request.OzoneQuota;
+import org.apache.hadoop.ozone.web.response.BucketInfo;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.Status;
+import org.apache.hadoop.ozone.web.handlers.ListArgs;
+import org.apache.hadoop.ozone.web.response.ListBuckets;
+import org.apache.hadoop.ozone.web.response.ListKeys;
+import org.apache.hadoop.ozone.web.response.ListVolumes;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.net.InetSocketAddress;
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CLIENT_ADDRESS_KEY;
+
+/**
+ * Test Ozone Manager operation in distributed handler scenario.
+ */
+public class TestOzoneManager {
+  private static MiniOzoneCluster cluster = null;
+  private static StorageHandler storageHandler;
+  private static UserArgs userArgs;
+  private static OMMetrics omMetrics;
+  private static OzoneConfiguration conf;
+  private static String clusterId;
+  private static String scmId;
+  private static String omId;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    omId = UUID.randomUUID().toString();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+    cluster =  MiniOzoneCluster.newBuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOmId(omId)
+        .build();
+    cluster.waitForClusterToBeReady();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+    omMetrics = cluster.getOzoneManager().getMetrics();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  // Create a volume and test its attribute after creating them
+  @Test(timeout = 60000)
+  public void testCreateVolume() throws IOException, OzoneException {
+    long volumeCreateFailCount = omMetrics.getNumVolumeCreateFails();
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
+    Assert.assertEquals(volumeCreateFailCount,
+        omMetrics.getNumVolumeCreateFails());
+  }
+
+  // Create a volume and modify the volume owner and then test its attributes
+  @Test(timeout = 60000)
+  public void testChangeVolumeOwner() throws IOException, OzoneException {
+    long volumeCreateFailCount = omMetrics.getNumVolumeCreateFails();
+    long volumeInfoFailCount = omMetrics.getNumVolumeInfoFails();
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    String newUserName = "user" + RandomStringUtils.randomNumeric(5);
+    createVolumeArgs.setUserName(newUserName);
+    storageHandler.setVolumeOwner(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+
+    Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
+    Assert.assertFalse(retVolumeInfo.getOwner().getName().equals(userName));
+    Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(newUserName));
+    Assert.assertEquals(volumeCreateFailCount,
+        omMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(volumeInfoFailCount,
+        omMetrics.getNumVolumeInfoFails());
+  }
+
+  // Create a volume and modify the volume owner and then test its attributes
+  @Test(timeout = 60000)
+  public void testChangeVolumeQuota() throws IOException, OzoneException {
+    long numVolumeCreateFail = omMetrics.getNumVolumeCreateFails();
+    long numVolumeInfoFail = omMetrics.getNumVolumeInfoFails();
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    Random rand = new Random();
+
+    // Create a new volume with a quota
+    OzoneQuota createQuota =
+        new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    createVolumeArgs.setQuota(createQuota);
+    storageHandler.createVolume(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertEquals(createQuota.sizeInBytes(),
+        retVolumeInfo.getQuota().sizeInBytes());
+
+    // Set a new quota and test it
+    OzoneQuota setQuota =
+        new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
+    createVolumeArgs.setQuota(setQuota);
+    storageHandler.setVolumeQuota(createVolumeArgs, false);
+    getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertEquals(setQuota.sizeInBytes(),
+        retVolumeInfo.getQuota().sizeInBytes());
+
+    // Remove the quota and test it again
+    storageHandler.setVolumeQuota(createVolumeArgs, true);
+    getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertEquals(OzoneConsts.MAX_QUOTA_IN_BYTES,
+        retVolumeInfo.getQuota().sizeInBytes());
+    Assert.assertEquals(numVolumeCreateFail,
+        omMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(numVolumeInfoFail,
+        omMetrics.getNumVolumeInfoFails());
+  }
+
+  // Create a volume and then delete it and then check for deletion
+  @Test(timeout = 60000)
+  public void testDeleteVolume() throws IOException, OzoneException {
+    long volumeCreateFailCount = omMetrics.getNumVolumeCreateFails();
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String volumeName1 = volumeName + "_A";
+    String volumeName2 = volumeName + "_AA";
+    VolumeArgs volumeArgs = null;
+    VolumeInfo volumeInfo = null;
+
+    // Create 2 empty volumes with same prefix.
+    volumeArgs = new VolumeArgs(volumeName1, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+
+    volumeArgs = new VolumeArgs(volumeName2, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+
+    volumeArgs  = new VolumeArgs(volumeName1, userArgs);
+    volumeInfo = storageHandler.getVolumeInfo(volumeArgs);
+    Assert.assertTrue(volumeInfo.getVolumeName().equals(volumeName1));
+    Assert.assertTrue(volumeInfo.getOwner().getName().equals(userName));
+    Assert.assertEquals(volumeCreateFailCount,
+        omMetrics.getNumVolumeCreateFails());
+
+    // Volume with _A should be able to delete as it is empty.
+    storageHandler.deleteVolume(volumeArgs);
+
+    // Make sure volume with _AA suffix still exists.
+    volumeArgs = new VolumeArgs(volumeName2, userArgs);
+    volumeInfo = storageHandler.getVolumeInfo(volumeArgs);
+    Assert.assertTrue(volumeInfo.getVolumeName().equals(volumeName2));
+
+    // Make sure volume with _A suffix is successfully deleted.
+    exception.expect(IOException.class);
+    exception.expectMessage("Info Volume failed, error:VOLUME_NOT_FOUND");
+    volumeArgs = new VolumeArgs(volumeName1, userArgs);
+    storageHandler.getVolumeInfo(volumeArgs);
+  }
+
+  // Create a volume and a bucket inside the volume,
+  // then delete it and then check for deletion failure
+  @Test(timeout = 60000)
+  public void testFailedDeleteVolume() throws IOException, OzoneException {
+    long numVolumeCreateFails = omMetrics.getNumVolumeCreateFails();
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
+    Assert.assertEquals(numVolumeCreateFails,
+        omMetrics.getNumVolumeCreateFails());
+
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+
+    try {
+      storageHandler.deleteVolume(createVolumeArgs);
+      Assert.fail("Expecting deletion should fail "
+          + "because volume is not empty");
+    } catch (IOException ex) {
+      Assert.assertEquals(ex.getMessage(),
+          "Delete Volume failed, error:VOLUME_NOT_EMPTY");
+    }
+    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
+    Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
+  }
+
+  // Create a volume and test Volume access for a different user
+  @Test(timeout = 60000)
+  public void testAccessVolume() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String[] groupName =
+        {"group" + RandomStringUtils.randomNumeric(5)};
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    createVolumeArgs.setGroups(groupName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, userName,
+        OzoneAcl.OzoneACLRights.READ_WRITE);
+    Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, userAcl));
+    OzoneAcl group = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, groupName[0],
+        OzoneAcl.OzoneACLRights.READ);
+    Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, group));
+
+    // Create a different user and access should fail
+    String falseUserName = "user" + RandomStringUtils.randomNumeric(5);
+    OzoneAcl falseUserAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER, falseUserName,
+            OzoneAcl.OzoneACLRights.READ_WRITE);
+    Assert.assertFalse(storageHandler
+        .checkVolumeAccess(volumeName, falseUserAcl));
+    // Checking access with user name and Group Type should fail
+    OzoneAcl falseGroupAcl = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, userName,
+        OzoneAcl.OzoneACLRights.READ_WRITE);
+    Assert.assertFalse(storageHandler
+        .checkVolumeAccess(volumeName, falseGroupAcl));
+
+    // Access for acl type world should also fail
+    OzoneAcl worldAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.WORLD, "",
+            OzoneAcl.OzoneACLRights.READ);
+    Assert.assertFalse(storageHandler.checkVolumeAccess(volumeName, worldAcl));
+
+    Assert.assertEquals(0, omMetrics.getNumVolumeCheckAccessFails());
+    Assert.assertEquals(0, omMetrics.getNumVolumeCreateFails());
+  }
+
+  @Test(timeout = 60000)
+  public void testCreateBucket() throws IOException, OzoneException {
+    long numVolumeCreateFail = omMetrics.getNumVolumeCreateFails();
+    long numBucketCreateFail = omMetrics.getNumBucketCreateFails();
+    long numBucketInfoFail = omMetrics.getNumBucketInfoFails();
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    Assert.assertEquals(numVolumeCreateFail,
+        omMetrics.getNumVolumeCreateFails());
+    Assert.assertEquals(numBucketCreateFail,
+        omMetrics.getNumBucketCreateFails());
+    Assert.assertEquals(numBucketInfoFail,
+        omMetrics.getNumBucketInfoFails());
+  }
+
+  @Test(timeout = 60000)
+  public void testDeleteBucket() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    storageHandler.deleteBucket(bucketArgs);
+    exception.expect(IOException.class);
+    exception.expectMessage("Info Bucket failed, error: BUCKET_NOT_FOUND");
+    storageHandler.getBucketInfo(getBucketArgs);
+  }
+
+  @Test(timeout = 60000)
+  public void testDeleteNonExistingBucket() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    BucketArgs newBucketArgs = new BucketArgs(
+        volumeName, bucketName + "_invalid", userArgs);
+    exception.expect(IOException.class);
+    exception.expectMessage("Delete Bucket failed, error:BUCKET_NOT_FOUND");
+    storageHandler.deleteBucket(newBucketArgs);
+  }
+
+
+  @Test(timeout = 60000)
+  public void testDeleteNonEmptyBucket() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
+    volumeArgs.setUserName(userName);
+    volumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(volumeArgs);
+    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    storageHandler.createBucket(bucketArgs);
+    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
+        userArgs);
+    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
+    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
+    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
+    String dataString = RandomStringUtils.randomAscii(100);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(100);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    exception.expect(IOException.class);
+    exception.expectMessage("Delete Bucket failed, error:BUCKET_NOT_EMPTY");
+    storageHandler.deleteBucket(bucketArgs);
+  }
+
+  /**
+   * Basic test of both putKey and getKey from OM, as one can not be tested
+   * without the other.
+   *
+   * @throws IOException
+   * @throws OzoneException
+   */
+  @Test
+  public void testGetKeyWriterReader() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    long numKeyAllocates = omMetrics.getNumKeyAllocates();
+    long numKeyLookups = omMetrics.getNumKeyLookups();
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(100);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(100);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    Assert.assertEquals(1 + numKeyAllocates, omMetrics.getNumKeyAllocates());
+
+    byte[] data = new byte[dataString.length()];
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    Assert.assertEquals(dataString, DFSUtil.bytes2String(data));
+    Assert.assertEquals(1 + numKeyLookups, omMetrics.getNumKeyLookups());
+  }
+
+  /**
+   * Test write the same key twice, the second write should fail, as currently
+   * key overwrite is not supported.
+   *
+   * @throws IOException
+   * @throws OzoneException
+   */
+  @Test
+  public void testKeyOverwrite() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    long numKeyAllocateFails = omMetrics.getNumKeyAllocateFails();
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(100);
+    String dataString = RandomStringUtils.randomAscii(100);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+
+    // We allow the key overwrite to be successful. Please note : Till
+    // HDFS-11922 is fixed this causes a data block leak on the data node side.
+    // That is this overwrite only overwrites the keys on OM. We need to
+    // garbage collect those blocks from datanode.
+    KeyArgs keyArgs2 = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    storageHandler.newKeyWriter(keyArgs2);
+    Assert
+        .assertEquals(numKeyAllocateFails, omMetrics.getNumKeyAllocateFails());
+  }
+
+  /**
+   * Test get a non-exiting key.
+   *
+   * @throws IOException
+   * @throws OzoneException
+   */
+  @Test
+  public void testGetNonExistKey() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    long numKeyLookupFails = omMetrics.getNumKeyLookupFails();
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    // try to get the key, should fail as it hasn't been created
+    exception.expect(IOException.class);
+    exception.expectMessage("KEY_NOT_FOUND");
+    storageHandler.newKeyReader(keyArgs);
+    Assert.assertEquals(1 + numKeyLookupFails,
+        omMetrics.getNumKeyLookupFails());
+  }
+
+  /**
+   * Test delete keys for om.
+   *
+   * @throws IOException
+   * @throws OzoneException
+   */
+  @Test
+  public void testDeleteKey() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    long numKeyDeletes = omMetrics.getNumKeyDeletes();
+    long numKeyDeleteFails = omMetrics.getNumKeyDeletesFails();
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    storageHandler.createBucket(bucketArgs);
+
+    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
+    keyArgs.setSize(100);
+    String dataString = RandomStringUtils.randomAscii(100);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+
+    storageHandler.deleteKey(keyArgs);
+    Assert.assertEquals(1 + numKeyDeletes, omMetrics.getNumKeyDeletes());
+
+    // Make sure the deleted key has been renamed.
+    MetadataStore store = cluster.getOzoneManager().
+        getMetadataManager().getStore();
+    List<Map.Entry<byte[], byte[]>> list = store.getRangeKVs(null, 10,
+        new MetadataKeyFilters.KeyPrefixFilter()
+            .addFilter(DELETING_KEY_PREFIX));
+    Assert.assertEquals(1, list.size());
+
+    // Delete the key again to test deleting non-existing key.
+    try {
+      storageHandler.deleteKey(keyArgs);
+      Assert.fail("Expected exception not thrown.");
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe.getMessage().contains("KEY_NOT_FOUND"));
+    }
+    Assert.assertEquals(1 + numKeyDeleteFails,
+        omMetrics.getNumKeyDeletesFails());
+  }
+
+  /**
+   * Test rename key for om.
+   *
+   * @throws IOException
+   * @throws OzoneException
+   */
+  @Test
+  public void testRenameKey() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    long numKeyRenames = omMetrics.getNumKeyRenames();
+    long numKeyRenameFails = omMetrics.getNumKeyRenameFails();
+    int testRenameFails = 0;
+    int testRenames = 0;
+    IOException ioe = null;
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    storageHandler.createBucket(bucketArgs);
+
+    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
+    keyArgs.setSize(100);
+    String toKeyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    // Rename from non-existent key should fail
+    try {
+      testRenames++;
+      storageHandler.renameKey(keyArgs, toKeyName);
+    } catch (IOException e) {
+      testRenameFails++;
+      ioe = e;
+    }
+    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
+
+    // Write the contents of the key to be renamed
+    String dataString = RandomStringUtils.randomAscii(100);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+
+    // Rename the key
+    toKeyName = "key" + RandomStringUtils.randomNumeric(5);
+    testRenames++;
+    storageHandler.renameKey(keyArgs, toKeyName);
+    Assert.assertEquals(numKeyRenames + testRenames,
+        omMetrics.getNumKeyRenames());
+    Assert.assertEquals(numKeyRenameFails + testRenameFails,
+        omMetrics.getNumKeyRenameFails());
+
+    // Try to get the key, should fail as it has been renamed
+    try {
+      storageHandler.newKeyReader(keyArgs);
+    } catch (IOException e) {
+      ioe = e;
+    }
+    Assert.assertTrue(ioe.getMessage().contains("KEY_NOT_FOUND"));
+
+    // Verify the contents of the renamed key
+    keyArgs = new KeyArgs(toKeyName, bucketArgs);
+    InputStream in = storageHandler.newKeyReader(keyArgs);
+    byte[] b = new byte[dataString.getBytes().length];
+    in.read(b);
+    Assert.assertEquals(new String(b), dataString);
+
+    // Rewrite the renamed key. Rename to key which already exists should fail.
+    keyArgs = new KeyArgs(keyName, bucketArgs);
+    keyArgs.setSize(100);
+    dataString = RandomStringUtils.randomAscii(100);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+      stream.close();
+      testRenames++;
+      storageHandler.renameKey(keyArgs, toKeyName);
+    } catch (IOException e) {
+      testRenameFails++;
+      ioe = e;
+    }
+    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
+
+    // Rename to empty string should fail
+    toKeyName = "";
+    try {
+      testRenames++;
+      storageHandler.renameKey(keyArgs, toKeyName);
+    } catch (IOException e) {
+      testRenameFails++;
+      ioe = e;
+    }
+    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
+
+    // Rename from empty string should fail
+    keyArgs = new KeyArgs("", bucketArgs);
+    toKeyName = "key" + RandomStringUtils.randomNumeric(5);
+    try {
+      testRenames++;
+      storageHandler.renameKey(keyArgs, toKeyName);
+    } catch (IOException e) {
+      testRenameFails++;
+      ioe = e;
+    }
+    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
+
+    Assert.assertEquals(numKeyRenames + testRenames,
+        omMetrics.getNumKeyRenames());
+    Assert.assertEquals(numKeyRenameFails + testRenameFails,
+        omMetrics.getNumKeyRenameFails());
+  }
+
+  @Test(timeout = 60000)
+  public void testListBuckets() throws IOException, OzoneException {
+    ListBuckets result = null;
+    ListArgs listBucketArgs = null;
+
+    // Create volume - volA.
+    final String volAname = "volA";
+    VolumeArgs volAArgs = new VolumeArgs(volAname, userArgs);
+    volAArgs.setUserName("userA");
+    volAArgs.setAdminName("adminA");
+    storageHandler.createVolume(volAArgs);
+
+    // Create 20 buckets in volA for tests.
+    for (int i=0; i<10; i++) {
+      // Create "/volA/aBucket_0" to "/volA/aBucket_9" buckets in volA volume.
+      BucketArgs aBuckets = new BucketArgs(volAname,
+          "aBucket_" + i, userArgs);
+      if(i % 3 == 0) {
+        aBuckets.setStorageType(StorageType.ARCHIVE);
+      } else {
+        aBuckets.setStorageType(StorageType.DISK);
+      }
+      storageHandler.createBucket(aBuckets);
+
+      // Create "/volA/bBucket_0" to "/volA/bBucket_9" buckets in volA volume.
+      BucketArgs bBuckets = new BucketArgs(volAname,
+          "bBucket_" + i, userArgs);
+      if(i % 3 == 0) {
+        bBuckets.setStorageType(StorageType.RAM_DISK);
+      } else {
+        bBuckets.setStorageType(StorageType.SSD);
+      }
+      storageHandler.createBucket(bBuckets);
+    }
+
+    VolumeArgs volArgs = new VolumeArgs(volAname, userArgs);
+
+    // List all buckets in volA.
+    listBucketArgs = new ListArgs(volArgs, null, 100, null);
+    result = storageHandler.listBuckets(listBucketArgs);
+    Assert.assertEquals(20, result.getBuckets().size());
+    List<BucketInfo> archiveBuckets = result.getBuckets().stream()
+        .filter(item -> item.getStorageType() == StorageType.ARCHIVE)
+        .collect(Collectors.toList());
+    Assert.assertEquals(4, archiveBuckets.size());
+
+    // List buckets with prefix "aBucket".
+    listBucketArgs = new ListArgs(volArgs, "aBucket", 100, null);
+    result = storageHandler.listBuckets(listBucketArgs);
+    Assert.assertEquals(10, result.getBuckets().size());
+    Assert.assertTrue(result.getBuckets().stream()
+        .allMatch(entry -> entry.getBucketName().startsWith("aBucket")));
+
+    // List a certain number of buckets.
+    listBucketArgs = new ListArgs(volArgs, null, 3, null);
+    result = storageHandler.listBuckets(listBucketArgs);
+    Assert.assertEquals(3, result.getBuckets().size());
+    Assert.assertEquals("aBucket_0",
+        result.getBuckets().get(0).getBucketName());
+    Assert.assertEquals("aBucket_1",
+        result.getBuckets().get(1).getBucketName());
+    Assert.assertEquals("aBucket_2",
+        result.getBuckets().get(2).getBucketName());
+
+    // List a certain number of buckets from the startKey.
+    listBucketArgs = new ListArgs(volArgs, null, 2, "bBucket_3");
+    result = storageHandler.listBuckets(listBucketArgs);
+    Assert.assertEquals(2, result.getBuckets().size());
+    Assert.assertEquals("bBucket_4",
+        result.getBuckets().get(0).getBucketName());
+    Assert.assertEquals("bBucket_5",
+        result.getBuckets().get(1).getBucketName());
+
+    // Provide an invalid bucket name as start key.
+    listBucketArgs = new ListArgs(volArgs, null, 100, "unknown_bucket_name");
+    ListBuckets buckets = storageHandler.listBuckets(listBucketArgs);
+    Assert.assertEquals(buckets.getBuckets().size(), 0);
+
+    // Use all arguments.
+    listBucketArgs = new ListArgs(volArgs, "b", 5, "bBucket_7");
+    result = storageHandler.listBuckets(listBucketArgs);
+    Assert.assertEquals(2, result.getBuckets().size());
+    Assert.assertEquals("bBucket_8",
+        result.getBuckets().get(0).getBucketName());
+    Assert.assertEquals("bBucket_9",
+        result.getBuckets().get(1).getBucketName());
+
+    // Provide an invalid maxKeys argument.
+    try {
+      listBucketArgs = new ListArgs(volArgs, null, -1, null);
+      storageHandler.listBuckets(listBucketArgs);
+      Assert.fail("Expecting an error when the given"
+          + " maxKeys argument is invalid.");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage()
+          .contains(String.format("the value must be in range (0, %d]",
+              OzoneConsts.MAX_LISTBUCKETS_SIZE)));
+    }
+
+    // Provide an invalid volume name.
+    VolumeArgs invalidVolArgs = new VolumeArgs("invalid_name", userArgs);
+    try {
+      listBucketArgs = new ListArgs(invalidVolArgs, null, 100, null);
+      storageHandler.listBuckets(listBucketArgs);
+      Assert.fail("Expecting an error when the given volume name is invalid.");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof IOException);
+      Assert.assertTrue(e.getMessage()
+          .contains(Status.VOLUME_NOT_FOUND.name()));
+    }
+  }
+
+  /**
+   * Test list keys.
+   * @throws IOException
+   * @throws OzoneException
+   */
+  @Test
+  public void testListKeys() throws IOException, OzoneException {
+    ListKeys result = null;
+    ListArgs listKeyArgs = null;
+
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    // Write 20 keys in bucket.
+    int numKeys = 20;
+    String keyName = "Key";
+    KeyArgs keyArgs = null;
+    for (int i = 0; i < numKeys; i++) {
+      if (i % 2 == 0) {
+        // Create /volume/bucket/aKey[0,2,4,...,18] in bucket.
+        keyArgs = new KeyArgs("a" + keyName + i, bucketArgs);
+      } else {
+        // Create /volume/bucket/bKey[1,3,5,...,19] in bucket.
+        keyArgs = new KeyArgs("b" + keyName + i, bucketArgs);
+      }
+      keyArgs.setSize(4096);
+
+      // Just for testing list keys call, so no need to write real data.
+      OutputStream stream = storageHandler.newKeyWriter(keyArgs);
+      stream.close();
+    }
+
+    // List all keys in bucket.
+    bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
+    listKeyArgs = new ListArgs(bucketArgs, null, 100, null);
+    result = storageHandler.listKeys(listKeyArgs);
+    Assert.assertEquals(numKeys, result.getKeyList().size());
+
+    // List keys with prefix "aKey".
+    listKeyArgs = new ListArgs(bucketArgs, "aKey", 100, null);
+    result = storageHandler.listKeys(listKeyArgs);
+    Assert.assertEquals(numKeys / 2, result.getKeyList().size());
+    Assert.assertTrue(result.getKeyList().stream()
+        .allMatch(entry -> entry.getKeyName().startsWith("aKey")));
+
+    // List a certain number of keys.
+    listKeyArgs = new ListArgs(bucketArgs, null, 3, null);
+    result = storageHandler.listKeys(listKeyArgs);
+    Assert.assertEquals(3, result.getKeyList().size());
+    Assert.assertEquals("aKey0",
+        result.getKeyList().get(0).getKeyName());
+    Assert.assertEquals("aKey10",
+        result.getKeyList().get(1).getKeyName());
+    Assert.assertEquals("aKey12",
+        result.getKeyList().get(2).getKeyName());
+
+    // List a certain number of keys from the startKey.
+    listKeyArgs = new ListArgs(bucketArgs, null, 2, "bKey1");
+    result = storageHandler.listKeys(listKeyArgs);
+    Assert.assertEquals(2, result.getKeyList().size());
+    Assert.assertEquals("bKey11",
+        result.getKeyList().get(0).getKeyName());
+    Assert.assertEquals("bKey13",
+        result.getKeyList().get(1).getKeyName());
+
+    // Provide an invalid key name as start key.
+    listKeyArgs = new ListArgs(bucketArgs, null, 100, "invalid_start_key");
+    ListKeys keys = storageHandler.listKeys(listKeyArgs);
+    Assert.assertEquals(keys.getKeyList().size(), 0);
+
+    // Provide an invalid maxKeys argument.
+    try {
+      listKeyArgs = new ListArgs(bucketArgs, null, -1, null);
+      storageHandler.listBuckets(listKeyArgs);
+      Assert.fail("Expecting an error when the given"
+          + " maxKeys argument is invalid.");
+    } catch (Exception e) {
+      GenericTestUtils.assertExceptionContains(
+          String.format("the value must be in range (0, %d]",
+              OzoneConsts.MAX_LISTKEYS_SIZE), e);
+    }
+
+    // Provide an invalid bucket name.
+    bucketArgs = new BucketArgs("invalid_bucket", createVolumeArgs);
+    try {
+      listKeyArgs = new ListArgs(bucketArgs, null, numKeys, null);
+      storageHandler.listKeys(listKeyArgs);
+      Assert.fail(
+          "Expecting an error when the given bucket name is invalid.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          Status.BUCKET_NOT_FOUND.name(), e);
+    }
+  }
+
+  @Test
+  public void testListVolumes() throws IOException, OzoneException {
+
+    String user0 = "testListVolumes-user-0";
+    String user1 = "testListVolumes-user-1";
+    String adminUser = "testListVolumes-admin";
+    ListArgs listVolumeArgs;
+    ListVolumes volumes;
+
+    // Create 10 volumes by user0 and user1
+    String[] user0vols = new String[10];
+    String[] user1vols = new String[10];
+    for (int i =0; i<10; i++) {
+      VolumeArgs createVolumeArgs;
+      String user0VolName = "Vol-" + user0 + "-" + i;
+      user0vols[i] = user0VolName;
+      createVolumeArgs = new VolumeArgs(user0VolName, userArgs);
+      createVolumeArgs.setUserName(user0);
+      createVolumeArgs.setAdminName(adminUser);
+      createVolumeArgs.setQuota(new OzoneQuota(i, OzoneQuota.Units.GB));
+      storageHandler.createVolume(createVolumeArgs);
+
+      String user1VolName = "Vol-" + user1 + "-" + i;
+      user1vols[i] = user1VolName;
+      createVolumeArgs = new VolumeArgs(user1VolName, userArgs);
+      createVolumeArgs.setUserName(user1);
+      createVolumeArgs.setAdminName(adminUser);
+      createVolumeArgs.setQuota(new OzoneQuota(i, OzoneQuota.Units.GB));
+      storageHandler.createVolume(createVolumeArgs);
+    }
+
+    // Test list all volumes
+    UserArgs userArgs0 = new UserArgs(user0, OzoneUtils.getRequestID(),
+        null, null, null, null);
+    listVolumeArgs = new ListArgs(userArgs0, "Vol-testListVolumes", 100, null);
+    listVolumeArgs.setRootScan(true);
+    volumes = storageHandler.listVolumes(listVolumeArgs);
+    Assert.assertEquals(20, volumes.getVolumes().size());
+
+    // Test list all volumes belongs to an user
+    listVolumeArgs = new ListArgs(userArgs0, null, 100, null);
+    listVolumeArgs.setRootScan(false);
+    volumes = storageHandler.listVolumes(listVolumeArgs);
+    Assert.assertEquals(10, volumes.getVolumes().size());
+
+    // Test prefix
+    listVolumeArgs = new ListArgs(userArgs0,
+        "Vol-" + user0 + "-3", 100, null);
+    volumes = storageHandler.listVolumes(listVolumeArgs);
+    Assert.assertEquals(1, volumes.getVolumes().size());
+    Assert.assertEquals(user0vols[3],
+        volumes.getVolumes().get(0).getVolumeName());
+    Assert.assertEquals(user0,
+        volumes.getVolumes().get(0).getOwner().getName());
+
+    // Test list volumes by user
+    UserArgs userArgs1 = new UserArgs(user1, OzoneUtils.getRequestID(),
+        null, null, null, null);
+    listVolumeArgs = new ListArgs(userArgs1, null, 100, null);
+    listVolumeArgs.setRootScan(false);
+    volumes = storageHandler.listVolumes(listVolumeArgs);
+    Assert.assertEquals(10, volumes.getVolumes().size());
+    Assert.assertEquals(user1,
+        volumes.getVolumes().get(3).getOwner().getName());
+
+    // Make sure all available fields are returned
+    final String user0vol4 = "Vol-" + user0 + "-4";
+    final String user0vol5 = "Vol-" + user0 + "-5";
+    listVolumeArgs = new ListArgs(userArgs0, null, 1, user0vol4);
+    listVolumeArgs.setRootScan(false);
+    volumes = storageHandler.listVolumes(listVolumeArgs);
+    Assert.assertEquals(1, volumes.getVolumes().size());
+    Assert.assertEquals(user0,
+        volumes.getVolumes().get(0).getOwner().getName());
+    Assert.assertEquals(user0vol5,
+        volumes.getVolumes().get(0).getVolumeName());
+    Assert.assertEquals(5,
+        volumes.getVolumes().get(0).getQuota().getSize());
+    Assert.assertEquals(OzoneQuota.Units.GB,
+        volumes.getVolumes().get(0).getQuota().getUnit());
+
+    // User doesn't have volumes
+    UserArgs userArgsX = new UserArgs("unknwonUser", OzoneUtils.getRequestID(),
+        null, null, null, null);
+    listVolumeArgs = new ListArgs(userArgsX, null, 100, null);
+    listVolumeArgs.setRootScan(false);
+    volumes = storageHandler.listVolumes(listVolumeArgs);
+    Assert.assertEquals(0, volumes.getVolumes().size());
+  }
+
+  /**
+   * Test get key information.
+   *
+   * @throws IOException
+   * @throws OzoneException
+   */
+  @Test
+  public void testGetKeyInfo() throws IOException,
+      OzoneException, ParseException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    long currentTime = Time.now();
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String keyName = "testKey";
+    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
+    keyArgs.setSize(4096);
+
+
+    OutputStream stream = storageHandler.newKeyWriter(keyArgs);
+    stream.close();
+
+    KeyInfo keyInfo = storageHandler.getKeyInfo(keyArgs);
+    // Compare the time in second unit since the date string reparsed to
+    // millisecond will lose precision.
+    Assert.assertTrue(
+        (HddsClientUtils.formatDateTime(keyInfo.getCreatedOn()) / 1000) >= (
+            currentTime / 1000));
+    Assert.assertTrue(
+        (HddsClientUtils.formatDateTime(keyInfo.getModifiedOn()) / 1000) >= (
+            currentTime / 1000));
+    Assert.assertEquals(keyName, keyInfo.getKeyName());
+    // with out data written, the size would be 0
+    Assert.assertEquals(0, keyInfo.getSize());
+  }
+
+  /**
+   * Test that the write can proceed without having to set the right size.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteSize() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(100);
+    // write a key without specifying size at all
+    String keyName = "testKey";
+    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    byte[] data = new byte[dataString.length()];
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    Assert.assertEquals(dataString, DFSUtil.bytes2String(data));
+
+    // write a key with a size, but write above it.
+    String keyName1 = "testKey1";
+    KeyArgs keyArgs1 = new KeyArgs(keyName1, bucketArgs);
+    keyArgs1.setSize(30);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs1)) {
+      stream.write(dataString.getBytes());
+    }
+    byte[] data1 = new byte[dataString.length()];
+    try (InputStream in = storageHandler.newKeyReader(keyArgs1)) {
+      in.read(data1);
+    }
+    Assert.assertEquals(dataString, DFSUtil.bytes2String(data1));
+  }
+
+  /**
+   * Tests the RPC call for getting scmId and clusterId from SCM.
+   * @throws IOException
+   */
+  @Test
+  public void testGetScmInfo() throws IOException {
+    ScmInfo info = cluster.getOzoneManager().getScmInfo();
+    Assert.assertEquals(clusterId, info.getClusterId());
+    Assert.assertEquals(scmId, info.getScmId());
+  }
+
+
+  @Test
+  public void testExpiredOpenKey() throws Exception {
+    BackgroundService openKeyCleanUpService = ((KeyManagerImpl)cluster
+        .getOzoneManager().getKeyManager()).getOpenKeyCleanupService();
+
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    // open some keys.
+
+    KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs);
+    KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs);
+    KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs);
+    KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs);
+    List<BlockGroup> openKeys;
+    storageHandler.newKeyWriter(keyArgs1);
+    storageHandler.newKeyWriter(keyArgs2);
+    storageHandler.newKeyWriter(keyArgs3);
+    storageHandler.newKeyWriter(keyArgs4);
+
+    Set<String> expected = Stream.of(
+        "testKey1", "testKey2", "testKey3", "testKey4")
+        .collect(Collectors.toSet());
+
+    // Now all k1-k4 should be in open state, so ExpiredOpenKeys should not
+    // contain these values.
+    openKeys = cluster.getOzoneManager()
+        .getMetadataManager().getExpiredOpenKeys();
+
+    for (BlockGroup bg : openKeys) {
+      String[] subs = bg.getGroupID().split("/");
+      String keyName = subs[subs.length - 1];
+      Assert.assertFalse(expected.contains(keyName));
+    }
+
+    Thread.sleep(2000);
+    // Now all k1-k4 should be in ExpiredOpenKeys
+    openKeys = cluster.getOzoneManager()
+        .getMetadataManager().getExpiredOpenKeys();
+    for (BlockGroup bg : openKeys) {
+      String[] subs = bg.getGroupID().split("/");
+      String keyName = subs[subs.length - 1];
+      if (expected.contains(keyName)) {
+        expected.remove(keyName);
+      }
+    }
+    Assert.assertEquals(0, expected.size());
+
+    KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs);
+    storageHandler.newKeyWriter(keyArgs5);
+
+    openKeyCleanUpService.triggerBackgroundTaskForTesting();
+    Thread.sleep(2000);
+    // now all k1-k4 should have been removed by the clean-up task, only k5
+    // should be present in ExpiredOpenKeys.
+    openKeys =
+        cluster.getOzoneManager().getMetadataManager().getExpiredOpenKeys();
+    System.out.println(openKeys);
+    boolean key5found = false;
+    Set<String> removed = Stream.of(
+        "testKey1", "testKey2", "testKey3", "testKey4")
+        .collect(Collectors.toSet());
+    for (BlockGroup bg : openKeys) {
+      String[] subs = bg.getGroupID().split("/");
+      String keyName = subs[subs.length - 1];
+      Assert.assertFalse(removed.contains(keyName));
+      if (keyName.equals("testKey5")) {
+        key5found = true;
+      }
+    }
+    Assert.assertTrue(key5found);
+  }
+
+  /**
+   * Tests the OM Initialization.
+   * @throws IOException
+   */
+  @Test
+  public void testOmInitialization() throws IOException {
+    // Read the version file info from OM version file
+    OMStorage omStorage = cluster.getOzoneManager().getOmStorage();
+    SCMStorage scmStorage = new SCMStorage(conf);
+    // asserts whether cluster Id and SCM ID are properly set in SCM Version
+    // file.
+    Assert.assertEquals(clusterId, scmStorage.getClusterID());
+    Assert.assertEquals(scmId, scmStorage.getScmId());
+    // asserts whether OM Id is properly set in OM Version file.
+    Assert.assertEquals(omId, omStorage.getOmId());
+    // asserts whether the SCM info is correct in OM Version file.
+    Assert.assertEquals(clusterId, omStorage.getClusterID());
+    Assert.assertEquals(scmId, omStorage.getScmId());
+  }
+
+  /**
+   * Tests the OM Initialization Failure.
+   * @throws IOException
+   */
+  @Test
+  public void testOmInitializationFailure() throws Exception {
+    OzoneConfiguration config = new OzoneConfiguration();
+    final String path =
+        GenericTestUtils.getTempPath(UUID.randomUUID().toString());
+    Path metaDirPath = Paths.get(path, "om-meta");
+    config.set(OzoneConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+    config.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
+    config.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
+    config.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
+        conf.get(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY));
+    exception.expect(OMException.class);
+    exception.expectMessage("OM not initialized.");
+    OzoneManager.createOm(null, config);
+    OMStorage omStore = new OMStorage(config);
+    omStore.setClusterId("testClusterId");
+    omStore.setScmId("testScmId");
+    // writes the version file properties
+    omStore.initialize();
+    exception.expect(OMException.class);
+    exception.expectMessage("SCM version info mismatch.");
+    OzoneManager.createOm(null, conf);
+  }
+
+  @Test
+  public void testGetServiceList() throws IOException {
+    long numGetServiceListCalls = omMetrics.getNumGetServiceLists();
+    List<ServiceInfo> services = cluster.getOzoneManager().getServiceList();
+
+    Assert.assertEquals(numGetServiceListCalls + 1,
+        omMetrics.getNumGetServiceLists());
+
+    ServiceInfo omInfo = services.stream().filter(
+        a -> a.getNodeType().equals(HddsProtos.NodeType.OM))
+        .collect(Collectors.toList()).get(0);
+    InetSocketAddress omAddress = new InetSocketAddress(omInfo.getHostname(),
+        omInfo.getPort(ServicePort.Type.RPC));
+    Assert.assertEquals(NetUtils.createSocketAddr(
+        conf.get(OZONE_OM_ADDRESS_KEY)), omAddress);
+
+    ServiceInfo scmInfo = services.stream().filter(
+        a -> a.getNodeType().equals(HddsProtos.NodeType.SCM))
+        .collect(Collectors.toList()).get(0);
+    InetSocketAddress scmAddress = new InetSocketAddress(scmInfo.getHostname(),
+        scmInfo.getPort(ServicePort.Type.RPC));
+    Assert.assertEquals(NetUtils.createSocketAddr(
+        conf.get(OZONE_SCM_CLIENT_ADDRESS_KEY)), scmAddress);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestInterface.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestInterface.java
new file mode 100644
index 0000000..8168d27
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRestInterface.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.ServicePort;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
+import static org.apache.hadoop.ozone.OmUtils.getOmAddressForClients;
+
+/**
+ * This class is to test the REST interface exposed by OzoneManager.
+ */
+public class TestOzoneManagerRestInterface {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testGetServiceList() throws Exception {
+    OzoneManagerHttpServer server =
+        cluster.getOzoneManager().getHttpServer();
+    HttpClient client = HttpClients.createDefault();
+    String connectionUri = "http://" +
+        NetUtils.getHostPortString(server.getHttpAddress());
+    HttpGet httpGet = new HttpGet(connectionUri + "/serviceList");
+    HttpResponse response = client.execute(httpGet);
+    String serviceListJson = EntityUtils.toString(response.getEntity());
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    TypeReference<List<ServiceInfo>> serviceInfoReference =
+        new TypeReference<List<ServiceInfo>>() {};
+    List<ServiceInfo> serviceInfos = objectMapper.readValue(
+        serviceListJson, serviceInfoReference);
+    Map<HddsProtos.NodeType, ServiceInfo> serviceMap = new HashMap<>();
+    for (ServiceInfo serviceInfo : serviceInfos) {
+      serviceMap.put(serviceInfo.getNodeType(), serviceInfo);
+    }
+
+    InetSocketAddress omAddress =
+        getOmAddressForClients(conf);
+    ServiceInfo omInfo = serviceMap.get(HddsProtos.NodeType.OM);
+
+    Assert.assertEquals(omAddress.getHostName(), omInfo.getHostname());
+    Assert.assertEquals(omAddress.getPort(),
+        omInfo.getPort(ServicePort.Type.RPC));
+    Assert.assertEquals(server.getHttpAddress().getPort(),
+        omInfo.getPort(ServicePort.Type.HTTP));
+
+    InetSocketAddress scmAddress =
+        getScmAddressForClients(conf);
+    ServiceInfo scmInfo = serviceMap.get(HddsProtos.NodeType.SCM);
+
+    Assert.assertEquals(scmAddress.getHostName(), scmInfo.getHostname());
+    Assert.assertEquals(scmAddress.getPort(),
+        scmInfo.getPort(ServicePort.Type.RPC));
+
+    ServiceInfo datanodeInfo = serviceMap.get(HddsProtos.NodeType.DATANODE);
+    DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
+        .getDatanodeDetails();
+    Assert.assertEquals(datanodeDetails.getHostName(),
+        datanodeInfo.getHostname());
+
+    Map<ServicePort.Type, Integer> ports = datanodeInfo.getPorts();
+    for(ServicePort.Type type : ports.keySet()) {
+      switch (type) {
+      case HTTP:
+      case HTTPS:
+        Assert.assertEquals(
+            datanodeDetails.getPort(DatanodeDetails.Port.Name.REST).getValue(),
+            ports.get(type));
+        break;
+      default:
+        // OM only sends Datanode's info port details
+        // i.e. HTTP or HTTPS
+        // Other ports are not expected as of now.
+        Assert.fail();
+        break;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestOzoneShell.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestOzoneShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestOzoneShell.java
index ed8f0d5..5082870 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestOzoneShell.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ozShell/TestOzoneShell.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.client.rest.RestClient;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
-import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.web.ozShell.Shell;
 import org.apache.hadoop.ozone.web.request.OzoneQuota;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
@@ -167,23 +167,23 @@ public class TestOzoneShell {
     System.setOut(new PrintStream(out));
     System.setErr(new PrintStream(err));
     if(clientProtocol.equals(RestClient.class)) {
-      String hostName = cluster.getKeySpaceManager().getHttpServer()
+      String hostName = cluster.getOzoneManager().getHttpServer()
           .getHttpAddress().getHostName();
       int port = cluster
-          .getKeySpaceManager().getHttpServer().getHttpAddress().getPort();
+          .getOzoneManager().getHttpServer().getHttpAddress().getPort();
       url = String.format("http://" + hostName + ":" + port);
     } else {
       List<ServiceInfo> services = null;
       try {
-        services = cluster.getKeySpaceManager().getServiceList();
+        services = cluster.getOzoneManager().getServiceList();
       } catch (IOException e) {
-        LOG.error("Could not get service list from KSM");
+        LOG.error("Could not get service list from OM");
       }
       String hostName = services.stream().filter(
-          a -> a.getNodeType().equals(HddsProtos.NodeType.KSM))
+          a -> a.getNodeType().equals(HddsProtos.NodeType.OM))
           .collect(Collectors.toList()).get(0).getHostname();
 
-      String port = cluster.getKeySpaceManager().getRpcPort();
+      String port = cluster.getOzoneManager().getRpcPort();
       url = String.format("o3://" + hostName + ":" + port);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
index b4ed2b1..1a1f37c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.scm.cli.SQLCLI;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -113,7 +112,7 @@ public class TestContainerSQLCli {
     cluster.waitForClusterToBeReady();
     datanodeIpAddress = cluster.getHddsDatanodes().get(0)
         .getDatanodeDetails().getIpAddress();
-    cluster.getKeySpaceManager().stop();
+    cluster.getOzoneManager().stop();
     cluster.getStorageContainerManager().stop();
 
     nodeManager = cluster.getStorageContainerManager().getScmNodeManager();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
index 0e61391..e592d56 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
@@ -90,7 +90,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
   @Test
   public void testCreateVolumes() throws IOException {
     super.testCreateVolumes(port);
-    Assert.assertEquals(0, cluster.getKeySpaceManager()
+    Assert.assertEquals(0, cluster.getOzoneManager()
         .getMetrics().getNumVolumeCreateFails());
   }
 
@@ -102,7 +102,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
   @Test
   public void testCreateVolumesWithQuota() throws IOException {
     super.testCreateVolumesWithQuota(port);
-    Assert.assertEquals(0, cluster.getKeySpaceManager()
+    Assert.assertEquals(0, cluster.getOzoneManager()
         .getMetrics().getNumVolumeCreateFails());
   }
 
@@ -114,7 +114,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
   @Test
   public void testCreateVolumesWithInvalidQuota() throws IOException {
     super.testCreateVolumesWithInvalidQuota(port);
-    Assert.assertEquals(0, cluster.getKeySpaceManager()
+    Assert.assertEquals(0, cluster.getOzoneManager()
         .getMetrics().getNumVolumeCreateFails());
   }
 
@@ -128,7 +128,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
   @Test
   public void testCreateVolumesWithInvalidUser() throws IOException {
     super.testCreateVolumesWithInvalidUser(port);
-    Assert.assertEquals(0, cluster.getKeySpaceManager()
+    Assert.assertEquals(0, cluster.getOzoneManager()
         .getMetrics().getNumVolumeCreateFails());
   }
 
@@ -143,7 +143,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
   @Test
   public void testCreateVolumesWithOutAdminRights() throws IOException {
     super.testCreateVolumesWithOutAdminRights(port);
-    Assert.assertEquals(0, cluster.getKeySpaceManager()
+    Assert.assertEquals(0, cluster.getOzoneManager()
         .getMetrics().getNumVolumeCreateFails());
   }
 
@@ -155,7 +155,7 @@ public class TestDistributedOzoneVolumes extends TestOzoneHelper {
   @Test
   public void testCreateVolumesInLoop() throws IOException {
     super.testCreateVolumesInLoop(port);
-    Assert.assertEquals(0, cluster.getKeySpaceManager()
+    Assert.assertEquals(0, cluster.getOzoneManager()
         .getMetrics().getNumVolumeCreateFails());
   }
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
index b86c577..a95bd0e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
@@ -48,13 +48,13 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.ksm.KeySpaceManager;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .Status;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
@@ -644,15 +644,15 @@ public class TestKeys {
     }
   }
 
-  private int countKsmKeys(KeySpaceManager ksm) throws IOException {
+  private int countOmKeys(OzoneManager om) throws IOException {
     int totalCount = 0;
-    List<KsmVolumeArgs> volumes =
-        ksm.listAllVolumes(null, null, Integer.MAX_VALUE);
-    for (KsmVolumeArgs volume : volumes) {
-      List<KsmBucketInfo> buckets =
-          ksm.listBuckets(volume.getVolume(), null, null, Integer.MAX_VALUE);
-      for (KsmBucketInfo bucket : buckets) {
-        List<KsmKeyInfo> keys = ksm.listKeys(bucket.getVolumeName(),
+    List<OmVolumeArgs> volumes =
+        om.listAllVolumes(null, null, Integer.MAX_VALUE);
+    for (OmVolumeArgs volume : volumes) {
+      List<OmBucketInfo> buckets =
+          om.listBuckets(volume.getVolume(), null, null, Integer.MAX_VALUE);
+      for (OmBucketInfo bucket : buckets) {
+        List<OmKeyInfo> keys = om.listKeys(bucket.getVolumeName(),
             bucket.getBucketName(), null, null, Integer.MAX_VALUE);
         totalCount += keys.size();
       }
@@ -662,10 +662,10 @@ public class TestKeys {
 
   @Test
   public void testDeleteKey() throws Exception {
-    KeySpaceManager ksm = ozoneCluster.getKeySpaceManager();
+    OzoneManager ozoneManager = ozoneCluster.getOzoneManager();
     // To avoid interference from other test cases,
     // we collect number of existing keys at the beginning
-    int numOfExistedKeys = countKsmKeys(ksm);
+    int numOfExistedKeys = countOmKeys(ozoneManager);
 
     // Keep tracking bucket keys info while creating them
     PutHelper helper = new PutHelper(client, path);
@@ -689,15 +689,15 @@ public class TestKeys {
     // count the total number of created keys.
     Set<Pair<String, String>> buckets = bucketKeys.getAllBuckets();
     for (Pair<String, String> buk : buckets) {
-      List<KsmKeyInfo> createdKeys =
-          ksm.listKeys(buk.getKey(), buk.getValue(), null, null, 20);
+      List<OmKeyInfo> createdKeys =
+          ozoneManager.listKeys(buk.getKey(), buk.getValue(), null, null, 20);
 
       // Memorize chunks that has been created,
       // so we can verify actual deletions at DN side later.
-      for (KsmKeyInfo keyInfo : createdKeys) {
-        List<KsmKeyLocationInfo> locations =
+      for (OmKeyInfo keyInfo : createdKeys) {
+        List<OmKeyLocationInfo> locations =
             keyInfo.getLatestVersionLocations().getLocationList();
-        for (KsmKeyLocationInfo location : locations) {
+        for (OmKeyLocationInfo location : locations) {
           KeyData keyData = new KeyData(location.getBlockID());
           KeyData blockInfo = cm.getContainerManager()
               .getKeyManager().getKey(keyData);
@@ -721,9 +721,9 @@ public class TestKeys {
     // Ensure all keys are created.
     Assert.assertEquals(20, numOfCreatedKeys);
 
-    // Ensure all keys are visible from KSM.
+    // Ensure all keys are visible from OM.
     // Total number should be numOfCreated + numOfExisted
-    Assert.assertEquals(20 + numOfExistedKeys, countKsmKeys(ksm));
+    Assert.assertEquals(20 + numOfExistedKeys, countOmKeys(ozoneManager));
 
     // Delete 10 keys
     int delCount = 20;
@@ -732,21 +732,21 @@ public class TestKeys {
       List<String> bks = bucketKeys.getBucketKeys(bucketInfo.getValue());
       for (String keyName : bks) {
         if (delCount > 0) {
-          KsmKeyArgs arg =
-              new KsmKeyArgs.Builder().setVolumeName(bucketInfo.getKey())
+          OmKeyArgs arg =
+              new OmKeyArgs.Builder().setVolumeName(bucketInfo.getKey())
                   .setBucketName(bucketInfo.getValue()).setKeyName(keyName)
                   .build();
-          ksm.deleteKey(arg);
+          ozoneManager.deleteKey(arg);
           delCount--;
         }
       }
     }
 
-    // It should be pretty quick that keys are removed from KSM namespace,
+    // It should be pretty quick that keys are removed from OM namespace,
     // because actual deletion happens in async mode.
     GenericTestUtils.waitFor(() -> {
       try {
-        int num = countKsmKeys(ksm);
+        int num = countOmKeys(ozoneManager);
         return num == (numOfExistedKeys);
       } catch (IOException e) {
         return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/resources/webapps/ksm/.gitkeep
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/resources/webapps/ksm/.gitkeep b/hadoop-ozone/integration-test/src/test/resources/webapps/ksm/.gitkeep
deleted file mode 100644
index 09697dc..0000000
--- a/hadoop-ozone/integration-test/src/test/resources/webapps/ksm/.gitkeep
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/resources/webapps/ozoneManager/.gitkeep
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/resources/webapps/ozoneManager/.gitkeep b/hadoop-ozone/integration-test/src/test/resources/webapps/ozoneManager/.gitkeep
new file mode 100644
index 0000000..09697dc
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/resources/webapps/ozoneManager/.gitkeep
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
index 3128d31..2200cd8 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
-import static org.apache.hadoop.ozone.KsmUtils.getKsmAddress;
+import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
 import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
 import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
@@ -34,9 +34,8 @@ import com.sun.jersey.api.container.ContainerFactory;
 import com.sun.jersey.api.core.ApplicationAdapter;
 
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.ksm.protocolPB
-    .KeySpaceManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.web.ObjectStoreApplication;
 import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
@@ -72,8 +71,8 @@ public final class ObjectStoreHandler implements Closeable {
       LoggerFactory.getLogger(ObjectStoreHandler.class);
 
   private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
-  private final KeySpaceManagerProtocolClientSideTranslatorPB
-      keySpaceManagerClient;
+  private final OzoneManagerProtocolClientSideTranslatorPB
+      ozoneManagerClient;
   private final StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
   private final ScmBlockLocationProtocolClientSideTranslatorPB
@@ -119,28 +118,28 @@ public final class ObjectStoreHandler implements Closeable {
                   NetUtils.getDefaultSocketFactory(conf),
                   Client.getRpcTimeout(conf)));
 
-      RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
+      RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
           ProtobufRpcEngine.class);
-      long ksmVersion =
-          RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
-      InetSocketAddress ksmAddress = getKsmAddress(conf);
-      this.keySpaceManagerClient =
-          new KeySpaceManagerProtocolClientSideTranslatorPB(
-              RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
-              ksmAddress, UserGroupInformation.getCurrentUser(), conf,
+      long omVersion =
+          RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+      InetSocketAddress omAddress = getOmAddress(conf);
+      this.ozoneManagerClient =
+          new OzoneManagerProtocolClientSideTranslatorPB(
+              RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+                  omAddress, UserGroupInformation.getCurrentUser(), conf,
               NetUtils.getDefaultSocketFactory(conf),
               Client.getRpcTimeout(conf)));
 
       storageHandler = new DistributedStorageHandler(
           new OzoneConfiguration(conf),
           this.storageContainerLocationClient,
-          this.keySpaceManagerClient);
+          this.ozoneManagerClient);
     } else {
       if (OzoneConsts.OZONE_HANDLER_LOCAL.equalsIgnoreCase(shType)) {
         storageHandler = new LocalStorageHandler(conf);
         this.storageContainerLocationClient = null;
         this.scmBlockLocationClient = null;
-        this.keySpaceManagerClient = null;
+        this.ozoneManagerClient = null;
       } else {
         throw new IllegalArgumentException(
             String.format("Unrecognized value for %s: %s,"
@@ -186,6 +185,6 @@ public final class ObjectStoreHandler implements Closeable {
     storageHandler.close();
     IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
     IOUtils.cleanupWithLogger(LOG, scmBlockLocationClient);
-    IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
+    IOUtils.cleanupWithLogger(LOG, ozoneManagerClient);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyProcessTemplate.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyProcessTemplate.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyProcessTemplate.java
index ef0293e..ad48787 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyProcessTemplate.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyProcessTemplate.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.ozone.web.handlers;
 import org.apache.commons.codec.binary.Base64;
 
 import org.apache.hadoop.ozone.OzoneRestUtils;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.client.rest.headers.Header;
@@ -102,7 +102,7 @@ public abstract class KeyProcessTemplate {
       LOG.error("IOException:", fsExp);
       // Map KEY_NOT_FOUND to INVALID_KEY
       if (fsExp.getMessage().endsWith(
-          KeySpaceManagerProtocolProtos.Status.KEY_NOT_FOUND.name())) {
+          OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND.name())) {
         throw ErrorTable.newError(ErrorTable.INVALID_KEY, userArgs, fsExp);
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeProcessTemplate.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeProcessTemplate.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeProcessTemplate.java
index 1d98400..fb95bb9 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeProcessTemplate.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/handlers/VolumeProcessTemplate.java
@@ -30,7 +30,7 @@ import java.nio.file.NoSuchFileException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ozone.OzoneRestUtils;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.interfaces.UserAuth;
@@ -135,7 +135,7 @@ public abstract class VolumeProcessTemplate {
     OzoneException exp = null;
 
     if ((fsExp != null && fsExp.getMessage().endsWith(
-        KeySpaceManagerProtocolProtos.Status.VOLUME_ALREADY_EXISTS.name()))
+        OzoneManagerProtocolProtos.Status.VOLUME_ALREADY_EXISTS.name()))
         || fsExp instanceof FileAlreadyExistsException) {
       exp = ErrorTable
           .newError(ErrorTable.VOLUME_ALREADY_EXISTS, reqID, volume, hostName);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org