You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/07/22 13:00:35 UTC

[hadoop-ozone] 25/39: HDDS-3612. Allow mounting bucket under other volume (#1104)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch ozone-0.6.0
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 6b666bfdc23a8e7354bd05e93872480a48fcaa2d
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Fri Jul 17 06:02:31 2020 +0200

    HDDS-3612. Allow mounting bucket under other volume (#1104)
    
    (cherry picked from commit 7aff2f05271e1d421bc48d1a02c1b7ad596888cd)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   2 +
 .../org/apache/hadoop/ozone/client/BucketArgs.java |  33 +-
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  23 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  11 +-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   5 -
 .../hadoop/ozone/om/exceptions/OMException.java    |   4 +-
 .../ozone/om/helpers/BucketEncryptionKeyInfo.java  |   4 +
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      | 142 +++++++--
 .../apache/hadoop/ozone/om/helpers/OmKeyArgs.java  |  18 ++
 .../hadoop/ozone/om/helpers/TestOmBucketInfo.java  |  22 +-
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |  18 +-
 .../dist/src/main/compose/ozonesecure/test.sh      |  13 +-
 .../dist/src/main/smoketest/basic/links.robot      | 152 ++++++++++
 .../dist/src/main/smoketest/commonlib.robot        |  34 +--
 .../smoketest/{commonlib.robot => lib/os.robot}    |  36 +--
 .../{s3/bucketdelete.robot => lib/os_tests.robot}  |  35 +--
 .../dist/src/main/smoketest/ozone-lib/shell.robot  |  48 +++
 .../src/main/smoketest/ozone-lib/shell_tests.robot |  58 ++++
 .../dist/src/main/smoketest/ozonefs/ozonefs.robot  |   2 +-
 .../dist/src/main/smoketest/ozonefs/setup.robot    |  16 +-
 hadoop-ozone/dist/src/main/smoketest/robot.robot   |  81 +++++
 .../src/main/smoketest/s3/MultipartUpload.robot    |   7 +-
 .../dist/src/main/smoketest/s3/bucketdelete.robot  |  12 +-
 .../dist/src/main/smoketest/s3/commonawslib.robot  |  33 +-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  | 100 +++---
 .../src/main/proto/OmClientProtocol.proto          |   4 +
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  | 112 ++++---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 334 +++++++++++++++------
 .../org/apache/hadoop/ozone/om/ResolvedBucket.java | 111 +++++++
 .../om/request/bucket/OMBucketCreateRequest.java   |  14 +
 .../om/request/file/OMDirectoryCreateRequest.java  |   4 +
 .../ozone/om/request/file/OMFileCreateRequest.java |   6 +-
 .../om/request/key/OMAllocateBlockRequest.java     |   9 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  27 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   8 +-
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |  15 +-
 .../ozone/om/request/key/OMKeyRenameRequest.java   |  19 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  11 +
 .../ozone/om/request/key/OMKeysDeleteRequest.java  |  48 ++-
 .../om/request/key/OMTrashRecoverRequest.java      |   7 +
 .../S3InitiateMultipartUploadRequest.java          |  33 +-
 .../multipart/S3MultipartUploadAbortRequest.java   |  25 +-
 .../S3MultipartUploadCommitPartRequest.java        |  42 +--
 .../S3MultipartUploadCompleteRequest.java          |  71 +++--
 .../request/file/TestOMDirectoryCreateRequest.java |   4 +
 .../ozone/om/request/key/TestOMKeyRequest.java     |   8 +
 .../s3/multipart/TestS3MultipartRequest.java       |  10 +
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |   4 +-
 .../hadoop/ozone/shell/bucket/BucketCommands.java  |   1 +
 .../ozone/shell/bucket/LinkBucketHandler.java      |  79 +++++
 50 files changed, 1448 insertions(+), 467 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index a473948..e340b32 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -298,6 +298,8 @@ public final class OzoneConsts {
   public static final String BUCKET_ENCRYPTION_KEY = "bucketEncryptionKey";
   public static final String DELETED_KEYS_LIST = "deletedKeysList";
   public static final String UNDELETED_KEYS_LIST = "unDeletedKeysList";
+  public static final String SOURCE_VOLUME = "sourceVolume";
+  public static final String SOURCE_BUCKET = "sourceBucket";
 
 
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
index 5bae15d..6c5d1dd 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
@@ -54,6 +54,8 @@ public final class BucketArgs {
    * Bucket encryption key name.
    */
   private String bucketEncryptionKey;
+  private final String sourceVolume;
+  private final String sourceBucket;
 
   /**
    * Private constructor, constructed via builder.
@@ -62,15 +64,19 @@ public final class BucketArgs {
    * @param acls list of ACLs.
    * @param metadata map of bucket metadata
    * @param bucketEncryptionKey bucket encryption key name
+   * @param sourceVolume
+   * @param sourceBucket
    */
   private BucketArgs(Boolean versioning, StorageType storageType,
-                     List<OzoneAcl> acls, Map<String, String> metadata,
-                     String bucketEncryptionKey) {
+      List<OzoneAcl> acls, Map<String, String> metadata,
+      String bucketEncryptionKey, String sourceVolume, String sourceBucket) {
     this.acls = acls;
     this.versioning = versioning;
     this.storageType = storageType;
     this.metadata = metadata;
     this.bucketEncryptionKey = bucketEncryptionKey;
+    this.sourceVolume = sourceVolume;
+    this.sourceBucket = sourceBucket;
   }
 
   /**
@@ -123,6 +129,14 @@ public final class BucketArgs {
     return new BucketArgs.Builder();
   }
 
+  public String getSourceVolume() {
+    return sourceVolume;
+  }
+
+  public String getSourceBucket() {
+    return sourceBucket;
+  }
+
   /**
    * Builder for OmBucketInfo.
    */
@@ -132,6 +146,8 @@ public final class BucketArgs {
     private List<OzoneAcl> acls;
     private Map<String, String> metadata;
     private String bucketEncryptionKey;
+    private String sourceVolume;
+    private String sourceBucket;
 
     public Builder() {
       metadata = new HashMap<>();
@@ -161,13 +177,24 @@ public final class BucketArgs {
       this.bucketEncryptionKey = bek;
       return this;
     }
+
+    public BucketArgs.Builder setSourceVolume(String volume) {
+      sourceVolume = volume;
+      return this;
+    }
+
+    public BucketArgs.Builder setSourceBucket(String bucket) {
+      sourceBucket = bucket;
+      return this;
+    }
+
     /**
      * Constructs the BucketArgs.
      * @return instance of BucketArgs.
      */
     public BucketArgs build() {
       return new BucketArgs(versioning, storageType, acls, metadata,
-          bucketEncryptionKey);
+          bucketEncryptionKey, sourceVolume, sourceBucket);
     }
   }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index d22b846..79712bb 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -109,6 +109,8 @@ public class OzoneBucket extends WithMetadata {
 
   private OzoneObj ozoneObj;
 
+  private String sourceVolume;
+  private String sourceBucket;
 
   private OzoneBucket(ConfigurationSource conf, String volumeName,
       String bucketName, ReplicationFactor defaultReplication,
@@ -138,11 +140,13 @@ public class OzoneBucket extends WithMetadata {
         .setResType(OzoneObj.ResourceType.BUCKET)
         .setStoreType(OzoneObj.StoreType.OZONE).build();
   }
+
   @SuppressWarnings("parameternumber")
   public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy,
       String volumeName, String bucketName, StorageType storageType,
       Boolean versioning, long creationTime, Map<String, String> metadata,
-      String encryptionKeyName) {
+      String encryptionKeyName,
+      String sourceVolume, String sourceBucket) {
     this(conf, volumeName, bucketName, null, null, proxy);
     this.storageType = storageType;
     this.versioning = versioning;
@@ -150,6 +154,8 @@ public class OzoneBucket extends WithMetadata {
     this.creationTime = Instant.ofEpochMilli(creationTime);
     this.metadata = metadata;
     this.encryptionKeyName = encryptionKeyName;
+    this.sourceVolume = sourceVolume;
+    this.sourceBucket = sourceBucket;
     modificationTime = Instant.now();
     if (modificationTime.isBefore(this.creationTime)) {
       modificationTime = Instant.ofEpochSecond(
@@ -161,9 +167,10 @@ public class OzoneBucket extends WithMetadata {
   public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy,
       String volumeName, String bucketName, StorageType storageType,
       Boolean versioning, long creationTime, long modificationTime,
-      Map<String, String> metadata, String encryptionKeyName) {
+      Map<String, String> metadata, String encryptionKeyName,
+      String sourceVolume, String sourceBucket) {
     this(conf, proxy, volumeName, bucketName, storageType, versioning,
-        creationTime, metadata, encryptionKeyName);
+        creationTime, metadata, encryptionKeyName, sourceVolume, sourceBucket);
     this.modificationTime = Instant.ofEpochMilli(modificationTime);
   }
 
@@ -306,6 +313,16 @@ public class OzoneBucket extends WithMetadata {
     return encryptionKeyName;
   }
 
+  public String getSourceVolume() {
+    return sourceVolume;
+  }
+
+  public String getSourceBucket() {
+    return sourceBucket;
+  }
+
+  /**
+   * Builder for OmBucketInfo.
   /**
    * Adds ACLs to the Bucket.
    * @param addAcl ACL to be added
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index bfc2f0d..dc37f09 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -448,6 +448,8 @@ public class RpcClient implements ClientProtocol {
         .setIsVersionEnabled(isVersionEnabled)
         .addAllMetadata(bucketArgs.getMetadata())
         .setStorageType(storageType)
+        .setSourceVolume(bucketArgs.getSourceVolume())
+        .setSourceBucket(bucketArgs.getSourceBucket())
         .setAcls(listOfAcls.stream().distinct().collect(Collectors.toList()));
 
     if (bek != null) {
@@ -614,7 +616,10 @@ public class RpcClient implements ClientProtocol {
         bucketInfo.getModificationTime(),
         bucketInfo.getMetadata(),
         bucketInfo.getEncryptionKeyInfo() != null ? bucketInfo
-            .getEncryptionKeyInfo().getKeyName() : null);
+            .getEncryptionKeyInfo().getKeyName() : null,
+        bucketInfo.getSourceVolume(),
+        bucketInfo.getSourceBucket()
+    );
   }
 
   @Override
@@ -635,7 +640,9 @@ public class RpcClient implements ClientProtocol {
         bucket.getModificationTime(),
         bucket.getMetadata(),
         bucket.getEncryptionKeyInfo() != null ? bucket
-            .getEncryptionKeyInfo().getKeyName() : null))
+            .getEncryptionKeyInfo().getKeyName() : null,
+        bucket.getSourceVolume(),
+        bucket.getSourceBucket()))
         .collect(Collectors.toList());
   }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 31cccac..6b34e81 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -40,10 +40,6 @@ public enum OMAction implements AuditAction {
   PURGE_KEYS,
   DELETE_KEYS,
 
-  // S3 Bucket
-  CREATE_S3_BUCKET,
-  DELETE_S3_BUCKET,
-
   // READ Actions
   CHECK_VOLUME_ACCESS,
   LIST_BUCKETS,
@@ -53,7 +49,6 @@ public enum OMAction implements AuditAction {
   READ_VOLUME,
   READ_BUCKET,
   READ_KEY,
-  LIST_S3BUCKETS,
   INITIATE_MULTIPART_UPLOAD,
   COMMIT_MULTIPART_UPLOAD_PARTKEY,
   COMPLETE_MULTIPART_UPLOAD,
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index e2b3418..bab8d94 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -223,6 +223,8 @@ public class OMException extends IOException {
 
     INVALID_VOLUME_NAME,
 
-    PARTIAL_DELETE
+    PARTIAL_DELETE,
+
+    DETECTED_LOOP_IN_BUCKET_LINKS,
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketEncryptionKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketEncryptionKeyInfo.java
index e1ae0bb..c180138 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketEncryptionKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/BucketEncryptionKeyInfo.java
@@ -49,6 +49,10 @@ public class BucketEncryptionKeyInfo {
     return version;
   }
 
+  public BucketEncryptionKeyInfo copy() {
+    return new BucketEncryptionKeyInfo(version, suite, keyName);
+  }
+
   /**
    * Builder for BucketEncryptionKeyInfo.
    */
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index e9a8cbc..abbe395 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -76,6 +76,10 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
    */
   private BucketEncryptionKeyInfo bekInfo;
 
+  private final String sourceVolume;
+
+  private final String sourceBucket;
+
   /**
    * Private constructor, constructed via builder.
    * @param volumeName - Volume name.
@@ -87,19 +91,23 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
    * @param modificationTime - Bucket modification time.
    * @param metadata - metadata.
    * @param bekInfo - bucket encryption key info.
+   * @param sourceVolume - source volume for bucket links, null otherwise
+   * @param sourceBucket - source bucket for bucket links, null otherwise
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
   private OmBucketInfo(String volumeName,
-                       String bucketName,
-                       List<OzoneAcl> acls,
-                       boolean isVersionEnabled,
-                       StorageType storageType,
-                       long creationTime,
-                       long modificationTime,
-                       long objectID,
-                       long updateID,
-                       Map<String, String> metadata,
-                       BucketEncryptionKeyInfo bekInfo) {
+      String bucketName,
+      List<OzoneAcl> acls,
+      boolean isVersionEnabled,
+      StorageType storageType,
+      long creationTime,
+      long modificationTime,
+      long objectID,
+      long updateID,
+      Map<String, String> metadata,
+      BucketEncryptionKeyInfo bekInfo,
+      String sourceVolume,
+      String sourceBucket) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.acls = acls;
@@ -111,6 +119,8 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     this.updateID = updateID;
     this.metadata = metadata;
     this.bekInfo = bekInfo;
+    this.sourceVolume = sourceVolume;
+    this.sourceBucket = sourceBucket;
   }
 
   /**
@@ -208,6 +218,18 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     return bekInfo;
   }
 
+  public String getSourceVolume() {
+    return sourceVolume;
+  }
+
+  public String getSourceBucket() {
+    return sourceBucket;
+  }
+
+  public boolean isLink() {
+    return sourceVolume != null && sourceBucket != null;
+  }
+
   /**
    * Returns new builder class that builds a OmBucketInfo.
    *
@@ -235,6 +257,10 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
         (bekInfo != null) ? bekInfo.getKeyName() : null);
     auditMap.put(OzoneConsts.MODIFICATION_TIME,
         String.valueOf(this.modificationTime));
+    if (isLink()) {
+      auditMap.put(OzoneConsts.SOURCE_VOLUME, sourceVolume);
+      auditMap.put(OzoneConsts.SOURCE_BUCKET, sourceBucket);
+    }
     return auditMap;
   }
 
@@ -242,7 +268,22 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
    * Return a new copy of the object.
    */
   public OmBucketInfo copyObject() {
-    OmBucketInfo.Builder builder = new OmBucketInfo.Builder()
+    Builder builder = toBuilder();
+
+    if (bekInfo != null) {
+      builder.setBucketEncryptionKey(bekInfo.copy());
+    }
+
+    builder.acls.clear();
+    acls.forEach(acl -> builder.addAcl(new OzoneAcl(acl.getType(),
+        acl.getName(), (BitSet) acl.getAclBitSet().clone(),
+        acl.getAclScope())));
+
+    return builder.build();
+  }
+
+  public Builder toBuilder() {
+    return new Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setStorageType(storageType)
@@ -251,19 +292,11 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
         .setModificationTime(modificationTime)
         .setObjectID(objectID)
         .setUpdateID(updateID)
-        .setBucketEncryptionKey(bekInfo != null ?
-            new BucketEncryptionKeyInfo(bekInfo.getVersion(),
-                bekInfo.getSuite(), bekInfo.getKeyName()) : null);
-
-    acls.forEach(acl -> builder.addAcl(new OzoneAcl(acl.getType(),
-        acl.getName(), (BitSet) acl.getAclBitSet().clone(),
-        acl.getAclScope())));
-
-    if (metadata != null) {
-      metadata.forEach((k, v) -> builder.addMetadata(k, v));
-    }
-    return builder.build();
-
+        .setBucketEncryptionKey(bekInfo)
+        .setSourceVolume(sourceVolume)
+        .setSourceBucket(sourceBucket)
+        .setAcls(acls)
+        .addAllMetadata(metadata);
   }
 
   /**
@@ -281,6 +314,8 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     private long updateID;
     private Map<String, String> metadata;
     private BucketEncryptionKeyInfo bekInfo;
+    private String sourceVolume;
+    private String sourceBucket;
 
     public Builder() {
       //Default values
@@ -362,6 +397,16 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
       return this;
     }
 
+    public Builder setSourceVolume(String volume) {
+      this.sourceVolume = volume;
+      return this;
+    }
+
+    public Builder setSourceBucket(String bucket) {
+      this.sourceBucket = bucket;
+      return this;
+    }
+
     /**
      * Constructs the OmBucketInfo.
      * @return instance of OmBucketInfo.
@@ -375,7 +420,7 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
 
       return new OmBucketInfo(volumeName, bucketName, acls, isVersionEnabled,
           storageType, creationTime, modificationTime, objectID, updateID,
-          metadata, bekInfo);
+          metadata, bekInfo, sourceVolume, sourceBucket);
     }
   }
 
@@ -397,6 +442,12 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     if (bekInfo != null && bekInfo.getKeyName() != null) {
       bib.setBeinfo(OMPBHelper.convert(bekInfo));
     }
+    if (sourceVolume != null) {
+      bib.setSourceVolume(sourceVolume);
+    }
+    if (sourceBucket != null) {
+      bib.setSourceBucket(sourceBucket);
+    }
     return bib.build();
   }
 
@@ -428,17 +479,28 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
     if (bucketInfo.hasBeinfo()) {
       obib.setBucketEncryptionKey(OMPBHelper.convert(bucketInfo.getBeinfo()));
     }
+    if (bucketInfo.hasSourceVolume()) {
+      obib.setSourceVolume(bucketInfo.getSourceVolume());
+    }
+    if (bucketInfo.hasSourceBucket()) {
+      obib.setSourceBucket(bucketInfo.getSourceBucket());
+    }
     return obib.build();
   }
 
   @Override
   public String getObjectInfo() {
+    String sourceInfo = sourceVolume != null && sourceBucket != null
+        ? ", source='" + sourceVolume + "/" + sourceBucket + "'"
+        : "";
+
     return "OMBucketInfo{" +
-        "volume='" + volumeName + '\'' +
-        ", bucket='" + bucketName + '\'' +
-        ", isVersionEnabled='" + isVersionEnabled + '\'' +
-        ", storageType='" + storageType + '\'' +
-        ", creationTime='" + creationTime + '\'' +
+        "volume='" + volumeName + "'" +
+        ", bucket='" + bucketName + "'" +
+        ", isVersionEnabled='" + isVersionEnabled + "'" +
+        ", storageType='" + storageType + "'" +
+        ", creationTime='" + creationTime + "'" +
+        sourceInfo +
         '}';
   }
 
@@ -460,6 +522,8 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
         storageType == that.storageType &&
         objectID == that.objectID &&
         updateID == that.updateID &&
+        Objects.equals(sourceVolume, that.sourceVolume) &&
+        Objects.equals(sourceBucket, that.sourceBucket) &&
         Objects.equals(metadata, that.metadata) &&
         Objects.equals(bekInfo, that.bekInfo);
   }
@@ -468,4 +532,22 @@ public final class OmBucketInfo extends WithObjectID implements Auditable {
   public int hashCode() {
     return Objects.hash(volumeName, bucketName);
   }
+
+  @Override
+  public String toString() {
+    return "OmBucketInfo{" +
+        "volumeName='" + volumeName + "'" +
+        ", bucketName='" + bucketName + "'" +
+        ", acls=" + acls +
+        ", isVersionEnabled=" + isVersionEnabled +
+        ", storageType=" + storageType +
+        ", creationTime=" + creationTime +
+        ", bekInfo=" + bekInfo +
+        ", sourceVolume='" + sourceVolume + "'" +
+        ", sourceBucket='" + sourceBucket + "'" +
+        ", objectID=" + objectID +
+        ", updateID=" + updateID +
+        ", metadata=" + metadata +
+        '}';
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 2a882a4..c08c988 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -162,6 +162,24 @@ public final class OmKeyArgs implements Auditable {
     locationInfoList.add(locationInfo);
   }
 
+  public OmKeyArgs.Builder toBuilder() {
+    return new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(dataSize)
+        .setType(type)
+        .setFactor(factor)
+        .setLocationInfoList(locationInfoList)
+        .setIsMultipartKey(isMultipartKey)
+        .setMultipartUploadID(multipartUploadID)
+        .setMultipartUploadPartNumber(multipartUploadPartNumber)
+        .addAllMetadata(metadata)
+        .setRefreshPipeline(refreshPipeline)
+        .setSortDatanodesInPipeline(sortDatanodesInPipeline)
+        .setAcls(acls);
+  }
+
   /**
    * Builder class of OmKeyArgs.
    */
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
index 15468c7..650fc91 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
@@ -42,10 +42,21 @@ public class TestOmBucketInfo {
         .setStorageType(StorageType.ARCHIVE)
         .build();
 
-    OmBucketInfo afterSerialization =
-        OmBucketInfo.getFromProtobuf(bucket.getProtobuf());
+    Assert.assertEquals(bucket,
+        OmBucketInfo.getFromProtobuf(bucket.getProtobuf()));
+  }
+
+  @Test
+  public void protobufConversionOfBucketLink() {
+    OmBucketInfo bucket = OmBucketInfo.newBuilder()
+        .setBucketName("bucket")
+        .setVolumeName("vol1")
+        .setSourceVolume("otherVol")
+        .setSourceBucket("someBucket")
+        .build();
 
-    Assert.assertEquals(bucket, afterSerialization);
+    Assert.assertEquals(bucket,
+        OmBucketInfo.getFromProtobuf(bucket.getProtobuf()));
   }
 
   @Test
@@ -66,7 +77,10 @@ public class TestOmBucketInfo {
 
     /* Clone an omBucketInfo. */
     OmBucketInfo cloneBucketInfo = omBucketInfo.copyObject();
-    Assert.assertEquals(omBucketInfo, cloneBucketInfo);
+    Assert.assertNotSame(omBucketInfo, cloneBucketInfo);
+    Assert.assertEquals("Expected " + omBucketInfo + " and " + cloneBucketInfo
+            + " to be equal",
+        omBucketInfo, cloneBucketInfo);
 
     /* Reset acl & check not equal. */
     omBucketInfo.setAcls(Collections.singletonList(new OzoneAcl(
diff --git a/hadoop-ozone/dist/src/main/compose/ozone/test.sh b/hadoop-ozone/dist/src/main/compose/ozone/test.sh
index e0b1d62..c40339e 100755
--- a/hadoop-ozone/dist/src/main/compose/ozone/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozone/test.sh
@@ -26,24 +26,24 @@ source "$COMPOSE_DIR/../testlib.sh"
 
 start_docker_env
 
-#Due to the limitation of the current auditparser test, it should be the
-#first test in a clean cluster.
-
-#Disabling for now, audit parser tool during parse getting exception.
-#execute_robot_test om auditparser
-
 execute_robot_test scm lib
+execute_robot_test scm ozone-lib
 
 execute_robot_test scm basic
 
 execute_robot_test scm gdpr
 
-execute_robot_test scm -v SCHEME:ofs ozonefs/ozonefs.robot
-execute_robot_test scm -v SCHEME:o3fs ozonefs/ozonefs.robot
+for scheme in ofs o3fs; do
+  for bucket in link bucket; do
+    execute_robot_test scm -v SCHEME:${scheme} -v BUCKET_TYPE:${bucket} ozonefs/ozonefs.robot
+  done
+done
 
 execute_robot_test scm security/ozone-secure-token.robot
 
-execute_robot_test scm s3
+for bucket in link generated; do
+  execute_robot_test scm -v BUCKET:${bucket} s3
+done
 
 execute_robot_test scm recon
 
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh b/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh
index 9c3f3ab..ce50fa0 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/test.sh
@@ -31,10 +31,15 @@ execute_robot_test scm basic
 
 execute_robot_test scm security
 
-execute_robot_test scm -v SCHEME:ofs ozonefs/ozonefs.robot
-execute_robot_test scm -v SCHEME:o3fs ozonefs/ozonefs.robot
-
-execute_robot_test s3g s3
+for scheme in ofs o3fs; do
+  for bucket in link bucket; do
+    execute_robot_test scm -v SCHEME:${scheme} -v BUCKET_TYPE:${bucket} ozonefs/ozonefs.robot
+  done
+done
+
+for bucket in link generated; do
+  execute_robot_test s3g -v BUCKET:${bucket} s3
+done
 
 #expects 4 pipelines, should be run before 
 #admincli which creates STANDALONE pipeline
diff --git a/hadoop-ozone/dist/src/main/smoketest/basic/links.robot b/hadoop-ozone/dist/src/main/smoketest/basic/links.robot
new file mode 100644
index 0000000..71c046e
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/basic/links.robot
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation       Test bucket links via Ozone CLI
+Library             OperatingSystem
+Resource            ../commonlib.robot
+Resource            ../ozone-lib/shell.robot
+Test Setup          Run Keyword if    '${SECURITY_ENABLED}' == 'true'    Kinit test user     testuser     testuser.keytab
+Test Timeout        2 minute
+Suite Setup         Create volumes
+
+*** Variables ***
+${prefix}    generated
+
+*** Keywords ***
+Create volumes
+    ${random} =         Generate Random String  5  [NUMBERS]
+    Set Suite Variable  ${source}  ${random}-source
+    Set Suite Variable  ${target}  ${random}-target
+    Execute             ozone sh volume create ${source}
+    Execute             ozone sh volume create ${target}
+    Run Keyword if      '${SECURITY_ENABLED}' == 'true'    Setup ACL tests
+
+Setup ACL tests
+    Execute             ozone sh bucket create ${source}/readable-bucket
+    Execute             ozone sh key put ${source}/readable-bucket/key-in-readable-bucket /etc/passwd
+    Execute             ozone sh bucket create ${source}/unreadable-bucket
+    Execute             ozone sh bucket link ${source}/readable-bucket ${target}/readable-link
+    Execute             ozone sh bucket link ${source}/readable-bucket ${target}/unreadable-link
+    Execute             ozone sh bucket link ${source}/unreadable-bucket ${target}/link-to-unreadable-bucket
+    Execute             ozone sh volume addacl --acl user:testuser2/scm@EXAMPLE.COM:r ${target}
+    Execute             ozone sh volume addacl --acl user:testuser2/scm@EXAMPLE.COM:rl ${source}
+    Execute             ozone sh bucket addacl --acl user:testuser2/scm@EXAMPLE.COM:rl ${source}/readable-bucket
+    Execute             ozone sh bucket addacl --acl user:testuser2/scm@EXAMPLE.COM:r ${target}/readable-link
+    Execute             ozone sh bucket addacl --acl user:testuser2/scm@EXAMPLE.COM:r ${target}/link-to-unreadable-bucket
+
+Can follow link with read access
+    Execute             kdestroy
+    Run Keyword         Kinit test user             testuser2         testuser2.keytab
+    ${result} =         Execute And Ignore Error    ozone sh key list ${target}/readable-link
+                        Should Contain              ${result}         key-in-readable-bucket
+
+Cannot follow link without read access
+    Execute             kdestroy
+    Run Keyword         Kinit test user             testuser2         testuser2.keytab
+    ${result} =         Execute And Ignore Error    ozone sh key list ${target}/unreadable-link
+                        Should Contain              ${result}         PERMISSION_DENIED
+
+ACL verified on source bucket
+    Execute             kdestroy
+    Run Keyword         Kinit test user             testuser2         testuser2.keytab
+    ${result} =         Execute                     ozone sh bucket info ${target}/link-to-unreadable-bucket
+                        Should Contain              ${result}         link-to-unreadable-bucket
+                        Should Not Contain          ${result}         PERMISSION_DENIED
+    ${result} =         Execute And Ignore Error    ozone sh key list ${target}/link-to-unreadable-bucket
+                        Should Contain              ${result}         PERMISSION_DENIED
+
+*** Test Cases ***
+Link to non-existent bucket
+                        Execute                     ozone sh bucket link ${source}/no-such-bucket ${target}/dangling-link
+    ${result} =         Execute And Ignore Error    ozone sh key list ${target}/dangling-link
+                        Should Contain              ${result}         BUCKET_NOT_FOUND
+
+Key create passthrough
+                        Execute                     ozone sh bucket link ${source}/bucket1 ${target}/link1
+                        Execute                     ozone sh bucket create ${source}/bucket1
+                        Execute                     ozone sh key put ${target}/link1/key1 /etc/passwd
+                        Key Should Match Local File     ${target}/link1/key1    /etc/passwd
+
+Key read passthrough
+                        Execute                     ozone sh key put ${source}/bucket1/key2 /opt/hadoop/NOTICE.txt
+                        Key Should Match Local File     ${source}/bucket1/key2    /opt/hadoop/NOTICE.txt
+
+Key list passthrough
+    ${target_list} =    Execute                     ozone sh key list ${target}/link1 | jq -r '.name'
+    ${source_list} =    Execute                     ozone sh key list ${source}/bucket1 | jq -r '.name'
+                        Should Be Equal             ${target_list}    ${source_list}
+                        Should Contain              ${source_list}    key1
+                        Should Contain              ${source_list}    key2
+
+Key delete passthrough
+                        Execute                     ozone sh key delete ${target}/link1/key2
+    ${source_list} =    Execute                     ozone sh key list ${source}/bucket1 | jq -r '.name'
+                        Should Not Contain          ${source_list}    key2
+
+Bucket list contains links
+    ${result} =         Execute                     ozone sh bucket list ${target}
+                        Should Contain              ${result}         link1
+                        Should Contain              ${result}         dangling-link
+
+Bucket info shows source
+    ${result} =         Execute                     ozone sh bucket info ${target}/link1 | jq -r '.sourceVolume, .sourceBucket' | xargs
+                        Should Be Equal             ${result}    ${source} bucket1
+
+Source and target have separate ACLs
+    Execute       ozone sh bucket addacl --acl user:user1:rwxy ${target}/link1
+    Verify ACL    bucket    ${target}/link1      USER    user1    READ WRITE READ_ACL WRITE_ACL
+    Verify ACL    bucket    ${source}/bucket1    USER    user1    ${EMPTY}
+
+    Execute       ozone sh bucket addacl --acl group:group2:r ${source}/bucket1
+    Verify ACL    bucket    ${target}/link1      GROUP   group2    ${EMPTY}
+    Verify ACL    bucket    ${source}/bucket1    GROUP   group2    READ
+
+Buckets and links share namespace
+                        Execute                     ozone sh bucket link ${source}/bucket2 ${target}/link2
+    ${result} =         Execute And Ignore Error    ozone sh bucket create ${target}/link2
+                        Should Contain              ${result}    BUCKET_ALREADY_EXISTS
+
+                        Execute                     ozone sh bucket create ${target}/bucket3
+    ${result} =         Execute And Ignore Error    ozone sh bucket link ${source}/bucket1 ${target}/bucket3
+                        Should Contain              ${result}    BUCKET_ALREADY_EXISTS
+
+Can follow link with read access
+    Run Keyword if    '${SECURITY_ENABLED}' == 'true'    Can follow link with read access
+
+Cannot follow link without read access
+    Run Keyword if    '${SECURITY_ENABLED}' == 'true'    Cannot follow link without read access
+
+ACL verified on source bucket
+    Run Keyword if    '${SECURITY_ENABLED}' == 'true'    ACL verified on source bucket
+
+Loop in link chain is detected
+                        Execute                     ozone sh bucket link ${target}/loop1 ${target}/loop2
+                        Execute                     ozone sh bucket link ${target}/loop2 ${target}/loop3
+                        Execute                     ozone sh bucket link ${target}/loop3 ${target}/loop1
+    ${result} =         Execute And Ignore Error    ozone sh key list ${target}/loop2
+                        Should Contain              ${result}    DETECTED_LOOP
+
+Multiple links to same bucket are allowed
+    Execute                         ozone sh bucket link ${source}/bucket1 ${target}/link3
+    Execute                         ozone sh key put ${target}/link3/key3 /etc/group
+    Key Should Match Local File     ${target}/link1/key3    /etc/group
+
+Source bucket not affected by deleting link
+                        Execute                     ozone sh bucket delete ${target}/link1
+    ${bucket_list} =    Execute                     ozone sh bucket list ${target}
+                        Should Not Contain          ${bucket_list}    link1
+    ${source_list} =    Execute                     ozone sh key list ${source}/bucket1 | jq -r '.name'
+                        Should Contain              ${source_list}    key1
diff --git a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot b/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
index 407111a..bf3b3e9 100644
--- a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
@@ -18,44 +18,14 @@ Library             OperatingSystem
 Library             String
 Library             BuiltIn
 
+Resource            lib/os.robot
+
 *** Variables ***
 ${SECURITY_ENABLED}  false
 ${OM_HA_PARAM}       ${EMPTY}
 ${OM_SERVICE_ID}     om
 
 *** Keywords ***
-Execute
-    [arguments]                     ${command}
-    ${rc}                           ${output} =                 Run And Return Rc And Output           ${command}
-    Log                             ${output}
-    Should Be Equal As Integers     ${rc}                       0
-    [return]                        ${output}
-
-Execute And Ignore Error
-    [arguments]                     ${command}
-    ${rc}                           ${output} =                 Run And Return Rc And Output           ${command}
-    Log                             ${output}
-    [return]                        ${output}
-
-Execute and checkrc
-    [arguments]                     ${command}                  ${expected_error_code}
-    ${rc}                           ${output} =                 Run And Return Rc And Output           ${command}
-    Log                             ${output}
-    Should Be Equal As Integers     ${rc}                       ${expected_error_code}
-    [return]                        ${output}
-
-Compare files
-    [arguments]                 ${file1}                   ${file2}
-    ${checksumbefore} =         Execute                    md5sum ${file1} | awk '{print $1}'
-    ${checksumafter} =          Execute                    md5sum ${file2} | awk '{print $1}'
-                                Should Be Equal            ${checksumbefore}            ${checksumafter}
-
-Install aws cli
-    ${rc}              ${output} =                 Run And Return Rc And Output           which apt-get
-    Run Keyword if     '${rc}' == '0'              Install aws cli s3 debian
-    ${rc}              ${output} =                 Run And Return Rc And Output           yum --help
-    Run Keyword if     '${rc}' == '0'              Install aws cli s3 centos
-
 Kinit HTTP user
     ${hostname} =       Execute                    hostname
     Wait Until Keyword Succeeds      2min       10sec      Execute            kinit -k HTTP/${hostname}@EXAMPLE.COM -t /etc/security/keytabs/HTTP.keytab
diff --git a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot b/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
similarity index 57%
copy from hadoop-ozone/dist/src/main/smoketest/commonlib.robot
copy to hadoop-ozone/dist/src/main/smoketest/lib/os.robot
index 407111a..af927f9 100644
--- a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/lib/os.robot
@@ -14,22 +14,12 @@
 # limitations under the License.
 
 *** Settings ***
-Library             OperatingSystem
-Library             String
-Library             BuiltIn
-
-*** Variables ***
-${SECURITY_ENABLED}  false
-${OM_HA_PARAM}       ${EMPTY}
-${OM_SERVICE_ID}     om
+Library     OperatingSystem
 
 *** Keywords ***
 Execute
     [arguments]                     ${command}
-    ${rc}                           ${output} =                 Run And Return Rc And Output           ${command}
-    Log                             ${output}
-    Should Be Equal As Integers     ${rc}                       0
-    [return]                        ${output}
+    Run Keyword And Return          Execute and checkrc             ${command}                  0
 
 Execute And Ignore Error
     [arguments]                     ${command}
@@ -50,18 +40,10 @@ Compare files
     ${checksumafter} =          Execute                    md5sum ${file2} | awk '{print $1}'
                                 Should Be Equal            ${checksumbefore}            ${checksumafter}
 
-Install aws cli
-    ${rc}              ${output} =                 Run And Return Rc And Output           which apt-get
-    Run Keyword if     '${rc}' == '0'              Install aws cli s3 debian
-    ${rc}              ${output} =                 Run And Return Rc And Output           yum --help
-    Run Keyword if     '${rc}' == '0'              Install aws cli s3 centos
-
-Kinit HTTP user
-    ${hostname} =       Execute                    hostname
-    Wait Until Keyword Succeeds      2min       10sec      Execute            kinit -k HTTP/${hostname}@EXAMPLE.COM -t /etc/security/keytabs/HTTP.keytab
-
-Kinit test user
-    [arguments]                      ${user}       ${keytab}
-    ${hostname} =       Execute                    hostname
-    Set Suite Variable  ${TEST_USER}               ${user}/${hostname}@EXAMPLE.COM
-    Wait Until Keyword Succeeds      2min       10sec      Execute            kinit -k ${user}/${hostname}@EXAMPLE.COM -t /etc/security/keytabs/${keytab}
+Create Random File
+    ${postfix} =             Generate Random String  5  [NUMBERS]
+    ${tmpfile} =             Set Variable   /tmp/tempfile-${postfix}
+    File Should Not Exist    ${tmpfile}
+    ${content} =             Set Variable   "Random string"
+    Create File              ${tmpfile}    ${content}
+    [Return]                 ${tmpfile}
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/bucketdelete.robot b/hadoop-ozone/dist/src/main/smoketest/lib/os_tests.robot
similarity index 53%
copy from hadoop-ozone/dist/src/main/smoketest/s3/bucketdelete.robot
copy to hadoop-ozone/dist/src/main/smoketest/lib/os_tests.robot
index bcba30d..dd4beaf 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/bucketdelete.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/lib/os_tests.robot
@@ -14,24 +14,25 @@
 # limitations under the License.
 
 *** Settings ***
-Documentation       S3 gateway test with aws cli
-Library             OperatingSystem
-Library             String
-Resource            ../commonlib.robot
-Resource            commonawslib.robot
-Test Timeout        5 minutes
-Suite Setup         Setup s3 tests
-
-*** Variables ***
-${ENDPOINT_URL}       http://s3g:9878
-${BUCKET}             generated
+Resource    os.robot
+
 
 *** Test Cases ***
 
-Delete existing bucket
-# Bucket already is created in Test Setup.
-                   Execute AWSS3APICli                delete-bucket --bucket ${BUCKET}
+Execute
+    ${output} =        Execute      echo 42
+    Should Be Equal    ${output}    42
+
+Execute failing command
+    Run Keyword And Expect Error    *    Execute    false
+
+Execute And Ignore Error
+    ${output} =        Execute And Ignore Error    echo 123 && false
+    Should Be Equal    ${output}    123
+
+Execute and checkrc
+    ${output} =        Execute and checkrc    echo failure && exit 1    1
+    Should Be Equal    ${output}    failure
 
-Delete non-existent bucket
-    ${result} =    Execute AWSS3APICli and checkrc    delete-bucket --bucket nosuchbucket    255
-                   Should contain                     ${result}                              NoSuchBucket
+Execute and checkrc RC mismatch
+    Run Keyword And Expect Error    *    Execute and checkrc    echo failure && exit 3    1
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
new file mode 100644
index 0000000..2e56ae4
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell.robot
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Resource    ../lib/os.robot
+Library     String
+
+
+*** Keywords ***
+Bucket Exists
+    [arguments]    ${bucket}
+    ${rc}    ${output} =      Run And Return Rc And Output             timeout 15 ozone sh bucket info ${bucket}
+    Return From Keyword If    ${rc} != 0                               ${FALSE}
+    Return From Keyword If    'VOLUME_NOT_FOUND' in '''${output}'''    ${FALSE}
+    Return From Keyword If    'BUCKET_NOT_FOUND' in '''${output}'''    ${FALSE}
+    [Return]                  ${TRUE}
+
+Compare Key With Local File
+    [arguments]    ${key}    ${file}
+    ${postfix} =   Generate Random String  5  [NUMBERS]
+    ${tmpfile} =   Set Variable    /tmp/tempkey-${postfix}
+    Execute        ozone sh key get -f ${key} ${tmpfile}
+    ${rc} =        Run And Return Rc    diff -q ${file} ${tmpfile}
+    Execute        rm -f ${tmpfile}
+    ${result} =    Set Variable If    ${rc} == 0    ${TRUE}   ${FALSE}
+    [Return]       ${result}
+
+Key Should Match Local File
+    [arguments]    ${key}    ${file}
+    ${matches} =   Compare Key With Local File    ${key}    ${file}
+    Should Be True    ${matches}
+
+Verify ACL
+    [arguments]         ${object_type}   ${object}    ${type}   ${name}    ${acls}
+    ${actual_acls} =    Execute          ozone sh ${object_type} getacl ${object} | jq -r '.[] | select(.type == "${type}") | select(.name == "${name}") | .aclList[]' | xargs
+                        Should Be Equal    ${acls}    ${actual_acls}
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
new file mode 100644
index 0000000..56fbcf8
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/ozone-lib/shell_tests.robot
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Resource    ../lib/os.robot
+Resource    shell.robot
+
+
+*** Variables ***
+${OM_SERVICE_ID}     om
+
+
+*** Test Cases ***
+
+Bucket Exists should not if No Such Volume
+    ${exists} =                 Bucket Exists   o3://${OM_SERVICE_ID}/no-such-volume/any-bucket
+    Should Be Equal             ${exists}       ${FALSE}
+
+Bucket Exists should not if No Such Bucket
+    Execute And Ignore Error    ozone sh volume create o3://${OM_SERVICE_ID}/vol1
+    ${exists} =                 Bucket Exists   o3://${OM_SERVICE_ID}/vol1/no-such-bucket
+    Should Be Equal             ${exists}       ${FALSE}
+
+Bucket Exists
+    Execute And Ignore Error    ozone sh bucket create o3://${OM_SERVICE_ID}/vol1/bucket
+    ${exists} =                 Bucket Exists   o3://${OM_SERVICE_ID}/vol1/bucket
+    Should Be Equal             ${exists}       ${TRUE}
+
+Bucket Exists should not if No Such OM service
+    ${exists} =                 Bucket Exists   o3://no-such-host/any-volume/any-bucket
+    Should Be Equal             ${exists}       ${FALSE}
+
+
+Key Should Match Local File
+    [Setup]                     Execute    ozone sh key put o3://${OM_SERVICE_ID}/vol1/bucket/passwd /etc/passwd
+    Key Should Match Local File     o3://${OM_SERVICE_ID}/vol1/bucket/passwd    /etc/passwd
+
+Compare Key With Local File with Different File
+    ${random_file} =            Create Random File
+    ${matches} =                Compare Key With Local File     o3://${OM_SERVICE_ID}/vol1/bucket/passwd    ${random_file}
+    Should Be Equal             ${matches}     ${FALSE}
+    [Teardown]                  Remove File    ${random_file}
+
+Compare Key With Local File if File Does Not Exist
+    ${matches} =                Compare Key With Local File     o3://${OM_SERVICE_ID}/vol1/bucket/passwd    /no-such-file
+    Should Be Equal             ${matches}     ${FALSE}
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozonefs/ozonefs.robot b/hadoop-ozone/dist/src/main/smoketest/ozonefs/ozonefs.robot
index 6d0042b..450f1b6 100644
--- a/hadoop-ozone/dist/src/main/smoketest/ozonefs/ozonefs.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/ozonefs/ozonefs.robot
@@ -19,7 +19,7 @@ Library             OperatingSystem
 Resource            ../commonlib.robot
 Resource            setup.robot
 Test Timeout        5 minutes
-Suite Setup         Setup ${BUCKET_TYPE}s for FS test
+Suite Setup         Setup for FS test
 
 *** Test Cases ***
 List root
diff --git a/hadoop-ozone/dist/src/main/smoketest/ozonefs/setup.robot b/hadoop-ozone/dist/src/main/smoketest/ozonefs/setup.robot
index 16e059e..441822d 100644
--- a/hadoop-ozone/dist/src/main/smoketest/ozonefs/setup.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/ozonefs/setup.robot
@@ -29,12 +29,12 @@ ${BUCKET_IN_VOL2}    ${BUCKET_TYPE}3-${SCHEME}
 ${DEEP_DIR}          test/${SCHEME}/dir
 
 *** Keywords ***
-Setup buckets for FS test
+Setup for FS test
     Create volumes for FS test
-    Create buckets for FS test
+    Run Keyword    Create ${BUCKET_TYPE}s for FS test
     Sanity check for FS test
     Assign suite vars for FS test
-    Log    Completed setup for ${SCHEME} tests in ${VOLUME}/${BUCKET} using FS base URL: ${BASE_URL}
+    Log    Completed setup for ${SCHEME} tests with ${BUCKET_TYPE}s in ${VOLUME}/${BUCKET} using FS base URL: ${BASE_URL}
 
 Create volumes for FS test
     Execute And Ignore Error    ozone sh volume create ${VOLUME} --quota 100TB
@@ -45,6 +45,16 @@ Create buckets for FS test
     Execute                     ozone sh bucket create ${VOLUME}/${BUCKET2}
     Execute                     ozone sh bucket create ${VOL2}/${BUCKET_IN_VOL2}
 
+Create links for FS test
+    Execute And Ignore Error    ozone sh volume create ${VOLUME}-src --quota 100TB
+    Execute And Ignore Error    ozone sh volume create ${VOL2}-src --quota 100TB
+    Execute                     ozone sh bucket create ${VOLUME}-src/${BUCKET}-src
+    Execute                     ozone sh bucket create ${VOLUME}-src/${BUCKET2}-src
+    Execute                     ozone sh bucket create ${VOL2}-src/${BUCKET_IN_VOL2}-src
+    Execute                     ozone sh bucket link ${VOLUME}-src/${BUCKET}-src ${VOLUME}/${BUCKET}
+    Execute                     ozone sh bucket link ${VOLUME}-src/${BUCKET2}-src ${VOLUME}/${BUCKET2}
+    Execute                     ozone sh bucket link ${VOL2}-src/${BUCKET_IN_VOL2}-src ${VOL2}/${BUCKET_IN_VOL2}
+
 Sanity check for FS test
     ${result} =         Execute               ozone sh volume list
                         Should contain        ${result}               ${VOLUME}
diff --git a/hadoop-ozone/dist/src/main/smoketest/robot.robot b/hadoop-ozone/dist/src/main/smoketest/robot.robot
new file mode 100644
index 0000000..d677ef3
--- /dev/null
+++ b/hadoop-ozone/dist/src/main/smoketest/robot.robot
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*** Settings ***
+Documentation       Smoketest for Robot functions
+Resource            commonlib.robot
+Test Timeout        5 minutes
+
+*** Test Cases ***
+
+Ensure Leading without Leading
+    ${result} =        Ensure Leading    /    a/b
+    Should Be Equal    ${result}         /a/b
+
+Ensure Leading with Leading
+    ${result} =        Ensure Leading    _   _a_b_c
+    Should Be Equal    ${result}         _a_b_c
+
+Ensure Leading for empty
+    ${result} =        Ensure Leading    |    ${EMPTY}
+    Should Be Equal    ${result}         |
+
+
+Ensure Trailing without Trailing
+    ${result} =    Ensure Trailing    .    x.y.z
+    Should Be Equal    ${result}      x.y.z.
+
+Ensure Trailing with Trailing
+    ${result} =    Ensure Trailing    x   axbxcx
+    Should Be Equal    ${result}      axbxcx
+
+Ensure Trailing for empty
+    ${result} =        Ensure Trailing   =    ${EMPTY}
+    Should Be Equal    ${result}         =
+
+
+Format o3fs URL without path
+    ${result} =    Format o3fs URL    vol1    bucket1
+    Should Be Equal    ${result}    o3fs://bucket1.vol1/
+
+Format o3fs URL with path
+    ${result} =    Format o3fs URL    vol1    bucket1    dir/file
+    Should Be Equal    ${result}    o3fs://bucket1.vol1/dir/file
+
+
+Format ofs URL without path
+    ${result} =    Format ofs URL    vol1    bucket1
+    Should Be Equal    ${result}    ofs://vol1/bucket1
+
+Format ofs URL with path
+    ${result} =    Format ofs URL    vol1    bucket1    dir/file
+    Should Be Equal    ${result}    ofs://vol1/bucket1/dir/file
+
+
+Format FS URL with ofs scheme
+    ${result} =    Format FS URL    ofs    vol1    bucket1
+    ${expected} =  Format ofs URL    vol1    bucket1
+    Should Be Equal    ${result}    ${expected}
+
+Format FS URL with o3fs scheme
+    ${result} =    Format FS URL    o3fs    vol1    bucket1
+    ${expected} =  Format o3fs URL    vol1    bucket1
+    Should Be Equal    ${result}    ${expected}
+
+Format FS URL with unsupported scheme
+    ${result} =    Run Keyword And Expect Error   *    Format FS URL    http    org   apache
+    Should Contain    ${result}    http
+    Should Contain    ${result}    nsupported
+
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
index 004a496..1c6827a 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
@@ -88,7 +88,7 @@ Test Multipart Upload Complete
 
 #read file and check the key
     ${result} =                 Execute AWSS3ApiCli        get-object --bucket ${BUCKET} --key multipartKey1 /tmp/multipartKey1.result
-                                Execute                    cat /tmp/part1 /tmp/part2 >> /tmp/multipartKey1
+                                Execute                    cat /tmp/part1 /tmp/part2 > /tmp/multipartKey1
     Compare files               /tmp/multipartKey1         /tmp/multipartKey1.result
 
 Test Multipart Upload Complete Entity too small
@@ -156,7 +156,7 @@ Test Multipart Upload Complete Invalid part errors and complete mpu with few par
                         Should contain          ${result}    ETag
 
     ${result} =         Execute AWSS3ApiCli        get-object --bucket ${BUCKET} --key multipartKey3 /tmp/multipartKey3.result
-                        Execute                    cat /tmp/part1 /tmp/part3 >> /tmp/multipartKey3
+                        Execute                    cat /tmp/part1 /tmp/part3 > /tmp/multipartKey3
     Compare files       /tmp/multipartKey3         /tmp/multipartKey3.result
 
 Test abort Multipart upload
@@ -237,7 +237,6 @@ Test Multipart Upload Put With Copy
                         Should contain           ${result}    UploadId
 
     ${result} =         Execute AWSS3APICli      upload-part-copy --bucket ${BUCKET} --key copytest/destination --upload-id ${uploadID} --part-number 1 --copy-source ${BUCKET}/copytest/source
-                        Should contain           ${result}    ${BUCKET}
                         Should contain           ${result}    ETag
                         Should contain           ${result}    LastModified
     ${eTag1} =          Execute and checkrc      echo '${result}' | jq -r '.CopyPartResult.ETag'   0
@@ -260,13 +259,11 @@ Test Multipart Upload Put With Copy and range
                         Should contain           ${result}    UploadId
 
     ${result} =         Execute AWSS3APICli      upload-part-copy --bucket ${BUCKET} --key copyrange/destination --upload-id ${uploadID} --part-number 1 --copy-source ${BUCKET}/copyrange/source --copy-source-range bytes=0-10485758
-                        Should contain           ${result}    ${BUCKET}
                         Should contain           ${result}    ETag
                         Should contain           ${result}    LastModified
     ${eTag1} =          Execute and checkrc      echo '${result}' | jq -r '.CopyPartResult.ETag'   0
 
     ${result} =         Execute AWSS3APICli      upload-part-copy --bucket ${BUCKET} --key copyrange/destination --upload-id ${uploadID} --part-number 2 --copy-source ${BUCKET}/copyrange/source --copy-source-range bytes=10485758-10485760
-                        Should contain           ${result}    ${BUCKET}
                         Should contain           ${result}    ETag
                         Should contain           ${result}    LastModified
     ${eTag2} =          Execute and checkrc      echo '${result}' | jq -r '.CopyPartResult.ETag'   0
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/bucketdelete.robot b/hadoop-ozone/dist/src/main/smoketest/s3/bucketdelete.robot
index bcba30d..ce7b825 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/bucketdelete.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/bucketdelete.robot
@@ -23,14 +23,20 @@ Test Timeout        5 minutes
 Suite Setup         Setup s3 tests
 
 *** Variables ***
-${ENDPOINT_URL}       http://s3g:9878
 ${BUCKET}             generated
+${ENDPOINT_URL}       http://s3g:9878
+
+*** Keywords ***
+Create bucket to be deleted
+    ${bucket} =    Run Keyword if    '${BUCKET}' == 'link'    Create link    to-be-deleted
+    ...            ELSE              Run Keyword              Create bucket
+    [return]       ${bucket}
 
 *** Test Cases ***
 
 Delete existing bucket
-# Bucket already is created in Test Setup.
-                   Execute AWSS3APICli                delete-bucket --bucket ${BUCKET}
+    ${bucket} =                Create bucket to be deleted
+    Execute AWSS3APICli        delete-bucket --bucket ${bucket}
 
 Delete non-existent bucket
     ${result} =    Execute AWSS3APICli and checkrc    delete-bucket --bucket nosuchbucket    255
diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot b/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
index 4595587..c263988 100644
--- a/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/s3/commonawslib.robot
@@ -15,12 +15,13 @@
 
 *** Settings ***
 Resource            ../commonlib.robot
-Resource            ../commonlib.robot
+Resource            ../ozone-lib/shell.robot
 
 *** Variables ***
+${ENDPOINT_URL}                http://s3g:9878
 ${OZONE_S3_HEADER_VERSION}     v4
 ${OZONE_S3_SET_CREDENTIALS}    true
-${BUCKET}                      bucket-999
+${BUCKET}                      generated
 
 *** Keywords ***
 Execute AWSS3APICli
@@ -38,6 +39,12 @@ Execute AWSS3Cli
     ${output} =       Execute                     aws s3 --endpoint-url ${ENDPOINT_URL} ${command}
     [return]          ${output}
 
+Install aws cli
+    ${rc}              ${output} =                 Run And Return Rc And Output           which apt-get
+    Run Keyword if     '${rc}' == '0'              Install aws cli s3 debian
+    ${rc}              ${output} =                 Run And Return Rc And Output           yum --help
+    Run Keyword if     '${rc}' == '0'              Install aws cli s3 centos
+
 Install aws cli s3 centos
     Execute            sudo -E yum install -y awscli
 
@@ -73,8 +80,9 @@ Setup dummy credentials for S3
 
 Create bucket
     ${postfix} =         Generate Random String  5  [NUMBERS]
-    Set Suite Variable   ${BUCKET}                  bucket-${postfix}
-                         Create bucket with name    ${BUCKET}
+    ${bucket} =          Set Variable               bucket-${postfix}
+                         Create bucket with name    ${bucket}
+    [Return]             ${bucket}
 
 Create bucket with name
     [Arguments]          ${bucket}
@@ -87,4 +95,19 @@ Setup s3 tests
     Run Keyword if    '${OZONE_S3_SET_CREDENTIALS}' == 'true'    Setup v4 headers
     ${result} =        Execute And Ignore Error                  ozone sh volume create o3://${OM_SERVICE_ID}/s3v
                        Should not contain                        ${result}          Failed
-    Run Keyword if    '${BUCKET}' == 'generated'                 Create bucket
+    ${BUCKET} =        Run Keyword if                            '${BUCKET}' == 'generated'            Create bucket
+    ...                ELSE                                      Set Variable    ${BUCKET}
+                       Set Suite Variable                        ${BUCKET}
+                       Run Keyword if                            '${BUCKET}' == 'link'                 Setup links for S3 tests
+
+Setup links for S3 tests
+    ${exists} =        Bucket Exists    o3://${OM_SERVICE_ID}/s3v/link
+    Return From Keyword If    ${exists}
+    Execute            ozone sh volume create o3://${OM_SERVICE_ID}/legacy
+    Execute            ozone sh bucket create o3://${OM_SERVICE_ID}/legacy/source-bucket
+    Create link        link
+
+Create link
+    [arguments]       ${bucket}
+    Execute           ozone sh bucket link o3://${OM_SERVICE_ID}/legacy/source-bucket o3://${OM_SERVICE_ID}/s3v/${bucket}
+    [return]          ${bucket}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index d4594ef..b80e357 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -19,11 +19,11 @@ package org.apache.hadoop.ozone.om;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.client.BlockID;
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.junit.After;
@@ -44,7 +45,6 @@ import org.mockito.Mockito;
 /**
  * Test for OM metrics.
  */
-@SuppressWarnings("deprecation")
 public class TestOmMetrics {
 
   /**
@@ -62,8 +62,6 @@ public class TestOmMetrics {
 
   /**
    * Create a MiniDFSCluster for testing.
-   *
-   * @throws IOException
    */
   @Before
   public void setup() throws Exception {
@@ -233,20 +231,28 @@ public class TestOmMetrics {
     KeyManager keyManager = (KeyManager) HddsWhiteboxTestUtils
         .getInternalState(ozoneManager, "keyManager");
     KeyManager mockKm = Mockito.spy(keyManager);
-
-    Mockito.doReturn(null).when(mockKm).openKey(null);
-    Mockito.doNothing().when(mockKm).deleteKey(null);
-    Mockito.doReturn(null).when(mockKm).lookupKey(null, "");
-    Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
-    Mockito.doReturn(null).when(mockKm).listTrash(
-        null, null, null, null, 0);
-    Mockito.doNothing().when(mockKm).commitKey(any(OmKeyArgs.class), anyLong());
-    Mockito.doReturn(null).when(mockKm).initiateMultipartUpload(
-        any(OmKeyArgs.class));
+    BucketManager mockBm = Mockito.mock(BucketManager.class);
+
+    OmBucketInfo mockBucket = OmBucketInfo.newBuilder()
+        .setVolumeName("").setBucketName("")
+        .build();
+    Mockito.when(mockBm.getBucketInfo(any(), any())).thenReturn(mockBucket);
+    Mockito.doReturn(null).when(mockKm).openKey(any());
+    Mockito.doNothing().when(mockKm).deleteKey(any());
+    Mockito.doReturn(null).when(mockKm).lookupKey(any(), any());
+    Mockito.doReturn(null).when(mockKm).listKeys(any(), any(), any(), any(),
+        anyInt());
+    Mockito.doReturn(null).when(mockKm).listTrash(any(), any(), any(), any(),
+        anyInt());
+    Mockito.doNothing().when(mockKm).commitKey(any(), anyLong());
+    Mockito.doReturn(null).when(mockKm).initiateMultipartUpload(any());
 
     HddsWhiteboxTestUtils.setInternalState(
+        ozoneManager, "bucketManager", mockBm);
+    HddsWhiteboxTestUtils.setInternalState(
         ozoneManager, "keyManager", mockKm);
-    doKeyOps();
+    OmKeyArgs keyArgs = createKeyArgs();
+    doKeyOps(keyArgs);
 
     MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
     assertCounter("NumKeyOps", 7L, omMetrics);
@@ -259,34 +265,32 @@ public class TestOmMetrics {
     assertCounter("NumInitiateMultipartUploads", 1L, omMetrics);
 
 
-    ozoneManager.openKey(null);
-    ozoneManager.commitKey(createKeyArgs(), 0);
-    ozoneManager.openKey(null);
-    ozoneManager.commitKey(createKeyArgs(), 0);
-    ozoneManager.openKey(null);
-    ozoneManager.commitKey(createKeyArgs(), 0);
-    ozoneManager.deleteKey(null);
+    ozoneManager.openKey(keyArgs);
+    ozoneManager.commitKey(keyArgs, 0);
+    ozoneManager.openKey(keyArgs);
+    ozoneManager.commitKey(keyArgs, 0);
+    ozoneManager.openKey(keyArgs);
+    ozoneManager.commitKey(keyArgs, 0);
+    ozoneManager.deleteKey(keyArgs);
 
 
     omMetrics = getMetrics("OMMetrics");
     assertCounter("NumKeys", 2L, omMetrics);
 
     // inject exception to test for Failure Metrics
-    Mockito.doThrow(exception).when(mockKm).openKey(null);
-    Mockito.doThrow(exception).when(mockKm).deleteKey(null);
-    Mockito.doThrow(exception).when(mockKm).lookupKey(null, "");
+    Mockito.doThrow(exception).when(mockKm).openKey(any());
+    Mockito.doThrow(exception).when(mockKm).deleteKey(any());
+    Mockito.doThrow(exception).when(mockKm).lookupKey(any(), any());
     Mockito.doThrow(exception).when(mockKm).listKeys(
-        null, null, null, null, 0);
+        any(), any(), any(), any(), anyInt());
     Mockito.doThrow(exception).when(mockKm).listTrash(
-        null, null, null, null, 0);
-    Mockito.doThrow(exception).when(mockKm).commitKey(any(OmKeyArgs.class),
-        anyLong());
-    Mockito.doThrow(exception).when(mockKm).initiateMultipartUpload(
-        any(OmKeyArgs.class));
+        any(), any(), any(), any(), anyInt());
+    Mockito.doThrow(exception).when(mockKm).commitKey(any(), anyLong());
+    Mockito.doThrow(exception).when(mockKm).initiateMultipartUpload(any());
 
     HddsWhiteboxTestUtils.setInternalState(
         ozoneManager, "keyManager", mockKm);
-    doKeyOps();
+    doKeyOps(keyArgs);
 
     omMetrics = getMetrics("OMMetrics");
     assertCounter("NumKeyOps", 21L, omMetrics);
@@ -380,39 +384,39 @@ public class TestOmMetrics {
   /**
    * Test key operations with ignoring thrown exception.
    */
-  private void doKeyOps() {
+  private void doKeyOps(OmKeyArgs keyArgs) {
     try {
-      ozoneManager.openKey(null);
+      ozoneManager.openKey(keyArgs);
     } catch (IOException ignored) {
     }
 
     try {
-      ozoneManager.deleteKey(null);
+      ozoneManager.deleteKey(keyArgs);
     } catch (IOException ignored) {
     }
 
     try {
-      ozoneManager.lookupKey(null);
+      ozoneManager.lookupKey(keyArgs);
     } catch (IOException ignored) {
     }
 
     try {
-      ozoneManager.listKeys(null, null, null, null, 0);
+      ozoneManager.listKeys("", "", null, null, 0);
     } catch (IOException ignored) {
     }
 
     try {
-      ozoneManager.listTrash(null, null, null, null, 0);
+      ozoneManager.listTrash("", "", null, null, 0);
     } catch (IOException ignored) {
     }
 
     try {
-      ozoneManager.commitKey(createKeyArgs(), 0);
+      ozoneManager.commitKey(keyArgs, 0);
     } catch (IOException ignored) {
     }
 
     try {
-      ozoneManager.initiateMultipartUpload(null);
+      ozoneManager.initiateMultipartUpload(keyArgs);
     } catch (IOException ignored) {
     }
 
@@ -420,12 +424,12 @@ public class TestOmMetrics {
 
   private OmKeyArgs createKeyArgs() {
     OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
-        .setBlockID(new BlockID(new ContainerBlockID(1, 1))).build();
+        .setBlockID(new BlockID(new ContainerBlockID(1, 1)))
+        .build();
     keyLocationInfo.setCreateVersion(0);
-    List<OmKeyLocationInfo> omKeyLocationInfoList = new ArrayList<>();
-    omKeyLocationInfoList.add(keyLocationInfo);
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setLocationInfoList(
-        omKeyLocationInfoList).build();
-    return keyArgs;
+
+    return new OmKeyArgs.Builder()
+        .setLocationInfoList(Collections.singletonList(keyLocationInfo))
+        .build();
   }
 }
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index f6eaf38..6859817 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -304,6 +304,8 @@ enum Status {
     INVALID_VOLUME_NAME = 61;
 
     PARTIAL_DELETE = 62;
+
+    DETECTED_LOOP_IN_BUCKET_LINKS = 63;
 }
 
 /**
@@ -483,6 +485,8 @@ message BucketInfo {
     optional uint64 objectID = 9;
     optional uint64 updateID = 10;
     optional uint64 modificationTime = 11;
+    optional string sourceVolume = 12;
+    optional string sourceBucket = 13;
 }
 
 enum StorageTypeProto {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index 2e0c6cf..4349d7c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -17,7 +17,6 @@
 package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -25,7 +24,7 @@ import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -34,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.hadoop.util.StringUtils;
@@ -41,6 +41,7 @@ import org.apache.hadoop.util.Time;
 
 import com.google.common.base.Preconditions;
 import org.iq80.leveldb.DBException;
+import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,46 +137,34 @@ public class BucketManagerImpl implements BucketManager {
         throw new OMException("Bucket already exist",
             OMException.ResultCodes.BUCKET_ALREADY_EXISTS);
       }
+
       BucketEncryptionKeyInfo bek = bucketInfo.getEncryptionKeyInfo();
-      BucketEncryptionKeyInfo.Builder bekb = null;
-      if (bek != null) {
-        if (kmsProvider == null) {
-          throw new OMException("Invalid KMS provider, check configuration " +
-              CommonConfigurationKeys.HADOOP_SECURITY_KEY_PROVIDER_PATH,
-              OMException.ResultCodes.INVALID_KMS_PROVIDER);
-        }
-        if (bek.getKeyName() == null) {
-          throw new OMException("Bucket encryption key needed.", OMException
-              .ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND);
-        }
-        // Talk to KMS to retrieve the bucket encryption key info.
-        KeyProvider.Metadata metadata = getKMSProvider().getMetadata(
-            bek.getKeyName());
-        if (metadata == null) {
-          throw new OMException("Bucket encryption key " + bek.getKeyName()
-              + " doesn't exist.",
-              OMException.ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND);
-        }
-        // If the provider supports pool for EDEKs, this will fill in the pool
-        kmsProvider.warmUpEncryptedKeys(bek.getKeyName());
-        bekb = new BucketEncryptionKeyInfo.Builder()
-            .setKeyName(bek.getKeyName())
-            .setVersion(CryptoProtocolVersion.ENCRYPTION_ZONES)
-            .setSuite(CipherSuite.convert(metadata.getCipher()));
-      }
-      List<OzoneAcl> acls = new ArrayList<>();
-      acls.addAll(bucketInfo.getAcls());
-      volumeArgs.getAclMap().getDefaultAclList().forEach(
-          a -> acls.add(OzoneAcl.fromProtobufWithAccessType(a)));
-
-      OmBucketInfo.Builder omBucketInfoBuilder = OmBucketInfo.newBuilder()
-          .setVolumeName(bucketInfo.getVolumeName())
-          .setBucketName(bucketInfo.getBucketName())
-          .setAcls(acls)
-          .setStorageType(bucketInfo.getStorageType())
-          .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
-          .setCreationTime(Time.now())
-          .addAllMetadata(bucketInfo.getMetadata());
+
+      boolean hasSourceVolume = bucketInfo.getSourceVolume() != null;
+      boolean hasSourceBucket = bucketInfo.getSourceBucket() != null;
+
+      if (hasSourceBucket != hasSourceVolume) {
+        throw new OMException("Both source volume and source bucket are " +
+            "required for bucket links",
+            OMException.ResultCodes.INVALID_REQUEST);
+      }
+
+      if (bek != null && hasSourceBucket) {
+        throw new OMException("Encryption cannot be set for bucket links",
+            OMException.ResultCodes.INVALID_REQUEST);
+      }
+
+      BucketEncryptionKeyInfo.Builder bekb =
+          createBucketEncryptionKeyInfoBuilder(bek);
+
+      OmBucketInfo.Builder omBucketInfoBuilder = bucketInfo.toBuilder()
+          .setCreationTime(Time.now());
+
+      List<OzoneManagerProtocolProtos.OzoneAclInfo> defaultAclList =
+          volumeArgs.getAclMap().getDefaultAclList();
+      for (OzoneManagerProtocolProtos.OzoneAclInfo a : defaultAclList) {
+        omBucketInfoBuilder.addAcl(OzoneAcl.fromProtobufWithAccessType(a));
+      }
 
       if (bekb != null) {
         omBucketInfoBuilder.setBucketEncryptionKey(bekb.build());
@@ -183,7 +172,14 @@ public class BucketManagerImpl implements BucketManager {
 
       OmBucketInfo omBucketInfo = omBucketInfoBuilder.build();
       commitBucketInfoToDB(omBucketInfo);
-      LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
+      if (hasSourceBucket) {
+        LOG.debug("created link {}/{} to bucket: {}/{}",
+            volumeName, bucketName,
+            omBucketInfo.getSourceVolume(), omBucketInfo.getSourceBucket());
+      } else {
+        LOG.debug("created bucket: {} in volume: {}", bucketName,
+            volumeName);
+      }
     } catch (IOException | DBException ex) {
       if (!(ex instanceof OMException)) {
         LOG.error("Bucket creation failed for bucket:{} in volume:{}",
@@ -199,6 +195,38 @@ public class BucketManagerImpl implements BucketManager {
     }
   }
 
+  @Nullable
+  public BucketEncryptionKeyInfo.Builder createBucketEncryptionKeyInfoBuilder(
+      BucketEncryptionKeyInfo bek) throws IOException {
+    BucketEncryptionKeyInfo.Builder bekb = null;
+    if (bek != null) {
+      if (kmsProvider == null) {
+        throw new OMException("Invalid KMS provider, check configuration " +
+            CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+            OMException.ResultCodes.INVALID_KMS_PROVIDER);
+      }
+      if (bek.getKeyName() == null) {
+        throw new OMException("Bucket encryption key needed.", OMException
+            .ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND);
+      }
+      // Talk to KMS to retrieve the bucket encryption key info.
+      KeyProvider.Metadata metadata = getKMSProvider().getMetadata(
+          bek.getKeyName());
+      if (metadata == null) {
+        throw new OMException("Bucket encryption key " + bek.getKeyName()
+            + " doesn't exist.",
+            OMException.ResultCodes.BUCKET_ENCRYPTION_KEY_NOT_FOUND);
+      }
+      // If the provider supports pool for EDEKs, this will fill in the pool
+      kmsProvider.warmUpEncryptedKeys(bek.getKeyName());
+      bekb = new BucketEncryptionKeyInfo.Builder()
+          .setKeyName(bek.getKeyName())
+          .setVersion(CryptoProtocolVersion.ENCRYPTION_ZONES)
+          .setSuite(CipherSuite.convert(metadata.getCipher()));
+    }
+    return bekb;
+  }
+
   private void commitBucketInfoToDB(OmBucketInfo omBucketInfo)
       throws IOException {
     String dbBucketKey =
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 8c0fcbd..0905d81 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -36,15 +36,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
@@ -133,6 +136,7 @@ import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
@@ -204,6 +208,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
@@ -231,6 +236,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       AuditLoggerType.OMLOGGER);
 
   private static final String OM_DAEMON = "om";
+
   private static boolean securityEnabled = false;
   private OzoneDelegationTokenSecretManager delegationTokenMgr;
   private OzoneBlockTokenSecretManager blockTokenMgr;
@@ -2025,22 +2031,29 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public OpenKeySession openKey(OmKeyArgs args) throws IOException {
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     if (isAclEnabled) {
       try {
         checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
-            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+            bucket.realVolume(), bucket.realBucket(), args.getKeyName());
       } catch (OMException ex) {
         // For new keys key checkAccess call will fail as key doesn't exist.
         // Check user access for bucket.
         if (ex.getResult().equals(KEY_NOT_FOUND)) {
           checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
-              args.getVolumeName(), args.getBucketName(), args.getKeyName());
+              bucket.realVolume(), bucket.realBucket(), args.getKeyName());
         } else {
           throw ex;
         }
       }
     }
+
     boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumKeyAllocates();
       return keyManager.openKey(args);
@@ -2048,12 +2061,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       metrics.incNumKeyAllocateFails();
       auditSuccess = false;
       AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.ALLOCATE_KEY,
-          (args == null) ? null : args.toAuditMap(), ex));
+          auditMap, ex));
       throw ex;
     } finally {
       if (auditSuccess) {
         AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-            OMAction.ALLOCATE_KEY, (args == null) ? null : args.toAuditMap()));
+            OMAction.ALLOCATE_KEY, auditMap));
       }
     }
   }
@@ -2061,24 +2074,29 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void commitKey(OmKeyArgs args, long clientID)
       throws IOException {
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     if (isAclEnabled) {
       try {
         checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
-            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+            bucket.realVolume(), bucket.realBucket(), args.getKeyName());
       } catch (OMException ex) {
         // For new keys key checkAccess call will fail as key doesn't exist.
         // Check user access for bucket.
         if (ex.getResult().equals(KEY_NOT_FOUND)) {
           checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
-              args.getVolumeName(), args.getBucketName(), args.getKeyName());
+              bucket.realVolume(), bucket.realBucket(), args.getKeyName());
         } else {
           throw ex;
         }
       }
     }
-    Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
-        args.toAuditMap();
+
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
     auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumKeyCommits();
       keyManager.commitKey(args, clientID);
@@ -2088,7 +2106,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       // As key also can have multiple versions, we need to increment keys
       // only if version is 0. Currently we have not complete support of
       // versioning of keys. So, this can be revisited later.
-      if (args != null && args.getLocationInfoList() != null &&
+      if (args.getLocationInfoList() != null &&
           args.getLocationInfoList().size() > 0 &&
           args.getLocationInfoList().get(0) != null &&
           args.getLocationInfoList().get(0).getCreateVersion() == 0) {
@@ -2105,25 +2123,30 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID,
       ExcludeList excludeList) throws IOException {
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     if (isAclEnabled) {
       try {
         checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
-            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+            bucket.realVolume(), bucket.realBucket(), args.getKeyName());
       } catch (OMException ex) {
         // For new keys key checkAccess call will fail as key doesn't exist.
         // Check user access for bucket.
         if (ex.getResult().equals(KEY_NOT_FOUND)) {
           checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
-              args.getVolumeName(), args.getBucketName(), args.getKeyName());
+              bucket.realVolume(), bucket.realBucket(), args.getKeyName());
         } else {
           throw ex;
         }
       }
     }
+
     boolean auditSuccess = true;
-    Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
-        args.toAuditMap();
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
     auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumBlockAllocateCalls();
       return keyManager.allocateBlock(args, clientID, excludeList);
@@ -2150,11 +2173,18 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     if (isAclEnabled) {
       checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
     }
+
     boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumKeyLookups();
       return keyManager.lookupKey(args, getClientAddress());
@@ -2162,25 +2192,32 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       metrics.incNumKeyLookupFails();
       auditSuccess = false;
       AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.READ_KEY,
-          (args == null) ? null : args.toAuditMap(), ex));
+          auditMap, ex));
       throw ex;
     } finally {
       if (auditSuccess) {
         AUDIT.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
-            (args == null) ? null : args.toAuditMap()));
+            auditMap));
       }
     }
   }
 
   @Override
   public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
+    Preconditions.checkNotNull(args);
+
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     if (isAclEnabled) {
       checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.WRITE,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
     }
-    Map<String, String> auditMap = (args == null) ? new LinkedHashMap<>() :
-        args.toAuditMap();
+
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
     auditMap.put(OzoneConsts.TO_KEY_NAME, toKeyName);
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumKeyRenames();
       keyManager.renameKey(args, toKeyName);
@@ -2202,20 +2239,25 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public void deleteKey(OmKeyArgs args) throws IOException {
+    Map<String, String> auditMap = args.toAuditMap();
     try {
+      ResolvedBucket bucket = resolveBucketLink(args);
+      args = bucket.update(args);
+
       if (isAclEnabled) {
         checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.DELETE,
-            args.getVolumeName(), args.getBucketName(), args.getKeyName());
+            bucket.realVolume(), bucket.realBucket(), args.getKeyName());
       }
+
       metrics.incNumKeyDeletes();
       keyManager.deleteKey(args);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction.DELETE_KEY,
-          (args == null) ? null : args.toAuditMap()));
+          auditMap));
       metrics.decNumKeys();
     } catch (Exception ex) {
       metrics.incNumKeyDeleteFails();
       AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.DELETE_KEY,
-          (args == null) ? null : args.toAuditMap(), ex));
+          auditMap, ex));
       throw ex;
     }
   }
@@ -2235,19 +2277,23 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
       String startKey, String keyPrefix, int maxKeys) throws IOException {
+
+    ResolvedBucket bucket = resolveBucketLink(Pair.of(volumeName, bucketName));
+
     if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET,
-          StoreType.OZONE, ACLType.LIST, volumeName, bucketName, keyPrefix);
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
+          bucket.realVolume(), bucket.realBucket(), keyPrefix);
     }
+
     boolean auditSuccess = true;
-    Map<String, String> auditMap = buildAuditMap(volumeName);
-    auditMap.put(OzoneConsts.BUCKET, bucketName);
+    Map<String, String> auditMap = bucket.audit();
     auditMap.put(OzoneConsts.START_KEY, startKey);
     auditMap.put(OzoneConsts.MAX_KEYS, String.valueOf(maxKeys));
     auditMap.put(OzoneConsts.KEY_PREFIX, keyPrefix);
+
     try {
       metrics.incNumKeyLists();
-      return keyManager.listKeys(volumeName, bucketName,
+      return keyManager.listKeys(bucket.realVolume(), bucket.realBucket(),
           startKey, keyPrefix, maxKeys);
     } catch (IOException ex) {
       metrics.incNumKeyListFails();
@@ -2268,6 +2314,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       String bucketName, String startKeyName, String keyPrefix, int maxKeys)
       throws IOException {
 
+    // bucket links not supported
+
     if (isAclEnabled) {
       checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
           volumeName, bucketName, keyPrefix);
@@ -2528,66 +2576,75 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws
       IOException {
-    OmMultipartInfo multipartInfo;
+
+    Preconditions.checkNotNull(keyArgs);
+    ResolvedBucket bucket = resolveBucketLink(keyArgs);
+
+    Map<String, String> auditMap = bucket.audit(keyArgs.toAuditMap());
+
+    keyArgs = bucket.update(keyArgs);
+
     metrics.incNumInitiateMultipartUploads();
     try {
-      multipartInfo = keyManager.initiateMultipartUpload(keyArgs);
+      OmMultipartInfo result = keyManager.initiateMultipartUpload(keyArgs);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-          OMAction.INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null :
-              keyArgs.toAuditMap()));
+          OMAction.INITIATE_MULTIPART_UPLOAD, auditMap));
+      return result;
     } catch (IOException ex) {
       AUDIT.logWriteFailure(buildAuditMessageForFailure(
-          OMAction.INITIATE_MULTIPART_UPLOAD,
-          (keyArgs == null) ? null : keyArgs.toAuditMap(), ex));
+          OMAction.INITIATE_MULTIPART_UPLOAD, auditMap, ex));
       metrics.incNumInitiateMultipartUploadFails();
       throw ex;
     }
-    return multipartInfo;
   }
 
   @Override
   public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
       OmKeyArgs keyArgs, long clientID) throws IOException {
-    boolean auditSuccess = false;
-    OmMultipartCommitUploadPartInfo commitUploadPartInfo;
+
+    Preconditions.checkNotNull(keyArgs);
+    ResolvedBucket bucket = resolveBucketLink(keyArgs);
+
+    Map<String, String> auditMap = bucket.audit(keyArgs.toAuditMap());
+
+    keyArgs = bucket.update(keyArgs);
+
     metrics.incNumCommitMultipartUploadParts();
     try {
-      commitUploadPartInfo = keyManager.commitMultipartUploadPart(keyArgs,
-          clientID);
-      auditSuccess = true;
+      OmMultipartCommitUploadPartInfo result =
+          keyManager.commitMultipartUploadPart(keyArgs, clientID);
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+          OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, auditMap));
+      return result;
     } catch (IOException ex) {
-      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
-          .INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null : keyArgs
-          .toAuditMap(), ex));
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(
+          OMAction.INITIATE_MULTIPART_UPLOAD, auditMap, ex));
       metrics.incNumCommitMultipartUploadPartFails();
       throw ex;
-    } finally {
-      if (auditSuccess) {
-        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-            OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, (keyArgs == null) ? null :
-                keyArgs.toAuditMap()));
-      }
     }
-    return commitUploadPartInfo;
   }
 
   @Override
   public OmMultipartUploadCompleteInfo completeMultipartUpload(
       OmKeyArgs omKeyArgs, OmMultipartUploadCompleteList multipartUploadList)
       throws IOException {
-    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo;
-    metrics.incNumCompleteMultipartUploads();
 
-    Map<String, String> auditMap = (omKeyArgs == null) ? new LinkedHashMap<>() :
-        omKeyArgs.toAuditMap();
+    Preconditions.checkNotNull(omKeyArgs);
+    ResolvedBucket bucket = resolveBucketLink(omKeyArgs);
+
+    Map<String, String> auditMap = bucket.audit(omKeyArgs.toAuditMap());
     auditMap.put(OzoneConsts.MULTIPART_LIST, multipartUploadList
         .getMultipartMap().toString());
+
+    omKeyArgs = bucket.update(omKeyArgs);
+
+    metrics.incNumCompleteMultipartUploads();
     try {
-      omMultipartUploadCompleteInfo = keyManager.completeMultipartUpload(
-          omKeyArgs, multipartUploadList);
+      OmMultipartUploadCompleteInfo result = keyManager.completeMultipartUpload(
+              omKeyArgs, multipartUploadList);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
           .COMPLETE_MULTIPART_UPLOAD, auditMap));
-      return omMultipartUploadCompleteInfo;
+      return result;
     } catch (IOException ex) {
       metrics.incNumCompleteMultipartUploadFails();
       AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
@@ -2599,8 +2656,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
 
-    Map<String, String> auditMap = (omKeyArgs == null) ? new LinkedHashMap<>() :
-        omKeyArgs.toAuditMap();
+    Preconditions.checkNotNull(omKeyArgs);
+    ResolvedBucket bucket = resolveBucketLink(omKeyArgs);
+
+    Map<String, String> auditMap = bucket.audit(omKeyArgs.toAuditMap());
+
+    omKeyArgs = bucket.update(omKeyArgs);
+
     metrics.incNumAbortMultipartUploads();
     try {
       keyManager.abortMultipartUpload(omKeyArgs);
@@ -2616,22 +2678,24 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   @Override
-  public OmMultipartUploadListParts listParts(String volumeName,
-      String bucketName, String keyName, String uploadID, int partNumberMarker,
-      int maxParts) throws IOException {
-    Map<String, String> auditMap = new HashMap<>();
-    auditMap.put(OzoneConsts.VOLUME, volumeName);
-    auditMap.put(OzoneConsts.BUCKET, bucketName);
+  public OmMultipartUploadListParts listParts(final String volumeName,
+      final String bucketName, String keyName, String uploadID,
+      int partNumberMarker, int maxParts)  throws IOException {
+
+    ResolvedBucket bucket = resolveBucketLink(Pair.of(volumeName, bucketName));
+
+    Map<String, String> auditMap = bucket.audit();
     auditMap.put(OzoneConsts.KEY, keyName);
     auditMap.put(OzoneConsts.UPLOAD_ID, uploadID);
     auditMap.put(OzoneConsts.PART_NUMBER_MARKER,
         Integer.toString(partNumberMarker));
     auditMap.put(OzoneConsts.MAX_PARTS, Integer.toString(maxParts));
+
     metrics.incNumListMultipartUploadParts();
     try {
       OmMultipartUploadListParts omMultipartUploadListParts =
-          keyManager.listParts(volumeName, bucketName, keyName, uploadID,
-              partNumberMarker, maxParts);
+          keyManager.listParts(bucket.realVolume(), bucket.realBucket(),
+              keyName, uploadID, partNumberMarker, maxParts);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
           .LIST_MULTIPART_UPLOAD_PARTS, auditMap));
       return omMultipartUploadListParts;
@@ -2647,15 +2711,16 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public OmMultipartUploadList listMultipartUploads(String volumeName,
       String bucketName, String prefix) throws IOException {
 
-    Map<String, String> auditMap = new HashMap<>();
-    auditMap.put(OzoneConsts.VOLUME, volumeName);
-    auditMap.put(OzoneConsts.BUCKET, bucketName);
+    ResolvedBucket bucket = resolveBucketLink(Pair.of(volumeName, bucketName));
+
+    Map<String, String> auditMap = bucket.audit();
     auditMap.put(OzoneConsts.PREFIX, prefix);
 
     metrics.incNumListMultipartUploads();
     try {
       OmMultipartUploadList omMultipartUploadList =
-          keyManager.listMultipartUploads(volumeName, bucketName, prefix);
+          keyManager.listMultipartUploads(bucket.realVolume(),
+              bucket.realBucket(), prefix);
       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
           .LIST_MULTIPART_UPLOADS, auditMap));
       return omMultipartUploadList;
@@ -2671,11 +2736,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   @Override
   public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
-    if (isAclEnabled) {
-      checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
-    }
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumGetFileStatus();
       return keyManager.getFileStatus(args);
@@ -2683,14 +2750,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       metrics.incNumGetFileStatusFails();
       auditSuccess = false;
       AUDIT.logReadFailure(
-          buildAuditMessageForFailure(OMAction.GET_FILE_STATUS,
-              (args == null) ? null : args.toAuditMap(), ex));
+          buildAuditMessageForFailure(OMAction.GET_FILE_STATUS, auditMap, ex));
       throw ex;
     } finally {
       if (auditSuccess) {
         AUDIT.logReadSuccess(
-            buildAuditMessageForSuccess(OMAction.GET_FILE_STATUS,
-                (args == null) ? null : args.toAuditMap()));
+            buildAuditMessageForSuccess(OMAction.GET_FILE_STATUS, auditMap));
       }
     }
   }
@@ -2704,11 +2769,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   @Override
   public void createDirectory(OmKeyArgs args) throws IOException {
-    if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
-    }
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumCreateDirectory();
       keyManager.createDirectory(args);
@@ -2716,14 +2783,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       metrics.incNumCreateDirectoryFails();
       auditSuccess = false;
       AUDIT.logWriteFailure(
-          buildAuditMessageForFailure(OMAction.CREATE_DIRECTORY,
-              (args == null) ? null : args.toAuditMap(), ex));
+          buildAuditMessageForFailure(OMAction.CREATE_DIRECTORY, auditMap, ex));
       throw ex;
     } finally {
       if (auditSuccess) {
         AUDIT.logWriteSuccess(
-            buildAuditMessageForSuccess(OMAction.CREATE_DIRECTORY,
-                (args == null) ? null : args.toAuditMap()));
+            buildAuditMessageForSuccess(OMAction.CREATE_DIRECTORY, auditMap));
       }
     }
   }
@@ -2731,11 +2796,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public OpenKeySession createFile(OmKeyArgs args, boolean overWrite,
       boolean recursive) throws IOException {
-    if (isAclEnabled) {
-      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.WRITE,
-          args.getVolumeName(), args.getBucketName(), null);
-    }
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumCreateFile();
       return keyManager.createFile(args, overWrite, recursive);
@@ -2743,23 +2810,30 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       metrics.incNumCreateFileFails();
       auditSuccess = false;
       AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.CREATE_FILE,
-          (args == null) ? null : args.toAuditMap(), ex));
+          auditMap, ex));
       throw ex;
     } finally {
       if (auditSuccess) {
         AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-            OMAction.CREATE_FILE, (args == null) ? null : args.toAuditMap()));
+            OMAction.CREATE_FILE, auditMap));
       }
     }
   }
 
   @Override
   public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     if (isAclEnabled) {
       checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
     }
+
     boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumLookupFile();
       return keyManager.lookupFile(args, getClientAddress());
@@ -2767,12 +2841,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       metrics.incNumLookupFileFails();
       auditSuccess = false;
       AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.LOOKUP_FILE,
-          (args == null) ? null : args.toAuditMap(), ex));
+          auditMap, ex));
       throw ex;
     } finally {
       if (auditSuccess) {
         AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
-            OMAction.LOOKUP_FILE, (args == null) ? null : args.toAuditMap()));
+            OMAction.LOOKUP_FILE, auditMap));
       }
     }
   }
@@ -2780,11 +2854,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   @Override
   public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
       String startKey, long numEntries) throws IOException {
+
+    ResolvedBucket bucket = resolveBucketLink(args);
+
     if (isAclEnabled) {
       checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
-          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
     }
+
     boolean auditSuccess = true;
+    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
+
+    args = bucket.update(args);
+
     try {
       metrics.incNumListStatus();
       return keyManager.listStatus(args, recursive, startKey, numEntries);
@@ -2792,12 +2874,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       metrics.incNumListStatusFails();
       auditSuccess = false;
       AUDIT.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_STATUS,
-          (args == null) ? null : args.toAuditMap(), ex));
+          auditMap, ex));
       throw ex;
     } finally {
       if (auditSuccess) {
         AUDIT.logReadSuccess(buildAuditMessageForSuccess(
-            OMAction.LIST_STATUS, (args == null) ? null : args.toAuditMap()));
+            OMAction.LIST_STATUS, auditMap));
       }
     }
   }
@@ -3314,4 +3396,60 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     jvmPauseMonitor.init(configuration);
     jvmPauseMonitor.start();
   }
+
+  public ResolvedBucket resolveBucketLink(KeyArgs args) throws IOException {
+    return resolveBucketLink(
+        Pair.of(args.getVolumeName(), args.getBucketName()));
+  }
+
+  public ResolvedBucket resolveBucketLink(OmKeyArgs args)
+      throws IOException {
+    return resolveBucketLink(
+        Pair.of(args.getVolumeName(), args.getBucketName()));
+  }
+
+  public ResolvedBucket resolveBucketLink(Pair<String, String> requested)
+      throws IOException {
+    Pair<String, String> resolved =
+        resolveBucketLink(requested, new HashSet<>());
+    return new ResolvedBucket(requested, resolved);
+  }
+
+  /**
+   * Resolves bucket symlinks. Read permission is required for following links.
+   *
+   * @param volumeAndBucket the bucket to be resolved (if it is a link)
+   * @param visited collects link buckets visited during the resolution to
+   *   avoid infinite loops
+   * @return bucket location possibly updated with its actual volume and bucket
+   *   after following bucket links
+   * @throws IOException (most likely OMException) if ACL check fails, bucket is
+   *   not found, loop is detected in the links, etc.
+   */
+  private Pair<String, String> resolveBucketLink(
+      Pair<String, String> volumeAndBucket,
+      Set<Pair<String, String>> visited) throws IOException {
+
+    String volumeName = volumeAndBucket.getLeft();
+    String bucketName = volumeAndBucket.getRight();
+    OmBucketInfo info = bucketManager.getBucketInfo(volumeName, bucketName);
+    if (!info.isLink()) {
+      return volumeAndBucket;
+    }
+
+    if (!visited.add(volumeAndBucket)) {
+      throw new OMException("Detected loop in bucket links",
+          DETECTED_LOOP_IN_BUCKET_LINKS);
+    }
+
+    if (isAclEnabled) {
+      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.READ,
+          volumeName, bucketName, null);
+    }
+
+    return resolveBucketLink(
+        Pair.of(info.getSourceVolume(), info.getSourceBucket()),
+        visited);
+  }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ResolvedBucket.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ResolvedBucket.java
new file mode 100644
index 0000000..fef9b2e
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ResolvedBucket.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Bundles information about a bucket, which is possibly a symlink,
+ * and the real bucket that it resolves to, if it is indeed a link.
+ * For regular buckets, both {@code requested} and {@code resolved} point to
+ * the same bucket.
+ */
+public class ResolvedBucket {
+
+  private final Pair<String, String> requested;
+  private final Pair<String, String> resolved;
+
+  public ResolvedBucket(Pair<String, String> requested,
+      Pair<String, String> resolved) {
+    this.requested = requested;
+    this.resolved = resolved;
+  }
+
+  public Pair<String, String> requested() {
+    return requested;
+  }
+
+  public Pair<String, String> resolved() {
+    return resolved;
+  }
+
+  public String requestedVolume() {
+    return requested.getLeft();
+  }
+
+  public String requestedBucket() {
+    return requested.getRight();
+  }
+
+  public String realVolume() {
+    return resolved.getLeft();
+  }
+
+  public String realBucket() {
+    return resolved.getRight();
+  }
+
+  public OmKeyArgs update(OmKeyArgs args) {
+    return isLink()
+        ? args.toBuilder()
+            .setVolumeName(realVolume())
+            .setBucketName(realBucket())
+            .build()
+        : args;
+  }
+
+  public KeyArgs update(KeyArgs args) {
+    return isLink()
+        ? args.toBuilder()
+            .setVolumeName(realVolume())
+            .setBucketName(realBucket())
+            .build()
+        : args;
+  }
+
+  public boolean isLink() {
+    return !Objects.equals(requested, resolved);
+  }
+
+  public Map<String, String> audit() {
+    return audit(new LinkedHashMap<>());
+  }
+
+  /**
+   * Adds audit information about the bucket (and if it's a link, then the
+   * real bucket, too) to {@code auditMap}.
+   * @return the same map for convenience
+   */
+  public Map<String, String> audit(Map<String, String> auditMap) {
+    auditMap.putIfAbsent(OzoneConsts.VOLUME, requestedVolume());
+    auditMap.putIfAbsent(OzoneConsts.BUCKET, requestedBucket());
+    if (isLink()) {
+      auditMap.put(OzoneConsts.SOURCE_VOLUME, realVolume());
+      auditMap.put(OzoneConsts.SOURCE_BUCKET, realBucket());
+    }
+    return auditMap;
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
index 9d7d133..71d5458 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
@@ -115,6 +115,20 @@ public class OMBucketCreateRequest extends OMClientRequest {
       newBucketInfo.setBeinfo(getBeinfo(kmsProvider, bucketInfo));
     }
 
+    boolean hasSourceVolume = bucketInfo.getSourceVolume() != null;
+    boolean hasSourceBucket = bucketInfo.getSourceBucket() != null;
+
+    if (hasSourceBucket != hasSourceVolume) {
+      throw new OMException("Both source volume and source bucket are " +
+          "required for bucket links",
+          OMException.ResultCodes.INVALID_REQUEST);
+    }
+
+    if (hasSourceBucket && bucketInfo.hasBeinfo()) {
+      throw new OMException("Encryption cannot be set for bucket links",
+          OMException.ResultCodes.INVALID_REQUEST);
+    }
+
     newCreateBucketRequest.setBucketInfo(newBucketInfo.build());
 
     return getOmRequest().toBuilder().setUserInfo(getUserInfo())
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index ec51333..7b2ab51 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -146,6 +146,10 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
     List<OmKeyInfo> missingParentInfos;
 
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // check Acl
       checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
           IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index 3b0b02b..7327626 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -166,6 +166,7 @@ public class OMFileCreateRequest extends OMKeyRequest {
 
     CreateFileRequest createFileRequest = getOmRequest().getCreateFileRequest();
     KeyArgs keyArgs = createFileRequest.getKeyArgs();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
 
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
@@ -199,6 +200,10 @@ public class OMFileCreateRequest extends OMKeyRequest {
     IOException exception = null;
     Result result = null;
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // check Acl
       checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
           IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
@@ -310,7 +315,6 @@ public class OMFileCreateRequest extends OMKeyRequest {
     }
 
     // Audit Log outside the lock
-    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
         OMAction.CREATE_FILE, auditMap, exception,
         getOmRequest().getUserInfo()));
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index 1a39e0b..9e82888 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -161,8 +161,7 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
     auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    String openKeyName = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, clientID);
+    String openKeyName = null;
 
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
@@ -172,6 +171,10 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
     IOException exception = null;
 
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // check Acl
       checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName, keyName,
           IAccessAuthorizer.ACLType.WRITE, allocateBlockRequest.getClientID());
@@ -182,6 +185,8 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
       // Here we don't acquire bucket/volume lock because for a single client
       // allocateBlock is called in serial fashion.
 
+      openKeyName = omMetadataManager.getOpenKey(volumeName, bucketName,
+          keyName, clientID);
       openKeyInfo = omMetadataManager.getOpenKeyTable().get(openKeyName);
       if (openKeyInfo == null) {
         throw new OMException("Open Key not found " + openKeyName,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index edeea3d..eb3769b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -125,37 +125,42 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     OmKeyInfo omKeyInfo = null;
     OMClientResponse omClientResponse = null;
     boolean bucketLockAcquired = false;
-    Result result = null;
+    Result result;
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    String dbOzoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
-    String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-        keyName, commitKeyRequest.getClientID());
 
     try {
+      commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap);
+      volumeName = commitKeyArgs.getVolumeName();
+      bucketName = commitKeyArgs.getBucketName();
+
       // check Acl
       checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName,
           keyName, IAccessAuthorizer.ACLType.WRITE,
           commitKeyRequest.getClientID());
 
+      String dbOzoneKey =
+          omMetadataManager.getOzoneKey(volumeName, bucketName,
+              keyName);
+      String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
+          keyName, commitKeyRequest.getClientID());
+
       List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
       for (KeyLocation keyLocation : commitKeyArgs.getKeyLocationsList()) {
         locationInfoList.add(OmKeyLocationInfo.getFromProtobuf(keyLocation));
       }
 
-      bucketLockAcquired = omMetadataManager.getLock().acquireWriteLock(
-          BUCKET_LOCK, volumeName, bucketName);
+      bucketLockAcquired =
+          omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
+              volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
       omKeyInfo = omMetadataManager.getOpenKeyTable().get(dbOpenKey);
-
       if (omKeyInfo == null) {
         throw new OMException("Failed to commit key, as " + dbOpenKey +
             "entry is not found in the OpenKey table", KEY_NOT_FOUND);
       }
-
       omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
 
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
@@ -183,7 +188,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       result = Result.FAILURE;
       exception = ex;
       omClientResponse = new OMKeyCommitResponse(createErrorOMResponse(
-            omResponse, exception));
+          omResponse, exception));
     } finally {
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
@@ -207,7 +212,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       if (omKeyInfo.getKeyLocationVersions().size() == 1) {
         omMetrics.incNumKeys();
       }
-      LOG.debug("Key commited. Volume:{}, Bucket:{}, Key:{}", volumeName,
+      LOG.debug("Key committed. Volume:{}, Bucket:{}, Key:{}", volumeName,
           bucketName, keyName);
       break;
     case FAILURE:
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index c6a7e52..f7f08dc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -165,6 +165,7 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     CreateKeyRequest createKeyRequest = getOmRequest().getCreateKeyRequest();
 
     KeyArgs keyArgs = createKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
 
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
@@ -184,6 +185,10 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     IOException exception = null;
     Result result = null;
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // check Acl
       checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
           IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
@@ -253,13 +258,10 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     }
 
     // Audit Log outside the lock
-
-    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
         OMAction.ALLOCATE_KEY, auditMap, exception,
         getOmRequest().getUserInfo()));
 
-
     switch (result) {
     case SUCCESS:
       LOG.debug("Key created. Volume:{}, Bucket:{}, Key:{}", volumeName,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
index b0eb6fd..8b75417 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
@@ -88,12 +88,13 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
     DeleteKeyRequest deleteKeyRequest = getOmRequest().getDeleteKeyRequest();
 
-    OzoneManagerProtocolProtos.KeyArgs deleteKeyArgs =
+    OzoneManagerProtocolProtos.KeyArgs keyArgs =
         deleteKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
 
-    String volumeName = deleteKeyArgs.getVolumeName();
-    String bucketName = deleteKeyArgs.getBucketName();
-    String keyName = deleteKeyArgs.getKeyName();
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String keyName = keyArgs.getKeyName();
 
     OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumKeyDeletes();
@@ -101,8 +102,6 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
     AuditLogger auditLogger = ozoneManager.getAuditLogger();
     OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
 
-    Map<String, String> auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
-
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
@@ -111,6 +110,10 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     Result result = null;
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // check Acl
       checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
           IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
index dc83ff6..91db347 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
@@ -101,12 +101,13 @@ public class OMKeyRenameRequest extends OMKeyRequest {
       long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
 
     RenameKeyRequest renameKeyRequest = getOmRequest().getRenameKeyRequest();
-    OzoneManagerProtocolProtos.KeyArgs renameKeyArgs =
+    OzoneManagerProtocolProtos.KeyArgs keyArgs =
         renameKeyRequest.getKeyArgs();
+    Map<String, String> auditMap = buildAuditMap(keyArgs, renameKeyRequest);
 
-    String volumeName = renameKeyArgs.getVolumeName();
-    String bucketName = renameKeyArgs.getBucketName();
-    String fromKeyName = renameKeyArgs.getKeyName();
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String fromKeyName = keyArgs.getKeyName();
     String toKeyName = renameKeyRequest.getToKeyName();
 
     OMMetrics omMetrics = ozoneManager.getMetrics();
@@ -114,9 +115,6 @@ public class OMKeyRenameRequest extends OMKeyRequest {
 
     AuditLogger auditLogger = ozoneManager.getAuditLogger();
 
-    Map<String, String> auditMap =
-        buildAuditMap(renameKeyArgs, renameKeyRequest);
-
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
 
@@ -132,6 +130,11 @@ public class OMKeyRenameRequest extends OMKeyRequest {
         throw new OMException("Key name is empty",
             OMException.ResultCodes.INVALID_KEY_NAME);
       }
+
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // check Acls to see if user has access to perform delete operation on
       // old key and create operation on new key
       checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
@@ -168,7 +171,7 @@ public class OMKeyRenameRequest extends OMKeyRequest {
       fromKeyValue.setKeyName(toKeyName);
 
       //Set modification time
-      fromKeyValue.setModificationTime(renameKeyArgs.getModificationTime());
+      fromKeyValue.setModificationTime(keyArgs.getModificationTime());
 
       // Add to cache.
       // fromKey should be deleted, toKey should be added with newly updated
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 0aec04d..e3f0a69 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -27,11 +27,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.PrefixManager;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -88,6 +90,15 @@ public abstract class OMKeyRequest extends OMClientRequest {
     super(omRequest);
   }
 
+  protected static KeyArgs resolveBucketLink(
+      OzoneManager ozoneManager, KeyArgs keyArgs,
+      Map<String, String> auditMap) throws IOException {
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(keyArgs);
+    keyArgs = bucket.update(keyArgs);
+    bucket.audit(auditMap);
+    return keyArgs;
+  }
+
   /**
    * This methods avoids multiple rpc calls to SCM by allocating multiple blocks
    * in one rpc call.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index adc42d8..012df49 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -19,12 +19,14 @@
 package org.apache.hadoop.ozone.om.request.key;
 
 import com.google.common.base.Optional;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
@@ -42,7 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -85,10 +87,11 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
 
     OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumKeyDeletes();
-    Map<String, String> auditMap = null;
     String volumeName = deleteKeyArgs.getVolumeName();
     String bucketName = deleteKeyArgs.getBucketName();
-    String keyName = "";
+    Map<String, String> auditMap = new LinkedHashMap<>();
+    auditMap.put(VOLUME, volumeName);
+    auditMap.put(BUCKET, bucketName);
     List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
 
     AuditLogger auditLogger = ozoneManager.getAuditLogger();
@@ -99,10 +102,7 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
-
-    boolean acquiredLock =
-        omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+    boolean acquiredLock = false;
 
     int indexFailed = 0;
     int length = deleteKeys.size();
@@ -112,12 +112,19 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
 
     boolean deleteStatus = true;
     try {
-
+      ResolvedBucket bucket = ozoneManager.resolveBucketLink(
+          Pair.of(volumeName, bucketName));
+      bucket.audit(auditMap);
+      volumeName = bucket.realVolume();
+      bucketName = bucket.realBucket();
+
+      acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+          volumeName, bucketName);
       // Validate bucket and volume exists or not.
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
       for (indexFailed = 0; indexFailed < length; indexFailed++) {
-        keyName = deleteKeyArgs.getKeys(indexFailed);
+        String keyName = deleteKeyArgs.getKeys(indexFailed);
         String objectKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
             keyName);
         OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
@@ -187,8 +194,7 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
           omDoubleBufferHelper);
     }
 
-    auditMap = buildDeleteKeysAuditMap(volumeName, bucketName, deleteKeys,
-        unDeletedKeys.getKeysList());
+    addDeletedKeys(auditMap, deleteKeys, unDeletedKeys.getKeysList());
 
     auditLog(auditLogger, buildAuditMessage(DELETE_KEYS, auditMap, exception,
         userInfo));
@@ -221,21 +227,13 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
   }
 
   /**
-   * Build audit map for DeleteKeys request.
-   * @param volumeName
-   * @param bucketName
-   * @param deletedKeys
-   * @param unDeletedKeys
-   * @return
+   * Add key info to audit map for DeleteKeys request.
    */
-  private Map<String, String> buildDeleteKeysAuditMap(String volumeName,
-      String bucketName, List<String> deletedKeys, List<String> unDeletedKeys) {
-    Map< String, String > auditMap = new HashMap<>();
-    auditMap.put(VOLUME, volumeName);
-    auditMap.put(BUCKET, bucketName);
+  private static void addDeletedKeys(
+      Map<String, String> auditMap, List<String> deletedKeys,
+      List<String> unDeletedKeys) {
     auditMap.put(DELETED_KEYS_LIST, String.join(",", deletedKeys));
-    auditMap.put(UNDELETED_KEYS_LIST, String.join(",",
-        unDeletedKeys));
-    return auditMap;
+    auditMap.put(UNDELETED_KEYS_LIST, String.join(",", unDeletedKeys));
   }
+
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java
index eac7842..232a0fb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.om.request.key;
 import java.io.IOException;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.response.key.OMTrashRecoverResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -86,6 +88,11 @@ public class OMTrashRecoverRequest extends OMKeyRequest {
     boolean acquireLock = false;
     OMClientResponse omClientResponse = null;
     try {
+      ResolvedBucket bucket = ozoneManager.resolveBucketLink(
+          Pair.of(volumeName, destinationBucket));
+      volumeName = bucket.realVolume();
+      destinationBucket = bucket.realBucket();
+
       // Check acl for the destination bucket.
       checkBucketAcls(ozoneManager, volumeName, destinationBucket, keyName,
           IAccessAuthorizer.ACLType.WRITE);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index 4f95fe4..aa96ba9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
@@ -96,8 +97,12 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
 
     Preconditions.checkNotNull(keyArgs.getMultipartUploadID());
 
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
+    final String requestedVolume = volumeName;
+    final String requestedBucket = bucketName;
     String keyName = keyArgs.getKeyName();
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
@@ -114,10 +119,14 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
         getOmRequest());
     OMClientResponse omClientResponse = null;
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // TODO to support S3 ACL later.
       acquiredBucketLock =
-          omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-              bucketName);
+          omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
@@ -136,8 +145,9 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
       // multipart upload request is received, it returns multipart upload id
       // for the key.
 
-      String multipartKey = omMetadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, keyArgs.getMultipartUploadID());
+      String multipartKey = omMetadataManager.getMultipartKey(
+          volumeName, bucketName, keyName,
+          keyArgs.getMultipartUploadID());
 
       // Even if this key already exists in the KeyTable, it would be taken
       // care of in the final complete multipart upload. AWS S3 behavior is
@@ -154,8 +164,8 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
           .build();
 
       omKeyInfo = new OmKeyInfo.Builder()
-          .setVolumeName(keyArgs.getVolumeName())
-          .setBucketName(keyArgs.getBucketName())
+          .setVolumeName(volumeName)
+          .setBucketName(bucketName)
           .setKeyName(keyArgs.getKeyName())
           .setCreationTime(keyArgs.getModificationTime())
           .setModificationTime(keyArgs.getModificationTime())
@@ -180,8 +190,8 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
           new S3InitiateMultipartUploadResponse(
               omResponse.setInitiateMultiPartUploadResponse(
                   MultipartInfoInitiateResponse.newBuilder()
-                      .setVolumeName(volumeName)
-                      .setBucketName(bucketName)
+                      .setVolumeName(requestedVolume)
+                      .setBucketName(requestedBucket)
                       .setKeyName(keyName)
                       .setMultipartUploadID(keyArgs.getMultipartUploadID()))
                   .build(), multipartKeyInfo, omKeyInfo);
@@ -196,14 +206,14 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
       addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
           ozoneManagerDoubleBufferHelper);
       if (acquiredBucketLock) {
-        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
+            volumeName, bucketName);
       }
     }
 
     // audit log
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
-        OMAction.INITIATE_MULTIPART_UPLOAD, buildKeyArgsAuditMap(keyArgs),
+        OMAction.INITIATE_MULTIPART_UPLOAD, auditMap,
         exception, getOmRequest().getUserInfo()));
 
     switch (result) {
@@ -217,6 +227,7 @@ public class S3InitiateMultipartUploadRequest extends OMKeyRequest {
       LOG.error("S3 InitiateMultipart Upload request for Key {} in " +
               "Volume/Bucket {}/{} is failed", keyName, volumeName, bucketName,
           exception);
+      break;
     default:
       LOG.error("Unrecognized Result for S3InitiateMultipartUploadRequest: {}",
           multipartInfoInitiateRequest);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
index 4518a3b..0726fe4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
 import java.io.IOException;
+import java.util.Map;
 
 import com.google.common.base.Optional;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
@@ -85,9 +86,12 @@ public class S3MultipartUploadAbortRequest extends OMKeyRequest {
         .getAbortMultiPartUploadRequest();
     OzoneManagerProtocolProtos.KeyArgs keyArgs = multipartUploadAbortRequest
         .getKeyArgs();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
 
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
+    final String requestedVolume = volumeName;
+    final String requestedBucket = bucketName;
     String keyName = keyArgs.getKeyName();
 
     ozoneManager.getMetrics().incNumAbortMultipartUploads();
@@ -101,15 +105,19 @@ public class S3MultipartUploadAbortRequest extends OMKeyRequest {
     OMClientResponse omClientResponse = null;
     Result result = null;
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // TODO to support S3 ACL later.
       acquiredLock =
-          omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
-              bucketName);
+          omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
+              volumeName, bucketName);
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
-      multipartKey = omMetadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, keyArgs.getMultipartUploadID());
+      multipartKey = omMetadataManager.getMultipartKey(
+          volumeName, bucketName, keyName, keyArgs.getMultipartUploadID());
 
       OmKeyInfo omKeyInfo =
           omMetadataManager.getOpenKeyTable().get(multipartKey);
@@ -118,7 +126,7 @@ public class S3MultipartUploadAbortRequest extends OMKeyRequest {
       // upload initiated for this key.
       if (omKeyInfo == null) {
         throw new OMException("Abort Multipart Upload Failed: volume: " +
-            volumeName + "bucket: " + bucketName + "key: " + keyName,
+            requestedVolume + "bucket: " + requestedBucket + "key: " + keyName,
             OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
       }
 
@@ -152,14 +160,14 @@ public class S3MultipartUploadAbortRequest extends OMKeyRequest {
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
       if (acquiredLock) {
-        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
+            volumeName, bucketName);
       }
     }
 
     // audit log
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
-        OMAction.ABORT_MULTIPART_UPLOAD, buildKeyArgsAuditMap(keyArgs),
+        OMAction.ABORT_MULTIPART_UPLOAD, auditMap,
         exception, getOmRequest().getUserInfo()));
 
     switch (result) {
@@ -173,6 +181,7 @@ public class S3MultipartUploadAbortRequest extends OMKeyRequest {
       LOG.error("Abort Multipart request is failed for KeyName {} in " +
               "VolumeName/Bucket {}/{}", keyName, volumeName, bucketName,
           exception);
+      break;
     default:
       LOG.error("Unrecognized Result for S3MultipartUploadAbortRequest: {}",
           multipartUploadAbortRequest);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
index 346ff87..283a22d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
@@ -89,6 +89,7 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
         getOmRequest().getCommitMultiPartUploadRequest();
 
     KeyArgs keyArgs = multipartCommitUploadPartRequest.getKeyArgs();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
 
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
@@ -111,6 +112,10 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
     OmMultipartKeyInfo multipartKeyInfo = null;
     Result result = null;
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
       // TODO to support S3 ACL later.
       acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
           volumeName, bucketName);
@@ -118,16 +123,19 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
       String uploadID = keyArgs.getMultipartUploadID();
-      multipartKey = omMetadataManager.getMultipartKey(volumeName,
-          bucketName, keyName, uploadID);
+      multipartKey = omMetadataManager.getMultipartKey(volumeName, bucketName,
+          keyName, uploadID);
 
       multipartKeyInfo = omMetadataManager.getMultipartInfoTable()
           .get(multipartKey);
 
       long clientID = multipartCommitUploadPartRequest.getClientID();
 
-      openKey = omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
-          clientID);
+      openKey = omMetadataManager.getOpenKey(
+          volumeName, bucketName, keyName, clientID);
+
+      String ozoneKey = omMetadataManager.getOzoneKey(
+          volumeName, bucketName, keyName);
 
       omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
 
@@ -147,8 +155,7 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
       // Set the UpdateID to current transactionLogIndex
       omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
 
-      partName = omMetadataManager.getOzoneKey(volumeName, bucketName,
-          keyName) + clientID;
+      partName = ozoneKey + clientID;
 
       if (multipartKeyInfo == null) {
         // This can occur when user started uploading part by the time commit
@@ -217,15 +224,19 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
       if (acquiredLock) {
-        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
+            volumeName, bucketName);
       }
     }
 
     // audit log
+    // Add MPU related information.
+    auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NUMBER,
+        String.valueOf(keyArgs.getMultipartNumber()));
+    auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NAME, partName);
     auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
         OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY,
-        buildAuditMap(keyArgs, partName), exception,
+        auditMap, exception,
         getOmRequest().getUserInfo()));
 
     switch (result) {
@@ -236,7 +247,8 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
     case FAILURE:
       ozoneManager.getMetrics().incNumCommitMultipartUploadPartFails();
       LOG.error("MultipartUpload Commit is failed for Key:{} in " +
-          "Volume/Bucket {}/{}", keyName, volumeName, bucketName, exception);
+          "Volume/Bucket {}/{}", keyName, volumeName, bucketName,
+          exception);
       break;
     default:
       LOG.error("Unrecognized Result for S3MultipartUploadCommitPartRequest: " +
@@ -246,15 +258,5 @@ public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
     return omClientResponse;
   }
 
-  private Map<String, String> buildAuditMap(KeyArgs keyArgs, String partName) {
-    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
-
-    // Add MPU related information.
-    auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NUMBER,
-        String.valueOf(keyArgs.getMultipartNumber()));
-    auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NAME, partName);
-
-    return auditMap;
-  }
 }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index c4e315c..a9aefa0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -96,19 +96,19 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
 
     List<OzoneManagerProtocolProtos.Part> partsList =
         multipartUploadCompleteRequest.getPartsListList();
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
 
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
+    final String requestedVolume = volumeName;
+    final String requestedBucket = bucketName;
     String keyName = keyArgs.getKeyName();
     String uploadID = keyArgs.getMultipartUploadID();
+    String multipartKey = null;
 
     ozoneManager.getMetrics().incNumCompleteMultipartUploads();
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
-        bucketName, keyName, uploadID);
-    String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
 
     boolean acquiredLock = false;
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
@@ -117,6 +117,13 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
     IOException exception = null;
     Result result = null;
     try {
+      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
+      volumeName = keyArgs.getVolumeName();
+      bucketName = keyArgs.getBucketName();
+
+      multipartKey = omMetadataManager.getMultipartKey(volumeName,
+          bucketName, keyName, uploadID);
+
       // TODO to support S3 ACL later.
 
       acquiredLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
@@ -124,12 +131,15 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
 
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
+      String ozoneKey = omMetadataManager.getOzoneKey(
+          volumeName, bucketName, keyName);
+
       OmMultipartKeyInfo multipartKeyInfo = omMetadataManager
           .getMultipartInfoTable().get(multipartKey);
 
       if (multipartKeyInfo == null) {
-        throw new OMException("Complete Multipart Upload Failed: volume: " +
-            volumeName + "bucket: " + bucketName + "key: " + keyName,
+        throw new OMException(
+            failureMessage(requestedVolume, requestedBucket, keyName),
             OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
       }
       TreeMap<Integer, PartKeyInfo> partKeyInfoMap =
@@ -140,8 +150,8 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
           LOG.error("Complete MultipartUpload failed for key {} , MPU Key has" +
                   " no parts in OM, parts given to upload are {}", ozoneKey,
               partsList);
-          throw new OMException("Complete Multipart Upload Failed: volume: " +
-              volumeName + "bucket: " + bucketName + "key: " + keyName,
+          throw new OMException(
+              failureMessage(requestedVolume, requestedBucket, keyName),
               OMException.ResultCodes.INVALID_PART);
         }
 
@@ -157,9 +167,9 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
                     "partNumber at index {} is {} for ozonekey is " +
                     "{}", i, currentPartNumber, i - 1, prevPartNumber,
                 ozoneKey);
-            throw new OMException("Complete Multipart Upload Failed: volume: " +
-                volumeName + "bucket: " + bucketName + "key: " + keyName +
-                "because parts are in Invalid order.",
+            throw new OMException(
+                failureMessage(requestedVolume, requestedBucket, keyName) +
+                " because parts are in Invalid order.",
                 OMException.ResultCodes.INVALID_PART_ORDER);
           }
           prevPartNumber = currentPartNumber;
@@ -182,10 +192,10 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
               !partName.equals(partKeyInfo.getPartName())) {
             String omPartName = partKeyInfo == null ? null :
                 partKeyInfo.getPartName();
-            throw new OMException("Complete Multipart Upload Failed: volume: " +
-                volumeName + "bucket: " + bucketName + "key: " + keyName +
+            throw new OMException(
+                failureMessage(requestedVolume, requestedBucket, keyName) +
                 ". Provided Part info is { " + partName + ", " + partNumber +
-                "}, where as OM has partName " + omPartName,
+                "}, whereas OM has partName " + omPartName,
                 OMException.ResultCodes.INVALID_PART);
           }
 
@@ -200,9 +210,9 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
                       partKeyInfo.getPartNumber(),
                       currentPartKeyInfo.getDataSize(),
                       OzoneConsts.OM_MULTIPART_MIN_SIZE);
-              throw new OMException("Complete Multipart Upload Failed: " +
-                  "Entity too small: volume: " + volumeName + "bucket: " +
-                  bucketName + "key: " + keyName,
+              throw new OMException(
+                  failureMessage(requestedVolume, requestedBucket, keyName) +
+                  ". Entity too small.",
                   OMException.ResultCodes.ENTITY_TOO_SMALL);
             }
           }
@@ -275,8 +285,8 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
 
         omResponse.setCompleteMultiPartUploadResponse(
             MultipartUploadCompleteResponse.newBuilder()
-                .setVolume(volumeName)
-                .setBucket(bucketName)
+                .setVolume(requestedVolume)
+                .setBucket(requestedBucket)
                 .setKey(keyName)
                 .setHash(DigestUtils.sha256Hex(keyName)));
 
@@ -285,9 +295,9 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
 
         result = Result.SUCCESS;
       } else {
-        throw new OMException("Complete Multipart Upload Failed: volume: " +
-            volumeName + "bucket: " + bucketName + "key: " + keyName +
-            "because of empty part list",
+        throw new OMException(
+            failureMessage(requestedVolume, requestedBucket, keyName) +
+            " because of empty part list",
             OMException.ResultCodes.INVALID_REQUEST);
       }
 
@@ -300,12 +310,11 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
       if (acquiredLock) {
-        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK,
+            volumeName, bucketName);
       }
     }
 
-    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
     auditMap.put(OzoneConsts.MULTIPART_LIST, partsList.toString());
 
     // audit log
@@ -315,13 +324,15 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
 
     switch (result) {
     case SUCCESS:
-      LOG.debug("MultipartUpload Complete request is successfull for Key: {} " +
+      LOG.debug("MultipartUpload Complete request is successful for Key: {} " +
           "in Volume/Bucket {}/{}", keyName, volumeName, bucketName);
       break;
     case FAILURE:
       ozoneManager.getMetrics().incNumCompleteMultipartUploadFails();
       LOG.error("MultipartUpload Complete request failed for Key: {} " +
-          "in Volume/Bucket {}/{}", keyName, volumeName, bucketName, exception);
+          "in Volume/Bucket {}/{}", keyName, volumeName, bucketName,
+          exception);
+      break;
     default:
       LOG.error("Unrecognized Result for S3MultipartUploadCommitRequest: {}",
           multipartUploadCompleteRequest);
@@ -330,6 +341,12 @@ public class S3MultipartUploadCompleteRequest extends OMKeyRequest {
     return omClientResponse;
   }
 
+  private static String failureMessage(String volume, String bucket,
+      String keyName) {
+    return "Complete Multipart Upload Failed: volume: " +
+        volume + " bucket: " + bucket + " key: " + keyName;
+  }
+
   private void updateCache(OMMetadataManager omMetadataManager,
       String ozoneKey, String multipartKey, OmKeyInfo omKeyInfo,
       long transactionLogIndex) {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
index b714375..c09bf86 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.ozone.om.request.file;
 import java.util.UUID;
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -86,6 +88,8 @@ public class TestOMDirectoryCreateRequest {
     auditLogger = Mockito.mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+    when(ozoneManager.resolveBucketLink(any(KeyArgs.class)))
+        .thenReturn(new ResolvedBucket(Pair.of("", ""), Pair.of("", "")));
   }
 
   @After
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 49794a1..dd6caf4 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -22,7 +22,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -150,6 +153,11 @@ public class TestOMKeyRequest {
     clientID = Time.now();
     dataSize = 1000L;
 
+    Pair<String, String> volumeAndBucket = Pair.of(volumeName, bucketName);
+    when(ozoneManager.resolveBucketLink(any(KeyArgs.class)))
+        .thenReturn(new ResolvedBucket(volumeAndBucket, volumeAndBucket));
+    when(ozoneManager.resolveBucketLink(any(Pair.class)))
+        .thenReturn(new ResolvedBucket(volumeAndBucket, volumeAndBucket));
   }
 
   @After
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 9950027..0271a7a 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -29,6 +29,7 @@ import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
@@ -37,6 +38,8 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ResolvedBucket;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
@@ -79,6 +82,13 @@ public class TestS3MultipartRequest {
     auditLogger = Mockito.mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+    when(ozoneManager.resolveBucketLink(any(KeyArgs.class)))
+        .thenAnswer(inv -> {
+          KeyArgs args = (KeyArgs) inv.getArguments()[0];
+          return new ResolvedBucket(
+              Pair.of(args.getVolumeName(), args.getBucketName()),
+              Pair.of(args.getVolumeName(), args.getBucketName()));
+        });
   }
 
 
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
index e723df7..25dea3d 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
@@ -477,9 +477,7 @@ public class BasicOzoneFileSystem extends FileSystem {
       result = innerDelete(f, recursive);
     } else {
       LOG.debug("delete: Path is a file: {}", f);
-      List<String> keyList = new ArrayList<>();
-      keyList.add(key);
-      result = adapter.deleteObjects(keyList);
+      result = adapter.deleteObject(key);
     }
 
     if (result) {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/BucketCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/BucketCommands.java
index ea4ec70..2d80060 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/BucketCommands.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/BucketCommands.java
@@ -38,6 +38,7 @@ import picocli.CommandLine.ParentCommand;
         InfoBucketHandler.class,
         ListBucketHandler.class,
         CreateBucketHandler.class,
+        LinkBucketHandler.class,
         DeleteBucketHandler.class,
         AddAclBucketHandler.class,
         RemoveAclBucketHandler.class,
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/LinkBucketHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/LinkBucketHandler.java
new file mode 100644
index 0000000..6671f2d
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/bucket/LinkBucketHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.shell.bucket;
+
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.shell.Handler;
+import org.apache.hadoop.ozone.shell.OzoneAddress;
+
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Parameters;
+
+import java.io.IOException;
+
+/**
+ * Creates a symlink to another bucket.
+ */
+@Command(name = "link",
+    description = "creates a symlink to another bucket")
+public class LinkBucketHandler extends Handler {
+
+  @Parameters(index = "0", arity = "1..1",
+      description = "The bucket which the link should point to.",
+      converter = BucketUri.class)
+  private OzoneAddress source;
+
+  @Parameters(index = "1", arity = "1..1",
+      description = "Address of the link bucket",
+      converter = BucketUri.class)
+  private OzoneAddress target;
+
+  @Override
+  protected OzoneAddress getAddress() {
+    return source;
+  }
+
+  /**
+   * Executes create bucket.
+   */
+  @Override
+  public void execute(OzoneClient client, OzoneAddress address)
+      throws IOException {
+
+    BucketArgs.Builder bb = new BucketArgs.Builder()
+        .setStorageType(StorageType.DEFAULT)
+        .setVersioning(false)
+        .setSourceVolume(source.getVolumeName())
+        .setSourceBucket(source.getBucketName());
+
+    String volumeName = target.getVolumeName();
+    String bucketName = target.getBucketName();
+
+    OzoneVolume vol = client.getObjectStore().getVolume(volumeName);
+    vol.createBucket(bucketName, bb.build());
+
+    if (isVerbose()) {
+      OzoneBucket bucket = vol.getBucket(bucketName);
+      printObjectAsJson(bucket);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org