You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/16 04:43:27 UTC

[1/2] hadoop git commit: HDFS-11769. Ozone: KSM: Add createVolume API. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 6e8584fc1 -> a8393b4b8


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneHelper.java
new file mode 100644
index 0000000..7395596
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneHelper.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.web;
+
+import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
+import org.apache.hadoop.ozone.web.headers.Header;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Helper functions to test Ozone.
+ */
+public class TestOzoneHelper {
+
+  public CloseableHttpClient createHttpClient() {
+    return HttpClientBuilder.create().build();
+  }
+  /**
+   * Creates Volumes on Ozone Store.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumes(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    CloseableHttpClient client = createHttpClient();
+    String volumeName = OzoneUtils.getRequestID().toLowerCase();
+    try {
+      HttpPost httppost = new HttpPost(
+          String.format("http://localhost:%d/%s", port, volumeName));
+
+      httppost.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httppost.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+      httppost.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httppost);
+      assertEquals(response.toString(), HTTP_CREATED,
+          response.getStatusLine().getStatusCode());
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Create Volumes with Quota.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithQuota(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    CloseableHttpClient client = createHttpClient();
+    String volumeName = OzoneUtils.getRequestID().toLowerCase();
+    try {
+      HttpPost httppost = new HttpPost(
+          String.format("http://localhost:%d/%s?quota=10TB", port, volumeName));
+
+      httppost.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httppost.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+      httppost.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httppost);
+      assertEquals(response.toString(), HTTP_CREATED,
+          response.getStatusLine().getStatusCode());
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Create Volumes with Invalid Quota.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithInvalidQuota(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    CloseableHttpClient client = createHttpClient();
+    String volumeName = OzoneUtils.getRequestID().toLowerCase();
+    try {
+      HttpPost httppost = new HttpPost(
+          String.format("http://localhost:%d/%s?quota=NaN", port, volumeName));
+
+      httppost.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httppost.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+      httppost.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httppost);
+      assertEquals(response.toString(), ErrorTable.MALFORMED_QUOTA
+              .getHttpCode(),
+          response.getStatusLine().getStatusCode());
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * To create a volume a user name must be specified using OZONE_USER header.
+   * This test verifies that we get an error in case we call without a OZONE
+   * user name.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithInvalidUser(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    CloseableHttpClient client = createHttpClient();
+    String volumeName = OzoneUtils.getRequestID().toLowerCase();
+    try {
+      HttpPost httppost = new HttpPost(
+          String.format("http://localhost:%d/%s?quota=1TB", port, volumeName));
+
+      httppost.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httppost.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+      httppost.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httppost);
+
+      assertEquals(response.toString(), ErrorTable.USER_NOT_FOUND.getHttpCode(),
+          response.getStatusLine().getStatusCode());
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Only Admins can create volumes in Ozone. This test uses simple userauth as
+   * backend and hdfs and root are admin users in the simple backend.
+   * <p>
+   * This test tries to create a volume as user bilbo.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithOutAdminRights(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    CloseableHttpClient client = createHttpClient();
+    String volumeName = OzoneUtils.getRequestID().toLowerCase();
+    try {
+      HttpPost httppost = new HttpPost(
+          String.format("http://localhost:%d/%s?quota=NaN", port, volumeName));
+
+      httppost.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httppost.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+      httppost.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              "bilbo"); // This is not a root user in Simple Auth
+      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httppost);
+      assertEquals(response.toString(), ErrorTable.ACCESS_DENIED.getHttpCode(),
+          response.getStatusLine().getStatusCode());
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Create a bunch of volumes in a loop.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesInLoop(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+
+    for (int x = 0; x < 1000; x++) {
+      CloseableHttpClient client = createHttpClient();
+      String volumeName = OzoneUtils.getRequestID().toLowerCase();
+      String userName = OzoneUtils.getRequestID().toLowerCase();
+
+      HttpPost httppost = new HttpPost(
+          String.format("http://localhost:%d/%s?quota=10TB", port, volumeName));
+
+      httppost.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httppost.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+      httppost.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+      httppost.addHeader(Header.OZONE_USER, userName);
+
+      HttpResponse response = client.execute(httppost);
+      assertEquals(response.toString(), HTTP_CREATED,
+          response.getStatusLine().getStatusCode());
+      client.close();
+    }
+  }
+  /**
+   * Get volumes owned by the user.
+   *
+   * @throws IOException
+   */
+  public void testGetVolumesByUser(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    // We need to create a volume for this test to succeed.
+    testCreateVolumes(port);
+    CloseableHttpClient client = createHttpClient();
+    try {
+      HttpGet httpget =
+          new HttpGet(String.format("http://localhost:%d/", port));
+
+      httpget.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+
+      httpget.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+
+      httpget.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      httpget.addHeader(Header.OZONE_USER,
+          OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httpget);
+      assertEquals(response.toString(), HTTP_OK,
+          response.getStatusLine().getStatusCode());
+
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Admins can read volumes belonging to other users.
+   *
+   * @throws IOException
+   */
+  public void testGetVolumesOfAnotherUser(int port) throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+
+    CloseableHttpClient client = createHttpClient();
+    try {
+      HttpGet httpget =
+          new HttpGet(String.format("http://localhost:%d/", port));
+
+      httpget.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httpget.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+
+      httpget.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              OzoneConsts.OZONE_SIMPLE_ROOT_USER);
+
+      // User Root is getting volumes belonging to user HDFS
+      httpget.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httpget);
+      assertEquals(response.toString(), HTTP_OK,
+          response.getStatusLine().getStatusCode());
+
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * if you try to read volumes belonging to another user,
+   * then server always ignores it.
+   *
+   * @throws IOException
+   */
+  public void testGetVolumesOfAnotherUserShouldFail(int port)
+      throws IOException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+
+    CloseableHttpClient client = createHttpClient();
+    String userName = OzoneUtils.getRequestID().toLowerCase();
+    try {
+      HttpGet httpget =
+          new HttpGet(String.format("http://localhost:%d/", port));
+
+      httpget.addHeader(Header.OZONE_VERSION_HEADER,
+          Header.OZONE_V1_VERSION_HEADER);
+      httpget.addHeader(HttpHeaders.DATE,
+          format.format(new Date(Time.monotonicNow())));
+
+      httpget.addHeader(HttpHeaders.AUTHORIZATION,
+          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+              userName);
+
+      // userName is NOT a root user, hence he should NOT be able to read the
+      // volumes of user HDFS
+      httpget.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
+
+      HttpResponse response = client.execute(httpget);
+      // We will get an Error called userNotFound when using Simple Auth Scheme
+      assertEquals(response.toString(), ErrorTable.USER_NOT_FOUND.getHttpCode(),
+          response.getStatusLine().getStatusCode());
+
+    } finally {
+      client.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneVolumes.java
deleted file mode 100644
index c339279..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneVolumes.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.web;
-
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
-import org.apache.hadoop.ozone.web.headers.Header;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import javax.ws.rs.core.HttpHeaders;
-import java.io.IOException;
-import java.net.URL;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-
-import static java.net.HttpURLConnection.HTTP_CREATED;
-import static java.net.HttpURLConnection.HTTP_OK;
-import static org.junit.Assert.assertEquals;
-
-public class TestOzoneVolumes {
-  /**
-   * Set the timeout for every test.
-   */
-  @Rule
-  public Timeout testTimeout = new Timeout(300000);
-
-  private static MiniOzoneCluster cluster = null;
-  private static int port = 0;
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true and
-   * OZONE_HANDLER_TYPE_KEY = "local" , which uses a local directory to
-   * emulate Ozone backend.
-   *
-   * @throws IOException
-   */
-  @BeforeClass
-  public static void init() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-
-    URL p = conf.getClass().getResource("");
-    String path = p.getPath().concat(TestOzoneVolumes.class.getSimpleName());
-    path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
-        OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
-
-    conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
-    Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
-
-    cluster = new MiniOzoneCluster.Builder(conf)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL).build();
-    DataNode dataNode = cluster.getDataNodes().get(0);
-    port = dataNode.getInfoPort();
-  }
-
-  /**
-   * shutdown MiniDFSCluster
-   */
-  @AfterClass
-  public static void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * Creates Volumes on Ozone Store.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testCreateVolumes() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-    HttpClient client = new DefaultHttpClient();
-    String volumeName = OzoneUtils.getRequestID().toLowerCase();
-    try {
-      HttpPost httppost = new HttpPost(
-          String.format("http://localhost:%d/%s", port, volumeName));
-
-      httppost.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httppost.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-      httppost.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      HttpResponse response = client.execute(httppost);
-      assertEquals(response.toString(), HTTP_CREATED,
-          response.getStatusLine().getStatusCode());
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * Create Volumes with Quota.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testCreateVolumesWithQuota() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-    HttpClient client = new DefaultHttpClient();
-    String volumeName = OzoneUtils.getRequestID().toLowerCase();
-    try {
-      HttpPost httppost = new HttpPost(
-          String.format("http://localhost:%d/%s?quota=10TB", port, volumeName));
-
-      httppost.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httppost.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-      httppost.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      HttpResponse response = client.execute(httppost);
-      assertEquals(response.toString(), HTTP_CREATED,
-          response.getStatusLine().getStatusCode());
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * Create Volumes with Invalid Quota.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testCreateVolumesWithInvalidQuota() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-    HttpClient client = new DefaultHttpClient();
-    String volumeName = OzoneUtils.getRequestID().toLowerCase();
-    try {
-      HttpPost httppost = new HttpPost(
-          String.format("http://localhost:%d/%s?quota=NaN", port, volumeName));
-
-      httppost.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httppost.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-      httppost.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      HttpResponse response = client.execute(httppost);
-      assertEquals(response.toString(), ErrorTable.MALFORMED_QUOTA
-              .getHttpCode(),
-          response.getStatusLine().getStatusCode());
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * To create a volume a user name must be specified using OZONE_USER header.
-   * This test verifies that we get an error in case we call without a OZONE
-   * user name.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testCreateVolumesWithInvalidUser() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-    HttpClient client = new DefaultHttpClient();
-    String volumeName = OzoneUtils.getRequestID().toLowerCase();
-    try {
-      HttpPost httppost = new HttpPost(
-          String.format("http://localhost:%d/%s?quota=1TB", port, volumeName));
-
-      httppost.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httppost.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-      httppost.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      HttpResponse response = client.execute(httppost);
-
-      assertEquals(response.toString(), ErrorTable.USER_NOT_FOUND.getHttpCode(),
-          response.getStatusLine().getStatusCode());
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * Only Admins can create volumes in Ozone. This test uses simple userauth as
-   * backend and hdfs and root are admin users in the simple backend.
-   * <p>
-   * This test tries to create a volume as user bilbo.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testCreateVolumesWithOutAdminRights() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-    HttpClient client = new DefaultHttpClient();
-    String volumeName = OzoneUtils.getRequestID().toLowerCase();
-    try {
-      HttpPost httppost = new HttpPost(
-          String.format("http://localhost:%d/%s?quota=NaN", port, volumeName));
-
-      httppost.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httppost.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-      httppost.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              "bilbo"); // This is not a root user in Simple Auth
-      httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      HttpResponse response = client.execute(httppost);
-      assertEquals(response.toString(), ErrorTable.ACCESS_DENIED.getHttpCode(),
-          response.getStatusLine().getStatusCode());
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * Create a bunch of volumes in a loop.
-   *
-   * @throws IOException
-   */
-  //@Test
-  public void testCreateVolumesInLoop() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-
-    for (int x = 0; x < 1000; x++) {
-      HttpClient client = new DefaultHttpClient();
-      String volumeName = OzoneUtils.getRequestID().toLowerCase();
-      String userName = OzoneUtils.getRequestID().toLowerCase();
-
-      HttpPost httppost = new HttpPost(
-          String.format("http://localhost:%d/%s?quota=10TB", port, volumeName));
-
-      httppost.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httppost.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-      httppost.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-      httppost.addHeader(Header.OZONE_USER, userName);
-
-      HttpResponse response = client.execute(httppost);
-      assertEquals(response.toString(), HTTP_CREATED,
-          response.getStatusLine().getStatusCode());
-      client.getConnectionManager().shutdown();
-    }
-  }
-  /**
-   * Get volumes owned by the user.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testGetVolumesByUser() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-    // We need to create a volume for this test to succeed.
-    testCreateVolumes();
-    HttpClient client = new DefaultHttpClient();
-    try {
-      HttpGet httpget =
-          new HttpGet(String.format("http://localhost:%d/", port));
-
-      httpget.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-
-      httpget.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-
-      httpget.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      httpget.addHeader(Header.OZONE_USER,
-          OzoneConsts.OZONE_SIMPLE_HDFS_USER );
-
-      HttpResponse response = client.execute(httpget);
-      assertEquals(response.toString(), HTTP_OK,
-          response.getStatusLine().getStatusCode());
-
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * Admins can read volumes belonging to other users.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testGetVolumesOfAnotherUser() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-
-    HttpClient client = new DefaultHttpClient();
-    try {
-      HttpGet httpget =
-          new HttpGet(String.format("http://localhost:%d/", port));
-
-      httpget.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httpget.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-
-      httpget.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              OzoneConsts.OZONE_SIMPLE_ROOT_USER);
-
-      // User Root is getting volumes belonging to user HDFS
-      httpget.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      HttpResponse response = client.execute(httpget);
-      assertEquals(response.toString(), HTTP_OK,
-          response.getStatusLine().getStatusCode());
-
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * if you try to read volumes belonging to another user,
-   * then server always ignores it.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testGetVolumesOfAnotherUserShouldFail() throws IOException {
-    SimpleDateFormat format =
-        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
-
-    HttpClient client = new DefaultHttpClient();
-    String userName = OzoneUtils.getRequestID().toLowerCase();
-    try {
-      HttpGet httpget =
-          new HttpGet(String.format("http://localhost:%d/", port));
-
-      httpget.addHeader(Header.OZONE_VERSION_HEADER,
-          Header.OZONE_V1_VERSION_HEADER);
-      httpget.addHeader(HttpHeaders.DATE,
-          format.format(new Date(Time.monotonicNow())));
-
-      httpget.addHeader(HttpHeaders.AUTHORIZATION,
-          Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
-              userName);
-
-      // userName is NOT a root user, hence he should NOT be able to read the
-      // volumes of user HDFS
-      httpget.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
-
-      HttpResponse response = client.execute(httpget);
-      // We will get an Error called userNotFound when using Simple Auth Scheme
-      assertEquals(response.toString(), ErrorTable.USER_NOT_FOUND.getHttpCode(),
-          response.getStatusLine().getStatusCode());
-
-    } finally {
-      client.getConnectionManager().shutdown();
-    }
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HDFS-11769. Ozone: KSM: Add createVolume API. Contributed by Mukul Kumar Singh.

Posted by ae...@apache.org.
HDFS-11769. Ozone: KSM: Add createVolume API. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8393b4b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8393b4b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8393b4b

Branch: refs/heads/HDFS-7240
Commit: a8393b4b875f8e69d5244ac1ce1f1a5abd474525
Parents: 6e8584f
Author: Anu Engineer <ae...@apache.org>
Authored: Mon May 15 21:38:08 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon May 15 21:38:08 2017 -0700

----------------------------------------------------------------------
 .../hadoop/ksm/helpers/KsmVolumeArgs.java       | 182 ++++++++
 .../ksm/protocol/KeyspaceManagerProtocol.java   |  11 +-
 ...ceManagerProtocolClientSideTranslatorPB.java | 211 ++++++++++
 .../main/proto/KeySpaceManagerProtocol.proto    |  11 +-
 .../server/datanode/ObjectStoreHandler.java     |  30 +-
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   3 +-
 .../apache/hadoop/ozone/ksm/KSMConfigKeys.java  |   9 +
 .../org/apache/hadoop/ozone/ksm/KSMMetrics.java |  63 +++
 .../hadoop/ozone/ksm/KeySpaceManager.java       |  57 ++-
 .../apache/hadoop/ozone/ksm/VolumeManager.java  |  42 ++
 .../hadoop/ozone/ksm/VolumeManagerImpl.java     | 153 +++++++
 .../ozone/ksm/exceptions/KSMException.java      | 103 +++++
 .../ozone/ksm/exceptions/package-info.java      |  19 +
 ...ceManagerProtocolServerSideTranslatorPB.java |  89 +++-
 .../web/storage/DistributedStorageHandler.java  |  54 +--
 .../hadoop/ozone/web/utils/OzoneUtils.java      |  23 ++
 .../org/apache/hadoop/utils/LevelDBStore.java   |  15 +-
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |  37 +-
 .../ozone/web/TestDistributedOzoneVolumes.java  | 173 ++++++++
 .../hadoop/ozone/web/TestLocalOzoneVolumes.java | 182 ++++++++
 .../hadoop/ozone/web/TestOzoneHelper.java       | 354 ++++++++++++++++
 .../hadoop/ozone/web/TestOzoneVolumes.java      | 413 -------------------
 22 files changed, 1735 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java
new file mode 100644
index 0000000..359c2d5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmVolumeArgs.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+
+/**
+ * A class that encapsulates the KsmVolumeArgs Args.
+ */
+public final class KsmVolumeArgs {
+  private final String adminName;
+  private final String ownerName;
+  private final String volume;
+  private final long quotaInBytes;
+  private final Map<String, String> keyValueMap;
+
+  /**
+   * Private constructor, constructed via builder.
+   * @param adminName  - Administrator's name.
+   * @param ownerName  - Volume owner's name
+   * @param volume - volume name
+   * @param quotaInBytes - Volume Quota in bytes.
+   * @param  keyValueMap - keyValue map.
+   */
+  private KsmVolumeArgs(String adminName, String ownerName, String volume,
+                        long quotaInBytes, Map<String, String> keyValueMap) {
+    this.adminName = adminName;
+    this.ownerName = ownerName;
+    this.volume = volume;
+    this.quotaInBytes = quotaInBytes;
+    this.keyValueMap = keyValueMap;
+  }
+
+  /**
+   * Returns the Admin Name.
+   * @return String.
+   */
+  public String getAdminName() {
+    return adminName;
+  }
+
+  /**
+   * Returns the owner Name.
+   * @return String
+   */
+  public String getOwnerName() {
+    return ownerName;
+  }
+
+  /**
+   * Returns the volume Name.
+   * @return String
+   */
+  public String getVolume() {
+    return volume;
+  }
+
+  /**
+   * Returns Quota in Bytes.
+   * @return long, Quota in bytes.
+   */
+  public long getQuotaInBytes() {
+    return quotaInBytes;
+  }
+
+  public Map<String, String> getKeyValueMap() {
+    return keyValueMap;
+  }
+
+  /**
+   * Returns new builder class that builds a KsmVolumeArgs.
+   *
+   * @return Builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for KsmVolumeArgs.
+   */
+  public static class Builder {
+    private String adminName;
+    private String ownerName;
+    private String volume;
+    private long quotaInBytes;
+    private Map<String, String> keyValueMap;
+
+    /**
+     * Constructs a builder.
+     */
+    Builder() {
+      keyValueMap = new HashMap<>();
+    }
+
+    public Builder setAdminName(String adminName) {
+      this.adminName = adminName;
+      return this;
+    }
+
+    public Builder setOwnerName(String ownerName) {
+      this.ownerName = ownerName;
+      return this;
+    }
+
+    public Builder setVolume(String volume) {
+      this.volume = volume;
+      return this;
+    }
+
+    public Builder setQuotaInBytes(long quotaInBytes) {
+      this.quotaInBytes = quotaInBytes;
+      return this;
+    }
+
+    public Builder addMetadata(String key, String value) {
+      keyValueMap.put(key, value); // overwrite if present.
+      return this;
+    }
+
+    /**
+     * Constructs a CreateVolumeArgument.
+     * @return CreateVolumeArgs.
+     */
+    public KsmVolumeArgs build() {
+      Preconditions.checkNotNull(adminName);
+      Preconditions.checkNotNull(ownerName);
+      Preconditions.checkNotNull(volume);
+      return new KsmVolumeArgs(adminName, ownerName, volume, quotaInBytes,
+          keyValueMap);
+    }
+  }
+
+  public VolumeInfo getProtobuf() {
+    List<KeyValue> list = new LinkedList<>();
+    for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
+      list.add(KeyValue.newBuilder().setKey(entry.getKey()).
+          setValue(entry.getValue()).build());
+    }
+
+    return VolumeInfo.newBuilder()
+        .setAdminName(adminName)
+        .setOwnerName(ownerName)
+        .setVolume(volume)
+        .setQuotaInBytes(quotaInBytes)
+        .addAllMetadata(list)
+        .build();
+  }
+
+  public static KsmVolumeArgs getFromProtobuf(VolumeInfo volInfo) {
+    return new KsmVolumeArgs(volInfo.getAdminName(), volInfo.getOwnerName(),
+        volInfo.getVolume(), volInfo.getQuotaInBytes(),
+        volInfo.getMetadataList().stream()
+            .collect(Collectors.toMap(KeyValue::getKey,
+                KeyValue::getValue)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
index 3a07d97..546b6c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeyspaceManagerProtocol.java
@@ -17,8 +17,7 @@
  */
 package org.apache.hadoop.ksm.protocol;
 
-import org.apache.hadoop.ksm.helpers.VolumeArgs;
-
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import java.io.IOException;
 import java.util.List;
 
@@ -32,7 +31,7 @@ public interface KeyspaceManagerProtocol {
    * @param args - Arguments to create Volume.
    * @throws IOException
    */
-  void createVolume(VolumeArgs args) throws IOException;
+  void createVolume(KsmVolumeArgs args) throws IOException;
 
   /**
    * Changes the owner of a volume.
@@ -64,7 +63,7 @@ public interface KeyspaceManagerProtocol {
    * @return VolumeArgs or exception is thrown.
    * @throws IOException
    */
-  VolumeArgs getVolumeinfo(String volume) throws IOException;
+  KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
 
   /**
    * Deletes the an exisiting empty volume.
@@ -82,7 +81,7 @@ public interface KeyspaceManagerProtocol {
    * @return List of Volumes.
    * @throws IOException
    */
-  List<VolumeArgs> listVolumeByUser(String userName, String prefix, String
+  List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix, String
       prevKey, long maxKeys) throws IOException;
 
   /**
@@ -93,6 +92,6 @@ public interface KeyspaceManagerProtocol {
    * @return List of Volumes.
    * @throws IOException
    */
-  List<VolumeArgs> listAllVolumes(String prefix, String
+  List<KsmVolumeArgs> listAllVolumes(String prefix, String
       prevKey, long maxKeys) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..beb8b06
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ksm.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.Status;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *  The client side implementation of KeyspaceManagerProtocol.
+ */
+
+@InterfaceAudience.Private
+public final class KeySpaceManagerProtocolClientSideTranslatorPB
+    implements KeyspaceManagerProtocol, ProtocolTranslator, Closeable {
+
+  /**
+   * RpcController is not used and hence is set to null.
+   */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+
+  private final KeySpaceManagerProtocolPB rpcProxy;
+
+  /**
+   * Constructor for KeySpaceManger Client.
+   * @param rpcProxy
+   */
+  public KeySpaceManagerProtocolClientSideTranslatorPB(
+      KeySpaceManagerProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   * <p>
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the
+   * close may fail require careful attention. It is strongly advised
+   * to relinquish the underlying resources and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+   * the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  /**
+   * Creates a volume.
+   *
+   * @param args - Arguments to create Volume.
+   * @throws IOException
+   */
+  @Override
+  public void createVolume(KsmVolumeArgs args) throws IOException {
+    CreateVolumeRequest.Builder req =
+        CreateVolumeRequest.newBuilder();
+    VolumeInfo volumeInfo = args.getProtobuf();
+    req.setVolumeInfo(volumeInfo);
+
+    final CreateVolumeResponse resp;
+    try {
+      resp = rpcProxy.createVolume(NULL_RPC_CONTROLLER,
+          req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+
+    if (resp.getStatus() != Status.OK) {
+      throw new IOException("Volume creation failed error" + resp.getStatus());
+    }
+  }
+
+  /**
+   * Changes the owner of a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param owner - Name of the owner.
+   * @throws IOException
+   */
+  @Override
+  public void setOwner(String volume, String owner) throws IOException {
+
+  }
+
+  /**
+   * Changes the Quota on a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param quota - Quota in bytes.
+   * @throws IOException
+   */
+  @Override
+  public void setQuota(String volume, long quota) throws IOException {
+
+  }
+
+  /**
+   * Checks if the specified user can access this volume.
+   *
+   * @param volume - volume
+   * @param userName - user name
+   * @throws IOException
+   */
+  @Override
+  public void checkVolumeAccess(String volume, String userName) throws
+      IOException {
+
+  }
+
+  /**
+   * Gets the volume information.
+   *
+   * @param volume - Volume name.s
+   * @return KsmVolumeArgs or exception is thrown.
+   * @throws IOException
+   */
+  @Override
+  public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
+    return null;
+  }
+
+  /**
+   * Deletes the an exisiting empty volume.
+   *
+   * @param volume - Name of the volume.
+   * @throws IOException
+   */
+  @Override
+  public void deleteVolume(String volume) throws IOException {
+
+  }
+
+  /**
+   * Lists volume owned by a specific user.
+   *
+   * @param userName - user name
+   * @param prefix - Filter prefix -- Return only entries that match this.
+   * @param prevKey - Previous key -- List starts from the next from the
+   * prevkey
+   * @param maxKeys - Max number of keys to return.
+   * @return List of Volumes.
+   * @throws IOException
+   */
+  @Override
+  public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
+                                              String prevKey, long maxKeys)
+      throws IOException {
+    return null;
+  }
+
+  /**
+   * Lists volume all volumes in the cluster.
+   *
+   * @param prefix - Filter prefix -- Return only entries that match this.
+   * @param prevKey - Previous key -- List starts from the next from the
+   * prevkey
+   * @param maxKeys - Max number of keys to return.
+   * @return List of Volumes.
+   * @throws IOException
+   */
+  @Override
+  public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, long
+      maxKeys) throws IOException {
+    return null;
+  }
+
+  /**
+   * Return the proxy object underlying this protocol translator.
+   *
+   * @return the proxy object underlying this protocol translator.
+   */
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
index 3ac25b8..4ce7275 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
@@ -41,8 +41,11 @@ enum Status {
     VOLUME_NOT_UNIQUE = 2;
     VOLUME_NOT_FOUND = 3;
     VOLUME_NOT_EMPTY = 4;
-    USER_NOT_FOUND = 5;
-    ACCESS_DENIED = 6;
+    VOLUME_ALREADY_EXISTS = 5;
+    USER_NOT_FOUND = 6;
+    USER_TOO_MANY_VOLUMES = 7;
+    ACCESS_DENIED = 8;
+    INTERNAL_ERROR = 9;
 }
 
 
@@ -66,6 +69,10 @@ message CreateVolumeResponse {
     required Status status = 1;
 }
 
+message VolumeList {
+    repeated string volumeNames = 1;
+}
+
 /**
     Changes the Volume Properties -- like ownership and quota for a volume.
 */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
index cb4880d..f44b3aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
@@ -30,6 +30,9 @@ import java.util.Map;
 import com.sun.jersey.api.container.ContainerFactory;
 import com.sun.jersey.api.core.ApplicationAdapter;
 
+import org.apache.hadoop.ksm.protocolPB
+    .KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.slf4j.Logger;
@@ -62,6 +65,8 @@ public final class ObjectStoreHandler implements Closeable {
       LoggerFactory.getLogger(ObjectStoreJerseyContainer.class);
 
   private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
+  private final KeySpaceManagerProtocolClientSideTranslatorPB
+      keySpaceManagerClient;
   private final StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
 
@@ -84,21 +89,34 @@ public final class ObjectStoreHandler implements Closeable {
     if (OzoneConsts.OZONE_HANDLER_DISTRIBUTED.equalsIgnoreCase(shType)) {
       RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
           ProtobufRpcEngine.class);
-      long version =
+      long scmVersion =
           RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
-      InetSocketAddress address =
+      InetSocketAddress scmAddress =
           OzoneClientUtils.getScmAddressForClients(conf);
       this.storageContainerLocationClient =
           new StorageContainerLocationProtocolClientSideTranslatorPB(
-              RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
-              address, UserGroupInformation.getCurrentUser(), conf,
-              NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
+              RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
+              scmAddress, UserGroupInformation.getCurrentUser(), conf,
+              NetUtils.getDefaultSocketFactory(conf),
+              Client.getRpcTimeout(conf)));
+      long ksmVersion =
+          RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
+      InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
+      this.keySpaceManagerClient =
+          new KeySpaceManagerProtocolClientSideTranslatorPB(
+              RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
+              ksmAddress, UserGroupInformation.getCurrentUser(), conf,
+              NetUtils.getDefaultSocketFactory(conf),
+              Client.getRpcTimeout(conf)));
+
       storageHandler = new DistributedStorageHandler(new OzoneConfiguration(),
-          this.storageContainerLocationClient);
+          this.storageContainerLocationClient,
+          this.keySpaceManagerClient);
     } else {
       if (OzoneConsts.OZONE_HANDLER_LOCAL.equalsIgnoreCase(shType)) {
         storageHandler = new LocalStorageHandler(conf);
         this.storageContainerLocationClient = null;
+        this.keySpaceManagerClient = null;
       } else {
         throw new IllegalArgumentException(
             String.format("Unrecognized value for %s: %s,"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index e10a3e2..5f960b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -80,6 +80,7 @@ public final class OzoneConsts {
   public static final String BLOCK_DB = "block.db";
   public static final String NODEPOOL_DB = "nodepool.db";
   public static final String OPEN_CONTAINERS_DB = "openContainers.db";
+  public static final String KSM_DB_NAME = "ksm.db";
 
   /**
    * Supports Bucket Versioning.
@@ -87,7 +88,7 @@ public final class OzoneConsts {
   public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
 
   /**
-   * Ozone handler types
+   * Ozone handler types.
    */
   public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
   public static final String OZONE_HANDLER_LOCAL = "local";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
index a23d47b..a773b17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
@@ -37,4 +37,13 @@ public final class KSMConfigKeys {
   public static final String OZONE_KSM_BIND_HOST_DEFAULT =
       "0.0.0.0";
   public static final int OZONE_KSM_PORT_DEFAULT = 9862;
+
+  // LevelDB cache file uses an off-heap cache in LevelDB of 128 MB.
+  public static final String OZONE_KSM_DB_CACHE_SIZE_MB =
+      "ozone.ksm.leveldb.cache.size.mb";
+  public static final int OZONE_KSM_DB_CACHE_SIZE_DEFAULT = 128;
+
+  public static final String OZONE_KSM_USER_MAX_VOLUME =
+      "ozone.ksm.user.max.volume";
+  public static final int OZONE_KSM_USER_MAX_VOLUME_DEFAULT = 1024;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
new file mode 100644
index 0000000..c75c8fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining KeySpaceManager statistics.
+ */
+public class KSMMetrics {
+  // KSM op metrics
+  private @Metric MutableCounterLong numVolumeCreates;
+
+  // Failure Metrics
+  private @Metric MutableCounterLong numVolumeCreateFails;
+
+  public KSMMetrics() {
+  }
+
+  public static KSMMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register("KSMMetrics",
+        "Key Space Manager Metrics",
+        new KSMMetrics());
+  }
+
+  public void incNumVolumeCreates() {
+    numVolumeCreates.incr();
+  }
+
+  public void incNumVolumeCreateFails() {
+    numVolumeCreates.incr();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeCreates() {
+    return numVolumeCreates.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeCreateFails() {
+    return numVolumeCreateFails.value();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 7250885..2ffeee7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -17,12 +17,13 @@
 
 package org.apache.hadoop.ozone.ksm;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ksm.helpers.VolumeArgs;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.ozone.OzoneClientUtils;
@@ -38,13 +39,15 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
     .OZONE_KSM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
     .OZONE_KSM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
-    .KeyspaceManagerService.newReflectiveBlockingService;
+import static org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.KeyspaceManagerService
+    .newReflectiveBlockingService;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 /**
@@ -52,12 +55,13 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
  */
 @InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
 public class KeySpaceManager implements KeyspaceManagerProtocol {
-  // TODO: Support JMX
   private static final Logger LOG =
       LoggerFactory.getLogger(KeySpaceManager.class);
 
   private final RPC.Server ksmRpcServer;
   private final InetSocketAddress ksmRpcAddress;
+  private final VolumeManager volumeManager;
+  private final KSMMetrics metrics;
 
   public KeySpaceManager(OzoneConfiguration conf) throws IOException {
     final int handlerCount = conf.getInt(OZONE_KSM_HANDLER_COUNT_KEY,
@@ -75,8 +79,8 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
         handlerCount);
     ksmRpcAddress = updateListenAddress(conf,
         OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
-
-    //TODO : Add call to register MXBean for JMX.
+    volumeManager = new VolumeManagerImpl(this, conf);
+    metrics = KSMMetrics.create();
   }
 
   /**
@@ -108,6 +112,19 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
     return rpcServer;
   }
 
+  public KSMMetrics getMetrics() {
+    return metrics;
+  }
+
+  /**
+   * Returns listening address of Key Space Manager RPC server.
+   *
+   * @return listen address of Key Space Manager RPC server
+   */
+  @VisibleForTesting
+  public InetSocketAddress getClientRpcAddress() {
+    return ksmRpcAddress;
+  }
   /**
    * Main entry point for starting KeySpaceManager.
    *
@@ -168,10 +185,23 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
   public void start() {
     LOG.info(buildRpcServerStartMessage("KeyspaceManager RPC server",
         ksmRpcAddress));
+    volumeManager.start();
     ksmRpcServer.start();
   }
 
   /**
+   * Stop service.
+   */
+  public void stop() {
+    try {
+      ksmRpcServer.stop();
+      volumeManager.stop();
+    } catch (IOException e) {
+      LOG.info("Key Space Manager stop failed.", e);
+    }
+  }
+
+  /**
    * Wait until service has completed shutdown.
    */
   public void join() {
@@ -179,7 +209,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
       ksmRpcServer.join();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      LOG.info("Interrupted during KeyspaceManager join.");
+      LOG.info("Interrupted during KeyspaceManager join.", e);
     }
   }
 
@@ -190,8 +220,9 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
    * @throws IOException
    */
   @Override
-  public void createVolume(VolumeArgs args) throws IOException {
-
+  public void createVolume(KsmVolumeArgs args) throws IOException {
+    metrics.incNumVolumeCreates();
+    volumeManager.createVolume(args);
   }
 
   /**
@@ -239,7 +270,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
    * @throws IOException
    */
   @Override
-  public VolumeArgs getVolumeinfo(String volume) throws IOException {
+  public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
     return null;
   }
 
@@ -266,7 +297,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
    * @throws IOException
    */
   @Override
-  public List<VolumeArgs> listVolumeByUser(String userName, String prefix,
+  public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
       String prevKey, long maxKeys) throws IOException {
     return null;
   }
@@ -282,7 +313,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
    * @throws IOException
    */
   @Override
-  public List<VolumeArgs> listAllVolumes(String prefix, String prevKey, long
+  public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, long
       maxKeys) throws IOException {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
new file mode 100644
index 0000000..e5bb4bd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManager.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+
+import java.io.IOException;
+
+/**
+ * KSM volume manager interface.
+ */
+public interface VolumeManager {
+  /**
+   * Start volume manager.
+   */
+  void start();
+
+  /**
+   * Stop volume manager.
+   */
+  void stop() throws IOException;
+
+  /**
+   * Create a new volume.
+   * @param args - Volume args to create a volume
+   */
+  void createVolume(KsmVolumeArgs args) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
new file mode 100644
index 0000000..1e63127
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.utils.LevelDBStore;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
+import static org.apache.hadoop.ozone.ksm
+    .KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.ksm
+    .KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.ozone.ksm
+    .KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
+import static org.apache.hadoop.ozone.ksm
+    .KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME;
+import static org.apache.hadoop.ozone.ksm.exceptions
+    .KSMException.ResultCodes;
+
+/**
+ * KSM volume management code.
+ */
+public class VolumeManagerImpl implements VolumeManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(VolumeManagerImpl.class);
+
+  private final KeySpaceManager ksm;
+  private final LevelDBStore store;
+  private final ReadWriteLock lock;
+  private final int maxUserVolumeCount;
+
+  /**
+   * Constructor.
+   * @param conf - Ozone configuration.
+   * @throws IOException
+   */
+  public VolumeManagerImpl(KeySpaceManager ksm, OzoneConfiguration conf)
+      throws IOException {
+    File metaDir = OzoneUtils.getScmMetadirPath(conf);
+    final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
+        OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
+    Options options = new Options();
+    options.cacheSize(cacheSize * OzoneConsts.MB);
+    File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
+    this.ksm = ksm;
+    this.store = new LevelDBStore(ksmDBFile, options);
+    lock = new ReentrantReadWriteLock();
+    this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
+        OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void stop() throws IOException {
+    store.close();
+  }
+
+  /**
+   * Creates a volume.
+   * @param args - KsmVolumeArgs.
+   */
+  @Override
+  public void createVolume(KsmVolumeArgs args) throws IOException {
+    Preconditions.checkNotNull(args);
+    lock.writeLock().lock();
+    WriteBatch batch = store.createWriteBatch();
+    try {
+      byte[] volumeName = store.get(DFSUtil.string2Bytes(args.getVolume()));
+
+      // Check of the volume already exists
+      if(volumeName != null) {
+        LOG.error("volume:{} already exists", args.getVolume());
+        throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
+      }
+
+      // Next count the number of volumes for the user
+      String dbUserName = "$" + args.getOwnerName();
+      byte[] volumeList  = store.get(DFSUtil.string2Bytes(dbUserName));
+      List prevVolList;
+      if (volumeList != null) {
+        VolumeList vlist = VolumeList.parseFrom(volumeList);
+        prevVolList = vlist.getVolumeNamesList();
+      } else {
+        prevVolList = new LinkedList();
+      }
+
+      if (prevVolList.size() >= maxUserVolumeCount) {
+        LOG.error("Too many volumes for user:{}", args.getOwnerName());
+        throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
+      }
+
+      // Commit the volume information to leveldb
+      VolumeInfo volumeInfo = args.getProtobuf();
+      batch.put(DFSUtil.string2Bytes(args.getVolume()),
+                                     volumeInfo.toByteArray());
+
+      prevVolList.add(args.getVolume());
+      VolumeList newVolList = VolumeList.newBuilder()
+              .addAllVolumeNames(prevVolList).build();
+      batch.put(DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray());
+      store.commitWriteBatch(batch);
+      LOG.info("created volume:{} user:{}",
+                                  args.getVolume(), args.getOwnerName());
+    } catch (IOException | DBException ex) {
+      ksm.getMetrics().incNumVolumeCreateFails();
+      LOG.error("Volume creation failed for user:{} volname:{}",
+                                args.getOwnerName(), args.getVolume(), ex);
+      throw ex;
+    } finally {
+      store.closeWriteBatch(batch);
+      lock.writeLock().unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
new file mode 100644
index 0000000..1a1b3a9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.ksm.exceptions;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown by KSM.
+ */
+public class KSMException extends IOException {
+  private final KSMException.ResultCodes result;
+
+  /**
+   * Constructs an {@code IOException} with {@code null}
+   * as its error detail message.
+   */
+  public KSMException(KSMException.ResultCodes result) {
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   * the
+   * {@link #getMessage()} method)
+   */
+  public KSMException(String message, KSMException.ResultCodes result) {
+    super(message);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message
+   * and cause.
+   * <p>
+   * <p> Note that the detail message associated with {@code cause} is
+   * <i>not</i> automatically incorporated into this exception's detail
+   * message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   * the
+   * {@link #getMessage()} method)
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   * #getCause()} method).  (A null value is permitted, and indicates that the
+   * cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public KSMException(String message, Throwable cause,
+                      KSMException.ResultCodes result) {
+    super(message, cause);
+    this.result = result;
+  }
+
+  /**
+   * Constructs an {@code IOException} with the specified cause and a
+   * detail message of {@code (cause==null ? null : cause.toString())}
+   * (which typically contains the class and detail message of {@code cause}).
+   * This constructor is useful for IO exceptions that are little more
+   * than wrappers for other throwables.
+   *
+   * @param cause The cause (which is saved for later retrieval by the {@link
+   * #getCause()} method).  (A null value is permitted, and indicates that the
+   * cause is nonexistent or unknown.)
+   * @since 1.6
+   */
+  public KSMException(Throwable cause, KSMException.ResultCodes result) {
+    super(cause);
+    this.result = result;
+  }
+
+  /**
+   * Returns resultCode.
+   * @return ResultCode
+   */
+  public KSMException.ResultCodes getResult() {
+    return result;
+  }
+
+  /**
+   * Error codes to make it easy to decode these exceptions.
+   */
+  public enum ResultCodes {
+    FAILED_TOO_MANY_USER_VOLUMES,
+    FAILED_VOLUME_ALREADY_EXISTS,
+    FAILED_INTERNAL_ERROR
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
new file mode 100644
index 0000000..09fd87f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.ksm.exceptions;
+// Exception thrown by KSM.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
index 0725d25..aa52c17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeyspaceManagerProtocolServerSideTranslatorPB.java
@@ -18,9 +18,40 @@ package org.apache.hadoop.ozone.protocolPB;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
 import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
-import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CreateVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.InfoVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.DeleteVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.ListVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.ListVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.Status;
+
+
+import java.io.IOException;
 
 /**
  * This class is the server-side translator that forwards requests received on
@@ -42,47 +73,61 @@ public class KeyspaceManagerProtocolServerSideTranslatorPB implements
   }
 
   @Override
-  public KeySpaceManagerProtocolProtos.CreateVolumeResponse createVolume(
-      RpcController controller, KeySpaceManagerProtocolProtos
-      .CreateVolumeRequest
-      request) throws ServiceException {
-    return null;
+  public CreateVolumeResponse createVolume(
+      RpcController controller, CreateVolumeRequest request)
+      throws ServiceException {
+    CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder();
+    resp.setStatus(Status.OK);
+    try {
+      impl.createVolume(KsmVolumeArgs.getFromProtobuf(request.getVolumeInfo()));
+    } catch (IOException e) {
+      if (e instanceof KSMException) {
+        KSMException ksmException = (KSMException)e;
+        if (ksmException.getResult() ==
+            ResultCodes.FAILED_VOLUME_ALREADY_EXISTS) {
+          resp.setStatus(Status.VOLUME_ALREADY_EXISTS);
+        } else if (ksmException.getResult() ==
+            ResultCodes.FAILED_TOO_MANY_USER_VOLUMES) {
+          resp.setStatus(Status.USER_TOO_MANY_VOLUMES);
+        }
+      } else {
+        resp.setStatus(Status.INTERNAL_ERROR);
+      }
+    }
+    return resp.build();
   }
 
   @Override
-  public KeySpaceManagerProtocolProtos.SetVolumePropertyResponse
-      setVolumeProperty(RpcController controller, KeySpaceManagerProtocolProtos
-      .SetVolumePropertyRequest request) throws ServiceException {
+  public SetVolumePropertyResponse setVolumeProperty(
+      RpcController controller, SetVolumePropertyRequest request)
+      throws ServiceException {
     return null;
   }
 
   @Override
-  public KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse
-      checkVolumeAccess(RpcController controller, KeySpaceManagerProtocolProtos
-      .CheckVolumeAccessRequest request) throws ServiceException {
+  public CheckVolumeAccessResponse checkVolumeAccess(
+      RpcController controller, CheckVolumeAccessRequest request)
+      throws ServiceException {
     return null;
   }
 
   @Override
-  public KeySpaceManagerProtocolProtos.InfoVolumeResponse infoVolume(
-      RpcController controller,
-      KeySpaceManagerProtocolProtos.InfoVolumeRequest request)
+  public InfoVolumeResponse infoVolume(
+      RpcController controller, InfoVolumeRequest request)
       throws ServiceException {
     return null;
   }
 
   @Override
-  public KeySpaceManagerProtocolProtos.DeleteVolumeResponse deleteVolume(
-      RpcController controller, KeySpaceManagerProtocolProtos
-      .DeleteVolumeRequest
-      request) throws ServiceException {
+  public DeleteVolumeResponse deleteVolume(
+      RpcController controller, DeleteVolumeRequest request)
+      throws ServiceException {
     return null;
   }
 
   @Override
-  public KeySpaceManagerProtocolProtos.ListVolumeResponse listVolumes(
-      RpcController controller,
-      KeySpaceManagerProtocolProtos.ListVolumeRequest request)
+  public ListVolumeResponse listVolumes(
+      RpcController controller, ListVolumeRequest request)
       throws ServiceException {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index f30f2ae..e96d3d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRespons
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -47,7 +49,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.Date;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.Locale;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
 import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
@@ -62,7 +70,9 @@ public final class DistributedStorageHandler implements StorageHandler {
       LoggerFactory.getLogger(DistributedStorageHandler.class);
 
   private final StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocation;
+      storageContainerLocationClient;
+  private final KeySpaceManagerProtocolClientSideTranslatorPB
+      keySpaceManagerClient;
   private final XceiverClientManager xceiverClientManager;
 
   private int chunkSize;
@@ -72,11 +82,15 @@ public final class DistributedStorageHandler implements StorageHandler {
    *
    * @param conf configuration
    * @param storageContainerLocation StorageContainerLocationProtocol proxy
+   * @param keySpaceManagerClient KeySpaceManager proxy
    */
   public DistributedStorageHandler(OzoneConfiguration conf,
       StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocation) {
-    this.storageContainerLocation = storageContainerLocation;
+                                       storageContainerLocation,
+      KeySpaceManagerProtocolClientSideTranslatorPB
+                                       keySpaceManagerClient) {
+    this.keySpaceManagerClient = keySpaceManagerClient;
+    this.storageContainerLocationClient = storageContainerLocation;
     this.xceiverClientManager = new XceiverClientManager(conf);
 
     chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@@ -92,21 +106,15 @@ public final class DistributedStorageHandler implements StorageHandler {
 
   @Override
   public void createVolume(VolumeArgs args) throws IOException, OzoneException {
-    String containerKey = buildContainerKey(args.getVolumeName());
-    XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
-    try {
-      VolumeInfo volume = new VolumeInfo();
-      volume.setVolumeName(args.getVolumeName());
-      volume.setQuota(args.getQuota());
-      volume.setOwner(new VolumeOwner(args.getUserName()));
-      volume.setCreatedOn(dateToString(new Date()));
-      volume.setCreatedBy(args.getAdminName());
-      KeyData containerKeyData = fromVolumeToContainerKeyData(
-          xceiverClient.getPipeline().getContainerName(), containerKey, volume);
-      putKey(xceiverClient, containerKeyData, args.getRequestID());
-    } finally {
-      xceiverClientManager.releaseClient(xceiverClient);
-    }
+    long quota = args.getQuota() == null ?
+        Long.MAX_VALUE : args.getQuota().sizeInBytes();
+    KsmVolumeArgs volumeArgs = KsmVolumeArgs.newBuilder()
+        .setAdminName(args.getAdminName())
+        .setOwnerName(args.getUserName())
+        .setVolume(args.getVolumeName())
+        .setQuotaInBytes(quota)
+        .build();
+    keySpaceManagerClient.createVolume(volumeArgs);
   }
 
   @Override
@@ -293,9 +301,9 @@ public final class DistributedStorageHandler implements StorageHandler {
   }
 
   /**
-   * Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline} of nodes
-   * capable of serving container protocol operations.  The container is
-   * selected based on the specified container key.
+   * Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline}
+   * of nodes capable of serving container protocol operations.
+   * The container is selected based on the specified container key.
    *
    * @param containerKey container key
    * @return XceiverClient connected to a container
@@ -304,7 +312,7 @@ public final class DistributedStorageHandler implements StorageHandler {
   private XceiverClientSpi acquireXceiverClient(String containerKey)
       throws IOException {
     Set<LocatedContainer> locatedContainers =
-        storageContainerLocation.getStorageContainerLocations(
+        storageContainerLocationClient.getStorageContainerLocations(
             new HashSet<>(Arrays.asList(containerKey)));
     Pipeline pipeline = newPipelineFromLocatedContainer(
         locatedContainers.iterator().next());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
index d94e6a1..fcd260f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java
@@ -18,7 +18,10 @@
 
 package org.apache.hadoop.ozone.web.utils;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
@@ -30,6 +33,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Request;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.MediaType;
+import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
@@ -272,4 +276,23 @@ public final class OzoneUtils {
         .build();
 
   }
+
+  /**
+   * Checks and creates Ozone Metadir Path if it does not exist.
+   *
+   * @param conf - Configuration
+   *
+   * @return File MetaDir
+   */
+  public static File getScmMetadirPath(Configuration conf) {
+    String metaDirPath = conf.getTrimmed(OzoneConfigKeys
+        .OZONE_CONTAINER_METADATA_DIRS);
+    Preconditions.checkNotNull(metaDirPath);
+    File dirPath = new File(metaDirPath);
+    if (!dirPath.exists() && !dirPath.mkdirs()) {
+      throw new IllegalArgumentException("Unable to create paths. Path: " +
+          dirPath);
+    }
+    return dirPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
index 21d0b03..e2049a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
@@ -19,7 +19,11 @@
 package org.apache.hadoop.utils;
 
 import org.fusesource.leveldbjni.JniDBFactory;
-import org.iq80.leveldb.*;
+import org.iq80.leveldb.WriteBatch;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.WriteOptions;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
 
 import java.io.Closeable;
 import java.io.File;
@@ -32,6 +36,7 @@ public class LevelDBStore implements Closeable {
   private DB db;
   private final File dbFile;
   private final Options dbOptions;
+  private final WriteOptions writeOptions;
 
   /**
    * Opens a DB file.
@@ -49,6 +54,7 @@ public class LevelDBStore implements Closeable {
       throw new IOException("Db is null");
     }
     this.dbFile = dbPath;
+    this.writeOptions = new WriteOptions().sync(true);
   }
 
   /**
@@ -65,6 +71,7 @@ public class LevelDBStore implements Closeable {
       throw new IOException("Db is null");
     }
     this.dbFile = dbPath;
+    this.writeOptions = new WriteOptions().sync(true);
   }
 
 
@@ -75,9 +82,7 @@ public class LevelDBStore implements Closeable {
    * @param value - value
    */
   public void put(byte[] key, byte[] value) {
-    WriteOptions options = new WriteOptions();
-    options.sync(true);
-    db.put(key, value, options);
+    db.put(key, value, writeOptions);
   }
 
   /**
@@ -167,7 +172,7 @@ public class LevelDBStore implements Closeable {
    * @param wb
    */
   public void commitWriteBatch(WriteBatch wb) {
-    db.write(wb);
+    db.write(wb, writeOptions);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 64f4bb2..9caec2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -26,8 +26,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.ksm.KeySpaceManager;
 import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.ozone.scm.StorageContainerManager;
 import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
@@ -67,6 +70,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
 
   private final OzoneConfiguration conf;
   private final StorageContainerManager scm;
+  private final KeySpaceManager ksm;
   private final Path tempPath;
 
   /**
@@ -76,11 +80,13 @@ public final class MiniOzoneCluster extends MiniDFSCluster
    * @param scm     StorageContainerManager, already running
    * @throws IOException if there is an I/O error
    */
-  private MiniOzoneCluster(Builder builder, StorageContainerManager scm)
+  private MiniOzoneCluster(Builder builder, StorageContainerManager scm,
+                           KeySpaceManager ksm)
       throws IOException {
     super(builder);
     this.conf = builder.conf;
     this.scm = scm;
+    this.ksm = ksm;
     tempPath = Paths.get(builder.getPath(), builder.getRunID());
   }
 
@@ -126,18 +132,28 @@ public final class MiniOzoneCluster extends MiniDFSCluster
   public void shutdown() {
     super.shutdown();
     LOG.info("Shutting down the Mini Ozone Cluster");
-    if (scm == null) {
-      return;
+
+    if (ksm != null) {
+      LOG.info("Shutting down the keySpaceManager");
+      ksm.stop();
+      ksm.join();
+    }
+
+    if (scm != null) {
+      LOG.info("Shutting down the StorageContainerManager");
+      scm.stop();
+      scm.join();
     }
-    LOG.info("Shutting down the StorageContainerManager");
-    scm.stop();
-    scm.join();
   }
 
   public StorageContainerManager getStorageContainerManager() {
     return this.scm;
   }
 
+  public KeySpaceManager getKeySpaceManager() {
+    return this.ksm;
+  }
+
   /**
    * Creates an {@link OzoneClient} connected to this cluster's REST service.
    * Callers take ownership of the client and must close it when done.
@@ -336,6 +352,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
 
       conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
       conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
+      conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");
 
       // Use random ports for ozone containers in mini cluster,
       // in order to launch multiple container servers per node.
@@ -344,11 +361,15 @@ public final class MiniOzoneCluster extends MiniDFSCluster
 
       StorageContainerManager scm = new StorageContainerManager(conf);
       scm.start();
+
+      KeySpaceManager ksm = new KeySpaceManager(conf);
+      ksm.start();
+
       String addressString =  scm.getDatanodeRpcAddress().getHostString() +
           ":" + scm.getDatanodeRpcAddress().getPort();
       conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, addressString);
 
-      MiniOzoneCluster cluster = new MiniOzoneCluster(this, scm);
+      MiniOzoneCluster cluster = new MiniOzoneCluster(this, scm, ksm);
       try {
         cluster.waitOzoneReady();
         if (waitForChillModeFinish) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
new file mode 100644
index 0000000..43175a9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestDistributedOzoneVolumes.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.web;
+
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Rule;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.junit.rules.Timeout;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Test ozone volume in the distributed storage handler scenario.
+ */
+public class TestDistributedOzoneVolumes extends TestOzoneHelper {
+  private static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestDistributedOzoneVolumes.class);
+  /**
+   * Set the timeout for every test.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300000);
+
+  private static MiniOzoneCluster cluster = null;
+  private static int port = 0;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = new MiniOzoneCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    DataNode dataNode = cluster.getDataNodes().get(0);
+    port = dataNode.getInfoPort();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Creates Volumes on Ozone Store.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateVolumes() throws IOException {
+    super.testCreateVolumes(port);
+    Assert.assertEquals(cluster.getKeySpaceManager()
+                          .getMetrics().getNumVolumeCreates(), 1);
+    Assert.assertEquals(cluster.getKeySpaceManager()
+                          .getMetrics().getNumVolumeCreateFails(), 0);
+  }
+
+  /**
+   * Create Volumes with Quota.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithQuota() throws IOException {
+    super.testCreateVolumesWithQuota(port);
+  }
+
+  /**
+   * Create Volumes with Invalid Quota.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithInvalidQuota() throws IOException {
+    super.testCreateVolumesWithInvalidQuota(port);
+  }
+
+  /**
+   * To create a volume a user name must be specified using OZONE_USER header.
+   * This test verifies that we get an error in case we call without a OZONE
+   * user name.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithInvalidUser() throws IOException {
+    super.testCreateVolumesWithInvalidUser(port);
+  }
+
+  /**
+   * Only Admins can create volumes in Ozone. This test uses simple userauth as
+   * backend and hdfs and root are admin users in the simple backend.
+   * <p>
+   * This test tries to create a volume as user bilbo.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesWithOutAdminRights() throws IOException {
+    super.testCreateVolumesWithOutAdminRights(port);
+  }
+
+  /**
+   * Create a bunch of volumes in a loop.
+   *
+   * @throws IOException
+   */
+  public void testCreateVolumesInLoop() throws IOException {
+    super.testCreateVolumesInLoop(port);
+  }
+  /**
+   * Get volumes owned by the user.
+   *
+   * @throws IOException
+   */
+  public void testGetVolumesByUser() throws IOException {
+    testGetVolumesByUser(port);
+  }
+
+  /**
+   * Admins can read volumes belonging to other users.
+   *
+   * @throws IOException
+   */
+  public void testGetVolumesOfAnotherUser() throws IOException {
+    super.testGetVolumesOfAnotherUser(port);
+  }
+
+  /**
+   * if you try to read volumes belonging to another user,
+   * then server always ignores it.
+   *
+   * @throws IOException
+   */
+  public void testGetVolumesOfAnotherUserShouldFail() throws IOException {
+    super.testGetVolumesOfAnotherUserShouldFail(port);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8393b4b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestLocalOzoneVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestLocalOzoneVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestLocalOzoneVolumes.java
new file mode 100644
index 0000000..78e6c0f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestLocalOzoneVolumes.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.web;
+
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.net.URL;
+
+/**
+ * Test ozone volume in the local storage handler scenario.
+ */
+public class TestLocalOzoneVolumes extends TestOzoneHelper {
+  /**
+   * Set the timeout for every test.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300000);
+
+  private static MiniOzoneCluster cluster = null;
+  private static int port = 0;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "local" , which uses a local directory to
+   * emulate Ozone backend.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    URL p = conf.getClass().getResource("");
+    String path = p.getPath()
+        .concat(TestLocalOzoneVolumes.class.getSimpleName());
+    path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
+        OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
+
+    conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
+    Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
+
+    cluster = new MiniOzoneCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL).build();
+    DataNode dataNode = cluster.getDataNodes().get(0);
+    port = dataNode.getInfoPort();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Creates Volumes on Ozone Store.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateVolumes() throws IOException {
+    super.testCreateVolumes(port);
+  }
+
+  /**
+   * Create Volumes with Quota.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateVolumesWithQuota() throws IOException {
+    super.testCreateVolumesWithQuota(port);
+  }
+
+  /**
+   * Create Volumes with Invalid Quota.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateVolumesWithInvalidQuota() throws IOException {
+    super.testCreateVolumesWithInvalidQuota(port);
+  }
+
+  /**
+   * To create a volume a user name must be specified using OZONE_USER header.
+   * This test verifies that we get an error in case we call without a OZONE
+   * user name.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateVolumesWithInvalidUser() throws IOException {
+    super.testCreateVolumesWithInvalidUser(port);
+  }
+
+  /**
+   * Only Admins can create volumes in Ozone. This test uses simple userauth as
+   * backend and hdfs and root are admin users in the simple backend.
+   * <p>
+   * This test tries to create a volume as user bilbo.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateVolumesWithOutAdminRights() throws IOException {
+    super.testCreateVolumesWithOutAdminRights(port);
+  }
+
+  /**
+   * Create a bunch of volumes in a loop.
+   *
+   * @throws IOException
+   */
+  //@Test
+  public void testCreateVolumesInLoop() throws IOException {
+    super.testCreateVolumesInLoop(port);
+  }
+  /**
+   * Get volumes owned by the user.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testGetVolumesByUser() throws IOException {
+    super.testGetVolumesByUser(port);
+  }
+
+  /**
+   * Admins can read volumes belonging to other users.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testGetVolumesOfAnotherUser() throws IOException {
+    super.testGetVolumesOfAnotherUser(port);
+  }
+
+  /**
+   * if you try to read volumes belonging to another user,
+   * then server always ignores it.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testGetVolumesOfAnotherUserShouldFail() throws IOException {
+    super.testGetVolumesOfAnotherUserShouldFail(port);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org