You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2019/09/13 05:07:28 UTC

[hbase] branch branch-2 updated: HBASE-22760 : Pause/Resume/Query Snapshot Auto Cleanup Activity (#618)

This is an automated email from the ASF dual-hosted git repository.

anoopsamjohn pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6356885  HBASE-22760 : Pause/Resume/Query Snapshot Auto Cleanup Activity (#618)
6356885 is described below

commit 63568854b6399fa528847cbe57c809ccb5e21b55
Author: Viraj Jasani <vi...@gmail.com>
AuthorDate: Fri Sep 13 10:37:22 2019 +0530

    HBASE-22760 : Pause/Resume/Query Snapshot Auto Cleanup Activity (#618)
---
 .../java/org/apache/hadoop/hbase/client/Admin.java |  22 ++++
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java |  23 +++++
 .../hadoop/hbase/client/AsyncHBaseAdmin.java       |  12 +++
 .../hbase/client/ConnectionImplementation.java     |  14 +++
 .../org/apache/hadoop/hbase/client/HBaseAdmin.java |  34 +++++++
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  28 ++++++
 .../hbase/client/ShortCircuitMasterConnection.java |  19 ++++
 .../hbase/shaded/protobuf/RequestConverter.java    |  31 ++++++
 .../apache/hadoop/hbase/zookeeper/ZNodePaths.java  |  38 +++++--
 .../java/org/apache/hadoop/hbase/HConstants.java   |   2 -
 .../src/main/protobuf/Master.proto                 |  28 ++++++
 .../src/main/protobuf/SnapshotCleanup.proto        |  31 ++++++
 .../org/apache/hadoop/hbase/master/HMaster.java    |  49 +++++++--
 .../hadoop/hbase/master/MasterRpcServices.java     |  57 +++++++++++
 .../org/apache/hadoop/hbase/client/TestAdmin2.java |  27 +++++
 .../master/cleaner/TestSnapshotCleanerChore.java   |   1 -
 .../master/cleaner/TestSnapshotFromMaster.java     | 103 +++++++++++++++++++
 .../hbase/snapshot/SnapshotTestingUtils.java       |  23 +++++
 hbase-shell/src/main/ruby/hbase/admin.rb           |  16 +++
 hbase-shell/src/main/ruby/shell.rb                 |   2 +
 .../shell/commands/snapshot_cleanup_enabled.rb     |  39 +++++++
 .../ruby/shell/commands/snapshot_cleanup_switch.rb |  43 ++++++++
 hbase-shell/src/test/ruby/hbase/admin_test.rb      |  14 +++
 .../hadoop/hbase/thrift2/client/ThriftAdmin.java   |  10 ++
 .../hbase/zookeeper/SnapshotCleanupTracker.java    | 112 +++++++++++++++++++++
 src/main/asciidoc/_chapters/ops_mgt.adoc           |  38 ++++++-
 26 files changed, 793 insertions(+), 23 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 35e1a95..c38d9c4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -3113,4 +3113,26 @@ public interface Admin extends Abortable, Closeable {
   default List<Boolean> hasUserPermissions(List<Permission> permissions) throws IOException {
     return hasUserPermissions(null, permissions);
   }
+
+  /**
+   * Turn on or off the auto snapshot cleanup based on TTL.
+   *
+   * @param on Set to <code>true</code> to enable, <code>false</code> to disable.
+   * @param synchronous If <code>true</code>, it waits until current snapshot cleanup is completed,
+   *   if outstanding.
+   * @return Previous auto snapshot cleanup value
+   * @throws IOException if a remote or network exception occurs
+   */
+  boolean snapshotCleanupSwitch(final boolean on, final boolean synchronous)
+      throws IOException;
+
+  /**
+   * Query the current state of the auto snapshot cleanup based on TTL.
+   *
+   * @return <code>true</code> if the auto snapshot cleanup is enabled,
+   *   <code>false</code> otherwise.
+   * @throws IOException if a remote or network exception occurs
+   */
+  boolean isSnapshotCleanupEnabled() throws IOException;
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 78ed6d3..7493dfb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -1462,4 +1462,27 @@ public interface AsyncAdmin {
   default CompletableFuture<List<Boolean>> hasUserPermissions(List<Permission> permissions) {
     return hasUserPermissions(null, permissions);
   }
+
+  /**
+   * Turn on or off the auto snapshot cleanup based on TTL.
+   * <p/>
+   * Notice that, the method itself is always non-blocking, which means it will always return
+   * immediately. The {@code sync} parameter only effects when will we complete the returned
+   * {@link CompletableFuture}.
+   *
+   * @param on Set to <code>true</code> to enable, <code>false</code> to disable.
+   * @param sync If <code>true</code>, it waits until current snapshot cleanup is completed,
+   *   if outstanding.
+   * @return Previous auto snapshot cleanup value wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> snapshotCleanupSwitch(boolean on, boolean sync);
+
+  /**
+   * Query the current state of the auto snapshot cleanup based on TTL.
+   *
+   * @return true if the auto snapshot cleanup is enabled, false otherwise.
+   *   The return value will be wrapped by a {@link CompletableFuture}.
+   */
+  CompletableFuture<Boolean> isSnapshotCleanupEnabled();
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index c90e383..3b51279 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -824,4 +824,16 @@ class AsyncHBaseAdmin implements AsyncAdmin {
       List<Permission> permissions) {
     return wrap(rawAdmin.hasUserPermissions(userName, permissions));
   }
+
+  @Override
+  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
+      final boolean sync) {
+    return wrap(rawAdmin.snapshotCleanupSwitch(on, sync));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
+    return wrap(rawAdmin.isSnapshotCleanupEnabled());
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 7f78ba0..39f7abf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1506,6 +1506,20 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       }
 
       @Override
+      public MasterProtos.SetSnapshotCleanupResponse switchSnapshotCleanup(
+          RpcController controller, MasterProtos.SetSnapshotCleanupRequest request)
+          throws ServiceException {
+        return stub.switchSnapshotCleanup(controller, request);
+      }
+
+      @Override
+      public MasterProtos.IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
+          RpcController controller, MasterProtos.IsSnapshotCleanupEnabledRequest request)
+          throws ServiceException {
+        return stub.isSnapshotCleanupEnabled(controller, request);
+      }
+
+      @Override
       public MasterProtos.ExecProcedureResponse execProcedure(
           RpcController controller, MasterProtos.ExecProcedureRequest request)
           throws ServiceException {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index cd88890..0b2be19 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -183,6 +183,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMainte
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+  .IsSnapshotCleanupEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
@@ -206,6 +208,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSna
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
@@ -4324,4 +4327,35 @@ public class HBaseAdmin implements Admin {
         }
       });
   }
+
+  @Override
+  public boolean snapshotCleanupSwitch(boolean on, boolean synchronous) throws IOException {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(),
+        getRpcControllerFactory()) {
+
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        SetSnapshotCleanupRequest req =
+          RequestConverter.buildSetSnapshotCleanupRequest(on, synchronous);
+        return master.switchSnapshotCleanup(getRpcController(), req).getPrevSnapshotCleanup();
+      }
+    });
+
+  }
+
+  @Override
+  public boolean isSnapshotCleanupEnabled() throws IOException {
+    return executeCallable(new MasterCallable<Boolean>(getConnection(),
+        getRpcControllerFactory()) {
+
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        IsSnapshotCleanupEnabledRequest req =
+          RequestConverter.buildIsSnapshotCleanupEnabledRequest();
+        return master.isSnapshotCleanupEnabled(getRpcController(), req).getEnabled();
+      }
+    });
+
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index d6934f4..6a0099f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -205,6 +205,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .IsSnapshotCleanupEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
@@ -255,6 +257,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .SetSnapshotCleanupResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
@@ -3867,4 +3871,28 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
               resp -> resp.getHasUserPermissionList()))
         .call();
   }
+
+  @Override
+  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on,
+      final boolean sync) {
+    return this.<Boolean>newMasterCaller()
+        .action((controller, stub) -> this
+            .call(controller, stub,
+                RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
+                MasterService.Interface::switchSnapshotCleanup,
+                SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
+        .call();
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
+    return this.<Boolean>newMasterCaller()
+        .action((controller, stub) -> this
+            .call(controller, stub,
+                RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
+                MasterService.Interface::isSnapshotCleanupEnabled,
+                IsSnapshotCleanupEnabledResponse::getEnabled))
+        .call();
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
index 842eab6..8d9c9b7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
@@ -100,6 +100,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+  .IsSnapshotCleanupEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+  .IsSnapshotCleanupEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
@@ -150,6 +154,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
@@ -267,6 +273,19 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
   }
 
   @Override
+  public SetSnapshotCleanupResponse switchSnapshotCleanup(RpcController controller,
+      SetSnapshotCleanupRequest request) throws ServiceException {
+    return stub.switchSnapshotCleanup(controller, request);
+  }
+
+  @Override
+  public IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
+      RpcController controller, IsSnapshotCleanupEnabledRequest request)
+      throws ServiceException {
+    return stub.isSnapshotCleanupEnabled(controller, request);
+  }
+
+  @Override
   public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
       RemoveReplicationPeerRequest request) throws ServiceException {
     return stub.removeReplicationPeer(controller, request);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index c60a4c6..2a30249 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -120,6 +120,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJ
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .IsSnapshotCleanupEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
@@ -134,6 +136,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleaner
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .SetSnapshotCleanupRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetTableStateInMetaRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
@@ -1891,4 +1895,31 @@ public final class RequestConverter {
         map(r -> buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME, Bytes.toBytes(r))).
         collect(Collectors.toList());
   }
+
+
+  /**
+   * Creates SetSnapshotCleanupRequest for turning on/off auto snapshot cleanup
+   *
+   * @param enabled Set to <code>true</code> to enable,
+   *   <code>false</code> to disable.
+   * @param synchronous If <code>true</code>, it waits until current snapshot cleanup is completed,
+   *   if outstanding.
+   * @return a SetSnapshotCleanupRequest
+   */
+  public static SetSnapshotCleanupRequest buildSetSnapshotCleanupRequest(
+    final boolean enabled, final boolean synchronous) {
+    return SetSnapshotCleanupRequest.newBuilder().setEnabled(enabled).setSynchronous(synchronous)
+      .build();
+  }
+
+  /**
+   * Creates IsSnapshotCleanupEnabledRequest to determine if auto snapshot cleanup
+   * based on TTL expiration is turned on
+   *
+   * @return IsSnapshotCleanupEnabledRequest
+   */
+  public static IsSnapshotCleanupEnabledRequest buildIsSnapshotCleanupEnabledRequest() {
+    return IsSnapshotCleanupEnabledRequest.newBuilder().build();
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
index b9a3412..c5e510f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
@@ -41,6 +41,7 @@ public class ZNodePaths {
   public static final char ZNODE_PATH_SEPARATOR = '/';
 
   public final static String META_ZNODE_PREFIX = "meta-region-server";
+  private static final String DEFAULT_SNAPSHOT_CLEANUP_ZNODE = "snapshot-cleanup";
 
   // base znode for this cluster
   public final String baseZNode;
@@ -89,6 +90,8 @@ public class ZNodePaths {
   public final String queuesZNode;
   // znode containing queues of hfile references to be replicated
   public final String hfileRefsZNode;
+  // znode containing the state of the snapshot auto-cleanup
+  final String snapshotCleanupZNode;
 
   public ZNodePaths(Configuration conf) {
     baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
@@ -123,20 +126,35 @@ public class ZNodePaths {
     queuesZNode = joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
     hfileRefsZNode = joinZNode(replicationZNode,
       conf.get("zookeeper.znode.replication.hfile.refs", "hfile-refs"));
+    snapshotCleanupZNode = joinZNode(baseZNode,
+        conf.get("zookeeper.znode.snapshot.cleanup", DEFAULT_SNAPSHOT_CLEANUP_ZNODE));
   }
 
   @Override
   public String toString() {
-    return "ZNodePaths [baseZNode=" + baseZNode + ", metaReplicaZNodes=" + metaReplicaZNodes
-        + ", rsZNode=" + rsZNode + ", drainingZNode=" + drainingZNode + ", masterAddressZNode="
-        + masterAddressZNode + ", backupMasterAddressesZNode=" + backupMasterAddressesZNode
-        + ", clusterStateZNode=" + clusterStateZNode + ", tableZNode=" + tableZNode
-        + ", clusterIdZNode=" + clusterIdZNode + ", splitLogZNode=" + splitLogZNode
-        + ", balancerZNode=" + balancerZNode + ", regionNormalizerZNode=" + regionNormalizerZNode
-        + ", switchZNode=" + switchZNode + ", tableLockZNode=" + tableLockZNode
-        + ", namespaceZNode=" + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode
-        + ", replicationZNode=" + replicationZNode + ", peersZNode=" + peersZNode
-        + ", queuesZNode=" + queuesZNode + ", hfileRefsZNode=" + hfileRefsZNode + "]";
+    return new StringBuilder()
+        .append("ZNodePaths [baseZNode=").append(baseZNode)
+        .append(", metaReplicaZNodes=").append(metaReplicaZNodes)
+        .append(", rsZNode=").append(rsZNode)
+        .append(", drainingZNode=").append(drainingZNode)
+        .append(", masterAddressZNode=").append(masterAddressZNode)
+        .append(", backupMasterAddressesZNode=").append(backupMasterAddressesZNode)
+        .append(", clusterStateZNode=").append(clusterStateZNode)
+        .append(", tableZNode=").append(tableZNode)
+        .append(", clusterIdZNode=").append(clusterIdZNode)
+        .append(", splitLogZNode=").append(splitLogZNode)
+        .append(", balancerZNode=").append(balancerZNode)
+        .append(", regionNormalizerZNode=").append(regionNormalizerZNode)
+        .append(", switchZNode=").append(switchZNode)
+        .append(", tableLockZNode=").append(tableLockZNode)
+        .append(", namespaceZNode=").append(namespaceZNode)
+        .append(", masterMaintZNode=").append(masterMaintZNode)
+        .append(", replicationZNode=").append(replicationZNode)
+        .append(", peersZNode=").append(peersZNode)
+        .append(", queuesZNode=").append(queuesZNode)
+        .append(", hfileRefsZNode=").append(hfileRefsZNode)
+        .append(", snapshotCleanupZNode=").append(snapshotCleanupZNode)
+        .append("]").toString();
   }
 
   /**
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index f7fcccd..209dec4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1501,8 +1501,6 @@ public final class HConstants {
   // User defined Default TTL config key
   public static final String DEFAULT_SNAPSHOT_TTL_CONFIG_KEY = "hbase.master.snapshot.ttl";
 
-  public static final String SNAPSHOT_CLEANER_DISABLE = "hbase.master.cleaner.snapshot.disable";
-
   /**
    * Configurations for master executor services.
    */
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index 15783b5..ce1d08a 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -318,6 +318,22 @@ enum MasterSwitchType {
   MERGE = 1;
 }
 
+message SetSnapshotCleanupRequest {
+  required bool enabled = 1;
+  optional bool synchronous = 2;
+}
+
+message SetSnapshotCleanupResponse {
+  required bool prev_snapshot_cleanup = 1;
+}
+
+message IsSnapshotCleanupEnabledRequest {
+}
+
+message IsSnapshotCleanupEnabledResponse {
+  required bool enabled = 1;
+}
+
 message SetSplitOrMergeEnabledRequest {
   required bool enabled = 1;
   optional bool synchronous = 2;
@@ -897,6 +913,18 @@ service MasterService {
   rpc RestoreSnapshot(RestoreSnapshotRequest) returns(RestoreSnapshotResponse);
 
   /**
+   * Turn on/off snapshot auto-cleanup based on TTL expiration
+   */
+  rpc SwitchSnapshotCleanup (SetSnapshotCleanupRequest)
+    returns (SetSnapshotCleanupResponse);
+
+  /**
+   * Determine if snapshot auto-cleanup based on TTL expiration is turned on
+   */
+  rpc IsSnapshotCleanupEnabled (IsSnapshotCleanupEnabledRequest)
+    returns (IsSnapshotCleanupEnabledResponse);
+
+  /**
    * Execute a distributed procedure.
    */
   rpc ExecProcedure(ExecProcedureRequest) returns(ExecProcedureResponse);
diff --git a/hbase-protocol-shaded/src/main/protobuf/SnapshotCleanup.proto b/hbase-protocol-shaded/src/main/protobuf/SnapshotCleanup.proto
new file mode 100644
index 0000000..154b0d6
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/SnapshotCleanup.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+// This file contains protocol buffers to represent the state of the snapshot auto cleanup based on TTL
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "SnapshotCleanupProtos";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message SnapshotCleanupState {
+    required bool snapshot_cleanup_enabled = 1;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index b661b53..1f76a03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -203,6 +203,7 @@ import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -313,6 +314,8 @@ public class HMaster extends HRegionServer implements MasterServices {
   MetaLocationSyncer metaLocationSyncer;
   // Tracker for active master location, if any client ZK quorum specified
   MasterAddressSyncer masterAddressSyncer;
+  // Tracker for auto snapshot cleanup state
+  SnapshotCleanupTracker snapshotCleanupTracker;
 
   // Tracker for split and merge state
   private SplitOrMergeTracker splitOrMergeTracker;
@@ -770,6 +773,9 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
     this.drainingServerTracker.start();
 
+    this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this);
+    this.snapshotCleanupTracker.start();
+
     String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
     boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
       HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
@@ -1443,15 +1449,15 @@ public class HMaster extends HRegionServer implements MasterServices {
       replicationPeerManager);
     getChoreService().scheduleChore(replicationBarrierCleaner);
 
-    final boolean isSnapshotChoreDisabled = conf.getBoolean(HConstants.SNAPSHOT_CLEANER_DISABLE,
-        false);
-    if (isSnapshotChoreDisabled) {
+    final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker
+        .isSnapshotCleanupEnabled();
+    this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
+    if (isSnapshotChoreEnabled) {
+      getChoreService().scheduleChore(this.snapshotCleanerChore);
+    } else {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Snapshot Cleaner Chore is disabled. Not starting up the chore..");
       }
-    } else {
-      this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
-      getChoreService().scheduleChore(this.snapshotCleanerChore);
     }
     serviceStarted = true;
     if (LOG.isTraceEnabled()) {
@@ -1544,6 +1550,37 @@ public class HMaster extends HRegionServer implements MasterServices {
     procedureExecutor.startWorkers();
   }
 
+  /**
+   * Turn on/off Snapshot Cleanup Chore
+   *
+   * @param on indicates whether Snapshot Cleanup Chore is to be run
+   */
+  void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
+    if (synchronous) {
+      synchronized (this.snapshotCleanerChore) {
+        switchSnapshotCleanup(on);
+      }
+    } else {
+      switchSnapshotCleanup(on);
+    }
+  }
+
+  private void switchSnapshotCleanup(final boolean on) {
+    try {
+      snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
+      if (on) {
+        if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) {
+          getChoreService().scheduleChore(this.snapshotCleanerChore);
+        }
+      } else {
+        getChoreService().cancelChore(this.snapshotCleanerChore);
+      }
+    } catch (KeeperException e) {
+      LOG.error("Error updating snapshot cleanup mode to {}", on, e);
+    }
+  }
+
+
   private void stopProcedureExecutor() {
     if (procedureExecutor != null) {
       configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 5c18d7f..fb83bc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -219,6 +219,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormaliz
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .IsSnapshotCleanupEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .IsSnapshotCleanupEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
@@ -271,6 +275,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .SetSnapshotCleanupRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .SetSnapshotCleanupResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetTableStateInMetaRequest;
@@ -1482,6 +1490,55 @@ public class MasterRpcServices extends RSRpcServices
   }
 
   @Override
+  public SetSnapshotCleanupResponse switchSnapshotCleanup(
+      RpcController controller, SetSnapshotCleanupRequest request)
+      throws ServiceException {
+    try {
+      master.checkInitialized();
+      final boolean enabled = request.getEnabled();
+      final boolean isSynchronous = request.hasSynchronous() && request.getSynchronous();
+      final boolean prevSnapshotCleanupRunning = this.switchSnapshotCleanup(enabled, isSynchronous);
+      return SetSnapshotCleanupResponse.newBuilder()
+          .setPrevSnapshotCleanup(prevSnapshotCleanupRunning).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
+      RpcController controller, IsSnapshotCleanupEnabledRequest request)
+      throws ServiceException {
+    try {
+      master.checkInitialized();
+      final boolean isSnapshotCleanupEnabled = master.snapshotCleanupTracker
+          .isSnapshotCleanupEnabled();
+      return IsSnapshotCleanupEnabledResponse.newBuilder()
+          .setEnabled(isSnapshotCleanupEnabled).build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  /**
+   * Turn on/off snapshot auto-cleanup based on TTL
+   *
+   * @param enabledNewVal Set to <code>true</code> to enable, <code>false</code> to disable
+   * @param synchronous If <code>true</code>, it waits until current snapshot cleanup is completed,
+   *   if outstanding
+   * @return previous snapshot auto-cleanup mode
+   */
+  private synchronized boolean switchSnapshotCleanup(final boolean enabledNewVal,
+      final boolean synchronous) {
+    final boolean oldValue = master.snapshotCleanupTracker.isSnapshotCleanupEnabled();
+    master.switchSnapshotCleanup(enabledNewVal, synchronous);
+    LOG.info("{} Successfully set snapshot cleanup to {}", master.getClientIdAuditPrefix(),
+      enabledNewVal);
+    return oldValue;
+  }
+
+
+  @Override
   public RunCatalogScanResponse runCatalogScan(RpcController c,
       RunCatalogScanRequest req) throws ServiceException {
     rpcPreCheck("runCatalogScan");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index 41fc218..8c79e71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -810,4 +810,31 @@ public class TestAdmin2 extends TestAdminBase {
     ADMIN.modifyTable(tableDesc);
     assertEquals(11111111, ADMIN.getDescriptor(tableName).getMaxFileSize());
   }
+
+  @Test
+  public void testSnapshotCleanupAsync() throws Exception {
+    testSnapshotCleanup(false);
+  }
+
+  @Test
+  public void testSnapshotCleanupSync() throws Exception {
+    testSnapshotCleanup(true);
+  }
+
+  private void testSnapshotCleanup(final boolean synchronous) throws IOException {
+    final boolean initialState = ADMIN.isSnapshotCleanupEnabled();
+    // Switch the snapshot auto cleanup state to opposite to initial state
+    boolean prevState = ADMIN.snapshotCleanupSwitch(!initialState, synchronous);
+    // The previous state should be the original state we observed
+    assertEquals(initialState, prevState);
+    // Current state should be opposite of the initial state
+    assertEquals(!initialState, ADMIN.isSnapshotCleanupEnabled());
+    // Reset the state back to what it was initially
+    prevState = ADMIN.snapshotCleanupSwitch(initialState, synchronous);
+    // The previous state should be the opposite of the initial state
+    assertEquals(!initialState, prevState);
+    // Current state should be the original state again
+    assertEquals(initialState, ADMIN.isSnapshotCleanupEnabled());
+  }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotCleanerChore.java
index e05213d..1a28950 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotCleanerChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotCleanerChore.java
@@ -105,7 +105,6 @@ public class TestSnapshotCleanerChore {
     snapshotManager = Mockito.mock(SnapshotManager.class);
     Stoppable stopper = new StoppableImplementation();
     Configuration conf = getSnapshotCleanerConf();
-    conf.setStrings("hbase.master.cleaner.snapshot.disable", "false");
     SnapshotCleanerChore snapshotCleanerChore =
             new SnapshotCleanerChore(stopper, conf, snapshotManager);
     List<SnapshotProtos.SnapshotDescription> snapshotDescriptionList = new ArrayList<>();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index fd183fc..fe98d5e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -74,12 +76,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .IsSnapshotCleanupEnabledRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .IsSnapshotCleanupEnabledResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos
+    .SetSnapshotCleanupRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 
 /**
@@ -142,6 +151,7 @@ public class TestSnapshotFromMaster {
     conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
       ConstantSizeRegionSplitPolicy.class.getName());
     conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
+    conf.setInt("hbase.master.cleaner.snapshot.interval", 500);
   }
 
   @Before
@@ -282,6 +292,89 @@ public class TestSnapshotFromMaster {
     master.getMasterRpcServices().deleteSnapshot(null, request);
   }
 
+  @Test
+  public void testGetCompletedSnapshotsWithCleanup() throws Exception {
+    // Enable auto snapshot cleanup for the cluster
+    SetSnapshotCleanupRequest setSnapshotCleanupRequest =
+        SetSnapshotCleanupRequest.newBuilder().setEnabled(true).build();
+    master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
+
+    // first check when there are no snapshots
+    GetCompletedSnapshotsRequest request = GetCompletedSnapshotsRequest.newBuilder().build();
+    GetCompletedSnapshotsResponse response =
+        master.getMasterRpcServices().getCompletedSnapshots(null, request);
+    assertEquals("Found unexpected number of snapshots", 0, response.getSnapshotsCount());
+
+    // write one snapshot to the fs
+    createSnapshotWithTtl("snapshot_01", 1L);
+    createSnapshotWithTtl("snapshot_02", 10L);
+
+    // check that we get one snapshot
+    response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
+    assertEquals("Found unexpected number of snapshots", 2, response.getSnapshotsCount());
+
+    // check that 1 snapshot is auto cleaned after 1 sec of TTL expiration
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+    response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
+    assertEquals("Found unexpected number of snapshots", 1, response.getSnapshotsCount());
+  }
+
+  @Test
+  public void testGetCompletedSnapshotsWithoutCleanup() throws Exception {
+    // Disable auto snapshot cleanup for the cluster
+    SetSnapshotCleanupRequest setSnapshotCleanupRequest =
+        SetSnapshotCleanupRequest.newBuilder().setEnabled(false).build();
+    master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
+
+    // first check when there are no snapshots
+    GetCompletedSnapshotsRequest request = GetCompletedSnapshotsRequest.newBuilder().build();
+    GetCompletedSnapshotsResponse response =
+        master.getMasterRpcServices().getCompletedSnapshots(null, request);
+    assertEquals("Found unexpected number of snapshots", 0, response.getSnapshotsCount());
+
+    // write one snapshot to the fs
+    createSnapshotWithTtl("snapshot_02", 1L);
+    createSnapshotWithTtl("snapshot_03", 1L);
+
+    // check that we get one snapshot
+    response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
+    assertEquals("Found unexpected number of snapshots", 2, response.getSnapshotsCount());
+
+    // check that no snapshot is auto cleaned even after 1 sec of TTL expiration
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+    response = master.getMasterRpcServices().getCompletedSnapshots(null, request);
+    assertEquals("Found unexpected number of snapshots", 2, response.getSnapshotsCount());
+  }
+
+  @Test
+  public void testSnapshotCleanupStatus() throws Exception {
+    // Enable auto snapshot cleanup for the cluster
+    SetSnapshotCleanupRequest setSnapshotCleanupRequest =
+        SetSnapshotCleanupRequest.newBuilder().setEnabled(true).build();
+    master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
+
+    // Check if auto snapshot cleanup is enabled
+    IsSnapshotCleanupEnabledRequest isSnapshotCleanupEnabledRequest =
+        IsSnapshotCleanupEnabledRequest.newBuilder().build();
+    IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabledResponse =
+        master.getMasterRpcServices().isSnapshotCleanupEnabled(null,
+            isSnapshotCleanupEnabledRequest);
+    Assert.assertTrue(isSnapshotCleanupEnabledResponse.getEnabled());
+
+    // Disable auto snapshot cleanup for the cluster
+    setSnapshotCleanupRequest = SetSnapshotCleanupRequest.newBuilder()
+        .setEnabled(false).build();
+    master.getMasterRpcServices().switchSnapshotCleanup(null, setSnapshotCleanupRequest);
+
+    // Check if auto snapshot cleanup is disabled
+    isSnapshotCleanupEnabledRequest = IsSnapshotCleanupEnabledRequest
+        .newBuilder().build();
+    isSnapshotCleanupEnabledResponse =
+        master.getMasterRpcServices().isSnapshotCleanupEnabled(null,
+            isSnapshotCleanupEnabledRequest);
+    Assert.assertFalse(isSnapshotCleanupEnabledResponse.getEnabled());
+  }
+
   /**
    * Test that the snapshot hfile archive cleaner works correctly. HFiles that are in snapshots
    * should be retained, while those that are not in a snapshot should be deleted.
@@ -428,6 +521,16 @@ public class TestSnapshotFromMaster {
     return builder.getSnapshotDescription();
   }
 
+  private SnapshotDescription createSnapshotWithTtl(final String snapshotName, final long ttl)
+      throws IOException {
+    SnapshotTestingUtils.SnapshotMock snapshotMock =
+        new SnapshotTestingUtils.SnapshotMock(UTIL.getConfiguration(), fs, rootDir);
+    SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder =
+        snapshotMock.createSnapshotV2(snapshotName, "test", 0, ttl);
+    builder.commit();
+    return builder.getSnapshotDescription();
+  }
+
   @Test
   public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
     // Write some data
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
index 75dfc30..d019795 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.FSVisitor;
@@ -671,6 +672,12 @@ public final class SnapshotTestingUtils {
       return createSnapshot(snapshotName, tableName, numRegions, SnapshotManifestV2.DESCRIPTOR_VERSION);
     }
 
+    public SnapshotBuilder createSnapshotV2(final String snapshotName, final String tableName,
+        final int numRegions, final long ttl) throws IOException {
+      return createSnapshot(snapshotName, tableName, numRegions,
+          SnapshotManifestV2.DESCRIPTOR_VERSION, ttl);
+    }
+
     private SnapshotBuilder createSnapshot(final String snapshotName, final String tableName,
         final int version) throws IOException {
       return createSnapshot(snapshotName, tableName, TEST_NUM_REGIONS, version);
@@ -692,6 +699,22 @@ public final class SnapshotTestingUtils {
       return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
     }
 
+    private SnapshotBuilder createSnapshot(final String snapshotName, final String tableName,
+        final int numRegions, final int version, final long ttl) throws IOException {
+      TableDescriptor htd = createHtd(tableName);
+      RegionData[] regions = createTable(htd, numRegions);
+      SnapshotProtos.SnapshotDescription desc = SnapshotProtos.SnapshotDescription.newBuilder()
+          .setTable(htd.getTableName().getNameAsString())
+          .setName(snapshotName)
+          .setVersion(version)
+          .setCreationTime(EnvironmentEdgeManager.currentTime())
+          .setTtl(ttl)
+          .build();
+      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir, conf);
+      SnapshotDescriptionUtils.writeSnapshotInfo(desc, workingDir, fs);
+      return new SnapshotBuilder(conf, fs, rootDir, htd, desc, regions);
+    }
+
     public TableDescriptor createHtd(final String tableName) {
       return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
               .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TEST_FAMILY))
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index b1b4021..e53e970 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -513,6 +513,22 @@ module Hbase
     end
 
     #----------------------------------------------------------------------------------------------
+    # Enable/disable snapshot auto-cleanup based on TTL expiration
+    # Returns previous snapshot auto-cleanup switch setting.
+    def snapshot_cleanup_switch(enable_disable)
+      @admin.snapshotCleanupSwitch(
+        java.lang.Boolean.valueOf(enable_disable), java.lang.Boolean.valueOf(false)
+      )
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Query the current state of the snapshot auto-cleanup based on TTL
+    # Returns the snapshot auto-cleanup state (true if enabled)
+    def snapshot_cleanup_enabled?
+      @admin.isSnapshotCleanupEnabled
+    end
+
+    #----------------------------------------------------------------------------------------------
     # Truncates table (deletes all records by recreating the table)
     def truncate(table_name_str)
       puts "Truncating '#{table_name_str}' table (it may take a while):"
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 3c6f48f..5d456bf 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -354,6 +354,8 @@ Shell.load_command_group(
     compact_rs
     compaction_state
     trace
+    snapshot_cleanup_switch
+    snapshot_cleanup_enabled
     splitormerge_switch
     splitormerge_enabled
     clear_compaction_queues
diff --git a/hbase-shell/src/main/ruby/shell/commands/snapshot_cleanup_enabled.rb b/hbase-shell/src/main/ruby/shell/commands/snapshot_cleanup_enabled.rb
new file mode 100644
index 0000000..15122f5
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/snapshot_cleanup_enabled.rb
@@ -0,0 +1,39 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Prints if snapshot auto cleanup based on TTL is enabled
+
+module Shell
+  module Commands
+    class SnapshotCleanupEnabled < Command
+      def help
+        <<-EOF
+Query the snapshot auto-cleanup state.
+Examples:
+
+  hbase> snapshot_cleanup_enabled
+        EOF
+      end
+
+      def command
+        state = admin.snapshot_cleanup_enabled?
+        formatter.row([state.to_s])
+        state
+      end
+    end
+  end
+end
diff --git a/hbase-shell/src/main/ruby/shell/commands/snapshot_cleanup_switch.rb b/hbase-shell/src/main/ruby/shell/commands/snapshot_cleanup_switch.rb
new file mode 100644
index 0000000..f63ef70
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/snapshot_cleanup_switch.rb
@@ -0,0 +1,43 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Switch snapshot auto-cleanup based on TTL expiration
+
+module Shell
+  module Commands
+    class SnapshotCleanupSwitch < Command
+      def help
+        <<-EOF
+Enable/Disable snapshot auto-cleanup based on snapshot TTL.
+Returns previous snapshot auto-cleanup switch state.
+Examples:
+
+  hbase> snapshot_cleanup_switch true
+  hbase> snapshot_cleanup_switch false
+        EOF
+      end
+
+      def command(enable_disable)
+        prev_state = admin.snapshot_cleanup_switch(enable_disable) ? 'true' : 'false'
+        formatter.row(["Previous snapshot cleanup state : #{prev_state}"])
+        prev_state
+      end
+    end
+  end
+end
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index a43e394..aee3f2b 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -170,6 +170,20 @@ module Hbase
       end
     end
 
+    #-------------------------------------------------------------------------------
+
+    define_test 'snapshot auto cleanup should work' do
+      command(:snapshot_cleanup_switch, true)
+      output = capture_stdout { command(:snapshot_cleanup_enabled) }
+      assert(output.include?('true'))
+
+      command(:snapshot_cleanup_switch, false)
+      output = capture_stdout { command(:snapshot_cleanup_enabled) }
+      assert(output.include?('false'))
+    end
+
+    #-------------------------------------------------------------------------------
+
     define_test "create should fail with non-string/non-hash column args" do
       assert_raise(ArgumentError) do
         command(:create, @create_test_name, 123)
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 309dffb..600c35d 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -1383,6 +1383,16 @@ public class ThriftAdmin implements Admin {
   }
 
   @Override
+  public boolean snapshotCleanupSwitch(boolean on, boolean synchronous) {
+    throw new NotImplementedException("snapshotCleanupSwitch not supported in ThriftAdmin");
+  }
+
+  @Override
+  public boolean isSnapshotCleanupEnabled() {
+    throw new NotImplementedException("isSnapshotCleanupEnabled not supported in ThriftAdmin");
+  }
+
+  @Override
   public Future<Void> splitRegionAsync(byte[] regionName) throws IOException {
     return splitRegionAsync(regionName, null);
   }
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/SnapshotCleanupTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/SnapshotCleanupTracker.java
new file mode 100644
index 0000000..433c7ab
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/SnapshotCleanupTracker.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotCleanupProtos;
+
+/**
+ * Tracks status of snapshot auto cleanup based on TTL
+ */
+@InterfaceAudience.Private
+public class SnapshotCleanupTracker extends ZKNodeTracker {
+
+  /**
+   * Constructs a new ZK node tracker.
+   *
+   * <p>After construction, use {@link #start} to kick off tracking.
+   *
+   * @param watcher reference to the {@link ZKWatcher} which also contains configuration and
+   *   constants
+   * @param abortable used to abort if a fatal error occurs
+   */
+  public SnapshotCleanupTracker(ZKWatcher watcher, Abortable abortable) {
+    super(watcher, watcher.getZNodePaths().snapshotCleanupZNode, abortable);
+  }
+
+  /**
+   * Returns the current state of the snapshot auto cleanup based on TTL
+   *
+   * @return <code>true</code> if the snapshot auto cleanup is enabled,
+   *   <code>false</code> otherwise.
+   */
+  public boolean isSnapshotCleanupEnabled() {
+    byte[] snapshotCleanupZNodeData = super.getData(false);
+    try {
+      // if data in ZK is null, use default of on.
+      return snapshotCleanupZNodeData == null ||
+          parseFrom(snapshotCleanupZNodeData).getSnapshotCleanupEnabled();
+    } catch (DeserializationException dex) {
+      LOG.error("ZK state for Snapshot Cleanup could not be parsed " +
+          Bytes.toStringBinary(snapshotCleanupZNodeData), dex);
+      // return false to be safe.
+      return false;
+    }
+  }
+
+  /**
+   * Set snapshot auto clean on/off
+   *
+   * @param snapshotCleanupEnabled true if the snapshot auto cleanup should be on,
+   *   false otherwise
+   * @throws KeeperException if ZooKeeper operation fails
+   */
+  public void setSnapshotCleanupEnabled(final boolean snapshotCleanupEnabled)
+      throws KeeperException {
+    byte [] snapshotCleanupZNodeData = toByteArray(snapshotCleanupEnabled);
+    try {
+      ZKUtil.setData(watcher, watcher.getZNodePaths().snapshotCleanupZNode,
+          snapshotCleanupZNodeData);
+    } catch(KeeperException.NoNodeException nne) {
+      ZKUtil.createAndWatch(watcher, watcher.getZNodePaths().snapshotCleanupZNode,
+          snapshotCleanupZNodeData);
+    }
+    super.nodeDataChanged(watcher.getZNodePaths().snapshotCleanupZNode);
+  }
+
+  private byte[] toByteArray(final boolean isSnapshotCleanupEnabled) {
+    SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
+        SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
+    builder.setSnapshotCleanupEnabled(isSnapshotCleanupEnabled);
+    return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+  }
+
+  private SnapshotCleanupProtos.SnapshotCleanupState parseFrom(final byte[] pbBytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(pbBytes);
+    SnapshotCleanupProtos.SnapshotCleanupState.Builder builder =
+        SnapshotCleanupProtos.SnapshotCleanupState.newBuilder();
+    try {
+      int magicLen = ProtobufUtil.lengthOfPBMagic();
+      ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen);
+    } catch (IOException e) {
+      throw new DeserializationException(e);
+    }
+    return builder.build();
+  }
+
+}
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index e6668d2..d3c3826 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -2759,13 +2759,43 @@ Value 0 for this config indicates TTL: FOREVER
 
 
 
+.Enable/Disable Snapshot Auto Cleanup on running cluster:
 
-At any point of time, if Snapshot cleanup is supposed to be stopped due to
-some snapshot restore activity, it is advisable to disable Snapshot Cleaner with
- config:
+By default, snapshot auto cleanup based on TTL would be enabled
+for any new cluster.
+At any point in time, if snapshot cleanup is supposed to be stopped due to
+some snapshot restore activity or any other reason, it is advisable
+to disable it using shell command:
 
-`hbase.master.cleaner.snapshot.disable`: "true"
+----
+hbase> snapshot_cleanup_switch false
+----
+
+We can re-enable it using:
+
+----
+hbase> snapshot_cleanup_switch true
+----
+
+The shell command with switch false would disable snapshot auto
+cleanup activity based on TTL and return the previous state of
+the activity(true: running already, false: disabled already)
+
+A sample output for above commands:
+----
+Previous snapshot cleanup state : true
+Took 0.0069 seconds
+=> "true"
+----
+
+We can query whether snapshot auto cleanup is enabled for
+cluster using:
+
+----
+hbase> snapshot_cleanup_enabled
+----
 
+The command would return output in true/false.
 
 [[ops.snapshots.list]]
 === Listing Snapshots