You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/08/08 20:36:41 UTC

[6/8] hadoop git commit: HDFS-12259. Ozone: OzoneClient: Refactor move ozone client from hadoop-hdfs to hadoop-hdfs-client. Contributed by Nandakumar.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
new file mode 100644
index 0000000..8e4ce92
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ChunkInputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Maintaining a list of ChunkInputStream. Read based on offset.
+ */
+public class ChunkGroupInputStream extends InputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChunkGroupInputStream.class);
+
+  private static final int EOF = -1;
+
+  private final ArrayList<ChunkInputStreamEntry> streamEntries;
+  private int currentStreamIndex;
+
+  public ChunkGroupInputStream() {
+    streamEntries = new ArrayList<>();
+    currentStreamIndex = 0;
+  }
+
+  @VisibleForTesting
+  public synchronized int getCurrentStreamIndex() {
+    return currentStreamIndex;
+  }
+
+  @VisibleForTesting
+  public long getRemainingOfIndex(int index) {
+    return streamEntries.get(index).getRemaining();
+  }
+
+  /**
+   * Append another stream to the end of the list.
+   *
+   * @param stream the stream instance.
+   * @param length the max number of bytes that should be written to this
+   *               stream.
+   */
+  public synchronized void addStream(InputStream stream, long length) {
+    streamEntries.add(new ChunkInputStreamEntry(stream, length));
+  }
+
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (streamEntries.size() <= currentStreamIndex) {
+      throw new IndexOutOfBoundsException();
+    }
+    ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
+    int data = entry.read();
+    return data;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    int totalReadLen = 0;
+    while (len > 0) {
+      if (streamEntries.size() <= currentStreamIndex) {
+        return totalReadLen == 0 ? EOF : totalReadLen;
+      }
+      ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
+      int readLen = Math.min(len, (int)current.getRemaining());
+      int actualLen = current.read(b, off, readLen);
+      // this means the underlying stream has nothing at all, return
+      if (actualLen == EOF) {
+        return totalReadLen > 0? totalReadLen : EOF;
+      }
+      totalReadLen += actualLen;
+      // this means there is no more data to read beyond this point, return
+      if (actualLen != readLen) {
+        return totalReadLen;
+      }
+      off += readLen;
+      len -= readLen;
+      if (current.getRemaining() <= 0) {
+        currentStreamIndex += 1;
+      }
+    }
+    return totalReadLen;
+  }
+
+  private static class ChunkInputStreamEntry extends InputStream {
+
+    private final InputStream inputStream;
+    private final long length;
+    private long currentPosition;
+
+
+    ChunkInputStreamEntry(InputStream chunkInputStream, long length) {
+      this.inputStream = chunkInputStream;
+      this.length = length;
+      this.currentPosition = 0;
+    }
+
+    synchronized long getRemaining() {
+      return length - currentPosition;
+    }
+
+    @Override
+    public synchronized int read(byte[] b, int off, int len)
+        throws IOException {
+      int readLen = inputStream.read(b, off, len);
+      currentPosition += readLen;
+      return readLen;
+    }
+
+    @Override
+    public synchronized int read() throws IOException {
+      int data = inputStream.read();
+      currentPosition += 1;
+      return data;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      inputStream.close();
+    }
+  }
+
+  public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
+      XceiverClientManager xceiverClientManager,
+      StorageContainerLocationProtocolClientSideTranslatorPB
+          storageContainerLocationClient, String requestId)
+      throws IOException {
+    int index = 0;
+    long length = 0;
+    String containerKey;
+    ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
+    for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) {
+      // check index as sanity check
+      Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex());
+      String containerName = ksmKeyLocationInfo.getContainerName();
+      Pipeline pipeline =
+          storageContainerLocationClient.getContainer(containerName);
+      XceiverClientSpi xceiverClient =
+          xceiverClientManager.acquireClient(pipeline);
+      boolean success = false;
+      containerKey = ksmKeyLocationInfo.getBlockID();
+      try {
+        LOG.debug("get key accessing {} {}",
+            xceiverClient.getPipeline().getContainerName(), containerKey);
+        ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
+            .containerKeyDataForRead(
+                xceiverClient.getPipeline().getContainerName(), containerKey);
+        ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
+            .getKey(xceiverClient, containerKeyData, requestId);
+        List<ContainerProtos.ChunkInfo> chunks =
+            response.getKeyData().getChunksList();
+        for (ContainerProtos.ChunkInfo chunk : chunks) {
+          length += chunk.getLen();
+        }
+        success = true;
+        ChunkInputStream inputStream = new ChunkInputStream(
+            containerKey, xceiverClientManager, xceiverClient,
+            chunks, requestId);
+        groupInputStream.addStream(inputStream,
+            ksmKeyLocationInfo.getLength());
+      } finally {
+        if (!success) {
+          xceiverClientManager.releaseClient(xceiverClient);
+        }
+      }
+    }
+    return new LengthInputStream(groupInputStream, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
new file mode 100644
index 0000000..2cc12f4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ChunkOutputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Maintaining a list of ChunkInputStream. Write based on offset.
+ *
+ * Note that this may write to multiple containers in one write call. In case
+ * that first container succeeded but later ones failed, the succeeded writes
+ * are not rolled back.
+ *
+ * TODO : currently not support multi-thread access.
+ */
+public class ChunkGroupOutputStream extends OutputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChunkGroupOutputStream.class);
+
+  // array list's get(index) is O(1)
+  private final ArrayList<ChunkOutputStreamEntry> streamEntries;
+  private int currentStreamIndex;
+  private long totalSize;
+  private long byteOffset;
+
+  //This has to be removed once HDFS-11888 is resolved.
+  //local cache which will have list of created container names.
+  private static Set<String> containersCreated = new HashSet<>();
+
+  public ChunkGroupOutputStream() {
+    this.streamEntries = new ArrayList<>();
+    this.currentStreamIndex = 0;
+    this.totalSize = 0;
+    this.byteOffset = 0;
+  }
+
+  @VisibleForTesting
+  public long getByteOffset() {
+    return byteOffset;
+  }
+
+  /**
+   * Append another stream to the end of the list. Note that the streams are not
+   * actually created to this point, only enough meta data about the stream is
+   * stored. When something is to be actually written to the stream, the stream
+   * will be created (if not already).
+   *
+   * @param containerKey the key to store in the container
+   * @param key the ozone key
+   * @param xceiverClientManager xceiver manager instance
+   * @param xceiverClient xceiver manager instance
+   * @param requestID the request id
+   * @param chunkSize the chunk size for this key chunks
+   * @param length the total length of this key
+   */
+  public synchronized void addStream(String containerKey, String key,
+      XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
+      String requestID, int chunkSize, long length) {
+    streamEntries.add(new ChunkOutputStreamEntry(containerKey, key,
+        xceiverClientManager, xceiverClient, requestID, chunkSize, length));
+    totalSize += length;
+  }
+
+  @VisibleForTesting
+  public synchronized void addStream(OutputStream outputStream, long length) {
+    streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
+    totalSize += length;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    if (streamEntries.size() <= currentStreamIndex) {
+      throw new IndexOutOfBoundsException();
+    }
+    ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
+    entry.write(b);
+    if (entry.getRemaining() <= 0) {
+      currentStreamIndex += 1;
+    }
+    byteOffset += 1;
+  }
+
+  /**
+   * Try to write the bytes sequence b[off:off+len) to streams.
+   *
+   * NOTE: Throws exception if the data could not fit into the remaining space.
+   * In which case nothing will be written.
+   * TODO:May need to revisit this behaviour.
+   *
+   * @param b byte data
+   * @param off starting offset
+   * @param len length to write
+   * @throws IOException
+   */
+  @Override
+  public synchronized void write(byte[] b, int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+    if (streamEntries.size() <= currentStreamIndex) {
+      throw new IOException("Write out of stream range! stream index:" +
+          currentStreamIndex);
+    }
+    if (totalSize - byteOffset < len) {
+      throw new IOException("Can not write " + len + " bytes with only " +
+          (totalSize - byteOffset) + " byte space");
+    }
+    while (len > 0) {
+      // in theory, this condition should never violate due the check above
+      // still do a sanity check.
+      Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+      ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+      int writeLen = Math.min(len, (int)current.getRemaining());
+      current.write(b, off, writeLen);
+      if (current.getRemaining() <= 0) {
+        currentStreamIndex += 1;
+      }
+      len -= writeLen;
+      off += writeLen;
+      byteOffset += writeLen;
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    for (int i = 0; i <= currentStreamIndex; i++) {
+      streamEntries.get(i).flush();
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    for (ChunkOutputStreamEntry entry : streamEntries) {
+      entry.close();
+    }
+  }
+
+  private static class ChunkOutputStreamEntry extends OutputStream {
+    private OutputStream outputStream;
+    private final String containerKey;
+    private final String key;
+    private final XceiverClientManager xceiverClientManager;
+    private final XceiverClientSpi xceiverClient;
+    private final String requestId;
+    private final int chunkSize;
+    // total number of bytes that should be written to this stream
+    private final long length;
+    // the current position of this stream 0 <= currentPosition < length
+    private long currentPosition;
+
+    ChunkOutputStreamEntry(String containerKey, String key,
+        XceiverClientManager xceiverClientManager,
+        XceiverClientSpi xceiverClient, String requestId, int chunkSize,
+        long length) {
+      this.outputStream = null;
+      this.containerKey = containerKey;
+      this.key = key;
+      this.xceiverClientManager = xceiverClientManager;
+      this.xceiverClient = xceiverClient;
+      this.requestId = requestId;
+      this.chunkSize = chunkSize;
+
+      this.length = length;
+      this.currentPosition = 0;
+    }
+
+    /**
+     * For testing purpose, taking a some random created stream instance.
+     * @param  outputStream a existing writable output stream
+     * @param  length the length of data to write to the stream
+     */
+    ChunkOutputStreamEntry(OutputStream outputStream, long length) {
+      this.outputStream = outputStream;
+      this.containerKey = null;
+      this.key = null;
+      this.xceiverClientManager = null;
+      this.xceiverClient = null;
+      this.requestId = null;
+      this.chunkSize = -1;
+
+      this.length = length;
+      this.currentPosition = 0;
+    }
+
+    long getLength() {
+      return length;
+    }
+
+    long getRemaining() {
+      return length - currentPosition;
+    }
+
+    private synchronized void checkStream() {
+      if (this.outputStream == null) {
+        this.outputStream = new ChunkOutputStream(containerKey,
+            key, xceiverClientManager, xceiverClient,
+            requestId, chunkSize);
+      }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      checkStream();
+      outputStream.write(b);
+      this.currentPosition += 1;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      checkStream();
+      outputStream.write(b, off, len);
+      this.currentPosition += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (this.outputStream != null) {
+        this.outputStream.flush();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.outputStream != null) {
+        this.outputStream.close();
+      }
+    }
+  }
+
+  public static ChunkGroupOutputStream getFromKsmKeyInfo(
+      KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager,
+      StorageContainerLocationProtocolClientSideTranslatorPB
+          storageContainerLocationClient,
+      int chunkSize, String requestId) throws IOException {
+    // TODO: the following createContainer and key writes may fail, in which
+    // case we should revert the above allocateKey to KSM.
+    // check index as sanity check
+    int index = 0;
+    String containerKey;
+    ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream();
+    for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) {
+      containerKey = subKeyInfo.getBlockID();
+
+      Preconditions.checkArgument(index++ == subKeyInfo.getIndex());
+      String containerName = subKeyInfo.getContainerName();
+      Pipeline pipeline =
+          storageContainerLocationClient.getContainer(containerName);
+      XceiverClientSpi xceiverClient =
+          xceiverClientManager.acquireClient(pipeline);
+      // create container if needed
+      // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
+      //The following change has to reverted once HDFS-11888 is fixed.
+      if(!containersCreated.contains(containerName)) {
+        synchronized (containerName.intern()) {
+          //checking again, there is a chance that some other thread has
+          // created it.
+          if (!containersCreated.contains(containerName)) {
+            LOG.debug("Need to create container {}.", containerName);
+            try {
+              ContainerProtocolCalls.createContainer(xceiverClient, requestId);
+            } catch (StorageContainerException ex) {
+              if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
+                //container already exist.
+                LOG.debug("Container {} already exists.", containerName);
+              } else {
+                LOG.error("Container creation failed for {}.",
+                    containerName, ex);
+                throw ex;
+              }
+            }
+            containersCreated.add(containerName);
+          }
+        }
+      }
+
+      groupOutputStream.addStream(containerKey, keyInfo.getKeyName(),
+          xceiverClientManager, xceiverClient, requestId, chunkSize,
+          subKeyInfo.getLength());
+    }
+    return groupOutputStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java
new file mode 100644
index 0000000..baf1887
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/LengthInputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.FilterInputStream;
+import java.io.InputStream;
+
+/**
+ * An input stream with length.
+ */
+public class LengthInputStream extends FilterInputStream {
+
+  private final long length;
+
+  /**
+   * Create an stream.
+   * @param in the underlying input stream.
+   * @param length the length of the stream.
+   */
+  public LengthInputStream(InputStream in, long length) {
+    super(in);
+    this.length = length;
+  }
+
+  /** @return the length. */
+  public long getLength() {
+    return length;
+  }
+
+  public InputStream getWrappedStream() {
+    return in;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
new file mode 100644
index 0000000..ca6f7aa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+
+/**
+ * This class contains methods that define the translation between the Ozone
+ * domain model and the storage container domain model.
+ */
+final class OzoneContainerTranslation {
+
+  /**
+   * Creates key data intended for reading a container key.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @return KeyData intended for reading the container key
+   */
+  public static KeyData containerKeyDataForRead(String containerName,
+      String containerKey) {
+    return KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .build();
+  }
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private OzoneContainerTranslation() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
new file mode 100644
index 0000000..9551cdb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.scm.storage.ChunkInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * OzoneInputStream is used to read data from Ozone.
+ * It uses SCM's {@link ChunkInputStream} for reading the data.
+ */
+public class OzoneInputStream extends InputStream {
+
+  private final ChunkGroupInputStream inputStream;
+
+  /**
+   * Constructs OzoneInputStream with ChunkInputStream.
+   *
+   * @param inputStream
+   */
+  public OzoneInputStream(ChunkGroupInputStream inputStream) {
+    this.inputStream = inputStream;
+  }
+
+  @Override
+  public int read() throws IOException {
+    return inputStream.read();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    inputStream.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
new file mode 100644
index 0000000..5e2ad94
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * OzoneOutputStream is used to write data into Ozone.
+ * It uses SCM's {@link ChunkGroupOutputStream} for writing the data.
+ */
+public class OzoneOutputStream extends OutputStream {
+
+  private final ChunkGroupOutputStream outputStream;
+
+  /**
+   * Constructs OzoneOutputStream with ChunkGroupOutputStream.
+   *
+   * @param outputStream
+   */
+  public OzoneOutputStream(ChunkGroupOutputStream outputStream) {
+    this.outputStream = outputStream;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    outputStream.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    outputStream.write(b, off, len);
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    outputStream.flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    //commitKey can be done here, if needed.
+    outputStream.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
new file mode 100644
index 0000000..493ece8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+/**
+ * This package contains Ozone I/O classes.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
new file mode 100644
index 0000000..7e2591a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+/**
+ * This package contains Ozone Client classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java
new file mode 100644
index 0000000..4955002
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/OzoneRestClient.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rest;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.client.rest.headers.Header;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+
+/**
+ * Ozone REST Client Implementation, it connects Ozone Handler to execute
+ * client calls. This uses REST protocol for the communication with server.
+ */
+public class OzoneRestClient implements OzoneClient, Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneRestClient.class);
+
+  private static final String SCHEMA = "http://";
+  private static final int DEFAULT_OZONE_PORT = 50070;
+
+  private final URI uri;
+  private final UserGroupInformation ugi;
+  private final OzoneAcl.OzoneACLRights userRights;
+  private final OzoneAcl.OzoneACLRights groupRights;
+
+
+  /**
+   * Creates OzoneRpcClient instance with new OzoneConfiguration.
+   *
+   * @throws IOException
+   */
+  public OzoneRestClient() throws IOException, URISyntaxException {
+    this(new OzoneConfiguration());
+  }
+
+   /**
+    * Creates OzoneRpcClient instance with the given configuration.
+    *
+    * @param conf
+    *
+    * @throws IOException
+    */
+  public OzoneRestClient(Configuration conf)
+      throws IOException {
+    Preconditions.checkNotNull(conf);
+    this.ugi = UserGroupInformation.getCurrentUser();
+    this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
+    this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
+
+    //TODO: get uri from property ozone.reset.servers
+    URIBuilder ozoneURI = null;
+    try {
+      ozoneURI = new URIBuilder(SCHEMA + "localhost");
+      if (ozoneURI.getPort() == 0) {
+        ozoneURI.setPort(DEFAULT_OZONE_PORT);
+      }
+      uri = ozoneURI.build();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void createVolume(String volumeName)
+      throws IOException {
+    createVolume(volumeName, ugi.getUserName());
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner)
+      throws IOException {
+
+    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES,
+        (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           OzoneAcl... acls)
+      throws IOException {
+    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, acls);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           long quota)
+      throws IOException {
+    createVolume(volumeName, owner, quota, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           long quota, OzoneAcl... acls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(owner);
+    Preconditions.checkNotNull(quota);
+    Preconditions.checkState(quota >= 0);
+
+    Set<OzoneAcl> aclSet = new HashSet<>();
+
+    if(acls != null) {
+      aclSet.addAll(Arrays.asList(acls));
+    }
+
+    LOG.info("Creating Volume: {}, with {} as owner and " +
+        "quota set to {} bytes.", volumeName, owner, quota);
+    HttpPost httpPost = null;
+    HttpEntity entity = null;
+    try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
+      URIBuilder builder = new URIBuilder(uri);
+      builder.setPath("/" + volumeName);
+      String quotaString = quota + Header.OZONE_QUOTA_BYTES;
+      builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quotaString);
+      httpPost = getHttpPost(owner, builder.build().toString());
+      for (OzoneAcl acl : aclSet) {
+        httpPost.addHeader(
+            Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl.toString());
+      }
+
+      HttpResponse response = httpClient.execute(httpPost);
+      entity = response.getEntity();
+      int errorCode = response.getStatusLine().getStatusCode();
+      if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
+        return;
+      }
+      if (entity != null) {
+        throw new IOException(EntityUtils.toString(entity));
+      } else {
+        throw new IOException("Unexpected null in http payload");
+      }
+    } catch (URISyntaxException | IllegalArgumentException ex) {
+      throw new IOException(ex.getMessage());
+    } finally {
+      EntityUtils.consume(entity);
+      OzoneClientUtils.releaseConnection(httpPost);
+    }
+  }
+
+  @Override
+  public void setVolumeOwner(String volumeName, String owner)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(owner);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void setVolumeQuota(String volumeName, long quota)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(quota);
+    Preconditions.checkState(quota >= 0);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneVolume getVolumeDetails(String volumeName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void deleteVolume(String volumeName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public Iterator<OzoneVolume> listVolumes(String volumePrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public Iterator<OzoneVolume> listVolumes(String volumePrefix,
+                                             String user)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        StorageType.DEFAULT, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           Versioning versioning)
+      throws IOException {
+    createBucket(volumeName, bucketName, versioning,
+        StorageType.DEFAULT, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           StorageType storageType)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        storageType, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           OzoneAcl... acls)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        StorageType.DEFAULT, acls);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           Versioning versioning, StorageType storageType,
+                           OzoneAcl... acls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(versioning);
+    Preconditions.checkNotNull(storageType);
+
+    String owner = ugi.getUserName();
+    final List<OzoneAcl> listOfAcls = new ArrayList<>();
+
+    //User ACL
+    OzoneAcl userAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
+            owner, userRights);
+    listOfAcls.add(userAcl);
+
+    //Group ACLs of the User
+    List<String> userGroups = Arrays.asList(UserGroupInformation
+        .createRemoteUser(owner).getGroupNames());
+    userGroups.stream().forEach((group) -> listOfAcls.add(
+        new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights)));
+
+    //ACLs passed as argument
+    if(acls != null) {
+      Arrays.stream(acls).forEach((acl) -> listOfAcls.add(acl));
+    }
+
+    LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
+        "Storage Type set to {}", volumeName, bucketName, versioning,
+        storageType);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  /**
+   * Converts OzoneConts.Versioning enum to boolean.
+   *
+   * @param version
+   * @return corresponding boolean value
+   */
+  private boolean getBucketVersioningProtobuf(
+      Versioning version) {
+    if(version != null) {
+      switch(version) {
+      case ENABLED:
+        return true;
+      case NOT_DEFINED:
+      case DISABLED:
+      default:
+        return false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void addBucketAcls(String volumeName, String bucketName,
+                            List<OzoneAcl> addAcls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(addAcls);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void removeBucketAcls(String volumeName, String bucketName,
+                               List<OzoneAcl> removeAcls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(removeAcls);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void setBucketVersioning(String volumeName, String bucketName,
+                                  Versioning versioning)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(versioning);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void setBucketStorageType(String volumeName, String bucketName,
+                                   StorageType storageType)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(storageType);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void deleteBucket(String volumeName, String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void checkBucketAccess(String volumeName, String bucketName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneBucket getBucketDetails(String volumeName,
+                                      String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public Iterator<OzoneBucket> listBuckets(String volumeName,
+                                            String bucketPrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneOutputStream createKey(String volumeName, String bucketName,
+                                     String keyName, long size)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneInputStream getKey(String volumeName, String bucketName,
+                                 String keyName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(keyName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void deleteKey(String volumeName, String bucketName,
+                        String keyName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(keyName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public List<OzoneKey> listKeys(String volumeName, String bucketName,
+                                 String keyPrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneKey getKeyDetails(String volumeName, String bucketName,
+                                  String keyName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(keyName);
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  /**
+   * Converts Versioning to boolean.
+   *
+   * @param version
+   * @return corresponding boolean value
+   */
+  private boolean getBucketVersioningFlag(
+      Versioning version) {
+    if(version != null) {
+      switch(version) {
+      case ENABLED:
+        return true;
+      case DISABLED:
+      case NOT_DEFINED:
+      default:
+        return false;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Returns a standard HttpPost Object to use for ozone post requests.
+   *
+   * @param user - If the use is being made on behalf of user, that user
+   * @param uriString  - UriString
+   * @return HttpPost
+   */
+  public HttpPost getHttpPost(String user, String uriString) {
+    HttpPost httpPost = new HttpPost(uriString);
+    addOzoneHeaders(httpPost);
+    if (user != null) {
+      httpPost.addHeader(Header.OZONE_USER, user);
+    }
+    return httpPost;
+  }
+
+  /**
+   * Add Ozone Headers.
+   *
+   * @param httpRequest - Http Request
+   */
+  private void addOzoneHeaders(HttpRequestBase httpRequest) {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+
+    httpRequest.addHeader(Header.OZONE_VERSION_HEADER,
+        Header.OZONE_V1_VERSION_HEADER);
+    httpRequest.addHeader(HttpHeaders.DATE,
+        format.format(new Date(Time.monotonicNow())));
+    httpRequest.addHeader(HttpHeaders.AUTHORIZATION,
+        Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
+            ugi.getUserName());
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java
new file mode 100644
index 0000000..5221a0e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/Header.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rest.headers;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * OZONE specific HTTP headers.
+ */
+@InterfaceAudience.Private
+public final class Header {
+  public static final String OZONE_QUOTA_BYTES = "BYTES";
+  public static final String OZONE_QUOTA_MB = "MB";
+  public static final String OZONE_QUOTA_GB = "GB";
+  public static final String OZONE_QUOTA_TB = "TB";
+  public static final String OZONE_QUOTA_REMOVE = "remove";
+  public static final String OZONE_QUOTA_UNDEFINED = "undefined";
+  public static final String OZONE_EMPTY_STRING="";
+  public static final String OZONE_DEFAULT_LIST_SIZE = "1000";
+
+  public static final String OZONE_USER = "x-ozone-user";
+  public static final String OZONE_SIMPLE_AUTHENTICATION_SCHEME = "OZONE";
+  public static final String OZONE_VERSION_HEADER = "x-ozone-version";
+  public static final String OZONE_V1_VERSION_HEADER ="v1";
+
+  public static final String OZONE_LIST_QUERY_SERVICE = "service";
+  public static final String OZONE_LIST_QUERY_VOLUME = "volume";
+  public static final String OZONE_LIST_QUERY_BUCKET = "bucket";
+  public static final String OZONE_LIST_QUERY_KEY = "key";
+
+  public static final String OZONE_REQUEST_ID = "x-ozone-request-id";
+  public static final String OZONE_SERVER_NAME = "x-ozone-server-name";
+
+  public static final String OZONE_STORAGE_TYPE = "x-ozone-storage-type";
+
+  public static final String OZONE_BUCKET_VERSIONING =
+      "x-ozone-bucket-versioning";
+
+  public static final String OZONE_ACLS = "x-ozone-acls";
+  public static final String OZONE_ACL_ADD = "ADD";
+  public static final String OZONE_ACL_REMOVE = "REMOVE";
+
+  public static final String OZONE_LIST_QUERY_TAG ="info";
+  public static final String OZONE_QUOTA_QUERY_TAG ="quota";
+  public static final String CONTENT_MD5 = "Content-MD5";
+  public static final String OZONE_LIST_QUERY_PREFIX="prefix";
+  public static final String OZONE_LIST_QUERY_MAXKEYS="max-keys";
+  public static final String OZONE_LIST_QUERY_PREVKEY="prev-key";
+  public static final String OZONE_LIST_QUERY_ROOTSCAN="root-scan";
+
+  private Header() {
+    // Never constructed.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java
new file mode 100644
index 0000000..54157f0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/headers/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Ozone HTTP header definitions.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.ozone.client.rest.headers;
+
+import org.apache.hadoop.classification.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java
new file mode 100644
index 0000000..ebcc104
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rest;
+
+/**
+ * This package contains Ozone rest client library classes.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java
new file mode 100644
index 0000000..daa9639
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClient.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.client.io.LengthInputStream;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.protocolPB
+    .KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.ksm.protocolPB
+    .KeySpaceManagerProtocolPB;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode
+ * to execute client calls. This uses RPC protocol for communication
+ * with the servers.
+ */
+public class OzoneRpcClient implements OzoneClient, Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneRpcClient.class);
+
+  private final StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private final KeySpaceManagerProtocolClientSideTranslatorPB
+      keySpaceManagerClient;
+  private final XceiverClientManager xceiverClientManager;
+  private final int chunkSize;
+
+
+  private final UserGroupInformation ugi;
+  private final OzoneAcl.OzoneACLRights userRights;
+  private final OzoneAcl.OzoneACLRights groupRights;
+
+  /**
+   * Creates OzoneRpcClient instance with new OzoneConfiguration.
+   *
+   * @throws IOException
+   */
+  public OzoneRpcClient() throws IOException {
+    this(new OzoneConfiguration());
+  }
+
+   /**
+    * Creates OzoneRpcClient instance with the given configuration.
+    *
+    * @param conf
+    *
+    * @throws IOException
+    */
+  public OzoneRpcClient(Configuration conf) throws IOException {
+    Preconditions.checkNotNull(conf);
+    this.ugi = UserGroupInformation.getCurrentUser();
+    this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
+    this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
+
+    long scmVersion =
+        RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
+    InetSocketAddress scmAddress =
+        OzoneClientUtils.getScmAddressForClients(conf);
+    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    this.storageContainerLocationClient =
+        new StorageContainerLocationProtocolClientSideTranslatorPB(
+            RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
+                scmAddress, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+
+    long ksmVersion =
+        RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
+    InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
+    RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    this.keySpaceManagerClient =
+        new KeySpaceManagerProtocolClientSideTranslatorPB(
+            RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
+                ksmAddress, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+
+    this.xceiverClientManager = new XceiverClientManager(conf);
+
+    int configuredChunkSize = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
+        ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
+    if(configuredChunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
+      LOG.warn("The chunk size ({}) is not allowed to be more than"
+              + " the maximum size ({}),"
+              + " resetting to the maximum size.",
+          configuredChunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
+      chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
+    } else {
+      chunkSize = configuredChunkSize;
+    }
+  }
+
+  @Override
+  public void createVolume(String volumeName)
+      throws IOException {
+    createVolume(volumeName, ugi.getUserName());
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner)
+      throws IOException {
+
+    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES,
+        (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           OzoneAcl... acls)
+      throws IOException {
+    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, acls);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           long quota)
+      throws IOException {
+    createVolume(volumeName, owner, quota, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           long quota, OzoneAcl... acls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(owner);
+    Preconditions.checkNotNull(quota);
+    Preconditions.checkState(quota >= 0);
+    OzoneAcl userAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
+            owner, userRights);
+    KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder();
+    builder.setAdminName(ugi.getUserName())
+        .setOwnerName(owner)
+        .setVolume(volumeName)
+        .setQuotaInBytes(quota)
+        .addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl));
+
+    List<OzoneAcl> listOfAcls = new ArrayList<>();
+
+    //Group ACLs of the User
+    List<String> userGroups = Arrays.asList(UserGroupInformation
+        .createRemoteUser(owner).getGroupNames());
+    userGroups.stream().forEach((group) -> listOfAcls.add(
+        new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights)));
+
+    //ACLs passed as argument
+    if(acls != null) {
+      listOfAcls.addAll(Arrays.asList(acls));
+    }
+
+    //Remove duplicates and set
+    for (OzoneAcl ozoneAcl :
+        listOfAcls.stream().distinct().collect(Collectors.toList())) {
+      builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(ozoneAcl));
+    }
+
+    LOG.info("Creating Volume: {}, with {} as owner and quota set to {} bytes.",
+        volumeName, owner, quota);
+    keySpaceManagerClient.createVolume(builder.build());
+  }
+
+  @Override
+  public void setVolumeOwner(String volumeName, String owner)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(owner);
+    keySpaceManagerClient.setOwner(volumeName, owner);
+  }
+
+  @Override
+  public void setVolumeQuota(String volumeName, long quota)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(quota);
+    Preconditions.checkState(quota >= 0);
+    keySpaceManagerClient.setQuota(volumeName, quota);
+  }
+
+  @Override
+  public OzoneVolume getVolumeDetails(String volumeName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    KsmVolumeArgs volumeArgs =
+        keySpaceManagerClient.getVolumeInfo(volumeName);
+    return new OzoneVolume(volumeArgs);
+  }
+
+  @Override
+  public boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    return keySpaceManagerClient.checkVolumeAccess(volumeName,
+        KSMPBHelper.convertOzoneAcl(acl));
+  }
+
+  @Override
+  public void deleteVolume(String volumeName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    keySpaceManagerClient.deleteVolume(volumeName);
+  }
+
+  @Override
+  public Iterator<OzoneVolume> listVolumes(String volumePrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public Iterator<OzoneVolume> listVolumes(String volumePrefix,
+                                             String user)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        StorageType.DEFAULT, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           Versioning versioning)
+      throws IOException {
+    createBucket(volumeName, bucketName, versioning,
+        StorageType.DEFAULT, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           StorageType storageType)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        storageType, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           OzoneAcl... acls)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        StorageType.DEFAULT, acls);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           Versioning versioning, StorageType storageType,
+                           OzoneAcl... acls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(versioning);
+    Preconditions.checkNotNull(storageType);
+
+    KsmBucketInfo.Builder builder = KsmBucketInfo.newBuilder();
+    builder.setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setStorageType(storageType)
+        .setIsVersionEnabled(getBucketVersioningProtobuf(
+        versioning));
+
+    String owner = ugi.getUserName();
+    final List<OzoneAcl> listOfAcls = new ArrayList<>();
+
+    //User ACL
+    OzoneAcl userAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
+            owner, userRights);
+    listOfAcls.add(userAcl);
+
+    //Group ACLs of the User
+    List<String> userGroups = Arrays.asList(UserGroupInformation
+        .createRemoteUser(owner).getGroupNames());
+    userGroups.stream().forEach((group) -> listOfAcls.add(
+        new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights)));
+
+    //ACLs passed as argument
+    if(acls != null) {
+      Arrays.stream(acls).forEach((acl) -> listOfAcls.add(acl));
+    }
+
+    //Remove duplicates and set
+    builder.setAcls(listOfAcls.stream().distinct()
+        .collect(Collectors.toList()));
+    LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
+        "Storage Type set to {}", volumeName, bucketName, versioning,
+        storageType);
+    keySpaceManagerClient.createBucket(builder.build());
+  }
+
+  /**
+   * Converts OzoneConts.Versioning enum to boolean.
+   *
+   * @param version
+   * @return corresponding boolean value
+   */
+  private boolean getBucketVersioningProtobuf(
+      Versioning version) {
+    if(version != null) {
+      switch(version) {
+      case ENABLED:
+        return true;
+      case NOT_DEFINED:
+      case DISABLED:
+      default:
+        return false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void addBucketAcls(String volumeName, String bucketName,
+                            List<OzoneAcl> addAcls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(addAcls);
+    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+    builder.setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setAddAcls(addAcls);
+    keySpaceManagerClient.setBucketProperty(builder.build());
+  }
+
+  @Override
+  public void removeBucketAcls(String volumeName, String bucketName,
+                               List<OzoneAcl> removeAcls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(removeAcls);
+    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+    builder.setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setRemoveAcls(removeAcls);
+    keySpaceManagerClient.setBucketProperty(builder.build());
+  }
+
+  @Override
+  public void setBucketVersioning(String volumeName, String bucketName,
+                                  Versioning versioning)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(versioning);
+    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+    builder.setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setIsVersionEnabled(getBucketVersioningFlag(
+            versioning));
+    keySpaceManagerClient.setBucketProperty(builder.build());
+  }
+
+  @Override
+  public void setBucketStorageType(String volumeName, String bucketName,
+                                   StorageType storageType)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(storageType);
+    KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
+    builder.setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setStorageType(storageType);
+    keySpaceManagerClient.setBucketProperty(builder.build());
+  }
+
+  @Override
+  public void deleteBucket(String volumeName, String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    keySpaceManagerClient.deleteBucket(volumeName, bucketName);
+  }
+
+  @Override
+  public void checkBucketAccess(String volumeName, String bucketName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneBucket getBucketDetails(String volumeName,
+                                      String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    KsmBucketInfo bucketInfo =
+        keySpaceManagerClient.getBucketInfo(volumeName, bucketName);
+    return new OzoneBucket(bucketInfo);
+  }
+
+  @Override
+  public Iterator<OzoneBucket> listBuckets(String volumeName,
+                                            String bucketPrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneOutputStream createKey(String volumeName, String bucketName,
+                                     String keyName, long size)
+      throws IOException {
+    String requestId = UUID.randomUUID().toString();
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .build();
+
+    KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
+    ChunkGroupOutputStream  groupOutputStream =
+        ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
+        storageContainerLocationClient, chunkSize, requestId);
+    return new OzoneOutputStream(groupOutputStream);
+  }
+
+  @Override
+  public OzoneInputStream getKey(String volumeName, String bucketName,
+                                 String keyName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(keyName);
+    String requestId = UUID.randomUUID().toString();
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
+    LengthInputStream lengthInputStream =
+        ChunkGroupInputStream.getFromKsmKeyInfo(
+        keyInfo, xceiverClientManager, storageContainerLocationClient,
+        requestId);
+    return new OzoneInputStream(
+        (ChunkGroupInputStream)lengthInputStream.getWrappedStream());
+  }
+
+  @Override
+  public void deleteKey(String volumeName, String bucketName,
+                        String keyName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(keyName);
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    keySpaceManagerClient.deleteKey(keyArgs);
+  }
+
+  @Override
+  public List<OzoneKey> listKeys(String volumeName, String bucketName,
+                                 String keyPrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneKey getKeyDetails(String volumeName, String bucketName,
+                                  String keyName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(keyName);
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    KsmKeyInfo keyInfo =
+        keySpaceManagerClient.lookupKey(keyArgs);
+    return new OzoneKey(keyInfo);
+  }
+
+  /**
+   * Converts Versioning to boolean.
+   *
+   * @param version
+   * @return corresponding boolean value
+   */
+  private boolean getBucketVersioningFlag(
+      Versioning version) {
+    if(version != null) {
+      switch(version) {
+      case ENABLED:
+        return true;
+      case DISABLED:
+      case NOT_DEFINED:
+      default:
+        return false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
+    IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
+    IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java
new file mode 100644
index 0000000..0fcc3fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+/**
+ * This package contains Ozone rpc client library classes.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43d38114/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
new file mode 100644
index 0000000..e69300c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+/**
+ * KSM Constants.
+ */
+public final class KSMConfigKeys {
+  /**
+   * Never constructed.
+   */
+  private KSMConfigKeys() {
+  }
+
+
+  public static final String OZONE_KSM_HANDLER_COUNT_KEY =
+      "ozone.ksm.handler.count.key";
+  public static final int OZONE_KSM_HANDLER_COUNT_DEFAULT = 20;
+
+  public static final String OZONE_KSM_ADDRESS_KEY =
+      "ozone.ksm.address";
+  public static final String OZONE_KSM_BIND_HOST_DEFAULT =
+      "0.0.0.0";
+  public static final int OZONE_KSM_PORT_DEFAULT = 9862;
+
+  public static final String OZONE_KSM_HTTP_ENABLED_KEY =
+      "ozone.ksm.http.enabled";
+  public static final String OZONE_KSM_HTTP_BIND_HOST_KEY =
+      "ozone.ksm.http-bind-host";
+  public static final String OZONE_KSM_HTTPS_BIND_HOST_KEY =
+      "ozone.ksm.https-bind-host";
+  public static final String OZONE_KSM_HTTP_ADDRESS_KEY =
+      "ozone.ksm.http-address";
+  public static final String OZONE_KSM_HTTPS_ADDRESS_KEY =
+      "ozone.ksm.https-address";
+  public static final String OZONE_KSM_KEYTAB_FILE =
+      "ozone.ksm.keytab.file";
+  public static final String OZONE_KSM_HTTP_BIND_HOST_DEFAULT = "0.0.0.0";
+  public static final int OZONE_KSM_HTTP_BIND_PORT_DEFAULT = 9874;
+  public static final int OZONE_KSM_HTTPS_BIND_PORT_DEFAULT = 9875;
+
+  // LevelDB cache file uses an off-heap cache in LevelDB of 128 MB.
+  public static final String OZONE_KSM_DB_CACHE_SIZE_MB =
+      "ozone.ksm.leveldb.cache.size.mb";
+  public static final int OZONE_KSM_DB_CACHE_SIZE_DEFAULT = 128;
+
+  public static final String OZONE_KSM_USER_MAX_VOLUME =
+      "ozone.ksm.user.max.volume";
+  public static final int OZONE_KSM_USER_MAX_VOLUME_DEFAULT = 1024;
+
+  // KSM Default user/group permissions
+  public static final String OZONE_KSM_USER_RIGHTS =
+      "ozone.ksm.user.rights";
+  public static final OzoneAcl.OzoneACLRights OZONE_KSM_USER_RIGHTS_DEFAULT =
+      OzoneAcl.OzoneACLRights.READ_WRITE;
+
+  public static final String OZONE_KSM_GROUP_RIGHTS =
+      "ozone.ksm.group.rights";
+  public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT =
+      OzoneAcl.OzoneACLRights.READ_WRITE;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org