You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:19:57 UTC

[21/50] [abbrv] hadoop git commit: HDDS-167. Rename KeySpaceManager to OzoneManager. Contributed by Arpit Agarwal.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMMetrcis.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMMetrcis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMMetrcis.java
deleted file mode 100644
index bf7d870..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMMetrcis.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-
-import java.io.IOException;
-
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.test.Whitebox;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Test for KSM metrics.
- */
-public class TestKSMMetrcis {
-  private MiniOzoneCluster cluster;
-  private KeySpaceManager ksmManager;
-
-  /**
-   * The exception used for testing failure metrics.
-   */
-  private IOException exception = new IOException();
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   *
-   * @throws IOException
-   */
-  @Before
-  public void setup() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
-        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    cluster = MiniOzoneCluster.newBuilder(conf).build();
-    cluster.waitForClusterToBeReady();
-    ksmManager = cluster.getKeySpaceManager();
-  }
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @After
-  public void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testVolumeOps() throws IOException {
-    VolumeManager volumeManager = (VolumeManager) Whitebox
-        .getInternalState(ksmManager, "volumeManager");
-    VolumeManager mockVm = Mockito.spy(volumeManager);
-
-    Mockito.doNothing().when(mockVm).createVolume(null);
-    Mockito.doNothing().when(mockVm).deleteVolume(null);
-    Mockito.doReturn(null).when(mockVm).getVolumeInfo(null);
-    Mockito.doReturn(true).when(mockVm).checkVolumeAccess(null, null);
-    Mockito.doNothing().when(mockVm).setOwner(null, null);
-    Mockito.doReturn(null).when(mockVm).listVolumes(null, null, null, 0);
-
-    Whitebox.setInternalState(ksmManager, "volumeManager", mockVm);
-    doVolumeOps();
-
-    MetricsRecordBuilder ksmMetrics = getMetrics("KSMMetrics");
-    assertCounter("NumVolumeOps", 6L, ksmMetrics);
-    assertCounter("NumVolumeCreates", 1L, ksmMetrics);
-    assertCounter("NumVolumeUpdates", 1L, ksmMetrics);
-    assertCounter("NumVolumeInfos", 1L, ksmMetrics);
-    assertCounter("NumVolumeCheckAccesses", 1L, ksmMetrics);
-    assertCounter("NumVolumeDeletes", 1L, ksmMetrics);
-    assertCounter("NumVolumeLists", 1L, ksmMetrics);
-
-    // inject exception to test for Failure Metrics
-    Mockito.doThrow(exception).when(mockVm).createVolume(null);
-    Mockito.doThrow(exception).when(mockVm).deleteVolume(null);
-    Mockito.doThrow(exception).when(mockVm).getVolumeInfo(null);
-    Mockito.doThrow(exception).when(mockVm).checkVolumeAccess(null, null);
-    Mockito.doThrow(exception).when(mockVm).setOwner(null, null);
-    Mockito.doThrow(exception).when(mockVm).listVolumes(null, null, null, 0);
-
-    Whitebox.setInternalState(ksmManager, "volumeManager", mockVm);
-    doVolumeOps();
-
-    ksmMetrics = getMetrics("KSMMetrics");
-    assertCounter("NumVolumeOps", 12L, ksmMetrics);
-    assertCounter("NumVolumeCreates", 2L, ksmMetrics);
-    assertCounter("NumVolumeUpdates", 2L, ksmMetrics);
-    assertCounter("NumVolumeInfos", 2L, ksmMetrics);
-    assertCounter("NumVolumeCheckAccesses", 2L, ksmMetrics);
-    assertCounter("NumVolumeDeletes", 2L, ksmMetrics);
-    assertCounter("NumVolumeLists", 2L, ksmMetrics);
-
-    assertCounter("NumVolumeCreateFails", 1L, ksmMetrics);
-    assertCounter("NumVolumeUpdateFails", 1L, ksmMetrics);
-    assertCounter("NumVolumeInfoFails", 1L, ksmMetrics);
-    assertCounter("NumVolumeCheckAccessFails", 1L, ksmMetrics);
-    assertCounter("NumVolumeDeleteFails", 1L, ksmMetrics);
-    assertCounter("NumVolumeListFails", 1L, ksmMetrics);
-  }
-
-  @Test
-  public void testBucketOps() throws IOException {
-    BucketManager bucketManager = (BucketManager) Whitebox
-        .getInternalState(ksmManager, "bucketManager");
-    BucketManager mockBm = Mockito.spy(bucketManager);
-
-    Mockito.doNothing().when(mockBm).createBucket(null);
-    Mockito.doNothing().when(mockBm).deleteBucket(null, null);
-    Mockito.doReturn(null).when(mockBm).getBucketInfo(null, null);
-    Mockito.doNothing().when(mockBm).setBucketProperty(null);
-    Mockito.doReturn(null).when(mockBm).listBuckets(null, null, null, 0);
-
-    Whitebox.setInternalState(ksmManager, "bucketManager", mockBm);
-    doBucketOps();
-
-    MetricsRecordBuilder ksmMetrics = getMetrics("KSMMetrics");
-    assertCounter("NumBucketOps", 5L, ksmMetrics);
-    assertCounter("NumBucketCreates", 1L, ksmMetrics);
-    assertCounter("NumBucketUpdates", 1L, ksmMetrics);
-    assertCounter("NumBucketInfos", 1L, ksmMetrics);
-    assertCounter("NumBucketDeletes", 1L, ksmMetrics);
-    assertCounter("NumBucketLists", 1L, ksmMetrics);
-
-    // inject exception to test for Failure Metrics
-    Mockito.doThrow(exception).when(mockBm).createBucket(null);
-    Mockito.doThrow(exception).when(mockBm).deleteBucket(null, null);
-    Mockito.doThrow(exception).when(mockBm).getBucketInfo(null, null);
-    Mockito.doThrow(exception).when(mockBm).setBucketProperty(null);
-    Mockito.doThrow(exception).when(mockBm).listBuckets(null, null, null, 0);
-
-    Whitebox.setInternalState(ksmManager, "bucketManager", mockBm);
-    doBucketOps();
-
-    ksmMetrics = getMetrics("KSMMetrics");
-    assertCounter("NumBucketOps", 10L, ksmMetrics);
-    assertCounter("NumBucketCreates", 2L, ksmMetrics);
-    assertCounter("NumBucketUpdates", 2L, ksmMetrics);
-    assertCounter("NumBucketInfos", 2L, ksmMetrics);
-    assertCounter("NumBucketDeletes", 2L, ksmMetrics);
-    assertCounter("NumBucketLists", 2L, ksmMetrics);
-
-    assertCounter("NumBucketCreateFails", 1L, ksmMetrics);
-    assertCounter("NumBucketUpdateFails", 1L, ksmMetrics);
-    assertCounter("NumBucketInfoFails", 1L, ksmMetrics);
-    assertCounter("NumBucketDeleteFails", 1L, ksmMetrics);
-    assertCounter("NumBucketListFails", 1L, ksmMetrics);
-  }
-
-  @Test
-  public void testKeyOps() throws IOException {
-    KeyManager bucketManager = (KeyManager) Whitebox
-        .getInternalState(ksmManager, "keyManager");
-    KeyManager mockKm = Mockito.spy(bucketManager);
-
-    Mockito.doReturn(null).when(mockKm).openKey(null);
-    Mockito.doNothing().when(mockKm).deleteKey(null);
-    Mockito.doReturn(null).when(mockKm).lookupKey(null);
-    Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
-
-    Whitebox.setInternalState(ksmManager, "keyManager", mockKm);
-    doKeyOps();
-
-    MetricsRecordBuilder ksmMetrics = getMetrics("KSMMetrics");
-    assertCounter("NumKeyOps", 4L, ksmMetrics);
-    assertCounter("NumKeyAllocate", 1L, ksmMetrics);
-    assertCounter("NumKeyLookup", 1L, ksmMetrics);
-    assertCounter("NumKeyDeletes", 1L, ksmMetrics);
-    assertCounter("NumKeyLists", 1L, ksmMetrics);
-
-    // inject exception to test for Failure Metrics
-    Mockito.doThrow(exception).when(mockKm).openKey(null);
-    Mockito.doThrow(exception).when(mockKm).deleteKey(null);
-    Mockito.doThrow(exception).when(mockKm).lookupKey(null);
-    Mockito.doThrow(exception).when(mockKm).listKeys(
-        null, null, null, null, 0);
-
-    Whitebox.setInternalState(ksmManager, "keyManager", mockKm);
-    doKeyOps();
-
-    ksmMetrics = getMetrics("KSMMetrics");
-    assertCounter("NumKeyOps", 8L, ksmMetrics);
-    assertCounter("NumKeyAllocate", 2L, ksmMetrics);
-    assertCounter("NumKeyLookup", 2L, ksmMetrics);
-    assertCounter("NumKeyDeletes", 2L, ksmMetrics);
-    assertCounter("NumKeyLists", 2L, ksmMetrics);
-
-    assertCounter("NumKeyAllocateFails", 1L, ksmMetrics);
-    assertCounter("NumKeyLookupFails", 1L, ksmMetrics);
-    assertCounter("NumKeyDeleteFails", 1L, ksmMetrics);
-    assertCounter("NumKeyListFails", 1L, ksmMetrics);
-  }
-
-  /**
-   * Test volume operations with ignoring thrown exception.
-   */
-  private void doVolumeOps() {
-    try {
-      ksmManager.createVolume(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.deleteVolume(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.getVolumeInfo(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.checkVolumeAccess(null, null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.setOwner(null, null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.listAllVolumes(null, null, 0);
-    } catch (IOException ignored) {
-    }
-  }
-
-  /**
-   * Test bucket operations with ignoring thrown exception.
-   */
-  private void doBucketOps() {
-    try {
-      ksmManager.createBucket(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.deleteBucket(null, null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.getBucketInfo(null, null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.setBucketProperty(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.listBuckets(null, null, null, 0);
-    } catch (IOException ignored) {
-    }
-  }
-
-  /**
-   * Test key operations with ignoring thrown exception.
-   */
-  private void doKeyOps() {
-    try {
-      ksmManager.openKey(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.deleteKey(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.lookupKey(null);
-    } catch (IOException ignored) {
-    }
-
-    try {
-      ksmManager.listKeys(null, null, null, null, 0);
-    } catch (IOException ignored) {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMSQLCli.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMSQLCli.java
deleted file mode 100644
index 7b92ec7..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMSQLCli.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.scm.cli.SQLCLI;
-import org.apache.hadoop.ozone.web.handlers.BucketArgs;
-import org.apache.hadoop.ozone.web.handlers.KeyArgs;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
-
-import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class tests the CLI that transforms ksm.db into SQLite DB files.
- */
-@RunWith(Parameterized.class)
-public class TestKSMSQLCli {
-  private MiniOzoneCluster cluster = null;
-  private StorageHandler storageHandler;
-  private UserArgs userArgs;
-  private OzoneConfiguration conf;
-  private SQLCLI cli;
-
-  private String userName = "userTest";
-  private String adminName = "adminTest";
-  private String volumeName0 = "volumeTest0";
-  private String volumeName1 = "volumeTest1";
-  private String bucketName0 = "bucketTest0";
-  private String bucketName1 = "bucketTest1";
-  private String bucketName2 = "bucketTest2";
-  private String keyName0 = "key0";
-  private String keyName1 = "key1";
-  private String keyName2 = "key2";
-  private String keyName3 = "key3";
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() {
-    return Arrays.asList(new Object[][] {
-        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB},
-        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB}
-    });
-  }
-
-  private String metaStoreType;
-
-  public TestKSMSQLCli(String type) {
-    metaStoreType = type;
-  }
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true and
-   * OZONE_HANDLER_TYPE_KEY = "distributed"
-   *
-   * @throws IOException
-   */
-  @Before
-  public void setup() throws Exception {
-    conf = new OzoneConfiguration();
-    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
-        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    cluster = MiniOzoneCluster.newBuilder(conf).build();
-    cluster.waitForClusterToBeReady();
-    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
-        null, null, null, null);
-    cluster.waitForClusterToBeReady();
-
-    VolumeArgs createVolumeArgs0 = new VolumeArgs(volumeName0, userArgs);
-    createVolumeArgs0.setUserName(userName);
-    createVolumeArgs0.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs0);
-    VolumeArgs createVolumeArgs1 = new VolumeArgs(volumeName1, userArgs);
-    createVolumeArgs1.setUserName(userName);
-    createVolumeArgs1.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs1);
-
-    BucketArgs bucketArgs0 = new BucketArgs(volumeName0, bucketName0, userArgs);
-    storageHandler.createBucket(bucketArgs0);
-    BucketArgs bucketArgs1 = new BucketArgs(volumeName1, bucketName1, userArgs);
-    storageHandler.createBucket(bucketArgs1);
-    BucketArgs bucketArgs2 = new BucketArgs(volumeName0, bucketName2, userArgs);
-    storageHandler.createBucket(bucketArgs2);
-
-    KeyArgs keyArgs0 =
-        new KeyArgs(volumeName0, bucketName0, keyName0, userArgs);
-    keyArgs0.setSize(100);
-    KeyArgs keyArgs1 =
-        new KeyArgs(volumeName1, bucketName1, keyName1, userArgs);
-    keyArgs1.setSize(200);
-    KeyArgs keyArgs2 =
-        new KeyArgs(volumeName0, bucketName2, keyName2, userArgs);
-    keyArgs2.setSize(300);
-    KeyArgs keyArgs3 =
-        new KeyArgs(volumeName0, bucketName2, keyName3, userArgs);
-    keyArgs3.setSize(400);
-
-    OutputStream stream = storageHandler.newKeyWriter(keyArgs0);
-    stream.close();
-    stream = storageHandler.newKeyWriter(keyArgs1);
-    stream.close();
-    stream = storageHandler.newKeyWriter(keyArgs2);
-    stream.close();
-    stream = storageHandler.newKeyWriter(keyArgs3);
-    stream.close();
-
-    cluster.getKeySpaceManager().stop();
-    cluster.getStorageContainerManager().stop();
-    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, metaStoreType);
-    cli = new SQLCLI(conf);
-  }
-
-  @After
-  public void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testKSMDB() throws Exception {
-    String dbOutPath =  GenericTestUtils.getTempPath(
-        UUID.randomUUID() + "/out_sql.db");
-
-    String dbRootPath = conf.get(OzoneConfigKeys.OZONE_METADATA_DIRS);
-    String dbPath = dbRootPath + "/" + KSM_DB_NAME;
-    String[] args = {"-p", dbPath, "-o", dbOutPath};
-
-    cli.run(args);
-
-    Connection conn = connectDB(dbOutPath);
-    String sql = "SELECT * FROM volumeList";
-    ResultSet rs = executeQuery(conn, sql);
-    List<String> expectedValues =
-        new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
-    while (rs.next()) {
-      String userNameRs = rs.getString("userName");
-      String volumeNameRs = rs.getString("volumeName");
-      assertEquals(userName,  userNameRs.substring(1));
-      assertTrue(expectedValues.remove(volumeNameRs));
-    }
-    assertEquals(0, expectedValues.size());
-
-    sql = "SELECT * FROM volumeInfo";
-    rs = executeQuery(conn, sql);
-    expectedValues =
-        new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
-    while (rs.next()) {
-      String adName = rs.getString("adminName");
-      String ownerName = rs.getString("ownerName");
-      String volumeName = rs.getString("volumeName");
-      assertEquals(adminName, adName);
-      assertEquals(userName, ownerName);
-      assertTrue(expectedValues.remove(volumeName));
-    }
-    assertEquals(0, expectedValues.size());
-
-    sql = "SELECT * FROM aclInfo";
-    rs = executeQuery(conn, sql);
-    expectedValues =
-        new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
-    while (rs.next()) {
-      String adName = rs.getString("adminName");
-      String ownerName = rs.getString("ownerName");
-      String volumeName = rs.getString("volumeName");
-      String type = rs.getString("type");
-      String uName = rs.getString("userName");
-      String rights = rs.getString("rights");
-      assertEquals(adminName, adName);
-      assertEquals(userName, ownerName);
-      assertEquals("USER", type);
-      assertEquals(userName, uName);
-      assertEquals("READ_WRITE", rights);
-      assertTrue(expectedValues.remove(volumeName));
-    }
-    assertEquals(0, expectedValues.size());
-
-    sql = "SELECT * FROM bucketInfo";
-    rs = executeQuery(conn, sql);
-    HashMap<String, String> expectedMap = new HashMap<>();
-    expectedMap.put(bucketName0, volumeName0);
-    expectedMap.put(bucketName2, volumeName0);
-    expectedMap.put(bucketName1, volumeName1);
-    while (rs.next()) {
-      String volumeName = rs.getString("volumeName");
-      String bucketName = rs.getString("bucketName");
-      boolean versionEnabled = rs.getBoolean("versionEnabled");
-      String storegeType = rs.getString("storageType");
-      assertEquals(volumeName, expectedMap.remove(bucketName));
-      assertFalse(versionEnabled);
-      assertEquals("DISK", storegeType);
-    }
-    assertEquals(0, expectedMap.size());
-
-    sql = "SELECT * FROM keyInfo";
-    rs = executeQuery(conn, sql);
-    HashMap<String, List<String>> expectedMap2 = new HashMap<>();
-    // no data written, data size will be 0
-    expectedMap2.put(keyName0,
-        Arrays.asList(volumeName0, bucketName0, "0"));
-    expectedMap2.put(keyName1,
-        Arrays.asList(volumeName1, bucketName1, "0"));
-    expectedMap2.put(keyName2,
-        Arrays.asList(volumeName0, bucketName2, "0"));
-    expectedMap2.put(keyName3,
-        Arrays.asList(volumeName0, bucketName2, "0"));
-    while (rs.next()) {
-      String volumeName = rs.getString("volumeName");
-      String bucketName = rs.getString("bucketName");
-      String keyName = rs.getString("keyName");
-      int dataSize = rs.getInt("dataSize");
-      List<String> vals = expectedMap2.remove(keyName);
-      assertNotNull(vals);
-      assertEquals(vals.get(0), volumeName);
-      assertEquals(vals.get(1), bucketName);
-      assertEquals(vals.get(2), Integer.toString(dataSize));
-    }
-    assertEquals(0, expectedMap2.size());
-
-    conn.close();
-    Files.delete(Paths.get(dbOutPath));
-  }
-
-  private ResultSet executeQuery(Connection conn, String sql)
-      throws SQLException {
-    Statement stmt = conn.createStatement();
-    return stmt.executeQuery(sql);
-  }
-
-  private Connection connectDB(String dbPath) throws Exception {
-    Class.forName("org.sqlite.JDBC");
-    String connectPath =
-        String.format("jdbc:sqlite:%s", dbPath);
-    return DriverManager.getConnection(connectPath);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
deleted file mode 100644
index 8a16bfe..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ /dev/null
@@ -1,1350 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
-import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.ServicePort;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.web.handlers.BucketArgs;
-import org.apache.hadoop.ozone.web.handlers.KeyArgs;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.web.request.OzoneQuota;
-import org.apache.hadoop.ozone.web.response.BucketInfo;
-import org.apache.hadoop.ozone.web.response.KeyInfo;
-import org.apache.hadoop.ozone.web.response.VolumeInfo;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.Status;
-import org.apache.hadoop.ozone.web.handlers.ListArgs;
-import org.apache.hadoop.ozone.web.response.ListBuckets;
-import org.apache.hadoop.ozone.web.response.ListKeys;
-import org.apache.hadoop.ozone.web.response.ListVolumes;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.net.InetSocketAddress;
-import java.text.ParseException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
-import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CLIENT_ADDRESS_KEY;
-
-/**
- * Test Key Space Manager operation in distributed handler scenario.
- */
-public class TestKeySpaceManager {
-  private static MiniOzoneCluster cluster = null;
-  private static StorageHandler storageHandler;
-  private static UserArgs userArgs;
-  private static KSMMetrics ksmMetrics;
-  private static OzoneConfiguration conf;
-  private static String clusterId;
-  private static String scmId;
-  private static String ksmId;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true and
-   * OZONE_HANDLER_TYPE_KEY = "distributed"
-   *
-   * @throws IOException
-   */
-  @BeforeClass
-  public static void init() throws Exception {
-    conf = new OzoneConfiguration();
-    clusterId = UUID.randomUUID().toString();
-    scmId = UUID.randomUUID().toString();
-    ksmId = UUID.randomUUID().toString();
-    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
-        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
-    cluster =  MiniOzoneCluster.newBuilder(conf)
-        .setClusterId(clusterId)
-        .setScmId(scmId)
-        .setKsmId(ksmId)
-        .build();
-    cluster.waitForClusterToBeReady();
-    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
-        null, null, null, null);
-    ksmMetrics = cluster.getKeySpaceManager().getMetrics();
-  }
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @AfterClass
-  public static void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  // Create a volume and test its attribute after creating them
-  @Test(timeout = 60000)
-  public void testCreateVolume() throws IOException, OzoneException {
-    long volumeCreateFailCount = ksmMetrics.getNumVolumeCreateFails();
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
-    Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
-    Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
-    Assert.assertEquals(volumeCreateFailCount,
-        ksmMetrics.getNumVolumeCreateFails());
-  }
-
-  // Create a volume and modify the volume owner and then test its attributes
-  @Test(timeout = 60000)
-  public void testChangeVolumeOwner() throws IOException, OzoneException {
-    long volumeCreateFailCount = ksmMetrics.getNumVolumeCreateFails();
-    long volumeInfoFailCount = ksmMetrics.getNumVolumeInfoFails();
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    String newUserName = "user" + RandomStringUtils.randomNumeric(5);
-    createVolumeArgs.setUserName(newUserName);
-    storageHandler.setVolumeOwner(createVolumeArgs);
-
-    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
-
-    Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
-    Assert.assertFalse(retVolumeInfo.getOwner().getName().equals(userName));
-    Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(newUserName));
-    Assert.assertEquals(volumeCreateFailCount,
-        ksmMetrics.getNumVolumeCreateFails());
-    Assert.assertEquals(volumeInfoFailCount,
-        ksmMetrics.getNumVolumeInfoFails());
-  }
-
-  // Create a volume and modify the volume owner and then test its attributes
-  @Test(timeout = 60000)
-  public void testChangeVolumeQuota() throws IOException, OzoneException {
-    long numVolumeCreateFail = ksmMetrics.getNumVolumeCreateFails();
-    long numVolumeInfoFail = ksmMetrics.getNumVolumeInfoFails();
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    Random rand = new Random();
-
-    // Create a new volume with a quota
-    OzoneQuota createQuota =
-        new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    createVolumeArgs.setQuota(createQuota);
-    storageHandler.createVolume(createVolumeArgs);
-
-    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
-    Assert.assertEquals(createQuota.sizeInBytes(),
-        retVolumeInfo.getQuota().sizeInBytes());
-
-    // Set a new quota and test it
-    OzoneQuota setQuota =
-        new OzoneQuota(rand.nextInt(100), OzoneQuota.Units.GB);
-    createVolumeArgs.setQuota(setQuota);
-    storageHandler.setVolumeQuota(createVolumeArgs, false);
-    getVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
-    Assert.assertEquals(setQuota.sizeInBytes(),
-        retVolumeInfo.getQuota().sizeInBytes());
-
-    // Remove the quota and test it again
-    storageHandler.setVolumeQuota(createVolumeArgs, true);
-    getVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
-    Assert.assertEquals(OzoneConsts.MAX_QUOTA_IN_BYTES,
-        retVolumeInfo.getQuota().sizeInBytes());
-    Assert.assertEquals(numVolumeCreateFail,
-        ksmMetrics.getNumVolumeCreateFails());
-    Assert.assertEquals(numVolumeInfoFail,
-        ksmMetrics.getNumVolumeInfoFails());
-  }
-
-  // Create a volume and then delete it and then check for deletion
-  @Test(timeout = 60000)
-  public void testDeleteVolume() throws IOException, OzoneException {
-    long volumeCreateFailCount = ksmMetrics.getNumVolumeCreateFails();
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String volumeName1 = volumeName + "_A";
-    String volumeName2 = volumeName + "_AA";
-    VolumeArgs volumeArgs = null;
-    VolumeInfo volumeInfo = null;
-
-    // Create 2 empty volumes with same prefix.
-    volumeArgs = new VolumeArgs(volumeName1, userArgs);
-    volumeArgs.setUserName(userName);
-    volumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(volumeArgs);
-
-    volumeArgs = new VolumeArgs(volumeName2, userArgs);
-    volumeArgs.setUserName(userName);
-    volumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(volumeArgs);
-
-    volumeArgs  = new VolumeArgs(volumeName1, userArgs);
-    volumeInfo = storageHandler.getVolumeInfo(volumeArgs);
-    Assert.assertTrue(volumeInfo.getVolumeName().equals(volumeName1));
-    Assert.assertTrue(volumeInfo.getOwner().getName().equals(userName));
-    Assert.assertEquals(volumeCreateFailCount,
-        ksmMetrics.getNumVolumeCreateFails());
-
-    // Volume with _A should be able to delete as it is empty.
-    storageHandler.deleteVolume(volumeArgs);
-
-    // Make sure volume with _AA suffix still exists.
-    volumeArgs = new VolumeArgs(volumeName2, userArgs);
-    volumeInfo = storageHandler.getVolumeInfo(volumeArgs);
-    Assert.assertTrue(volumeInfo.getVolumeName().equals(volumeName2));
-
-    // Make sure volume with _A suffix is successfully deleted.
-    exception.expect(IOException.class);
-    exception.expectMessage("Info Volume failed, error:VOLUME_NOT_FOUND");
-    volumeArgs = new VolumeArgs(volumeName1, userArgs);
-    storageHandler.getVolumeInfo(volumeArgs);
-  }
-
-  // Create a volume and a bucket inside the volume,
-  // then delete it and then check for deletion failure
-  @Test(timeout = 60000)
-  public void testFailedDeleteVolume() throws IOException, OzoneException {
-    long numVolumeCreateFails = ksmMetrics.getNumVolumeCreateFails();
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    VolumeInfo retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
-    Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
-    Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
-    Assert.assertEquals(numVolumeCreateFails,
-        ksmMetrics.getNumVolumeCreateFails());
-
-    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
-    storageHandler.createBucket(bucketArgs);
-
-    try {
-      storageHandler.deleteVolume(createVolumeArgs);
-      Assert.fail("Expecting deletion should fail "
-          + "because volume is not empty");
-    } catch (IOException ex) {
-      Assert.assertEquals(ex.getMessage(),
-          "Delete Volume failed, error:VOLUME_NOT_EMPTY");
-    }
-    retVolumeInfo = storageHandler.getVolumeInfo(getVolumeArgs);
-    Assert.assertTrue(retVolumeInfo.getVolumeName().equals(volumeName));
-    Assert.assertTrue(retVolumeInfo.getOwner().getName().equals(userName));
-  }
-
-  // Create a volume and test Volume access for a different user
-  @Test(timeout = 60000)
-  public void testAccessVolume() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String[] groupName =
-        {"group" + RandomStringUtils.randomNumeric(5)};
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    createVolumeArgs.setGroups(groupName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, userName,
-        OzoneAcl.OzoneACLRights.READ_WRITE);
-    Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, userAcl));
-    OzoneAcl group = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, groupName[0],
-        OzoneAcl.OzoneACLRights.READ);
-    Assert.assertTrue(storageHandler.checkVolumeAccess(volumeName, group));
-
-    // Create a different user and access should fail
-    String falseUserName = "user" + RandomStringUtils.randomNumeric(5);
-    OzoneAcl falseUserAcl =
-        new OzoneAcl(OzoneAcl.OzoneACLType.USER, falseUserName,
-            OzoneAcl.OzoneACLRights.READ_WRITE);
-    Assert.assertFalse(storageHandler
-        .checkVolumeAccess(volumeName, falseUserAcl));
-    // Checking access with user name and Group Type should fail
-    OzoneAcl falseGroupAcl = new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, userName,
-        OzoneAcl.OzoneACLRights.READ_WRITE);
-    Assert.assertFalse(storageHandler
-        .checkVolumeAccess(volumeName, falseGroupAcl));
-
-    // Access for acl type world should also fail
-    OzoneAcl worldAcl =
-        new OzoneAcl(OzoneAcl.OzoneACLType.WORLD, "",
-            OzoneAcl.OzoneACLRights.READ);
-    Assert.assertFalse(storageHandler.checkVolumeAccess(volumeName, worldAcl));
-
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCheckAccessFails());
-    Assert.assertEquals(0, ksmMetrics.getNumVolumeCreateFails());
-  }
-
-  @Test(timeout = 60000)
-  public void testCreateBucket() throws IOException, OzoneException {
-    long numVolumeCreateFail = ksmMetrics.getNumVolumeCreateFails();
-    long numBucketCreateFail = ksmMetrics.getNumBucketCreateFails();
-    long numBucketInfoFail = ksmMetrics.getNumBucketInfoFails();
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
-    volumeArgs.setUserName(userName);
-    volumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(volumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
-    storageHandler.createBucket(bucketArgs);
-
-    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
-        userArgs);
-    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
-    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
-    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
-    Assert.assertEquals(numVolumeCreateFail,
-        ksmMetrics.getNumVolumeCreateFails());
-    Assert.assertEquals(numBucketCreateFail,
-        ksmMetrics.getNumBucketCreateFails());
-    Assert.assertEquals(numBucketInfoFail,
-        ksmMetrics.getNumBucketInfoFails());
-  }
-
-  @Test(timeout = 60000)
-  public void testDeleteBucket() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
-    volumeArgs.setUserName(userName);
-    volumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(volumeArgs);
-    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
-    storageHandler.createBucket(bucketArgs);
-    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
-        userArgs);
-    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
-    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
-    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
-    storageHandler.deleteBucket(bucketArgs);
-    exception.expect(IOException.class);
-    exception.expectMessage("Info Bucket failed, error: BUCKET_NOT_FOUND");
-    storageHandler.getBucketInfo(getBucketArgs);
-  }
-
-  @Test(timeout = 60000)
-  public void testDeleteNonExistingBucket() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
-    volumeArgs.setUserName(userName);
-    volumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(volumeArgs);
-    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
-    storageHandler.createBucket(bucketArgs);
-    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
-        userArgs);
-    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
-    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
-    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
-    BucketArgs newBucketArgs = new BucketArgs(
-        volumeName, bucketName + "_invalid", userArgs);
-    exception.expect(IOException.class);
-    exception.expectMessage("Delete Bucket failed, error:BUCKET_NOT_FOUND");
-    storageHandler.deleteBucket(newBucketArgs);
-  }
-
-
-  @Test(timeout = 60000)
-  public void testDeleteNonEmptyBucket() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
-    volumeArgs.setUserName(userName);
-    volumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(volumeArgs);
-    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
-    storageHandler.createBucket(bucketArgs);
-    BucketArgs getBucketArgs = new BucketArgs(volumeName, bucketName,
-        userArgs);
-    BucketInfo bucketInfo = storageHandler.getBucketInfo(getBucketArgs);
-    Assert.assertTrue(bucketInfo.getVolumeName().equals(volumeName));
-    Assert.assertTrue(bucketInfo.getBucketName().equals(bucketName));
-    String dataString = RandomStringUtils.randomAscii(100);
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(100);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-    exception.expect(IOException.class);
-    exception.expectMessage("Delete Bucket failed, error:BUCKET_NOT_EMPTY");
-    storageHandler.deleteBucket(bucketArgs);
-  }
-
-  /**
-   * Basic test of both putKey and getKey from KSM, as one can not be tested
-   * without the other.
-   *
-   * @throws IOException
-   * @throws OzoneException
-   */
-  @Test
-  public void testGetKeyWriterReader() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-    long numKeyAllocates = ksmMetrics.getNumKeyAllocates();
-    long numKeyLookups = ksmMetrics.getNumKeyLookups();
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    String dataString = RandomStringUtils.randomAscii(100);
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(100);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-    Assert.assertEquals(1 + numKeyAllocates, ksmMetrics.getNumKeyAllocates());
-
-    byte[] data = new byte[dataString.length()];
-    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
-      in.read(data);
-    }
-    Assert.assertEquals(dataString, DFSUtil.bytes2String(data));
-    Assert.assertEquals(1 + numKeyLookups, ksmMetrics.getNumKeyLookups());
-  }
-
-  /**
-   * Test write the same key twice, the second write should fail, as currently
-   * key overwrite is not supported.
-   *
-   * @throws IOException
-   * @throws OzoneException
-   */
-  @Test
-  public void testKeyOverwrite() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-    long numKeyAllocateFails = ksmMetrics.getNumKeyAllocateFails();
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(100);
-    String dataString = RandomStringUtils.randomAscii(100);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-
-    // We allow the key overwrite to be successful. Please note : Till
-    // HDFS-11922 is fixed this causes a data block leak on the data node side.
-    // That is this overwrite only overwrites the keys on KSM. We need to
-    // garbage collect those blocks from datanode.
-    KeyArgs keyArgs2 = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    storageHandler.newKeyWriter(keyArgs2);
-    Assert
-        .assertEquals(numKeyAllocateFails, ksmMetrics.getNumKeyAllocateFails());
-  }
-
-  /**
-   * Test get a non-exiting key.
-   *
-   * @throws IOException
-   * @throws OzoneException
-   */
-  @Test
-  public void testGetNonExistKey() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-    long numKeyLookupFails = ksmMetrics.getNumKeyLookupFails();
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    // try to get the key, should fail as it hasn't been created
-    exception.expect(IOException.class);
-    exception.expectMessage("KEY_NOT_FOUND");
-    storageHandler.newKeyReader(keyArgs);
-    Assert.assertEquals(1 + numKeyLookupFails,
-        ksmMetrics.getNumKeyLookupFails());
-  }
-
-  /**
-   * Test delete keys for ksm.
-   *
-   * @throws IOException
-   * @throws OzoneException
-   */
-  @Test
-  public void testDeleteKey() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-    long numKeyDeletes = ksmMetrics.getNumKeyDeletes();
-    long numKeyDeleteFails = ksmMetrics.getNumKeyDeletesFails();
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    storageHandler.createBucket(bucketArgs);
-
-    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
-    keyArgs.setSize(100);
-    String dataString = RandomStringUtils.randomAscii(100);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-
-    storageHandler.deleteKey(keyArgs);
-    Assert.assertEquals(1 + numKeyDeletes, ksmMetrics.getNumKeyDeletes());
-
-    // Make sure the deleted key has been renamed.
-    MetadataStore store = cluster.getKeySpaceManager().
-        getMetadataManager().getStore();
-    List<Map.Entry<byte[], byte[]>> list = store.getRangeKVs(null, 10,
-        new MetadataKeyFilters.KeyPrefixFilter()
-            .addFilter(DELETING_KEY_PREFIX));
-    Assert.assertEquals(1, list.size());
-
-    // Delete the key again to test deleting non-existing key.
-    try {
-      storageHandler.deleteKey(keyArgs);
-      Assert.fail("Expected exception not thrown.");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe.getMessage().contains("KEY_NOT_FOUND"));
-    }
-    Assert.assertEquals(1 + numKeyDeleteFails,
-        ksmMetrics.getNumKeyDeletesFails());
-  }
-
-  /**
-   * Test rename key for ksm.
-   *
-   * @throws IOException
-   * @throws OzoneException
-   */
-  @Test
-  public void testRenameKey() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-    long numKeyRenames = ksmMetrics.getNumKeyRenames();
-    long numKeyRenameFails = ksmMetrics.getNumKeyRenameFails();
-    int testRenameFails = 0;
-    int testRenames = 0;
-    IOException ioe = null;
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    storageHandler.createBucket(bucketArgs);
-
-    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
-    keyArgs.setSize(100);
-    String toKeyName = "key" + RandomStringUtils.randomNumeric(5);
-
-    // Rename from non-existent key should fail
-    try {
-      testRenames++;
-      storageHandler.renameKey(keyArgs, toKeyName);
-    } catch (IOException e) {
-      testRenameFails++;
-      ioe = e;
-    }
-    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
-
-    // Write the contents of the key to be renamed
-    String dataString = RandomStringUtils.randomAscii(100);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-
-    // Rename the key
-    toKeyName = "key" + RandomStringUtils.randomNumeric(5);
-    testRenames++;
-    storageHandler.renameKey(keyArgs, toKeyName);
-    Assert.assertEquals(numKeyRenames + testRenames,
-        ksmMetrics.getNumKeyRenames());
-    Assert.assertEquals(numKeyRenameFails + testRenameFails,
-        ksmMetrics.getNumKeyRenameFails());
-
-    // Try to get the key, should fail as it has been renamed
-    try {
-      storageHandler.newKeyReader(keyArgs);
-    } catch (IOException e) {
-      ioe = e;
-    }
-    Assert.assertTrue(ioe.getMessage().contains("KEY_NOT_FOUND"));
-
-    // Verify the contents of the renamed key
-    keyArgs = new KeyArgs(toKeyName, bucketArgs);
-    InputStream in = storageHandler.newKeyReader(keyArgs);
-    byte[] b = new byte[dataString.getBytes().length];
-    in.read(b);
-    Assert.assertEquals(new String(b), dataString);
-
-    // Rewrite the renamed key. Rename to key which already exists should fail.
-    keyArgs = new KeyArgs(keyName, bucketArgs);
-    keyArgs.setSize(100);
-    dataString = RandomStringUtils.randomAscii(100);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-      stream.close();
-      testRenames++;
-      storageHandler.renameKey(keyArgs, toKeyName);
-    } catch (IOException e) {
-      testRenameFails++;
-      ioe = e;
-    }
-    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
-
-    // Rename to empty string should fail
-    toKeyName = "";
-    try {
-      testRenames++;
-      storageHandler.renameKey(keyArgs, toKeyName);
-    } catch (IOException e) {
-      testRenameFails++;
-      ioe = e;
-    }
-    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
-
-    // Rename from empty string should fail
-    keyArgs = new KeyArgs("", bucketArgs);
-    toKeyName = "key" + RandomStringUtils.randomNumeric(5);
-    try {
-      testRenames++;
-      storageHandler.renameKey(keyArgs, toKeyName);
-    } catch (IOException e) {
-      testRenameFails++;
-      ioe = e;
-    }
-    Assert.assertTrue(ioe.getMessage().contains("Rename key failed, error"));
-
-    Assert.assertEquals(numKeyRenames + testRenames,
-        ksmMetrics.getNumKeyRenames());
-    Assert.assertEquals(numKeyRenameFails + testRenameFails,
-        ksmMetrics.getNumKeyRenameFails());
-  }
-
-  @Test(timeout = 60000)
-  public void testListBuckets() throws IOException, OzoneException {
-    ListBuckets result = null;
-    ListArgs listBucketArgs = null;
-
-    // Create volume - volA.
-    final String volAname = "volA";
-    VolumeArgs volAArgs = new VolumeArgs(volAname, userArgs);
-    volAArgs.setUserName("userA");
-    volAArgs.setAdminName("adminA");
-    storageHandler.createVolume(volAArgs);
-
-    // Create 20 buckets in volA for tests.
-    for (int i=0; i<10; i++) {
-      // Create "/volA/aBucket_0" to "/volA/aBucket_9" buckets in volA volume.
-      BucketArgs aBuckets = new BucketArgs(volAname,
-          "aBucket_" + i, userArgs);
-      if(i % 3 == 0) {
-        aBuckets.setStorageType(StorageType.ARCHIVE);
-      } else {
-        aBuckets.setStorageType(StorageType.DISK);
-      }
-      storageHandler.createBucket(aBuckets);
-
-      // Create "/volA/bBucket_0" to "/volA/bBucket_9" buckets in volA volume.
-      BucketArgs bBuckets = new BucketArgs(volAname,
-          "bBucket_" + i, userArgs);
-      if(i % 3 == 0) {
-        bBuckets.setStorageType(StorageType.RAM_DISK);
-      } else {
-        bBuckets.setStorageType(StorageType.SSD);
-      }
-      storageHandler.createBucket(bBuckets);
-    }
-
-    VolumeArgs volArgs = new VolumeArgs(volAname, userArgs);
-
-    // List all buckets in volA.
-    listBucketArgs = new ListArgs(volArgs, null, 100, null);
-    result = storageHandler.listBuckets(listBucketArgs);
-    Assert.assertEquals(20, result.getBuckets().size());
-    List<BucketInfo> archiveBuckets = result.getBuckets().stream()
-        .filter(item -> item.getStorageType() == StorageType.ARCHIVE)
-        .collect(Collectors.toList());
-    Assert.assertEquals(4, archiveBuckets.size());
-
-    // List buckets with prefix "aBucket".
-    listBucketArgs = new ListArgs(volArgs, "aBucket", 100, null);
-    result = storageHandler.listBuckets(listBucketArgs);
-    Assert.assertEquals(10, result.getBuckets().size());
-    Assert.assertTrue(result.getBuckets().stream()
-        .allMatch(entry -> entry.getBucketName().startsWith("aBucket")));
-
-    // List a certain number of buckets.
-    listBucketArgs = new ListArgs(volArgs, null, 3, null);
-    result = storageHandler.listBuckets(listBucketArgs);
-    Assert.assertEquals(3, result.getBuckets().size());
-    Assert.assertEquals("aBucket_0",
-        result.getBuckets().get(0).getBucketName());
-    Assert.assertEquals("aBucket_1",
-        result.getBuckets().get(1).getBucketName());
-    Assert.assertEquals("aBucket_2",
-        result.getBuckets().get(2).getBucketName());
-
-    // List a certain number of buckets from the startKey.
-    listBucketArgs = new ListArgs(volArgs, null, 2, "bBucket_3");
-    result = storageHandler.listBuckets(listBucketArgs);
-    Assert.assertEquals(2, result.getBuckets().size());
-    Assert.assertEquals("bBucket_4",
-        result.getBuckets().get(0).getBucketName());
-    Assert.assertEquals("bBucket_5",
-        result.getBuckets().get(1).getBucketName());
-
-    // Provide an invalid bucket name as start key.
-    listBucketArgs = new ListArgs(volArgs, null, 100, "unknown_bucket_name");
-    ListBuckets buckets = storageHandler.listBuckets(listBucketArgs);
-    Assert.assertEquals(buckets.getBuckets().size(), 0);
-
-    // Use all arguments.
-    listBucketArgs = new ListArgs(volArgs, "b", 5, "bBucket_7");
-    result = storageHandler.listBuckets(listBucketArgs);
-    Assert.assertEquals(2, result.getBuckets().size());
-    Assert.assertEquals("bBucket_8",
-        result.getBuckets().get(0).getBucketName());
-    Assert.assertEquals("bBucket_9",
-        result.getBuckets().get(1).getBucketName());
-
-    // Provide an invalid maxKeys argument.
-    try {
-      listBucketArgs = new ListArgs(volArgs, null, -1, null);
-      storageHandler.listBuckets(listBucketArgs);
-      Assert.fail("Expecting an error when the given"
-          + " maxKeys argument is invalid.");
-    } catch (Exception e) {
-      Assert.assertTrue(e.getMessage()
-          .contains(String.format("the value must be in range (0, %d]",
-              OzoneConsts.MAX_LISTBUCKETS_SIZE)));
-    }
-
-    // Provide an invalid volume name.
-    VolumeArgs invalidVolArgs = new VolumeArgs("invalid_name", userArgs);
-    try {
-      listBucketArgs = new ListArgs(invalidVolArgs, null, 100, null);
-      storageHandler.listBuckets(listBucketArgs);
-      Assert.fail("Expecting an error when the given volume name is invalid.");
-    } catch (Exception e) {
-      Assert.assertTrue(e instanceof IOException);
-      Assert.assertTrue(e.getMessage()
-          .contains(Status.VOLUME_NOT_FOUND.name()));
-    }
-  }
-
-  /**
-   * Test list keys.
-   * @throws IOException
-   * @throws OzoneException
-   */
-  @Test
-  public void testListKeys() throws IOException, OzoneException {
-    ListKeys result = null;
-    ListArgs listKeyArgs = null;
-
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    // Write 20 keys in bucket.
-    int numKeys = 20;
-    String keyName = "Key";
-    KeyArgs keyArgs = null;
-    for (int i = 0; i < numKeys; i++) {
-      if (i % 2 == 0) {
-        // Create /volume/bucket/aKey[0,2,4,...,18] in bucket.
-        keyArgs = new KeyArgs("a" + keyName + i, bucketArgs);
-      } else {
-        // Create /volume/bucket/bKey[1,3,5,...,19] in bucket.
-        keyArgs = new KeyArgs("b" + keyName + i, bucketArgs);
-      }
-      keyArgs.setSize(4096);
-
-      // Just for testing list keys call, so no need to write real data.
-      OutputStream stream = storageHandler.newKeyWriter(keyArgs);
-      stream.close();
-    }
-
-    // List all keys in bucket.
-    bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
-    listKeyArgs = new ListArgs(bucketArgs, null, 100, null);
-    result = storageHandler.listKeys(listKeyArgs);
-    Assert.assertEquals(numKeys, result.getKeyList().size());
-
-    // List keys with prefix "aKey".
-    listKeyArgs = new ListArgs(bucketArgs, "aKey", 100, null);
-    result = storageHandler.listKeys(listKeyArgs);
-    Assert.assertEquals(numKeys / 2, result.getKeyList().size());
-    Assert.assertTrue(result.getKeyList().stream()
-        .allMatch(entry -> entry.getKeyName().startsWith("aKey")));
-
-    // List a certain number of keys.
-    listKeyArgs = new ListArgs(bucketArgs, null, 3, null);
-    result = storageHandler.listKeys(listKeyArgs);
-    Assert.assertEquals(3, result.getKeyList().size());
-    Assert.assertEquals("aKey0",
-        result.getKeyList().get(0).getKeyName());
-    Assert.assertEquals("aKey10",
-        result.getKeyList().get(1).getKeyName());
-    Assert.assertEquals("aKey12",
-        result.getKeyList().get(2).getKeyName());
-
-    // List a certain number of keys from the startKey.
-    listKeyArgs = new ListArgs(bucketArgs, null, 2, "bKey1");
-    result = storageHandler.listKeys(listKeyArgs);
-    Assert.assertEquals(2, result.getKeyList().size());
-    Assert.assertEquals("bKey11",
-        result.getKeyList().get(0).getKeyName());
-    Assert.assertEquals("bKey13",
-        result.getKeyList().get(1).getKeyName());
-
-    // Provide an invalid key name as start key.
-    listKeyArgs = new ListArgs(bucketArgs, null, 100, "invalid_start_key");
-    ListKeys keys = storageHandler.listKeys(listKeyArgs);
-    Assert.assertEquals(keys.getKeyList().size(), 0);
-
-    // Provide an invalid maxKeys argument.
-    try {
-      listKeyArgs = new ListArgs(bucketArgs, null, -1, null);
-      storageHandler.listBuckets(listKeyArgs);
-      Assert.fail("Expecting an error when the given"
-          + " maxKeys argument is invalid.");
-    } catch (Exception e) {
-      GenericTestUtils.assertExceptionContains(
-          String.format("the value must be in range (0, %d]",
-              OzoneConsts.MAX_LISTKEYS_SIZE), e);
-    }
-
-    // Provide an invalid bucket name.
-    bucketArgs = new BucketArgs("invalid_bucket", createVolumeArgs);
-    try {
-      listKeyArgs = new ListArgs(bucketArgs, null, numKeys, null);
-      storageHandler.listKeys(listKeyArgs);
-      Assert.fail(
-          "Expecting an error when the given bucket name is invalid.");
-    } catch (IOException e) {
-      GenericTestUtils.assertExceptionContains(
-          Status.BUCKET_NOT_FOUND.name(), e);
-    }
-  }
-
-  @Test
-  public void testListVolumes() throws IOException, OzoneException {
-
-    String user0 = "testListVolumes-user-0";
-    String user1 = "testListVolumes-user-1";
-    String adminUser = "testListVolumes-admin";
-    ListArgs listVolumeArgs;
-    ListVolumes volumes;
-
-    // Create 10 volumes by user0 and user1
-    String[] user0vols = new String[10];
-    String[] user1vols = new String[10];
-    for (int i =0; i<10; i++) {
-      VolumeArgs createVolumeArgs;
-      String user0VolName = "Vol-" + user0 + "-" + i;
-      user0vols[i] = user0VolName;
-      createVolumeArgs = new VolumeArgs(user0VolName, userArgs);
-      createVolumeArgs.setUserName(user0);
-      createVolumeArgs.setAdminName(adminUser);
-      createVolumeArgs.setQuota(new OzoneQuota(i, OzoneQuota.Units.GB));
-      storageHandler.createVolume(createVolumeArgs);
-
-      String user1VolName = "Vol-" + user1 + "-" + i;
-      user1vols[i] = user1VolName;
-      createVolumeArgs = new VolumeArgs(user1VolName, userArgs);
-      createVolumeArgs.setUserName(user1);
-      createVolumeArgs.setAdminName(adminUser);
-      createVolumeArgs.setQuota(new OzoneQuota(i, OzoneQuota.Units.GB));
-      storageHandler.createVolume(createVolumeArgs);
-    }
-
-    // Test list all volumes
-    UserArgs userArgs0 = new UserArgs(user0, OzoneUtils.getRequestID(),
-        null, null, null, null);
-    listVolumeArgs = new ListArgs(userArgs0, "Vol-testListVolumes", 100, null);
-    listVolumeArgs.setRootScan(true);
-    volumes = storageHandler.listVolumes(listVolumeArgs);
-    Assert.assertEquals(20, volumes.getVolumes().size());
-
-    // Test list all volumes belongs to an user
-    listVolumeArgs = new ListArgs(userArgs0, null, 100, null);
-    listVolumeArgs.setRootScan(false);
-    volumes = storageHandler.listVolumes(listVolumeArgs);
-    Assert.assertEquals(10, volumes.getVolumes().size());
-
-    // Test prefix
-    listVolumeArgs = new ListArgs(userArgs0,
-        "Vol-" + user0 + "-3", 100, null);
-    volumes = storageHandler.listVolumes(listVolumeArgs);
-    Assert.assertEquals(1, volumes.getVolumes().size());
-    Assert.assertEquals(user0vols[3],
-        volumes.getVolumes().get(0).getVolumeName());
-    Assert.assertEquals(user0,
-        volumes.getVolumes().get(0).getOwner().getName());
-
-    // Test list volumes by user
-    UserArgs userArgs1 = new UserArgs(user1, OzoneUtils.getRequestID(),
-        null, null, null, null);
-    listVolumeArgs = new ListArgs(userArgs1, null, 100, null);
-    listVolumeArgs.setRootScan(false);
-    volumes = storageHandler.listVolumes(listVolumeArgs);
-    Assert.assertEquals(10, volumes.getVolumes().size());
-    Assert.assertEquals(user1,
-        volumes.getVolumes().get(3).getOwner().getName());
-
-    // Make sure all available fields are returned
-    final String user0vol4 = "Vol-" + user0 + "-4";
-    final String user0vol5 = "Vol-" + user0 + "-5";
-    listVolumeArgs = new ListArgs(userArgs0, null, 1, user0vol4);
-    listVolumeArgs.setRootScan(false);
-    volumes = storageHandler.listVolumes(listVolumeArgs);
-    Assert.assertEquals(1, volumes.getVolumes().size());
-    Assert.assertEquals(user0,
-        volumes.getVolumes().get(0).getOwner().getName());
-    Assert.assertEquals(user0vol5,
-        volumes.getVolumes().get(0).getVolumeName());
-    Assert.assertEquals(5,
-        volumes.getVolumes().get(0).getQuota().getSize());
-    Assert.assertEquals(OzoneQuota.Units.GB,
-        volumes.getVolumes().get(0).getQuota().getUnit());
-
-    // User doesn't have volumes
-    UserArgs userArgsX = new UserArgs("unknwonUser", OzoneUtils.getRequestID(),
-        null, null, null, null);
-    listVolumeArgs = new ListArgs(userArgsX, null, 100, null);
-    listVolumeArgs.setRootScan(false);
-    volumes = storageHandler.listVolumes(listVolumeArgs);
-    Assert.assertEquals(0, volumes.getVolumes().size());
-  }
-
-  /**
-   * Test get key information.
-   *
-   * @throws IOException
-   * @throws OzoneException
-   */
-  @Test
-  public void testGetKeyInfo() throws IOException,
-      OzoneException, ParseException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    long currentTime = Time.now();
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    String keyName = "testKey";
-    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
-    keyArgs.setSize(4096);
-
-
-    OutputStream stream = storageHandler.newKeyWriter(keyArgs);
-    stream.close();
-
-    KeyInfo keyInfo = storageHandler.getKeyInfo(keyArgs);
-    // Compare the time in second unit since the date string reparsed to
-    // millisecond will lose precision.
-    Assert.assertTrue(
-        (HddsClientUtils.formatDateTime(keyInfo.getCreatedOn()) / 1000) >= (
-            currentTime / 1000));
-    Assert.assertTrue(
-        (HddsClientUtils.formatDateTime(keyInfo.getModifiedOn()) / 1000) >= (
-            currentTime / 1000));
-    Assert.assertEquals(keyName, keyInfo.getKeyName());
-    // with out data written, the size would be 0
-    Assert.assertEquals(0, keyInfo.getSize());
-  }
-
-  /**
-   * Test that the write can proceed without having to set the right size.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testWriteSize() throws IOException, OzoneException {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    String dataString = RandomStringUtils.randomAscii(100);
-    // write a key without specifying size at all
-    String keyName = "testKey";
-    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-    byte[] data = new byte[dataString.length()];
-    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
-      in.read(data);
-    }
-    Assert.assertEquals(dataString, DFSUtil.bytes2String(data));
-
-    // write a key with a size, but write above it.
-    String keyName1 = "testKey1";
-    KeyArgs keyArgs1 = new KeyArgs(keyName1, bucketArgs);
-    keyArgs1.setSize(30);
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs1)) {
-      stream.write(dataString.getBytes());
-    }
-    byte[] data1 = new byte[dataString.length()];
-    try (InputStream in = storageHandler.newKeyReader(keyArgs1)) {
-      in.read(data1);
-    }
-    Assert.assertEquals(dataString, DFSUtil.bytes2String(data1));
-  }
-
-  /**
-   * Tests the RPC call for getting scmId and clusterId from SCM.
-   * @throws IOException
-   */
-  @Test
-  public void testGetScmInfo() throws IOException {
-    ScmInfo info = cluster.getKeySpaceManager().getScmInfo();
-    Assert.assertEquals(clusterId, info.getClusterId());
-    Assert.assertEquals(scmId, info.getScmId());
-  }
-
-
-  @Test
-  public void testExpiredOpenKey() throws Exception {
-    BackgroundService openKeyCleanUpService = ((KeyManagerImpl)cluster
-        .getKeySpaceManager().getKeyManager()).getOpenKeyCleanupService();
-
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    // open some keys.
-
-    KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs);
-    KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs);
-    KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs);
-    KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs);
-    List<BlockGroup> openKeys;
-    storageHandler.newKeyWriter(keyArgs1);
-    storageHandler.newKeyWriter(keyArgs2);
-    storageHandler.newKeyWriter(keyArgs3);
-    storageHandler.newKeyWriter(keyArgs4);
-
-    Set<String> expected = Stream.of(
-        "testKey1", "testKey2", "testKey3", "testKey4")
-        .collect(Collectors.toSet());
-
-    // Now all k1-k4 should be in open state, so ExpiredOpenKeys should not
-    // contain these values.
-    openKeys = cluster.getKeySpaceManager()
-        .getMetadataManager().getExpiredOpenKeys();
-
-    for (BlockGroup bg : openKeys) {
-      String[] subs = bg.getGroupID().split("/");
-      String keyName = subs[subs.length - 1];
-      Assert.assertFalse(expected.contains(keyName));
-    }
-
-    Thread.sleep(2000);
-    // Now all k1-k4 should be in ExpiredOpenKeys
-    openKeys = cluster.getKeySpaceManager()
-        .getMetadataManager().getExpiredOpenKeys();
-    for (BlockGroup bg : openKeys) {
-      String[] subs = bg.getGroupID().split("/");
-      String keyName = subs[subs.length - 1];
-      if (expected.contains(keyName)) {
-        expected.remove(keyName);
-      }
-    }
-    Assert.assertEquals(0, expected.size());
-
-    KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs);
-    storageHandler.newKeyWriter(keyArgs5);
-
-    openKeyCleanUpService.triggerBackgroundTaskForTesting();
-    Thread.sleep(2000);
-    // now all k1-k4 should have been removed by the clean-up task, only k5
-    // should be present in ExpiredOpenKeys.
-    openKeys =
-        cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
-    System.out.println(openKeys);
-    boolean key5found = false;
-    Set<String> removed = Stream.of(
-        "testKey1", "testKey2", "testKey3", "testKey4")
-        .collect(Collectors.toSet());
-    for (BlockGroup bg : openKeys) {
-      String[] subs = bg.getGroupID().split("/");
-      String keyName = subs[subs.length - 1];
-      Assert.assertFalse(removed.contains(keyName));
-      if (keyName.equals("testKey5")) {
-        key5found = true;
-      }
-    }
-    Assert.assertTrue(key5found);
-  }
-
-  /**
-   * Tests the KSM Initialization.
-   * @throws IOException
-   */
-  @Test
-  public void testKSMInitialization() throws IOException {
-    // Read the version file info from KSM version file
-    KSMStorage ksmStorage = cluster.getKeySpaceManager().getKsmStorage();
-    SCMStorage scmStorage = new SCMStorage(conf);
-    // asserts whether cluster Id and SCM ID are properly set in SCM Version
-    // file.
-    Assert.assertEquals(clusterId, scmStorage.getClusterID());
-    Assert.assertEquals(scmId, scmStorage.getScmId());
-    // asserts whether KSM Id is properly set in KSM Version file.
-    Assert.assertEquals(ksmId, ksmStorage.getKsmId());
-    // asserts whether the SCM info is correct in KSM Version file.
-    Assert.assertEquals(clusterId, ksmStorage.getClusterID());
-    Assert.assertEquals(scmId, ksmStorage.getScmId());
-  }
-
-  /**
-   * Tests the KSM Initialization Failure.
-   * @throws IOException
-   */
-  @Test
-  public void testKSMInitializationFailure() throws Exception {
-    OzoneConfiguration config = new OzoneConfiguration();
-    final String path =
-        GenericTestUtils.getTempPath(UUID.randomUUID().toString());
-    Path metaDirPath = Paths.get(path, "ksm-meta");
-    config.set(OzoneConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
-    config.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
-    config.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
-    config.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
-        conf.get(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY));
-    exception.expect(KSMException.class);
-    exception.expectMessage("KSM not initialized.");
-    KeySpaceManager.createKSM(null, config);
-    KSMStorage ksmStore = new KSMStorage(config);
-    ksmStore.setClusterId("testClusterId");
-    ksmStore.setScmId("testScmId");
-    // writes the version file properties
-    ksmStore.initialize();
-    exception.expect(KSMException.class);
-    exception.expectMessage("SCM version info mismatch.");
-    KeySpaceManager.createKSM(null, conf);
-  }
-
-  @Test
-  public void testGetServiceList() throws IOException {
-    long numGetServiceListCalls = ksmMetrics.getNumGetServiceLists();
-    List<ServiceInfo> services = cluster.getKeySpaceManager().getServiceList();
-
-    Assert.assertEquals(numGetServiceListCalls + 1,
-        ksmMetrics.getNumGetServiceLists());
-
-    ServiceInfo ksmInfo = services.stream().filter(
-        a -> a.getNodeType().equals(HddsProtos.NodeType.KSM))
-        .collect(Collectors.toList()).get(0);
-    InetSocketAddress ksmAddress = new InetSocketAddress(ksmInfo.getHostname(),
-        ksmInfo.getPort(ServicePort.Type.RPC));
-    Assert.assertEquals(NetUtils.createSocketAddr(
-        conf.get(OZONE_KSM_ADDRESS_KEY)), ksmAddress);
-
-    ServiceInfo scmInfo = services.stream().filter(
-        a -> a.getNodeType().equals(HddsProtos.NodeType.SCM))
-        .collect(Collectors.toList()).get(0);
-    InetSocketAddress scmAddress = new InetSocketAddress(scmInfo.getHostname(),
-        scmInfo.getPort(ServicePort.Type.RPC));
-    Assert.assertEquals(NetUtils.createSocketAddr(
-        conf.get(OZONE_SCM_CLIENT_ADDRESS_KEY)), scmAddress);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManagerRestInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManagerRestInterface.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManagerRestInterface.java
deleted file mode 100644
index feb83d3..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManagerRestInterface.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.ServicePort;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
-import static org.apache.hadoop.ozone.KsmUtils.getKsmAddressForClients;
-
-/**
- * This class is to test the REST interface exposed by KeySpaceManager.
- */
-public class TestKeySpaceManagerRestInterface {
-
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).build();
-    cluster.waitForClusterToBeReady();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testGetServiceList() throws Exception {
-    KeySpaceManagerHttpServer server =
-        cluster.getKeySpaceManager().getHttpServer();
-    HttpClient client = HttpClients.createDefault();
-    String connectionUri = "http://" +
-        NetUtils.getHostPortString(server.getHttpAddress());
-    HttpGet httpGet = new HttpGet(connectionUri + "/serviceList");
-    HttpResponse response = client.execute(httpGet);
-    String serviceListJson = EntityUtils.toString(response.getEntity());
-
-    ObjectMapper objectMapper = new ObjectMapper();
-    TypeReference<List<ServiceInfo>> serviceInfoReference =
-        new TypeReference<List<ServiceInfo>>() {};
-    List<ServiceInfo> serviceInfos = objectMapper.readValue(
-        serviceListJson, serviceInfoReference);
-    Map<HddsProtos.NodeType, ServiceInfo> serviceMap = new HashMap<>();
-    for (ServiceInfo serviceInfo : serviceInfos) {
-      serviceMap.put(serviceInfo.getNodeType(), serviceInfo);
-    }
-
-    InetSocketAddress ksmAddress =
-        getKsmAddressForClients(conf);
-    ServiceInfo ksmInfo = serviceMap.get(HddsProtos.NodeType.KSM);
-
-    Assert.assertEquals(ksmAddress.getHostName(), ksmInfo.getHostname());
-    Assert.assertEquals(ksmAddress.getPort(),
-        ksmInfo.getPort(ServicePort.Type.RPC));
-    Assert.assertEquals(server.getHttpAddress().getPort(),
-        ksmInfo.getPort(ServicePort.Type.HTTP));
-
-    InetSocketAddress scmAddress =
-        getScmAddressForClients(conf);
-    ServiceInfo scmInfo = serviceMap.get(HddsProtos.NodeType.SCM);
-
-    Assert.assertEquals(scmAddress.getHostName(), scmInfo.getHostname());
-    Assert.assertEquals(scmAddress.getPort(),
-        scmInfo.getPort(ServicePort.Type.RPC));
-
-    ServiceInfo datanodeInfo = serviceMap.get(HddsProtos.NodeType.DATANODE);
-    DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
-        .getDatanodeDetails();
-    Assert.assertEquals(datanodeDetails.getHostName(),
-        datanodeInfo.getHostname());
-
-    Map<ServicePort.Type, Integer> ports = datanodeInfo.getPorts();
-    for(ServicePort.Type type : ports.keySet()) {
-      switch (type) {
-      case HTTP:
-      case HTTPS:
-        Assert.assertEquals(
-            datanodeDetails.getPort(DatanodeDetails.Port.Name.REST).getValue(),
-            ports.get(type));
-        break;
-      default:
-        // KSM only sends Datanode's info port details
-        // i.e. HTTP or HTTPS
-        // Other ports are not expected as of now.
-        Assert.fail();
-        break;
-      }
-    }
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org