You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/07/11 23:10:08 UTC

[08/56] [abbrv] hadoop git commit: HDDS-167. Rename KeySpaceManager to OzoneManager. Contributed by Arpit Agarwal.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java
deleted file mode 100644
index 15c3fd3..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.web.handlers.BucketArgs;
-import org.apache.hadoop.ozone.web.handlers.KeyArgs;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class tests the versioning of blocks from KSM side.
- */
-public class TestKsmBlockVersioning {
-  private static MiniOzoneCluster cluster = null;
-  private static UserArgs userArgs;
-  private static OzoneConfiguration conf;
-  private static KeySpaceManager keySpaceManager;
-  private static StorageHandler storageHandler;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true and
-   * OZONE_HANDLER_TYPE_KEY = "distributed"
-   *
-   * @throws IOException
-   */
-  @BeforeClass
-  public static void init() throws Exception {
-    conf = new OzoneConfiguration();
-    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
-        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    cluster = MiniOzoneCluster.newBuilder(conf).build();
-    cluster.waitForClusterToBeReady();
-    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
-        null, null, null, null);
-    keySpaceManager = cluster.getKeySpaceManager();
-  }
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @AfterClass
-  public static void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testAllocateCommit() throws Exception {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .setDataSize(1000)
-        .build();
-
-    // 1st update, version 0
-    OpenKeySession openKey = keySpaceManager.openKey(keyArgs);
-    keySpaceManager.commitKey(keyArgs, openKey.getId());
-
-    KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs);
-    KsmKeyLocationInfoGroup highestVersion =
-        checkVersions(keyInfo.getKeyLocationVersions());
-    assertEquals(0, highestVersion.getVersion());
-    assertEquals(1, highestVersion.getLocationList().size());
-
-    // 2nd update, version 1
-    openKey = keySpaceManager.openKey(keyArgs);
-    //KsmKeyLocationInfo locationInfo =
-    //    keySpaceManager.allocateBlock(keyArgs, openKey.getId());
-    keySpaceManager.commitKey(keyArgs, openKey.getId());
-
-    keyInfo = keySpaceManager.lookupKey(keyArgs);
-    highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
-    assertEquals(1, highestVersion.getVersion());
-    assertEquals(2, highestVersion.getLocationList().size());
-
-    // 3rd update, version 2
-    openKey = keySpaceManager.openKey(keyArgs);
-    // this block will be appended to the latest version of version 2.
-    keySpaceManager.allocateBlock(keyArgs, openKey.getId());
-    keySpaceManager.commitKey(keyArgs, openKey.getId());
-
-    keyInfo = keySpaceManager.lookupKey(keyArgs);
-    highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
-    assertEquals(2, highestVersion.getVersion());
-    assertEquals(4, highestVersion.getLocationList().size());
-  }
-
-  private KsmKeyLocationInfoGroup checkVersions(
-      List<KsmKeyLocationInfoGroup> versions) {
-    KsmKeyLocationInfoGroup currentVersion = null;
-    for (KsmKeyLocationInfoGroup version : versions) {
-      if (currentVersion != null) {
-        assertEquals(currentVersion.getVersion() + 1, version.getVersion());
-        for (KsmKeyLocationInfo info : currentVersion.getLocationList()) {
-          boolean found = false;
-          // all the blocks from the previous version must present in the next
-          // version
-          for (KsmKeyLocationInfo info2 : version.getLocationList()) {
-            if (info.getLocalID() == info2.getLocalID()) {
-              found = true;
-              break;
-            }
-          }
-          assertTrue(found);
-        }
-      }
-      currentVersion = version;
-    }
-    return currentVersion;
-  }
-
-  @Test
-  public void testReadLatestVersion() throws Exception {
-
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .setDataSize(1000)
-        .build();
-
-    String dataString = RandomStringUtils.randomAlphabetic(100);
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    // this write will create 1st version with one block
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-    byte[] data = new byte[dataString.length()];
-    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
-      in.read(data);
-    }
-    KsmKeyInfo keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
-    assertEquals(dataString, DFSUtil.bytes2String(data));
-    assertEquals(0, keyInfo.getLatestVersionLocations().getVersion());
-    assertEquals(1,
-        keyInfo.getLatestVersionLocations().getLocationList().size());
-
-    // this write will create 2nd version, 2nd version will contain block from
-    // version 1, and add a new block
-    dataString = RandomStringUtils.randomAlphabetic(10);
-    data = new byte[dataString.length()];
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
-      in.read(data);
-    }
-    keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
-    assertEquals(dataString, DFSUtil.bytes2String(data));
-    assertEquals(1, keyInfo.getLatestVersionLocations().getVersion());
-    assertEquals(2,
-        keyInfo.getLatestVersionLocations().getLocationList().size());
-
-    dataString = RandomStringUtils.randomAlphabetic(200);
-    data = new byte[dataString.length()];
-    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
-      stream.write(dataString.getBytes());
-    }
-    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
-      in.read(data);
-    }
-    keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
-    assertEquals(dataString, DFSUtil.bytes2String(data));
-    assertEquals(2, keyInfo.getLatestVersionLocations().getVersion());
-    assertEquals(3,
-        keyInfo.getLatestVersionLocations().getLocationList().size());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
deleted file mode 100644
index 1cb6e82..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.web.handlers.BucketArgs;
-import org.apache.hadoop.ozone.web.handlers.KeyArgs;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.LinkedList;
-
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test key write/read where a key can span multiple containers.
- */
-public class TestMultipleContainerReadWrite {
-  private static MiniOzoneCluster cluster = null;
-  private static StorageHandler storageHandler;
-  private static UserArgs userArgs;
-  private static OzoneConfiguration conf;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true and
-   * OZONE_HANDLER_TYPE_KEY = "distributed"
-   *
-   * @throws IOException
-   */
-  @BeforeClass
-  public static void init() throws Exception {
-    conf = new OzoneConfiguration();
-    // set to as small as 100 bytes per block.
-    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 1);
-    conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
-    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
-        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
-    cluster = MiniOzoneCluster.newBuilder(conf).build();
-    cluster.waitForClusterToBeReady();
-    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
-        null, null, null, null);
-  }
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @AfterClass
-  public static void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  @Test
-  public void testWriteRead() throws Exception {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    String dataString = RandomStringUtils.randomAscii(3 * (int)OzoneConsts.MB);
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(3 * (int)OzoneConsts.MB);
-
-    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
-      outputStream.write(dataString.getBytes());
-    }
-
-    byte[] data = new byte[dataString.length()];
-    try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
-      inputStream.read(data, 0, data.length);
-    }
-    assertEquals(dataString, new String(data));
-    // checking whether container meta data has the chunk file persisted.
-    MetricsRecordBuilder containerMetrics = getMetrics(
-        "StorageContainerMetrics");
-    assertCounter("numWriteChunk", 3L, containerMetrics);
-    assertCounter("numReadChunk", 3L, containerMetrics);
-  }
-
-  // Disable this test, because this tests assumes writing beyond a specific
-  // size is not allowed. Which is not true for now. Keeping this test in case
-  // we add this restrict in the future.
-  @Ignore
-  @Test
-  public void testErrorWrite() throws Exception {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    String dataString1 = RandomStringUtils.randomAscii(100);
-    String dataString2 = RandomStringUtils.randomAscii(500);
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(500);
-
-    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
-      // first write will write succeed
-      outputStream.write(dataString1.getBytes());
-      // second write
-      exception.expect(IOException.class);
-      exception.expectMessage(
-          "Can not write 500 bytes with only 400 byte space");
-      outputStream.write(dataString2.getBytes());
-    }
-  }
-
-  @Test
-  public void testPartialRead() throws Exception {
-    String userName = "user" + RandomStringUtils.randomNumeric(5);
-    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
-    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
-    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
-    String keyName = "key" + RandomStringUtils.randomNumeric(5);
-
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
-    storageHandler.createVolume(createVolumeArgs);
-
-    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
-    bucketArgs.setAddAcls(new LinkedList<>());
-    bucketArgs.setRemoveAcls(new LinkedList<>());
-    bucketArgs.setStorageType(StorageType.DISK);
-    storageHandler.createBucket(bucketArgs);
-
-    String dataString = RandomStringUtils.randomAscii(500);
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(500);
-
-    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
-      outputStream.write(dataString.getBytes());
-    }
-
-    byte[] data = new byte[600];
-    try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
-      int readLen = inputStream.read(data, 0, 340);
-      assertEquals(340, readLen);
-      assertEquals(dataString.substring(0, 340),
-          new String(data).substring(0, 340));
-
-      readLen = inputStream.read(data, 340, 260);
-      assertEquals(160, readLen);
-      assertEquals(dataString, new String(data).substring(0, 500));
-
-      readLen = inputStream.read(data, 500, 1);
-      assertEquals(-1, readLen);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestContainerReportWithKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestContainerReportWithKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestContainerReportWithKeys.java
new file mode 100644
index 0000000..5481506
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestContainerReportWithKeys.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.*;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This class tests container report with DN container state info.
+ */
+public class TestContainerReportWithKeys {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestContainerReportWithKeys.class);
+  private static MiniOzoneCluster cluster = null;
+  private static OzoneConfiguration conf;
+  private static StorageContainerManager scm;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+    scm = cluster.getStorageContainerManager();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testContainerReportKeyWrite() throws Exception {
+    final String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    final String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    final String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    final int keySize = 100;
+
+    OzoneClient client = OzoneClientFactory.getClient(conf);
+    ObjectStore objectStore = client.getObjectStore();
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey(keyName, keySize, ReplicationType.STAND_ALONE,
+                ReplicationFactor.ONE);
+    String dataString = RandomStringUtils.randomAlphabetic(keySize);
+    key.write(dataString.getBytes());
+    key.close();
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(keySize)
+        .build();
+
+
+    OmKeyLocationInfo keyInfo =
+        cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
+            .get(0).getBlocksLatestVersionOnly().get(0);
+
+    ContainerData cd = getContainerData(keyInfo.getContainerID());
+
+    LOG.info("DN Container Data:  keyCount: {} used: {} ",
+        cd.getKeyCount(), cd.getBytesUsed());
+
+    ContainerInfo cinfo = scm.getContainerInfo(keyInfo.getContainerID());
+
+    LOG.info("SCM Container Info keyCount: {} usedBytes: {}",
+        cinfo.getNumberOfKeys(), cinfo.getUsedBytes());
+  }
+
+
+  private static ContainerData getContainerData(long containerID) {
+    ContainerData containerData;
+    try {
+      ContainerManager containerManager = cluster.getHddsDatanodes().get(0)
+          .getDatanodeStateMachine().getContainer().getContainerManager();
+      containerData = containerManager.readContainer(containerID);
+    } catch (StorageContainerException e) {
+      throw new AssertionError(e);
+    }
+    return containerData;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestMultipleContainerReadWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestMultipleContainerReadWrite.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestMultipleContainerReadWrite.java
new file mode 100644
index 0000000..1389cba
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestMultipleContainerReadWrite.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test key write/read where a key can span multiple containers.
+ */
+public class TestMultipleContainerReadWrite {
+  private static MiniOzoneCluster cluster = null;
+  private static StorageHandler storageHandler;
+  private static UserArgs userArgs;
+  private static OzoneConfiguration conf;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    // set to as small as 100 bytes per block.
+    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 1);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testWriteRead() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(3 * (int)OzoneConsts.MB);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(3 * (int)OzoneConsts.MB);
+
+    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+      outputStream.write(dataString.getBytes());
+    }
+
+    byte[] data = new byte[dataString.length()];
+    try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
+      inputStream.read(data, 0, data.length);
+    }
+    assertEquals(dataString, new String(data));
+    // checking whether container meta data has the chunk file persisted.
+    MetricsRecordBuilder containerMetrics = getMetrics(
+        "StorageContainerMetrics");
+    assertCounter("numWriteChunk", 3L, containerMetrics);
+    assertCounter("numReadChunk", 3L, containerMetrics);
+  }
+
+  // Disable this test, because this tests assumes writing beyond a specific
+  // size is not allowed. Which is not true for now. Keeping this test in case
+  // we add this restrict in the future.
+  @Ignore
+  @Test
+  public void testErrorWrite() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString1 = RandomStringUtils.randomAscii(100);
+    String dataString2 = RandomStringUtils.randomAscii(500);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(500);
+
+    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+      // first write will write succeed
+      outputStream.write(dataString1.getBytes());
+      // second write
+      exception.expect(IOException.class);
+      exception.expectMessage(
+          "Can not write 500 bytes with only 400 byte space");
+      outputStream.write(dataString2.getBytes());
+    }
+  }
+
+  @Test
+  public void testPartialRead() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(500);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    keyArgs.setSize(500);
+
+    try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+      outputStream.write(dataString.getBytes());
+    }
+
+    byte[] data = new byte[600];
+    try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
+      int readLen = inputStream.read(data, 0, 340);
+      assertEquals(340, readLen);
+      assertEquals(dataString.substring(0, 340),
+          new String(data).substring(0, 340));
+
+      readLen = inputStream.read(data, 340, 260);
+      assertEquals(160, readLen);
+      assertEquals(dataString, new String(data).substring(0, 500));
+
+      readLen = inputStream.read(data, 500, 1);
+      assertEquals(-1, readLen);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
new file mode 100644
index 0000000..15122b9
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the versioning of blocks from OM side.
+ */
+public class TestOmBlockVersioning {
+  private static MiniOzoneCluster cluster = null;
+  private static UserArgs userArgs;
+  private static OzoneConfiguration conf;
+  private static OzoneManager ozoneManager;
+  private static StorageHandler storageHandler;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+    ozoneManager = cluster.getOzoneManager();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAllocateCommit() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(1000)
+        .build();
+
+    // 1st update, version 0
+    OpenKeySession openKey = ozoneManager.openKey(keyArgs);
+    ozoneManager.commitKey(keyArgs, openKey.getId());
+
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
+    OmKeyLocationInfoGroup highestVersion =
+        checkVersions(keyInfo.getKeyLocationVersions());
+    assertEquals(0, highestVersion.getVersion());
+    assertEquals(1, highestVersion.getLocationList().size());
+
+    // 2nd update, version 1
+    openKey = ozoneManager.openKey(keyArgs);
+    //OmKeyLocationInfo locationInfo =
+    //    ozoneManager.allocateBlock(keyArgs, openKey.getId());
+    ozoneManager.commitKey(keyArgs, openKey.getId());
+
+    keyInfo = ozoneManager.lookupKey(keyArgs);
+    highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
+    assertEquals(1, highestVersion.getVersion());
+    assertEquals(2, highestVersion.getLocationList().size());
+
+    // 3rd update, version 2
+    openKey = ozoneManager.openKey(keyArgs);
+    // this block will be appended to the latest version of version 2.
+    ozoneManager.allocateBlock(keyArgs, openKey.getId());
+    ozoneManager.commitKey(keyArgs, openKey.getId());
+
+    keyInfo = ozoneManager.lookupKey(keyArgs);
+    highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
+    assertEquals(2, highestVersion.getVersion());
+    assertEquals(4, highestVersion.getLocationList().size());
+  }
+
+  private OmKeyLocationInfoGroup checkVersions(
+      List<OmKeyLocationInfoGroup> versions) {
+    OmKeyLocationInfoGroup currentVersion = null;
+    for (OmKeyLocationInfoGroup version : versions) {
+      if (currentVersion != null) {
+        assertEquals(currentVersion.getVersion() + 1, version.getVersion());
+        for (OmKeyLocationInfo info : currentVersion.getLocationList()) {
+          boolean found = false;
+          // all the blocks from the previous version must present in the next
+          // version
+          for (OmKeyLocationInfo info2 : version.getLocationList()) {
+            if (info.getLocalID() == info2.getLocalID()) {
+              found = true;
+              break;
+            }
+          }
+          assertTrue(found);
+        }
+      }
+      currentVersion = version;
+    }
+    return currentVersion;
+  }
+
+  @Test
+  public void testReadLatestVersion() throws Exception {
+
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(1000)
+        .build();
+
+    String dataString = RandomStringUtils.randomAlphabetic(100);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    // this write will create 1st version with one block
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    byte[] data = new byte[dataString.length()];
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    OmKeyInfo keyInfo = ozoneManager.lookupKey(omKeyArgs);
+    assertEquals(dataString, DFSUtil.bytes2String(data));
+    assertEquals(0, keyInfo.getLatestVersionLocations().getVersion());
+    assertEquals(1,
+        keyInfo.getLatestVersionLocations().getLocationList().size());
+
+    // this write will create 2nd version, 2nd version will contain block from
+    // version 1, and add a new block
+    dataString = RandomStringUtils.randomAlphabetic(10);
+    data = new byte[dataString.length()];
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    keyInfo = ozoneManager.lookupKey(omKeyArgs);
+    assertEquals(dataString, DFSUtil.bytes2String(data));
+    assertEquals(1, keyInfo.getLatestVersionLocations().getVersion());
+    assertEquals(2,
+        keyInfo.getLatestVersionLocations().getLocationList().size());
+
+    dataString = RandomStringUtils.randomAlphabetic(200);
+    data = new byte[dataString.length()];
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    keyInfo = ozoneManager.lookupKey(omKeyArgs);
+    assertEquals(dataString, DFSUtil.bytes2String(data));
+    assertEquals(2, keyInfo.getLatestVersionLocations().getVersion());
+    assertEquals(3,
+        keyInfo.getLatestVersionLocations().getLocationList().size());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
new file mode 100644
index 0000000..8d0f4b21
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test for OM metrics.
+ */
+@SuppressWarnings("deprecation")
+public class TestOmMetrics {
+  private MiniOzoneCluster cluster;
+  private OzoneManager ozoneManager;
+
+  /**
+   * The exception used for testing failure metrics.
+   */
+  private IOException exception = new IOException();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+    ozoneManager = cluster.getOzoneManager();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testVolumeOps() throws IOException {
+    VolumeManager volumeManager =
+        (VolumeManager) org.apache.hadoop.test.Whitebox
+            .getInternalState(ozoneManager, "volumeManager");
+    VolumeManager mockVm = Mockito.spy(volumeManager);
+
+    Mockito.doNothing().when(mockVm).createVolume(null);
+    Mockito.doNothing().when(mockVm).deleteVolume(null);
+    Mockito.doReturn(null).when(mockVm).getVolumeInfo(null);
+    Mockito.doReturn(true).when(mockVm).checkVolumeAccess(null, null);
+    Mockito.doNothing().when(mockVm).setOwner(null, null);
+    Mockito.doReturn(null).when(mockVm).listVolumes(null, null, null, 0);
+
+    org.apache.hadoop.test.Whitebox.setInternalState(
+        ozoneManager, "volumeManager", mockVm);
+    doVolumeOps();
+
+    MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
+    assertCounter("NumVolumeOps", 6L, omMetrics);
+    assertCounter("NumVolumeCreates", 1L, omMetrics);
+    assertCounter("NumVolumeUpdates", 1L, omMetrics);
+    assertCounter("NumVolumeInfos", 1L, omMetrics);
+    assertCounter("NumVolumeCheckAccesses", 1L, omMetrics);
+    assertCounter("NumVolumeDeletes", 1L, omMetrics);
+    assertCounter("NumVolumeLists", 1L, omMetrics);
+
+    // inject exception to test for Failure Metrics
+    Mockito.doThrow(exception).when(mockVm).createVolume(null);
+    Mockito.doThrow(exception).when(mockVm).deleteVolume(null);
+    Mockito.doThrow(exception).when(mockVm).getVolumeInfo(null);
+    Mockito.doThrow(exception).when(mockVm).checkVolumeAccess(null, null);
+    Mockito.doThrow(exception).when(mockVm).setOwner(null, null);
+    Mockito.doThrow(exception).when(mockVm).listVolumes(null, null, null, 0);
+
+    org.apache.hadoop.test.Whitebox.setInternalState(ozoneManager, "volumeManager", mockVm);
+    doVolumeOps();
+
+    omMetrics = getMetrics("OMMetrics");
+    assertCounter("NumVolumeOps", 12L, omMetrics);
+    assertCounter("NumVolumeCreates", 2L, omMetrics);
+    assertCounter("NumVolumeUpdates", 2L, omMetrics);
+    assertCounter("NumVolumeInfos", 2L, omMetrics);
+    assertCounter("NumVolumeCheckAccesses", 2L, omMetrics);
+    assertCounter("NumVolumeDeletes", 2L, omMetrics);
+    assertCounter("NumVolumeLists", 2L, omMetrics);
+
+    assertCounter("NumVolumeCreateFails", 1L, omMetrics);
+    assertCounter("NumVolumeUpdateFails", 1L, omMetrics);
+    assertCounter("NumVolumeInfoFails", 1L, omMetrics);
+    assertCounter("NumVolumeCheckAccessFails", 1L, omMetrics);
+    assertCounter("NumVolumeDeleteFails", 1L, omMetrics);
+    assertCounter("NumVolumeListFails", 1L, omMetrics);
+  }
+
+  @Test
+  public void testBucketOps() throws IOException {
+    BucketManager bucketManager =
+        (BucketManager) org.apache.hadoop.test.Whitebox
+            .getInternalState(ozoneManager, "bucketManager");
+    BucketManager mockBm = Mockito.spy(bucketManager);
+
+    Mockito.doNothing().when(mockBm).createBucket(null);
+    Mockito.doNothing().when(mockBm).deleteBucket(null, null);
+    Mockito.doReturn(null).when(mockBm).getBucketInfo(null, null);
+    Mockito.doNothing().when(mockBm).setBucketProperty(null);
+    Mockito.doReturn(null).when(mockBm).listBuckets(null, null, null, 0);
+
+    org.apache.hadoop.test.Whitebox.setInternalState(
+        ozoneManager, "bucketManager", mockBm);
+    doBucketOps();
+
+    MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
+    assertCounter("NumBucketOps", 5L, omMetrics);
+    assertCounter("NumBucketCreates", 1L, omMetrics);
+    assertCounter("NumBucketUpdates", 1L, omMetrics);
+    assertCounter("NumBucketInfos", 1L, omMetrics);
+    assertCounter("NumBucketDeletes", 1L, omMetrics);
+    assertCounter("NumBucketLists", 1L, omMetrics);
+
+    // inject exception to test for Failure Metrics
+    Mockito.doThrow(exception).when(mockBm).createBucket(null);
+    Mockito.doThrow(exception).when(mockBm).deleteBucket(null, null);
+    Mockito.doThrow(exception).when(mockBm).getBucketInfo(null, null);
+    Mockito.doThrow(exception).when(mockBm).setBucketProperty(null);
+    Mockito.doThrow(exception).when(mockBm).listBuckets(null, null, null, 0);
+
+    org.apache.hadoop.test.Whitebox.setInternalState(
+        ozoneManager, "bucketManager", mockBm);
+    doBucketOps();
+
+    omMetrics = getMetrics("OMMetrics");
+    assertCounter("NumBucketOps", 10L, omMetrics);
+    assertCounter("NumBucketCreates", 2L, omMetrics);
+    assertCounter("NumBucketUpdates", 2L, omMetrics);
+    assertCounter("NumBucketInfos", 2L, omMetrics);
+    assertCounter("NumBucketDeletes", 2L, omMetrics);
+    assertCounter("NumBucketLists", 2L, omMetrics);
+
+    assertCounter("NumBucketCreateFails", 1L, omMetrics);
+    assertCounter("NumBucketUpdateFails", 1L, omMetrics);
+    assertCounter("NumBucketInfoFails", 1L, omMetrics);
+    assertCounter("NumBucketDeleteFails", 1L, omMetrics);
+    assertCounter("NumBucketListFails", 1L, omMetrics);
+  }
+
+  @Test
+  public void testKeyOps() throws IOException {
+    KeyManager bucketManager = (KeyManager) org.apache.hadoop.test.Whitebox
+        .getInternalState(ozoneManager, "keyManager");
+    KeyManager mockKm = Mockito.spy(bucketManager);
+
+    Mockito.doReturn(null).when(mockKm).openKey(null);
+    Mockito.doNothing().when(mockKm).deleteKey(null);
+    Mockito.doReturn(null).when(mockKm).lookupKey(null);
+    Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
+
+    org.apache.hadoop.test.Whitebox.setInternalState(
+        ozoneManager, "keyManager", mockKm);
+    doKeyOps();
+
+    MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
+    assertCounter("NumKeyOps", 4L, omMetrics);
+    assertCounter("NumKeyAllocate", 1L, omMetrics);
+    assertCounter("NumKeyLookup", 1L, omMetrics);
+    assertCounter("NumKeyDeletes", 1L, omMetrics);
+    assertCounter("NumKeyLists", 1L, omMetrics);
+
+    // inject exception to test for Failure Metrics
+    Mockito.doThrow(exception).when(mockKm).openKey(null);
+    Mockito.doThrow(exception).when(mockKm).deleteKey(null);
+    Mockito.doThrow(exception).when(mockKm).lookupKey(null);
+    Mockito.doThrow(exception).when(mockKm).listKeys(
+        null, null, null, null, 0);
+
+    org.apache.hadoop.test.Whitebox.setInternalState(
+        ozoneManager, "keyManager", mockKm);
+    doKeyOps();
+
+    omMetrics = getMetrics("OMMetrics");
+    assertCounter("NumKeyOps", 8L, omMetrics);
+    assertCounter("NumKeyAllocate", 2L, omMetrics);
+    assertCounter("NumKeyLookup", 2L, omMetrics);
+    assertCounter("NumKeyDeletes", 2L, omMetrics);
+    assertCounter("NumKeyLists", 2L, omMetrics);
+
+    assertCounter("NumKeyAllocateFails", 1L, omMetrics);
+    assertCounter("NumKeyLookupFails", 1L, omMetrics);
+    assertCounter("NumKeyDeleteFails", 1L, omMetrics);
+    assertCounter("NumKeyListFails", 1L, omMetrics);
+  }
+
+  /**
+   * Test volume operations with ignoring thrown exception.
+   */
+  private void doVolumeOps() {
+    try {
+      ozoneManager.createVolume(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.deleteVolume(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.getVolumeInfo(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.checkVolumeAccess(null, null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.setOwner(null, null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.listAllVolumes(null, null, 0);
+    } catch (IOException ignored) {
+    }
+  }
+
+  /**
+   * Test bucket operations with ignoring thrown exception.
+   */
+  private void doBucketOps() {
+    try {
+      ozoneManager.createBucket(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.deleteBucket(null, null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.getBucketInfo(null, null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.setBucketProperty(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.listBuckets(null, null, null, 0);
+    } catch (IOException ignored) {
+    }
+  }
+
+  /**
+   * Test key operations with ignoring thrown exception.
+   */
+  private void doKeyOps() {
+    try {
+      ozoneManager.openKey(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.deleteKey(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.lookupKey(null);
+    } catch (IOException ignored) {
+    }
+
+    try {
+      ozoneManager.listKeys(null, null, null, null, 0);
+    } catch (IOException ignored) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java
new file mode 100644
index 0000000..005a012
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSQLCli.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.scm.cli.SQLCLI;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the CLI that transforms om.db into SQLite DB files.
+ */
+@RunWith(Parameterized.class)
+public class TestOmSQLCli {
+  private MiniOzoneCluster cluster = null;
+  private StorageHandler storageHandler;
+  private UserArgs userArgs;
+  private OzoneConfiguration conf;
+  private SQLCLI cli;
+
+  private String userName = "userTest";
+  private String adminName = "adminTest";
+  private String volumeName0 = "volumeTest0";
+  private String volumeName1 = "volumeTest1";
+  private String bucketName0 = "bucketTest0";
+  private String bucketName1 = "bucketTest1";
+  private String bucketName2 = "bucketTest2";
+  private String keyName0 = "key0";
+  private String keyName1 = "key1";
+  private String keyName2 = "key2";
+  private String keyName3 = "key3";
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB},
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB}
+    });
+  }
+
+  private String metaStoreType;
+
+  public TestOmSQLCli(String type) {
+    metaStoreType = type;
+  }
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @Before
+  public void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+    cluster.waitForClusterToBeReady();
+
+    VolumeArgs createVolumeArgs0 = new VolumeArgs(volumeName0, userArgs);
+    createVolumeArgs0.setUserName(userName);
+    createVolumeArgs0.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs0);
+    VolumeArgs createVolumeArgs1 = new VolumeArgs(volumeName1, userArgs);
+    createVolumeArgs1.setUserName(userName);
+    createVolumeArgs1.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs1);
+
+    BucketArgs bucketArgs0 = new BucketArgs(volumeName0, bucketName0, userArgs);
+    storageHandler.createBucket(bucketArgs0);
+    BucketArgs bucketArgs1 = new BucketArgs(volumeName1, bucketName1, userArgs);
+    storageHandler.createBucket(bucketArgs1);
+    BucketArgs bucketArgs2 = new BucketArgs(volumeName0, bucketName2, userArgs);
+    storageHandler.createBucket(bucketArgs2);
+
+    KeyArgs keyArgs0 =
+        new KeyArgs(volumeName0, bucketName0, keyName0, userArgs);
+    keyArgs0.setSize(100);
+    KeyArgs keyArgs1 =
+        new KeyArgs(volumeName1, bucketName1, keyName1, userArgs);
+    keyArgs1.setSize(200);
+    KeyArgs keyArgs2 =
+        new KeyArgs(volumeName0, bucketName2, keyName2, userArgs);
+    keyArgs2.setSize(300);
+    KeyArgs keyArgs3 =
+        new KeyArgs(volumeName0, bucketName2, keyName3, userArgs);
+    keyArgs3.setSize(400);
+
+    OutputStream stream = storageHandler.newKeyWriter(keyArgs0);
+    stream.close();
+    stream = storageHandler.newKeyWriter(keyArgs1);
+    stream.close();
+    stream = storageHandler.newKeyWriter(keyArgs2);
+    stream.close();
+    stream = storageHandler.newKeyWriter(keyArgs3);
+    stream.close();
+
+    cluster.getOzoneManager().stop();
+    cluster.getStorageContainerManager().stop();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, metaStoreType);
+    cli = new SQLCLI(conf);
+  }
+
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testOmDB() throws Exception {
+    String dbOutPath =  GenericTestUtils.getTempPath(
+        UUID.randomUUID() + "/out_sql.db");
+
+    String dbRootPath = conf.get(OzoneConfigKeys.OZONE_METADATA_DIRS);
+    String dbPath = dbRootPath + "/" + OM_DB_NAME;
+    String[] args = {"-p", dbPath, "-o", dbOutPath};
+
+    cli.run(args);
+
+    Connection conn = connectDB(dbOutPath);
+    String sql = "SELECT * FROM volumeList";
+    ResultSet rs = executeQuery(conn, sql);
+    List<String> expectedValues =
+        new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
+    while (rs.next()) {
+      String userNameRs = rs.getString("userName");
+      String volumeNameRs = rs.getString("volumeName");
+      assertEquals(userName,  userNameRs.substring(1));
+      assertTrue(expectedValues.remove(volumeNameRs));
+    }
+    assertEquals(0, expectedValues.size());
+
+    sql = "SELECT * FROM volumeInfo";
+    rs = executeQuery(conn, sql);
+    expectedValues =
+        new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
+    while (rs.next()) {
+      String adName = rs.getString("adminName");
+      String ownerName = rs.getString("ownerName");
+      String volumeName = rs.getString("volumeName");
+      assertEquals(adminName, adName);
+      assertEquals(userName, ownerName);
+      assertTrue(expectedValues.remove(volumeName));
+    }
+    assertEquals(0, expectedValues.size());
+
+    sql = "SELECT * FROM aclInfo";
+    rs = executeQuery(conn, sql);
+    expectedValues =
+        new LinkedList<>(Arrays.asList(volumeName0, volumeName1));
+    while (rs.next()) {
+      String adName = rs.getString("adminName");
+      String ownerName = rs.getString("ownerName");
+      String volumeName = rs.getString("volumeName");
+      String type = rs.getString("type");
+      String uName = rs.getString("userName");
+      String rights = rs.getString("rights");
+      assertEquals(adminName, adName);
+      assertEquals(userName, ownerName);
+      assertEquals("USER", type);
+      assertEquals(userName, uName);
+      assertEquals("READ_WRITE", rights);
+      assertTrue(expectedValues.remove(volumeName));
+    }
+    assertEquals(0, expectedValues.size());
+
+    sql = "SELECT * FROM bucketInfo";
+    rs = executeQuery(conn, sql);
+    HashMap<String, String> expectedMap = new HashMap<>();
+    expectedMap.put(bucketName0, volumeName0);
+    expectedMap.put(bucketName2, volumeName0);
+    expectedMap.put(bucketName1, volumeName1);
+    while (rs.next()) {
+      String volumeName = rs.getString("volumeName");
+      String bucketName = rs.getString("bucketName");
+      boolean versionEnabled = rs.getBoolean("versionEnabled");
+      String storegeType = rs.getString("storageType");
+      assertEquals(volumeName, expectedMap.remove(bucketName));
+      assertFalse(versionEnabled);
+      assertEquals("DISK", storegeType);
+    }
+    assertEquals(0, expectedMap.size());
+
+    sql = "SELECT * FROM keyInfo";
+    rs = executeQuery(conn, sql);
+    HashMap<String, List<String>> expectedMap2 = new HashMap<>();
+    // no data written, data size will be 0
+    expectedMap2.put(keyName0,
+        Arrays.asList(volumeName0, bucketName0, "0"));
+    expectedMap2.put(keyName1,
+        Arrays.asList(volumeName1, bucketName1, "0"));
+    expectedMap2.put(keyName2,
+        Arrays.asList(volumeName0, bucketName2, "0"));
+    expectedMap2.put(keyName3,
+        Arrays.asList(volumeName0, bucketName2, "0"));
+    while (rs.next()) {
+      String volumeName = rs.getString("volumeName");
+      String bucketName = rs.getString("bucketName");
+      String keyName = rs.getString("keyName");
+      int dataSize = rs.getInt("dataSize");
+      List<String> vals = expectedMap2.remove(keyName);
+      assertNotNull(vals);
+      assertEquals(vals.get(0), volumeName);
+      assertEquals(vals.get(1), bucketName);
+      assertEquals(vals.get(2), Integer.toString(dataSize));
+    }
+    assertEquals(0, expectedMap2.size());
+
+    conn.close();
+    Files.delete(Paths.get(dbOutPath));
+  }
+
+  private ResultSet executeQuery(Connection conn, String sql)
+      throws SQLException {
+    Statement stmt = conn.createStatement();
+    return stmt.executeQuery(sql);
+  }
+
+  private Connection connectDB(String dbPath) throws Exception {
+    Class.forName("org.sqlite.JDBC");
+    String connectPath =
+        String.format("jdbc:sqlite:%s", dbPath);
+    return DriverManager.getConnection(connectPath);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org