You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by lj...@apache.org on 2019/03/29 13:59:29 UTC

[hadoop] branch trunk updated: HDDS-1300. Optimize non-recursive ozone filesystem apis. Contributed by Lokesh Jain.

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6186ed9  HDDS-1300. Optimize non-recursive ozone filesystem apis. Contributed by Lokesh Jain.
6186ed9 is described below

commit 6186ed94b9c6f972cdda83e649e8a6a7ff60f5cb
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Fri Mar 29 19:27:29 2019 +0530

    HDDS-1300. Optimize non-recursive ozone filesystem apis. Contributed by Lokesh Jain.
---
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  59 +++
 .../ozone/client/protocol/ClientProtocol.java      |  67 +++-
 .../hadoop/ozone/client/rest/RestClient.java       |  22 ++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 144 ++++---
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   3 +
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   5 +-
 .../hadoop/ozone/om/exceptions/OMException.java    |   8 +-
 .../hadoop/ozone/om/helpers/OzoneFileStatus.java   |  26 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  57 ++-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  87 +++-
 .../src/main/proto/OzoneManagerProtocol.proto      |  46 ++-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 436 ++++++++++++++++-----
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  36 ++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 107 ++++-
 .../apache/hadoop/ozone/om/fs/OzoneManagerFS.java  |  13 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |  71 +++-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 189 ++++++++-
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   9 +-
 .../hadoop/fs/ozone/OzoneClientAdapterImpl.java    |  73 ++--
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |  75 +---
 .../hadoop/fs/ozone/TestOzoneFileInterfaces.java   |  13 +-
 21 files changed, 1232 insertions(+), 314 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 9a12ab7..8b3f8be 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
@@ -465,11 +466,69 @@ public class OzoneBucket extends WithMetadata {
               partNumberMarker, maxParts);
   }
 
+  /**
+   * OzoneFS api to get file status for an entry.
+   *
+   * @param keyName Key name
+   * @throws OMException if file does not exist
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
   public OzoneFileStatus getFileStatus(String keyName) throws IOException {
     return proxy.getOzoneFileStatus(volumeName, name, keyName);
   }
 
   /**
+   * Ozone FS api to create a directory. Parent directories if do not exist
+   * are created for the input directory.
+   *
+   * @param keyName Key name
+   * @throws OMException if any entry in the path exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  public void createDirectory(String keyName) throws IOException {
+    proxy.createDirectory(volumeName, name, keyName);
+  }
+
+  /**
+   * OzoneFS api to creates an input stream for a file.
+   *
+   * @param keyName Key name
+   * @throws OMException if given key is not found or it is not a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  public OzoneInputStream readFile(String keyName) throws IOException {
+    return proxy.readFile(volumeName, name, keyName);
+  }
+
+  /**
+   * OzoneFS api to creates an output stream for a file.
+   *
+   * @param keyName   Key name
+   * @param overWrite if true existing file at the location will be overwritten
+   * @param recursive if true file would be created even if parent directories
+   *                    do not exist
+   * @throws OMException if given key is a directory
+   *                     if file exists and isOverwrite flag is false
+   *                     if an ancestor exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  public OzoneOutputStream createFile(String keyName, long size,
+      ReplicationType type, ReplicationFactor factor, boolean overWrite,
+      boolean recursive) throws IOException {
+    return proxy
+        .createFile(volumeName, name, keyName, size, type, factor, overWrite,
+            recursive);
+  }
+
+  /**
    * An Iterator to iterate over {@link OzoneKey} list.
    */
   private class KeyIterator implements Iterator<OzoneKey> {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 5378c6a..cb9cb30 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@@ -537,12 +538,70 @@ public interface ClientProtocol {
 
   /**
    * Get the Ozone File Status for a particular Ozone key.
+   *
    * @param volumeName volume name.
    * @param bucketName bucket name.
-   * @param keyName key name.
+   * @param keyName    key name.
    * @return OzoneFileStatus for the key.
-   * @throws IOException
+   * @throws OMException if file does not exist
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
    */
-  OzoneFileStatus getOzoneFileStatus(String volumeName,
-      String bucketName, String keyName) throws IOException;
+  OzoneFileStatus getOzoneFileStatus(String volumeName, String bucketName,
+      String keyName) throws IOException;
+
+  /**
+   * Creates directory with keyName as the absolute path for the directory.
+   *
+   * @param volumeName Volume name
+   * @param bucketName Bucket name
+   * @param keyName    Absolute path for the directory
+   * @throws OMException if any entry in the path exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  void createDirectory(String volumeName, String bucketName, String keyName)
+      throws IOException;
+
+  /**
+   * Creates an input stream for reading file contents.
+   *
+   * @param volumeName Volume name
+   * @param bucketName Bucket name
+   * @param keyName    Absolute path of the file to be read
+   * @return Input stream for reading the file
+   * @throws OMException if any entry in the path exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  OzoneInputStream readFile(String volumeName, String bucketName,
+      String keyName) throws IOException;
+
+  /**
+   * Creates an output stream for writing to a file.
+   *
+   * @param volumeName Volume name
+   * @param bucketName Bucket name
+   * @param keyName    Absolute path of the file to be written
+   * @param size       Size of data to be written
+   * @param type       Replication Type
+   * @param factor     Replication Factor
+   * @param overWrite  if true existing file at the location will be overwritten
+   * @param recursive  if true file would be created even if parent directories
+   *                   do not exist
+   * @return Output stream for writing to the file
+   * @throws OMException if given key is a directory
+   *                     if file exists and isOverwrite flag is false
+   *                     if an ancestor exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  @SuppressWarnings("checkstyle:parameternumber")
+  OzoneOutputStream createFile(String volumeName, String bucketName,
+      String keyName, long size, ReplicationType type, ReplicationFactor factor,
+      boolean overWrite, boolean recursive) throws IOException;
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index 369b9fb..80e81fe 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -1091,4 +1091,26 @@ public class RestClient implements ClientProtocol {
     throw new UnsupportedOperationException("Ozone REST protocol does not " +
         "support this operation.");
   }
+
+  @Override
+  public void createDirectory(String volumeName, String bucketName,
+      String keyName) {
+    throw new UnsupportedOperationException(
+        "Ozone REST protocol does not " + "support this operation.");
+  }
+
+  @Override
+  public OzoneInputStream readFile(String volumeName, String bucketName,
+      String keyName) {
+    throw new UnsupportedOperationException(
+        "Ozone REST protocol does not " + "support this operation.");
+  }
+
+  @Override
+  public OzoneOutputStream createFile(String volumeName, String bucketName,
+      String keyName, long size, ReplicationType type, ReplicationFactor factor,
+      boolean overWrite, boolean recursive) {
+    throw new UnsupportedOperationException(
+        "Ozone REST protocol does not " + "support this operation.");
+  }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 6ecda09..6b60f96 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -617,37 +617,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    KeyOutputStream keyOutputStream =
-        new KeyOutputStream.Builder()
-            .setHandler(openKey)
-            .setXceiverClientManager(xceiverClientManager)
-            .setOmClient(ozoneManagerClient)
-            .setChunkSize(chunkSize)
-            .setRequestID(requestId)
-            .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
-            .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
-            .setStreamBufferFlushSize(streamBufferFlushSize)
-            .setStreamBufferMaxSize(streamBufferMaxSize)
-            .setWatchTimeout(watchTimeout)
-            .setBlockSize(blockSize)
-            .setChecksumType(checksumType)
-            .setBytesPerChecksum(bytesPerChecksum)
-            .setMaxRetryCount(maxRetryCount)
-            .build();
-    keyOutputStream.addPreallocateBlocks(
-        openKey.getKeyInfo().getLatestVersionLocations(),
-        openKey.getOpenVersion());
-    final FileEncryptionInfo feInfo = keyOutputStream
-        .getFileEncryptionInfo();
-    if (feInfo != null) {
-      KeyProvider.KeyVersion decrypted = getDEK(feInfo);
-      final CryptoOutputStream cryptoOut = new CryptoOutputStream(
-          keyOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
-          decrypted.getMaterial(), feInfo.getIV());
-      return new OzoneOutputStream(cryptoOut);
-    } else {
-      return new OzoneOutputStream(keyOutputStream);
-    }
+    return createOutputStream(openKey, requestId, type, factor);
   }
 
   private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
@@ -674,20 +644,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         .setRefreshPipeline(true)
         .build();
     OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
-    LengthInputStream lengthInputStream =
-        KeyInputStream.getFromOmKeyInfo(
-            keyInfo, xceiverClientManager, storageContainerLocationClient,
-            requestId, verifyChecksum);
-    FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
-    if (feInfo != null) {
-      final KeyProvider.KeyVersion decrypted  = getDEK(feInfo);
-      final CryptoInputStream cryptoIn =
-          new CryptoInputStream(lengthInputStream.getWrappedStream(),
-              OzoneKMSUtil.getCryptoCodec(conf, feInfo),
-              decrypted.getMaterial(), feInfo.getIV());
-      return new OzoneInputStream(cryptoIn);
-    }
-    return new OzoneInputStream(lengthInputStream.getWrappedStream());
+    return createInputStream(keyInfo, requestId);
   }
 
   @Override
@@ -978,7 +935,102 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
   @Override
   public OzoneFileStatus getOzoneFileStatus(String volumeName,
       String bucketName, String keyName) throws IOException {
-    return ozoneManagerClient.getFileStatus(volumeName, bucketName, keyName);
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    return ozoneManagerClient.getFileStatus(keyArgs);
+  }
+
+  @Override
+  public void createDirectory(String volumeName, String bucketName,
+      String keyName) throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setKeyName(keyName).build();
+    ozoneManagerClient.createDirectory(keyArgs);
+  }
+
+  @Override
+  public OzoneInputStream readFile(String volumeName, String bucketName,
+      String keyName) throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = ozoneManagerClient.lookupFile(keyArgs);
+    return createInputStream(keyInfo, UUID.randomUUID().toString());
+  }
+
+  @Override
+  public OzoneOutputStream createFile(String volumeName, String bucketName,
+      String keyName, long size, ReplicationType type, ReplicationFactor factor,
+      boolean overWrite, boolean recursive) throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .setType(HddsProtos.ReplicationType.valueOf(type.name()))
+        .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
+        .build();
+    OpenKeySession keySession =
+        ozoneManagerClient.createFile(keyArgs, overWrite, recursive);
+    return createOutputStream(keySession, UUID.randomUUID().toString(), type,
+        factor);
+  }
+
+  private OzoneInputStream createInputStream(OmKeyInfo keyInfo,
+      String requestId) throws IOException {
+    LengthInputStream lengthInputStream = KeyInputStream
+        .getFromOmKeyInfo(keyInfo, xceiverClientManager,
+            storageContainerLocationClient, requestId, verifyChecksum);
+    FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
+    if (feInfo != null) {
+      final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
+      final CryptoInputStream cryptoIn =
+          new CryptoInputStream(lengthInputStream.getWrappedStream(),
+              OzoneKMSUtil.getCryptoCodec(conf, feInfo),
+              decrypted.getMaterial(), feInfo.getIV());
+      return new OzoneInputStream(cryptoIn);
+    }
+    return new OzoneInputStream(lengthInputStream.getWrappedStream());
+  }
+
+  private OzoneOutputStream createOutputStream(OpenKeySession openKey,
+      String requestId, ReplicationType type, ReplicationFactor factor)
+      throws IOException {
+    KeyOutputStream keyOutputStream =
+        new KeyOutputStream.Builder()
+            .setHandler(openKey)
+            .setXceiverClientManager(xceiverClientManager)
+            .setOmClient(ozoneManagerClient)
+            .setChunkSize(chunkSize)
+            .setRequestID(requestId)
+            .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
+            .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
+            .setBlockSize(blockSize)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
+            .setMaxRetryCount(maxRetryCount).build();
+    keyOutputStream
+        .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
+            openKey.getOpenVersion());
+    final FileEncryptionInfo feInfo = keyOutputStream.getFileEncryptionInfo();
+    if (feInfo != null) {
+      KeyProvider.KeyVersion decrypted = getDEK(feInfo);
+      final CryptoOutputStream cryptoOut =
+          new CryptoOutputStream(keyOutputStream,
+              OzoneKMSUtil.getCryptoCodec(conf, feInfo),
+              decrypted.getMaterial(), feInfo.getIV());
+      return new OzoneOutputStream(cryptoOut);
+    } else {
+      return new OzoneOutputStream(keyOutputStream);
+    }
   }
 
   @Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index be879d8..2ceccbb 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -188,6 +188,7 @@ public final class OmUtils {
     case ServiceList:
     case ListMultiPartUploadParts:
     case GetFileStatus:
+    case LookupFile:
       return true;
     case CreateVolume:
     case SetVolumeProperty:
@@ -212,6 +213,8 @@ public final class OmUtils {
     case CancelDelegationToken:
     case ApplyCreateKey:
     case ApplyInitiateMultiPartUpload:
+    case CreateDirectory:
+    case CreateFile:
       return false;
     default:
       LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 0cbab08..e184502 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -55,7 +55,10 @@ public enum OMAction implements AuditAction {
   LIST_MULTIPART_UPLOAD_PARTS,
 
   //FS Actions
-  GET_FILE_STATUS;
+  GET_FILE_STATUS,
+  CREATE_DIRECTORY,
+  CREATE_FILE,
+  LOOKUP_FILE;
 
   @Override
   public String getAction() {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index b2f805a..b56ab7f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -189,6 +189,12 @@ public class OMException extends IOException {
 
     TOKEN_CREATION_ERROR,
 
-    FILE_NOT_FOUND
+    FILE_NOT_FOUND,
+
+    DIRECTORY_NOT_FOUND,
+
+    FILE_ALREADY_EXISTS,
+
+    NOT_A_FILE
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java
index a32fc4a..462463d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java
@@ -33,9 +33,15 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
  * File Status of the Ozone Key.
  */
 public class OzoneFileStatus extends FileStatus {
+
+  private static final long serialVersionUID = 1L;
+
+  transient private OmKeyInfo keyInfo;
+
   public OzoneFileStatus(OmKeyInfo key, long blockSize, boolean isDirectory) {
     super(key.getDataSize(), isDirectory, key.getFactor().getNumber(),
         blockSize, key.getModificationTime(), getPath(key.getKeyName()));
+    keyInfo = key;
   }
 
   public OzoneFileStatus(FileStatus status) throws IOException {
@@ -43,10 +49,8 @@ public class OzoneFileStatus extends FileStatus {
   }
 
   // Use this constructor only for directories
-  public OzoneFileStatus(int replication, long blockSize,
-                         String keyName) {
-    super(0, true, replication, blockSize, 0,
-        getPath(keyName));
+  public OzoneFileStatus(String keyName) {
+    super(0, true, 0, 0, 0, getPath(keyName));
   }
 
   public FileStatusProto getProtobuf() throws IOException {
@@ -94,4 +98,18 @@ public class OzoneFileStatus extends FileStatus {
       return super.getModificationTime();
     }
   }
+
+  public OmKeyInfo getKeyInfo() {
+    return keyInfo;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 6926662..b3dc9c8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.protocol;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@@ -400,14 +401,54 @@ public interface OzoneManagerProtocol
   OMFailoverProxyProvider getOMFailoverProxyProvider();
 
   /**
-   * Get File Status for an Ozone key.
-   * @param volumeName volume name.
-   * @param bucketName bucket name.
-   * @param keyName key name.
-   * @return OzoneFileStatus for the key.
-   * @throws IOException
+   * OzoneFS api to get file status for an entry.
+   *
+   * @param keyArgs Key args
+   * @throws OMException if file does not exist
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  OzoneFileStatus getFileStatus(OmKeyArgs keyArgs) throws IOException;
+
+  /**
+   * Ozone FS api to create a directory. Parent directories if do not exist
+   * are created for the input directory.
+   *
+   * @param args Key args
+   * @throws OMException if any entry in the path exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  void createDirectory(OmKeyArgs args) throws IOException;
+
+  /**
+   * OzoneFS api to creates an output stream for a file.
+   *
+   * @param keyArgs   Key args
+   * @param overWrite if true existing file at the location will be overwritten
+   * @param recursive if true file would be created even if parent directories
+   *                  do not exist
+   * @throws OMException if given key is a directory
+   *                     if file exists and isOverwrite flag is false
+   *                     if an ancestor exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  OpenKeySession createFile(OmKeyArgs keyArgs, boolean overWrite,
+      boolean recursive) throws IOException;
+
+  /**
+   * OzoneFS api to lookup for a file.
+   *
+   * @param keyArgs Key args
+   * @throws OMException if given key is not found or it is not a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
    */
-  OzoneFileStatus getFileStatus(String volumeName, String bucketName,
-                                String keyName) throws IOException;
+  OmKeyInfo lookupFile(OmKeyArgs keyArgs) throws IOException;
 }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 538e25d..968ff39 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
@@ -1224,20 +1226,21 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
 
   /**
    * Get File Status for an Ozone key.
-   * @param volumeName volume name.
-   * @param bucketName bucket name.
-   * @param keyName key name.
+   *
+   * @param args
    * @return OzoneFileStatus for the key.
    * @throws IOException
    */
-  public OzoneFileStatus getFileStatus(String volumeName, String bucketName,
-                                String keyName) throws IOException {
-    GetFileStatusRequest req = GetFileStatusRequest
-        .newBuilder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
+  public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
         .build();
+    GetFileStatusRequest req =
+        GetFileStatusRequest.newBuilder()
+            .setKeyArgs(keyArgs)
+            .build();
 
     OMRequest omRequest = createOMRequest(Type.GetFileStatus)
         .setGetFileStatusRequest(req)
@@ -1251,4 +1254,68 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     }
     return OzoneFileStatus.getFromProtobuf(resp.getStatus());
   }
+
+  @Override
+  public void createDirectory(OmKeyArgs args) throws IOException {
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
+        .build();
+    CreateDirectoryRequest request = CreateDirectoryRequest.newBuilder()
+        .setKeyArgs(keyArgs)
+        .build();
+
+    OMRequest omRequest = createOMRequest(Type.CreateDirectory)
+        .setCreateDirectoryRequest(request)
+        .build();
+
+    handleError(submitRequest(omRequest));
+  }
+
+  @Override
+  public OmKeyInfo lookupFile(OmKeyArgs args)
+      throws IOException {
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
+        .build();
+    OzoneManagerProtocolProtos.LookupFileRequest lookupFileRequest =
+        OzoneManagerProtocolProtos.LookupFileRequest.newBuilder()
+            .setKeyArgs(keyArgs)
+            .build();
+    OMRequest omRequest = createOMRequest(Type.LookupFile)
+        .setLookupFileRequest(lookupFileRequest)
+        .build();
+    OzoneManagerProtocolProtos.LookupFileResponse resp =
+        handleError(submitRequest(omRequest)).getLookupFileResponse();
+    return OmKeyInfo.getFromProtobuf(resp.getKeyInfo());
+  }
+
+  @Override
+  public OpenKeySession createFile(OmKeyArgs args,
+      boolean overWrite, boolean recursive) throws IOException {
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
+        .setDataSize(args.getDataSize())
+        .setType(args.getType())
+        .setFactor(args.getFactor())
+        .build();
+    OzoneManagerProtocolProtos.CreateFileRequest createFileRequest =
+        OzoneManagerProtocolProtos.CreateFileRequest.newBuilder()
+            .setKeyArgs(keyArgs)
+            .setIsOverwrite(overWrite)
+            .setIsRecursive(recursive)
+            .build();
+    OMRequest omRequest = createOMRequest(Type.CreateFile)
+        .setCreateFileRequest(createFileRequest)
+        .build();
+    OzoneManagerProtocolProtos.CreateFileResponse resp =
+        handleError(submitRequest(omRequest)).getCreateFileResponse();
+    return new OpenKeySession(resp.getID(),
+        OmKeyInfo.getFromProtobuf(resp.getKeyInfo()), resp.getOpenVersion());
+  }
 }
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index eac8d2a..ffd1eba 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -82,6 +82,9 @@ enum Type {
   CancelDelegationToken = 63;
 
   GetFileStatus = 70;
+  CreateDirectory = 71;
+  CreateFile = 72;
+  LookupFile = 73;
 }
 
 message OMRequest {
@@ -135,6 +138,9 @@ message OMRequest {
   optional hadoop.common.CancelDelegationTokenRequestProto cancelDelegationTokenRequest = 63;
 
   optional GetFileStatusRequest             getFileStatusRequest           = 70;
+  optional CreateDirectoryRequest           createDirectoryRequest         = 71;
+  optional CreateFileRequest                createFileRequest              = 72;
+  optional LookupFileRequest                lookupFileRequest              = 73;
 }
 
 message OMResponse {
@@ -191,6 +197,9 @@ message OMResponse {
   optional CancelDelegationTokenResponseProto cancelDelegationTokenResponse = 63;
 
   optional GetFileStatusResponse              getFileStatusResponse        = 70;
+  optional CreateDirectoryResponse            createDirectoryResponse      = 71;
+  optional CreateFileResponse                 createFileResponse           = 72;
+  optional LookupFileResponse                 lookupFileResponse           = 73;
 }
 
 enum Status {
@@ -243,6 +252,9 @@ enum Status {
     TOKEN_CREATION_ERROR = 43;
 
     FILE_NOT_FOUND = 44;
+    DIRECTORY_NOT_FOUND = 45;
+    FILE_ALREADY_EXISTS = 46;
+    NOT_A_FILE = 47;
 }
 
 
@@ -539,15 +551,43 @@ message KeyInfo {
 }
 
 message GetFileStatusRequest {
-    required string volumeName = 1;
-    required string bucketName = 2;
-    required string keyName = 3;
+    required KeyArgs keyArgs = 1;
 }
 
 message GetFileStatusResponse {
     required hadoop.fs.FileStatusProto status = 1;
 }
 
+message CreateDirectoryRequest {
+    required KeyArgs keyArgs = 1;
+}
+
+message CreateDirectoryResponse {
+}
+
+message CreateFileRequest {
+    required KeyArgs keyArgs = 1;
+    required bool isRecursive = 2;
+    required bool isOverwrite = 3;
+}
+
+message CreateFileResponse {
+
+    optional KeyInfo keyInfo = 1;
+    // clients' followup request may carry this ID for stateful operations
+    // (similar to a cookie).
+    optional uint64 ID = 2;
+    optional uint64 openVersion = 3;
+}
+
+message LookupFileRequest {
+    required KeyArgs keyArgs = 1;
+}
+
+message LookupFileResponse {
+    optional KeyInfo keyInfo = 1;
+}
+
 message CreateKeyRequest {
     required KeyArgs keyArgs = 1;
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 83a60c0..ff360e5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -17,6 +17,8 @@
 package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -201,17 +203,20 @@ public class KeyManagerImpl implements KeyManager {
 
   private void validateBucket(String volumeName, String bucketName)
       throws IOException {
-    String volumeKey = metadataManager.getVolumeKey(volumeName);
     String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-
-    //Check if the volume exists
-    if (metadataManager.getVolumeTable().get(volumeKey) == null) {
-      LOG.error("volume not found: {}", volumeName);
-      throw new OMException("Volume not found",
-          OMException.ResultCodes.VOLUME_NOT_FOUND);
-    }
-    //Check if bucket already exists
+    // Check if bucket exists
     if (metadataManager.getBucketTable().get(bucketKey) == null) {
+      String volumeKey = metadataManager.getVolumeKey(volumeName);
+      // If the volume also does not exist, we should throw volume not found
+      // exception
+      if (metadataManager.getVolumeTable().get(volumeKey) == null) {
+        LOG.error("volume not found: {}", volumeName);
+        throw new OMException("Volume not found",
+            OMException.ResultCodes.VOLUME_NOT_FOUND);
+      }
+
+      // if the volume exists but bucket does not exist, throw bucket not found
+      // exception
       LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
       throw new OMException("Bucket not found",
           OMException.ResultCodes.BUCKET_NOT_FOUND);
@@ -388,103 +393,39 @@ public class KeyManagerImpl implements KeyManager {
     return edek;
   }
 
-  @SuppressWarnings("checkstyle:methodlength")
   @Override
   public OpenKeySession openKey(OmKeyArgs args) throws IOException {
     Preconditions.checkNotNull(args);
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
     validateBucket(volumeName, bucketName);
 
-    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
-    String keyName = args.getKeyName();
-    ReplicationFactor factor = args.getFactor();
-    ReplicationType type = args.getType();
     long currentTime = Time.monotonicNowNanos();
     OmKeyInfo keyInfo;
     String openKey;
     long openVersion;
 
-    FileEncryptionInfo encInfo = null;
-    OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
-    BucketEncryptionKeyInfo ezInfo = bucketInfo.getEncryptionKeyInfo();
-    if (ezInfo != null) {
-      if (getKMSProvider() == null) {
-        throw new OMException("Invalid KMS provider, check configuration " +
-            CommonConfigurationKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH,
-            OMException.ResultCodes.INVALID_KMS_PROVIDER);
-      }
-
-      final String ezKeyName = ezInfo.getKeyName();
-      EncryptedKeyVersion edek = generateEDEK(ezKeyName);
-      encInfo = new FileEncryptionInfo(ezInfo.getSuite(), ezInfo.getVersion(),
-            edek.getEncryptedKeyVersion().getMaterial(),
-            edek.getEncryptedKeyIv(),
-            ezKeyName, edek.getEncryptionKeyVersionName());
-    }
+    FileEncryptionInfo encInfo;
 
     try {
-      if (args.getIsMultipartKey()) {
-        Preconditions.checkArgument(args.getMultipartUploadPartNumber() > 0,
-            "PartNumber Should be greater than zero");
-        // When key is multipart upload part key, we should take replication
-        // type and replication factor from original key which has done
-        // initiate multipart upload. If we have not found any such, we throw
-        // error no such multipart upload.
-        String uploadID = args.getMultipartUploadID();
-        Preconditions.checkNotNull(uploadID);
-        String multipartKey = metadataManager.getMultipartKey(volumeName,
-            bucketName, keyName, uploadID);
-        OmKeyInfo partKeyInfo = metadataManager.getOpenKeyTable().get(
-            multipartKey);
-        if (partKeyInfo == null) {
-          throw new OMException("No such Multipart upload is with specified " +
-              "uploadId " + uploadID,
-              ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
-        } else {
-          factor = partKeyInfo.getFactor();
-          type = partKeyInfo.getType();
-        }
-      } else {
-        // If user does not specify a replication strategy or
-        // replication factor, OM will use defaults.
-        if (factor == null) {
-          factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE;
-        }
-        if (type == null) {
-          type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
-        }
-      }
-      List<OmKeyLocationInfo> locations = new ArrayList<>();
-      String objectKey = metadataManager.getOzoneKey(
-          volumeName, bucketName, keyName);
+      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+      OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
+      encInfo = getFileEncryptionInfo(bucketInfo);
       // NOTE size of a key is not a hard limit on anything, it is a value that
-      // client should expect, in terms of current size of key. If client sets a
-      // value, then this value is used, otherwise, we allocate a single block
-      // which is the current size, if read by the client.
+      // client should expect, in terms of current size of key. If client sets
+      // a value, then this value is used, otherwise, we allocate a single
+      // block which is the current size, if read by the client.
       long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
-
+      List<OmKeyLocationInfo> locations = new ArrayList<>();
       if (args.getIsMultipartKey()) {
-        // For this upload part we don't need to check in KeyTable. As this
-        // is not an actual key, it is a part of the key.
-        keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
+        keyInfo = prepareMultipartKeyInfo(args, size, locations, encInfo);
         //TODO args.getMetadata
-        openVersion = 0;
       } else {
-        keyInfo = metadataManager.getKeyTable().get(objectKey);
-        if (keyInfo != null) {
-          // the key already exist, the new blocks will be added as new version
-          // when locations.size = 0, the new version will have identical blocks
-          // as its previous version
-          openVersion = keyInfo.addNewVersion(locations);
-          keyInfo.setDataSize(size + keyInfo.getDataSize());
-        } else {
-          // the key does not exist, create a new object, the new blocks are the
-          // version 0
-          keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
-          openVersion = 0;
-        }
+        keyInfo = prepareKeyInfo(args, size, locations, encInfo);
       }
+
+      openVersion = keyInfo.getLatestVersionLocations().getVersion();
       openKey = metadataManager.getOpenKey(
           volumeName, bucketName, keyName, currentTime);
       if (metadataManager.getOpenKeyTable().get(openKey) != null) {
@@ -507,20 +448,28 @@ public class KeyManagerImpl implements KeyManager {
     } catch (IOException ex) {
       LOG.error("Key open failed for volume:{} bucket:{} key:{}",
           volumeName, bucketName, keyName, ex);
-      throw new OMException(ex.getMessage(),
-          ResultCodes.KEY_ALLOCATION_ERROR);
+      throw new OMException(ex.getMessage(), ResultCodes.KEY_ALLOCATION_ERROR);
     } finally {
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
 
+    allocateBlockInKey(keyInfo, args.getDataSize(), currentTime);
+    return new OpenKeySession(currentTime, keyInfo, openVersion);
+  }
+
+  private void allocateBlockInKey(OmKeyInfo keyInfo, long size, long sessionId)
+      throws IOException {
+    String openKey = metadataManager
+        .getOpenKey(keyInfo.getVolumeName(), keyInfo.getBucketName(),
+            keyInfo.getKeyName(), sessionId);
     // requested size is not required but more like a optimization:
     // SCM looks at the requested, if it 0, no block will be allocated at
     // the point, if client needs more blocks, client can always call
     // allocateBlock. But if requested size is not 0, OM will preallocate
     // some blocks and piggyback to client, to save RPC calls.
-    if (args.getDataSize() > 0) {
+    if (size > 0) {
       List<OmKeyLocationInfo> locationInfos =
-          allocateBlock(keyInfo, new ExcludeList(), args.getDataSize());
+          allocateBlock(keyInfo, new ExcludeList(), size);
       keyInfo.appendNewBlocks(locationInfos);
     }
 
@@ -529,7 +478,69 @@ public class KeyManagerImpl implements KeyManager {
     if (!isRatisEnabled) {
       metadataManager.getOpenKeyTable().put(openKey, keyInfo);
     }
-    return new OpenKeySession(currentTime, keyInfo, openVersion);
+  }
+
+  private OmKeyInfo prepareKeyInfo(OmKeyArgs args, long size,
+      List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo)
+      throws IOException {
+    ReplicationFactor factor = args.getFactor();
+    ReplicationType type = args.getType();
+    OmKeyInfo keyInfo;
+    // If user does not specify a replication strategy or
+    // replication factor, OM will use defaults.
+    if (factor == null) {
+      factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE;
+    }
+    if (type == null) {
+      type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
+    }
+    String objectKey = metadataManager.getOzoneKey(
+        args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    keyInfo = metadataManager.getKeyTable().get(objectKey);
+    if (keyInfo != null) {
+      // the key already exist, the new blocks will be added as new version
+      // when locations.size = 0, the new version will have identical blocks
+      // as its previous version
+      keyInfo.addNewVersion(locations);
+      keyInfo.setDataSize(size + keyInfo.getDataSize());
+    } else {
+      // the key does not exist, create a new object, the new blocks are the
+      // version 0
+      keyInfo = createKeyInfo(args, locations, factor, type, size, encInfo);
+    }
+    return keyInfo;
+  }
+
+  private OmKeyInfo prepareMultipartKeyInfo(OmKeyArgs args, long size,
+      List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo)
+      throws IOException {
+    ReplicationFactor factor;
+    ReplicationType type;
+
+    Preconditions.checkArgument(args.getMultipartUploadPartNumber() > 0,
+        "PartNumber Should be greater than zero");
+    // When key is multipart upload part key, we should take replication
+    // type and replication factor from original key which has done
+    // initiate multipart upload. If we have not found any such, we throw
+    // error no such multipart upload.
+    String uploadID = args.getMultipartUploadID();
+    Preconditions.checkNotNull(uploadID);
+    String multipartKey = metadataManager
+        .getMultipartKey(args.getVolumeName(), args.getBucketName(),
+            args.getKeyName(), uploadID);
+    OmKeyInfo partKeyInfo = metadataManager.getOpenKeyTable().get(
+        multipartKey);
+    if (partKeyInfo == null) {
+      throw new OMException("No such Multipart upload is with specified " +
+          "uploadId " + uploadID,
+          ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+    } else {
+      factor = partKeyInfo.getFactor();
+      type = partKeyInfo.getType();
+    }
+    // For this upload part we don't need to check in KeyTable. As this
+    // is not an actual key, it is a part of the key.
+    return createKeyInfo(args, locations, factor, type, size, encInfo);
   }
 
   public void applyOpenKey(KeyArgs omKeyArgs,
@@ -1329,18 +1340,27 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
-  public OzoneFileStatus getFileStatus(String volumeName, String bucketName,
-                                       String keyName) throws IOException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(keyName);
+  /**
+   * OzoneFS api to get file status for an entry.
+   *
+   * @param args Key args
+   * @throws OMException if file does not exist
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args, "Key args can not be null");
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
 
     metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {
         validateBucket(volumeName, bucketName);
-        return new OzoneFileStatus(3, scmBlockSize, keyName);
+        return new OzoneFileStatus(keyName);
       }
 
       //Check if the key is a file.
@@ -1363,7 +1383,7 @@ public class KeyManagerImpl implements KeyManager {
       List<OmKeyInfo> keys = metadataManager.listKeys(volumeName, bucketName,
           null, dirKey, 1);
       if (keys.iterator().hasNext()) {
-        return new OzoneFileStatus(3, scmBlockSize, keyName);
+        return new OzoneFileStatus(keyName);
       }
 
       LOG.debug("Unable to get file status for the key: volume:" + volumeName +
@@ -1377,6 +1397,226 @@ public class KeyManagerImpl implements KeyManager {
     }
   }
 
+  /**
+   * Ozone FS api to create a directory. Parent directories if do not exist
+   * are created for the input directory.
+   *
+   * @param args Key args
+   * @throws OMException if any entry in the path exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  public void createDirectory(OmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args, "Key args can not be null");
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+
+    try {
+      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+
+      // verify bucket exists
+      OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
+
+      // Check if this is the root of the filesystem.
+      if (keyName.length() == 0) {
+        return;
+      }
+
+      verifyNoFilesInPath(volumeName, bucketName, Paths.get(keyName), false);
+      String dir = addTrailingSlashIfNeeded(keyName);
+      String dirDbKey =
+          metadataManager.getOzoneKey(volumeName, bucketName, dir);
+      FileEncryptionInfo encInfo = getFileEncryptionInfo(bucketInfo);
+      OmKeyInfo dirDbKeyInfo =
+          createDirectoryKeyInfo(volumeName, bucketName, dir, new ArrayList<>(),
+              ReplicationFactor.ONE, ReplicationType.RATIS, encInfo);
+      metadataManager.getKeyTable().put(dirDbKey, dirDbKeyInfo);
+
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+  }
+
+  private OmKeyInfo createDirectoryKeyInfo(String volumeName, String bucketName,
+      String keyName, List<OmKeyLocationInfo> locations,
+      ReplicationFactor factor, ReplicationType type,
+      FileEncryptionInfo encInfo) {
+    return new OmKeyInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setOmKeyLocationInfos(Collections.singletonList(
+            new OmKeyLocationInfoGroup(0, locations)))
+        .setCreationTime(Time.now())
+        .setModificationTime(Time.now())
+        .setDataSize(0)
+        .setReplicationType(type)
+        .setReplicationFactor(factor)
+        .setFileEncryptionInfo(encInfo)
+        .build();
+  }
+
+  /**
+   * OzoneFS api to creates an output stream for a file.
+   *
+   * @param args        Key args
+   * @param isOverWrite if true existing file at the location will be
+   *                    overwritten
+   * @param isRecursive if true file would be created even if parent
+   *                    directories do not exist
+   * @throws OMException if given key is a directory
+   *                     if file exists and isOverwrite flag is false
+   *                     if an ancestor exists as a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  @Override
+  public OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite,
+      boolean isRecursive) throws IOException {
+    Preconditions.checkNotNull(args, "Key args can not be null");
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    OpenKeySession keySession;
+
+    try {
+      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+
+      OzoneFileStatus fileStatus;
+      try {
+        fileStatus = getFileStatus(args);
+        if (fileStatus.isDirectory()) {
+          throw new OMException("Can not write to directory: " + keyName,
+              ResultCodes.NOT_A_FILE);
+        } else if (fileStatus.isFile()) {
+          if (!isOverWrite) {
+            throw new OMException("File " + keyName + " already exists",
+                ResultCodes.FILE_ALREADY_EXISTS);
+          }
+        }
+      } catch (OMException ex) {
+        if (ex.getResult() != ResultCodes.FILE_NOT_FOUND) {
+          throw ex;
+        }
+      }
+
+      verifyNoFilesInPath(volumeName, bucketName,
+          Paths.get(keyName).getParent(), !isRecursive);
+      // TODO: Optimize call to openKey as keyInfo is already available in the
+      // filestatus. We can avoid some operations in openKey call.
+      keySession = openKey(args);
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+
+    return keySession;
+  }
+
+  /**
+   * OzoneFS api to lookup for a file.
+   *
+   * @param args Key args
+   * @throws OMException if given key is not found or it is not a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  @Override
+  public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args, "Key args can not be null");
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+
+    try {
+      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+      OzoneFileStatus fileStatus = getFileStatus(args);
+      if (fileStatus.isFile()) {
+        return fileStatus.getKeyInfo();
+      }
+      //if key is not of type file or if key is not found we throw an exception
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+
+    throw new OMException("Can not write to directory: " + keyName,
+        ResultCodes.NOT_A_FILE);
+  }
+
+  /**
+   * Verify that none of the parent path exists as file in the filesystem.
+   *
+   * @param volumeName         Volume name
+   * @param bucketName         Bucket name
+   * @param path               Directory path. This is the absolute path of the
+   *                           directory for the ozone filesystem.
+   * @param directoryMustExist throws exception if true and given path does not
+   *                           exist as directory
+   * @throws OMException if ancestor exists as file in the filesystem
+   *                     if directoryMustExist flag is true and parent does
+   *                     not exist
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  private void verifyNoFilesInPath(String volumeName, String bucketName,
+      Path path, boolean directoryMustExist) throws IOException {
+    OmKeyArgs.Builder argsBuilder = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName);
+    while (path != null) {
+      String keyName = path.toString();
+      try {
+        OzoneFileStatus fileStatus =
+            getFileStatus(argsBuilder.setKeyName(keyName).build());
+        if (fileStatus.isFile()) {
+          LOG.error("Unable to create directory (File already exists): volume: "
+              + volumeName + "bucket: " + bucketName + "key: " + keyName);
+          throw new OMException(
+              "Unable to create directory at : volume: " + volumeName
+                  + "bucket: " + bucketName + "key: " + keyName,
+              ResultCodes.FILE_ALREADY_EXISTS);
+        } else if (fileStatus.isDirectory()) {
+          break;
+        }
+      } catch (OMException ex) {
+        if (ex.getResult() != ResultCodes.FILE_NOT_FOUND) {
+          throw ex;
+        } else if (ex.getResult() == ResultCodes.FILE_NOT_FOUND) {
+          if (directoryMustExist) {
+            throw new OMException("Parent directory does not exist",
+                ex.getCause(), ResultCodes.DIRECTORY_NOT_FOUND);
+          }
+        }
+      }
+      path = path.getParent();
+    }
+  }
+
+  private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo)
+      throws IOException {
+    FileEncryptionInfo encInfo = null;
+    BucketEncryptionKeyInfo ezInfo = bucketInfo.getEncryptionKeyInfo();
+    if (ezInfo != null) {
+      if (getKMSProvider() == null) {
+        throw new OMException("Invalid KMS provider, check configuration " +
+            CommonConfigurationKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+            OMException.ResultCodes.INVALID_KMS_PROVIDER);
+      }
+
+      final String ezKeyName = ezInfo.getKeyName();
+      EncryptedKeyVersion edek = generateEDEK(ezKeyName);
+      encInfo = new FileEncryptionInfo(ezInfo.getSuite(), ezInfo.getVersion(),
+          edek.getEncryptedKeyVersion().getMaterial(),
+          edek.getEncryptedKeyIv(),
+          ezKeyName, edek.getEncryptionKeyVersionName());
+    }
+    return encInfo;
+  }
+
   private String addTrailingSlashIfNeeded(String key) {
     if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
       return key + OZONE_URI_DELIMITER;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 96a9439..6e6d3aa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -68,6 +68,9 @@ public class OMMetrics {
   private @Metric MutableCounterLong numCompleteMultipartUploads;
 
   private @Metric MutableCounterLong numGetFileStatus;
+  private @Metric MutableCounterLong numCreateDirectory;
+  private @Metric MutableCounterLong numCreateFile;
+  private @Metric MutableCounterLong numLookupFile;
 
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
@@ -101,6 +104,9 @@ public class OMMetrics {
   private @Metric MutableCounterLong numListMultipartUploadPartFails;
 
   private @Metric MutableCounterLong numGetFileStatusFails;
+  private @Metric MutableCounterLong numCreateDirectoryFails;
+  private @Metric MutableCounterLong numCreateFileFails;
+  private @Metric MutableCounterLong numLookupFileFails;
 
   // Metrics for total number of volumes, buckets and keys
 
@@ -297,6 +303,36 @@ public class OMMetrics {
     numGetFileStatusFails.incr();
   }
 
+  public void incNumCreateDirectory() {
+    numKeyOps.incr();
+    numFSOps.incr();
+    numCreateDirectory.incr();
+  }
+
+  public void incNumCreateDirectoryFails() {
+    numCreateDirectoryFails.incr();
+  }
+
+  public void incNumCreateFile() {
+    numKeyOps.incr();
+    numFSOps.incr();
+    numCreateFile.incr();
+  }
+
+  public void incNumCreateFileFails() {
+    numCreateFileFails.incr();
+  }
+
+  public void incNumLookupFile() {
+    numKeyOps.incr();
+    numFSOps.incr();
+    numLookupFile.incr();
+  }
+
+  public void incNumLookupFileFails() {
+    numLookupFileFails.incr();
+  }
+
   public void incNumListMultipartUploadPartFails() {
     numListMultipartUploadPartFails.incr();
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 9fa297d..c8fdef4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2665,24 +2665,103 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   @Override
-  public OzoneFileStatus getFileStatus(String volumeName, String bucketName,
-                                       String keyName) throws IOException {
-    Map<String, String> auditMap = new HashMap<>();
-    auditMap.put(OzoneConsts.VOLUME, volumeName);
-    auditMap.put(OzoneConsts.BUCKET, bucketName);
-    auditMap.put(OzoneConsts.KEY, keyName);
-    metrics.incNumGetFileStatus();
+  public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
+    if (isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
+    boolean auditSuccess = true;
     try {
-      OzoneFileStatus ozoneFileStatus =
-          keyManager.getFileStatus(volumeName, bucketName, keyName);
-      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
-          .GET_FILE_STATUS, auditMap));
-      return ozoneFileStatus;
+      metrics.incNumGetFileStatus();
+      return keyManager.getFileStatus(args);
     } catch (IOException ex) {
       metrics.incNumGetFileStatusFails();
-      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
-          .GET_FILE_STATUS, auditMap, ex));
+      auditSuccess = false;
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(OMAction.GET_FILE_STATUS,
+              (args == null) ? null : args.toAuditMap(), ex));
+      throw ex;
+    } finally {
+      if (auditSuccess) {
+        AUDIT.logWriteSuccess(
+            buildAuditMessageForSuccess(OMAction.GET_FILE_STATUS,
+                (args == null) ? null : args.toAuditMap()));
+      }
+    }
+  }
+
+  @Override
+  public void createDirectory(OmKeyArgs args) throws IOException {
+    if (isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
+    boolean auditSuccess = true;
+    try {
+      metrics.incNumCreateDirectory();
+      keyManager.createDirectory(args);
+    } catch (IOException ex) {
+      metrics.incNumCreateDirectoryFails();
+      auditSuccess = false;
+      AUDIT.logWriteFailure(
+          buildAuditMessageForFailure(OMAction.CREATE_DIRECTORY,
+              (args == null) ? null : args.toAuditMap(), ex));
       throw ex;
+    } finally {
+      if (auditSuccess) {
+        AUDIT.logWriteSuccess(
+            buildAuditMessageForSuccess(OMAction.CREATE_DIRECTORY,
+                (args == null) ? null : args.toAuditMap()));
+      }
+    }
+  }
+
+  @Override
+  public OpenKeySession createFile(OmKeyArgs args, boolean overWrite,
+      boolean recursive) throws IOException {
+    if (isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
+    boolean auditSuccess = true;
+    try {
+      metrics.incNumCreateFile();
+      return keyManager.createFile(args, overWrite, recursive);
+    } catch (Exception ex) {
+      metrics.incNumCreateFileFails();
+      auditSuccess = false;
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.CREATE_FILE,
+          (args == null) ? null : args.toAuditMap(), ex));
+      throw ex;
+    } finally {
+      if(auditSuccess){
+        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+            OMAction.CREATE_FILE, (args == null) ? null : args.toAuditMap()));
+      }
+    }
+  }
+
+  @Override
+  public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
+    boolean auditSuccess = true;
+    try {
+      metrics.incNumLookupFile();
+      return keyManager.lookupFile(args);
+    } catch (Exception ex) {
+      metrics.incNumLookupFileFails();
+      auditSuccess = false;
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.LOOKUP_FILE,
+          (args == null) ? null : args.toAuditMap(), ex));
+      throw ex;
+    } finally {
+      if(auditSuccess){
+        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+            OMAction.LOOKUP_FILE, (args == null) ? null : args.toAuditMap()));
+      }
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
index abd7794..cdde506 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.ozone.om.fs;
 
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 
 import java.io.IOException;
@@ -26,6 +29,12 @@ import java.io.IOException;
  * Ozone Manager FileSystem interface.
  */
 public interface OzoneManagerFS {
-  OzoneFileStatus getFileStatus(String volumeName, String bucketName,
-                                String keyName) throws IOException;
+  OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException;
+
+  void createDirectory(OmKeyArgs args) throws IOException;
+
+  OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite,
+      boolean isRecursive) throws IOException;
+
+  OmKeyInfo lookupFile(OmKeyArgs args) throws IOException;
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 7660ed1..04b4881 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitK
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateBucketRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateBucketResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
@@ -333,6 +334,19 @@ public class OzoneManagerRequestHandler implements RequestHandler {
             getOzoneFileStatus(request.getGetFileStatusRequest());
         responseBuilder.setGetFileStatusResponse(getFileStatusResponse);
         break;
+      case CreateDirectory:
+        createDirectory(request.getCreateDirectoryRequest());
+        break;
+      case CreateFile:
+        OzoneManagerProtocolProtos.CreateFileResponse createFileResponse =
+            createFile(request.getCreateFileRequest());
+        responseBuilder.setCreateFileResponse(createFileResponse);
+        break;
+      case LookupFile:
+        OzoneManagerProtocolProtos.LookupFileResponse lookupFileResponse =
+            lookupFile(request.getLookupFileRequest());
+        responseBuilder.setLookupFileResponse(lookupFileResponse);
+        break;
       default:
         responseBuilder.setSuccess(false);
         responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -955,11 +969,62 @@ public class OzoneManagerRequestHandler implements RequestHandler {
 
   private GetFileStatusResponse getOzoneFileStatus(
       GetFileStatusRequest request) throws IOException {
-    GetFileStatusResponse.Builder rb = GetFileStatusResponse.newBuilder();
+    KeyArgs keyArgs = request.getKeyArgs();
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .build();
 
-    rb.setStatus(impl.getFileStatus(request.getVolumeName(),
-        request.getBucketName(), request.getKeyName()).getProtobuf());
+    GetFileStatusResponse.Builder rb = GetFileStatusResponse.newBuilder();
+    rb.setStatus(impl.getFileStatus(omKeyArgs).getProtobuf());
 
     return rb.build();
   }
+
+  private void createDirectory(CreateDirectoryRequest request)
+      throws IOException {
+    KeyArgs keyArgs = request.getKeyArgs();
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .build();
+    impl.createDirectory(omKeyArgs);
+  }
+
+  private OzoneManagerProtocolProtos.CreateFileResponse createFile(
+      OzoneManagerProtocolProtos.CreateFileRequest request) throws IOException {
+    KeyArgs keyArgs = request.getKeyArgs();
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .setDataSize(keyArgs.getDataSize())
+        .setType(keyArgs.getType())
+        .setFactor(keyArgs.getFactor())
+        .build();
+    OpenKeySession keySession =
+        impl.createFile(omKeyArgs, request.getIsOverwrite(),
+            request.getIsRecursive());
+    return OzoneManagerProtocolProtos.CreateFileResponse.newBuilder()
+        .setKeyInfo(keySession.getKeyInfo().getProtobuf())
+        .setID(keySession.getId())
+        .setOpenVersion(keySession.getOpenVersion())
+        .build();
+  }
+
+  private OzoneManagerProtocolProtos.LookupFileResponse lookupFile(
+      OzoneManagerProtocolProtos.LookupFileRequest request)
+      throws IOException {
+    KeyArgs keyArgs = request.getKeyArgs();
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .build();
+    return OzoneManagerProtocolProtos.LookupFileResponse.newBuilder()
+        .setKeyInfo(impl.lookupFile(omKeyArgs).getProtobuf())
+        .build();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index d5c17b5..d422227 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -20,9 +20,12 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -133,12 +136,8 @@ public class TestKeyManagerImpl {
   public void allocateBlockFailureInChillMode() throws Exception {
     KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
         metadataManager, conf, "om1", null);
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+    OmKeyArgs keyArgs = createBuilder()
         .setKeyName(KEY_NAME)
-        .setBucketName(BUCKET_NAME)
-        .setFactor(ReplicationFactor.ONE)
-        .setType(ReplicationType.STAND_ALONE)
-        .setVolumeName(VOLUME_NAME)
         .build();
     OpenKeySession keySession = keyManager1.openKey(keyArgs);
     LambdaTestUtils.intercept(OMException.class,
@@ -152,13 +151,10 @@ public class TestKeyManagerImpl {
   public void openKeyFailureInChillMode() throws Exception {
     KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
         metadataManager, conf, "om1", null);
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+    OmKeyArgs keyArgs = createBuilder()
         .setKeyName(KEY_NAME)
-        .setBucketName(BUCKET_NAME)
-        .setFactor(ReplicationFactor.ONE)
         .setDataSize(1000)
-        .setType(ReplicationType.STAND_ALONE)
-        .setVolumeName(VOLUME_NAME).build();
+        .build();
     LambdaTestUtils.intercept(OMException.class,
         "ChillModePrecheck failed for allocateBlock", () -> {
           keyManager1.openKey(keyArgs);
@@ -167,17 +163,180 @@ public class TestKeyManagerImpl {
 
   @Test
   public void openKeyWithMultipleBlocks() throws IOException {
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+    OmKeyArgs keyArgs = createBuilder()
         .setKeyName(UUID.randomUUID().toString())
-        .setBucketName(BUCKET_NAME)
-        .setFactor(ReplicationFactor.ONE)
         .setDataSize(scmBlockSize * 10)
-        .setType(ReplicationType.STAND_ALONE)
-        .setVolumeName(VOLUME_NAME)
         .build();
     OpenKeySession keySession = keyManager.openKey(keyArgs);
     OmKeyInfo keyInfo = keySession.getKeyInfo();
     Assert.assertEquals(10,
         keyInfo.getLatestVersionLocations().getLocationList().size());
   }
+
+  @Test
+  public void testCreateDirectory() throws IOException {
+    // Create directory where the parent directory does not exist
+    String keyName = RandomStringUtils.randomAlphabetic(5);
+    OmKeyArgs keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+    for (int i =0; i< 5; i++) {
+      keyName += "/" + RandomStringUtils.randomAlphabetic(5);
+    }
+    keyManager.createDirectory(keyArgs);
+    Path path = Paths.get(keyName);
+    while (path != null) {
+      // verify parent directories are created
+      Assert.assertTrue(keyManager.getFileStatus(keyArgs).isDirectory());
+      path = path.getParent();
+    }
+
+    // make sure create directory fails where parent is a file
+    keyName = RandomStringUtils.randomAlphabetic(5);
+    keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+    OpenKeySession keySession = keyManager.openKey(keyArgs);
+    keyArgs.setLocationInfoList(
+        keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
+    keyManager.commitKey(keyArgs, keySession.getId());
+    for (int i =0; i< 5; i++) {
+      keyName += "/" + RandomStringUtils.randomAlphabetic(5);
+    }
+    try {
+      keyManager.createDirectory(keyArgs);
+      Assert.fail("Creation should fail for directory.");
+    } catch (OMException e) {
+      Assert.assertEquals(e.getResult(),
+          OMException.ResultCodes.FILE_ALREADY_EXISTS);
+    }
+
+    // create directory for root directory
+    keyName = "";
+    keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+    keyManager.createDirectory(keyArgs);
+    Assert.assertTrue(keyManager.getFileStatus(keyArgs).isDirectory());
+
+    // create directory where parent is root
+    keyName = RandomStringUtils.randomAlphabetic(5);
+    keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+    keyManager.createDirectory(keyArgs);
+    Assert.assertTrue(keyManager.getFileStatus(keyArgs).isDirectory());
+  }
+
+  @Test
+  public void testOpenFile() throws IOException {
+    // create key
+    String keyName = RandomStringUtils.randomAlphabetic(5);
+    OmKeyArgs keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    keyArgs.setLocationInfoList(
+        keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
+    keyManager.commitKey(keyArgs, keySession.getId());
+
+    // try to open created key with overWrite flag set to false
+    try {
+      keyManager.createFile(keyArgs, false, false);
+      Assert.fail("Open key should fail for non overwrite create");
+    } catch (OMException ex) {
+      if (ex.getResult() != OMException.ResultCodes.FILE_ALREADY_EXISTS) {
+        throw ex;
+      }
+    }
+
+    // create file should pass with overwrite flag set to true
+    keyManager.createFile(keyArgs, true, false);
+
+    // try to create a file where parent directories do not exist and
+    // recursive flag is set to false
+    keyName = RandomStringUtils.randomAlphabetic(5);
+    for (int i =0; i< 5; i++) {
+      keyName += "/" + RandomStringUtils.randomAlphabetic(5);
+    }
+    keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+    try {
+      keyManager.createFile(keyArgs, false, false);
+      Assert.fail("Open file should fail for non recursive write");
+    } catch (OMException ex) {
+      if (ex.getResult() != OMException.ResultCodes.DIRECTORY_NOT_FOUND) {
+        throw ex;
+      }
+    }
+
+    // file create should pass when recursive flag is set to true
+    keySession = keyManager.createFile(keyArgs, false, true);
+    keyArgs.setLocationInfoList(
+        keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
+    keyManager.commitKey(keyArgs, keySession.getId());
+    Assert.assertTrue(keyManager
+        .getFileStatus(keyArgs).isFile());
+
+    // try creating a file over a directory
+    keyArgs = createBuilder()
+        .setKeyName("")
+        .build();
+    try {
+      keyManager.createFile(keyArgs, true, true);
+      Assert.fail("Open file should fail for non recursive write");
+    } catch (OMException ex) {
+      if (ex.getResult() != OMException.ResultCodes.NOT_A_FILE) {
+        throw ex;
+      }
+    }
+  }
+
+  @Test
+  public void testLookupFile() throws IOException {
+    String keyName = RandomStringUtils.randomAlphabetic(5);
+    OmKeyArgs keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+
+    // lookup for a non-existent file
+    try {
+      keyManager.lookupFile(keyArgs);
+      Assert.fail("Lookup file should fail for non existent file");
+    } catch (OMException ex) {
+      if (ex.getResult() != OMException.ResultCodes.FILE_NOT_FOUND) {
+        throw ex;
+      }
+    }
+
+    // create a file
+    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    keyArgs.setLocationInfoList(
+        keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
+    keyManager.commitKey(keyArgs, keySession.getId());
+    Assert.assertEquals(keyManager.lookupFile(keyArgs).getKeyName(), keyName);
+
+    // lookup for created file
+    keyArgs = createBuilder()
+        .setKeyName("")
+        .build();
+    try {
+      keyManager.lookupFile(keyArgs);
+      Assert.fail("Lookup file should fail for a directory");
+    } catch (OMException ex) {
+      if (ex.getResult() != OMException.ResultCodes.NOT_A_FILE) {
+        throw ex;
+      }
+    }
+  }
+
+  private OmKeyArgs.Builder createBuilder() {
+    return new OmKeyArgs.Builder()
+        .setBucketName(BUCKET_NAME)
+        .setFactor(ReplicationFactor.ONE)
+        .setDataSize(0)
+        .setType(ReplicationType.STAND_ALONE)
+        .setVolumeName(VOLUME_NAME);
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index efa444b..a08d58b 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -39,15 +39,14 @@ public interface OzoneClientAdapter {
 
   void close() throws IOException;
 
-  InputStream createInputStream(String key) throws IOException;
+  InputStream readFile(String key) throws IOException;
 
-  OzoneFSOutputStream createKey(String key) throws IOException;
+  OzoneFSOutputStream createFile(String key, boolean overWrite,
+      boolean recursive) throws IOException;
 
   void renameKey(String key, String newKeyName) throws IOException;
 
-  boolean isDirectory(BasicKeyInfo key);
-
-  boolean createDirectory(String keyName);
+  boolean createDirectory(String keyName) throws IOException;
 
   boolean deleteObject(String keyName);
 
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
index 149d0e6..e32ca12 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapterImpl.java
@@ -17,19 +17,18 @@
  */
 package org.apache.hadoop.fs.ozone;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
-import java.util.HashMap;
 import java.util.Iterator;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -163,22 +162,43 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
   }
 
   @Override
-  public InputStream createInputStream(String key) throws IOException {
+  public InputStream readFile(String key) throws IOException {
     if (storageStatistics != null) {
       storageStatistics.incrementCounter(Statistic.OBJECTS_READ, 1);
     }
-    return bucket.readKey(key).getInputStream();
+    try {
+      return bucket.readFile(key).getInputStream();
+    } catch (OMException ex) {
+      if (ex.getResult() == OMException.ResultCodes.FILE_NOT_FOUND
+          || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+        throw new FileNotFoundException(
+            ex.getResult().name() + ": " + ex.getMessage());
+      } else {
+        throw ex;
+      }
+    }
   }
 
   @Override
-  public OzoneFSOutputStream createKey(String key) throws IOException {
+  public OzoneFSOutputStream createFile(String key, boolean overWrite,
+      boolean recursive) throws IOException {
     if (storageStatistics != null) {
       storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
     }
-    OzoneOutputStream ozoneOutputStream =
-        bucket.createKey(key, 0, replicationType, replicationFactor,
-            new HashMap<>());
-    return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
+    try {
+      OzoneOutputStream ozoneOutputStream = bucket
+          .createFile(key, 0, replicationType, replicationFactor, overWrite,
+              recursive);
+      return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
+    } catch (OMException ex) {
+      if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
+          || ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
+        throw new FileAlreadyExistsException(
+            ex.getResult().name() + ": " + ex.getMessage());
+      } else {
+        throw ex;
+      }
+    }
   }
 
   @Override
@@ -190,39 +210,26 @@ public class OzoneClientAdapterImpl implements OzoneClientAdapter {
   }
 
   /**
-   * Helper method to check if an Ozone key is representing a directory.
-   *
-   * @param key key to be checked as a directory
-   * @return true if key is a directory, false otherwise
-   */
-  @Override
-  public boolean isDirectory(BasicKeyInfo key) {
-    LOG.trace("key name:{} size:{}", key.getName(),
-        key.getDataSize());
-    return key.getName().endsWith(OZONE_URI_DELIMITER)
-        && (key.getDataSize() == 0);
-  }
-
-  /**
    * Helper method to create an directory specified by key name in bucket.
    *
    * @param keyName key name to be created as directory
    * @return true if the key is created, false otherwise
    */
   @Override
-  public boolean createDirectory(String keyName) {
+  public boolean createDirectory(String keyName) throws IOException {
+    LOG.trace("creating dir for key:{}", keyName);
+    if (storageStatistics != null) {
+      storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
+    }
     try {
-      LOG.trace("creating dir for key:{}", keyName);
-      if (storageStatistics != null) {
-        storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
+      bucket.createDirectory(keyName);
+    } catch (OMException e) {
+      if (e.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS) {
+        throw new FileAlreadyExistsException(e.getMessage());
       }
-      bucket.createKey(keyName, 0, replicationType, replicationFactor,
-          new HashMap<>()).close();
-      return true;
-    } catch (IOException ioe) {
-      LOG.error("create key failed for key:{}", keyName, ioe);
-      return false;
+      throw e;
     }
+    return true;
   }
 
   /**
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 05fb77f..3a09e04 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -230,14 +230,8 @@ public class OzoneFileSystem extends FileSystem
     }
     statistics.incrementWriteOps(1);
     LOG.trace("open() path:{}", f);
-    final FileStatus fileStatus = getFileStatus(f);
     final String key = pathToKey(f);
-    if (fileStatus.isDirectory()) {
-      throw new FileNotFoundException("Can't open directory " + f + " to read");
-    }
-
-    return new FSDataInputStream(
-        new OzoneFSInputStream(adapter.createInputStream(key)));
+    return new FSDataInputStream(new OzoneFSInputStream(adapter.readFile(key)));
   }
 
   @Override
@@ -251,26 +245,9 @@ public class OzoneFileSystem extends FileSystem
     }
     statistics.incrementWriteOps(1);
     final String key = pathToKey(f);
-    final FileStatus status;
-    try {
-      status = getFileStatus(f);
-      if (status.isDirectory()) {
-        throw new FileAlreadyExistsException(f + " is a directory");
-      } else {
-        if (!overwrite) {
-          // path references a file and overwrite is disabled
-          throw new FileAlreadyExistsException(f + " already exists");
-        }
-        LOG.trace("Overwriting file {}", f);
-        adapter.deleteObject(key);
-      }
-    } catch (FileNotFoundException ignored) {
-      // this means the file is not found
-    }
-
     // We pass null to FSDataOutputStream so it won't count writes that
     // are being buffered to a file
-    return new FSDataOutputStream(adapter.createKey(key), statistics);
+    return createOutputStream(key, overwrite, true);
   }
 
   @Override
@@ -286,6 +263,7 @@ public class OzoneFileSystem extends FileSystem
           Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
     }
     statistics.incrementWriteOps(1);
+    final String key = pathToKey(path);
     final Path parent = path.getParent();
     if (parent != null) {
       // expect this to raise an exception if there is no parent
@@ -293,8 +271,13 @@ public class OzoneFileSystem extends FileSystem
         throw new FileAlreadyExistsException("Not a directory: " + parent);
       }
     }
-    return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
-        bufferSize, replication, blockSize, progress);
+    return createOutputStream(key, flags.contains(CreateFlag.OVERWRITE), false);
+  }
+
+  private FSDataOutputStream createOutputStream(String key, boolean overwrite,
+      boolean recursive) throws IOException {
+    return new FSDataOutputStream(adapter.createFile(key, overwrite, recursive),
+        statistics);
   }
 
   @Override
@@ -737,48 +720,14 @@ public class OzoneFileSystem extends FileSystem
   /**
    * Check whether the path is valid and then create directories.
    * Directory is represented using a key with no value.
-   * All the non-existent parent directories are also created.
    *
    * @param path directory path to be created
    * @return true if directory exists or created successfully.
    * @throws IOException
    */
   private boolean mkdir(Path path) throws IOException {
-    Path fPart = path;
-    Path prevfPart = null;
-    do {
-      LOG.trace("validating path:{}", fPart);
-      try {
-        FileStatus fileStatus = getFileStatus(fPart);
-        if (fileStatus.isDirectory()) {
-          // If path exists and a directory, exit
-          break;
-        } else {
-          // Found a file here, rollback and delete newly created directories
-          LOG.trace("Found a file with same name as directory, path:{}", fPart);
-          if (prevfPart != null) {
-            delete(prevfPart, true);
-          }
-          throw new FileAlreadyExistsException(String.format(
-              "Can't make directory for path '%s', it is a file.", fPart));
-        }
-      } catch (FileNotFoundException fnfe) {
-        LOG.trace("creating directory for fpart:{}", fPart);
-        String key = pathToKey(fPart);
-        String dirKey = addTrailingSlashIfNeeded(key);
-        if (!adapter.createDirectory(dirKey)) {
-          // Directory creation failed here,
-          // rollback and delete newly created directories
-          LOG.trace("Directory creation failed, path:{}", fPart);
-          if (prevfPart != null) {
-            delete(prevfPart, true);
-          }
-          return false;
-        }
-      }
-      prevfPart = fPart;
-      fPart = fPart.getParent();
-    } while (fPart != null);
+    String key = pathToKey(path);
+    adapter.createDirectory(key);
     return true;
   }
 
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
index e8b090b..b4b37c4 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.junit.Assert;
 import org.junit.Before;
@@ -187,8 +188,8 @@ public class TestOzoneFileInterfaces {
     FileStatus status = fs.getFileStatus(path);
     assertEquals(statistics.getLong(
         StorageStatistics.CommonStatisticNames.OP_GET_FILE_STATUS).longValue(),
-        2);
-    assertEquals(statistics.getLong("objects_query").longValue(), 2);
+        1);
+    assertEquals(statistics.getLong("objects_query").longValue(), 1);
     // The timestamp of the newly created file should always be greater than
     // the time when the test was started
     assertTrue("Modification time has not been recorded: " + status,
@@ -269,9 +270,13 @@ public class TestOzoneFileInterfaces {
     verifyOwnerGroup(status);
 
     long currentTime = System.currentTimeMillis();
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(o3fs.pathToKey(path))
+        .build();
     OzoneFileStatus omStatus =
-        cluster.getOzoneManager().getFileStatus(volumeName,
-        bucketName, o3fs.pathToKey(path));
+        cluster.getOzoneManager().getFileStatus(keyArgs);
     //Another get file status here, incremented the counter.
     Assert.assertEquals(numFileStatus + 2,
         cluster.getOzoneManager().getMetrics().getNumGetFileStatus());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org