You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/13 21:58:12 UTC

[03/11] hbase git commit: HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator

HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b09b87d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b09b87d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b09b87d1

Branch: refs/heads/HBASE-20952
Commit: b09b87d143730db00ec56114a752d3a74f8982c4
Parents: da9508d
Author: zhangduo <zh...@apache.org>
Authored: Tue Dec 11 08:39:43 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Tue Dec 11 14:51:26 2018 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncBufferedMutator.java      |  16 +-
 .../client/AsyncBufferedMutatorBuilder.java     |  19 +++
 .../client/AsyncBufferedMutatorBuilderImpl.java |  19 ++-
 .../hbase/client/AsyncBufferedMutatorImpl.java  |  67 +++++---
 .../client/AsyncConnectionConfiguration.java    |  37 +++--
 .../hbase/client/AsyncConnectionImpl.java       |  11 +-
 .../hbase/client/TestAsyncBufferMutator.java    | 161 ++++++++++++++++++-
 7 files changed, 277 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index 6fe4b9a..7b21eb5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -18,13 +18,16 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
 /**
  * Used to communicate with a single HBase table in batches. Obtain an instance from a
  * {@link AsyncConnection} and call {@link #close()} afterwards.
@@ -52,7 +55,9 @@ public interface AsyncBufferedMutator extends Closeable {
    * part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
    * @param mutation The data to send.
    */
-  CompletableFuture<Void> mutate(Mutation mutation);
+  default CompletableFuture<Void> mutate(Mutation mutation) {
+    return Iterables.getOnlyElement(mutate(Collections.singletonList(mutation)));
+  }
 
   /**
    * Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
@@ -81,4 +86,11 @@ public interface AsyncBufferedMutator extends Closeable {
    * @return The size of the write buffer in bytes.
    */
   long getWriteBufferSize();
+
+  /**
+   * Returns the periodical flush interval, 0 means disabled.
+   */
+  default long getPeriodicalFlushTimeout(TimeUnit unit) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index 45959bb..c617c8e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -46,6 +46,25 @@ public interface AsyncBufferedMutatorBuilder {
   AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
 
   /**
+   * Set the periodical flush interval. If the data in the buffer has not been flush for a long
+   * time, i.e, reach this timeout limit, we will flush it automatically.
+   * <p/>
+   * Notice that, set the timeout to 0 or a negative value means disable periodical flush, not
+   * 'flush immediately'. If you want to flush immediately then you should not use this class, as it
+   * is designed to be 'buffered'.
+   */
+  default AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  /**
+   * Disable the periodical flush, i.e, set the timeout to 0.
+   */
+  default AsyncBufferedMutatorBuilder disableWriteBufferPeriodicFlush() {
+    return setWriteBufferPeriodicFlush(0, TimeUnit.NANOSECONDS);
+  }
+
+  /**
    * Set the max retry times for an operation. Usually it is the max attempt times minus 1.
    * <p>
    * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index 227d02b..eb8af17 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 
 /**
  * The implementation of {@link AsyncBufferedMutatorBuilder}.
@@ -28,14 +29,20 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
 
+  private final HashedWheelTimer periodicalFlushTimer;
+
   private final AsyncTableBuilder<?> tableBuilder;
 
   private long writeBufferSize;
 
+  private long periodicFlushTimeoutNs;
+
   public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
-      AsyncTableBuilder<?> tableBuilder) {
+      AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
     this.tableBuilder = tableBuilder;
     this.writeBufferSize = connConf.getWriteBufferSize();
+    this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
+    this.periodicalFlushTimer = periodicalFlushTimer;
   }
 
   @Override
@@ -77,8 +84,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
   }
 
   @Override
-  public AsyncBufferedMutator build() {
-    return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
+  public AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+    this.periodicFlushTimeoutNs = unit.toNanos(timeout);
+    return this;
   }
 
+  @Override
+  public AsyncBufferedMutator build() {
+    return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
+      periodicFlushTimeoutNs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 5a92ace..318c6c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -29,16 +30,24 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
 /**
  * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
  */
 @InterfaceAudience.Private
 class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
+  private final HashedWheelTimer periodicalFlushTimer;
+
   private final AsyncTable<?> table;
 
   private final long writeBufferSize;
 
+  private final long periodicFlushTimeoutNs;
+
   private List<Mutation> mutations = new ArrayList<>();
 
   private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -47,9 +56,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
 
   private boolean closed;
 
-  AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) {
+  @VisibleForTesting
+  Timeout periodicFlushTask;
+
+  AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+      long writeBufferSize, long periodicFlushTimeoutNs) {
+    this.periodicalFlushTimer = periodicalFlushTimer;
     this.table = table;
     this.writeBufferSize = writeBufferSize;
+    this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
   }
 
   @Override
@@ -62,7 +77,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
     return table.getConfiguration();
   }
 
-  private void internalFlush() {
+  // will be overridden in test
+  @VisibleForTesting
+  protected void internalFlush() {
+    if (periodicFlushTask != null) {
+      periodicFlushTask.cancel();
+      periodicFlushTask = null;
+    }
     List<Mutation> toSend = this.mutations;
     if (toSend.isEmpty()) {
       return;
@@ -86,29 +107,10 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
   }
 
   @Override
-  public CompletableFuture<Void> mutate(Mutation mutation) {
-    CompletableFuture<Void> future = new CompletableFuture<Void>();
-    long heapSize = mutation.heapSize();
-    synchronized (this) {
-      if (closed) {
-        future.completeExceptionally(new IOException("Already closed"));
-        return future;
-      }
-      mutations.add(mutation);
-      futures.add(future);
-      bufferedSize += heapSize;
-      if (bufferedSize >= writeBufferSize) {
-        internalFlush();
-      }
-    }
-    return future;
-  }
-
-  @Override
   public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
     List<CompletableFuture<Void>> futures =
-        Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
-            .collect(Collectors.toList());
+      Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
+        .collect(Collectors.toList());
     long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
     synchronized (this) {
       if (closed) {
@@ -116,6 +118,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
         futures.forEach(f -> f.completeExceptionally(ioe));
         return futures;
       }
+      if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
+        periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
+          synchronized (AsyncBufferedMutatorImpl.this) {
+            // confirm that we are still valid, if there is already an internalFlush call before us,
+            // then we should not execute any more. And in internalFlush we will set periodicFlush
+            // to null, and since we may schedule a new one, so here we check whether the references
+            // are equal.
+            if (timeout == periodicFlushTask) {
+              periodicFlushTask = null;
+              internalFlush();
+            }
+          }
+        }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+      }
       this.mutations.addAll(mutations);
       this.futures.addAll(futures);
       bufferedSize += heapSize;
@@ -141,4 +157,9 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
   public long getWriteBufferSize() {
     return writeBufferSize;
   }
+
+  @Override
+  public long getPeriodicalFlushTimeout(TimeUnit unit) {
+    return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index bd2add8..915e9dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,11 +39,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
 import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
 
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -91,32 +92,38 @@ class AsyncConnectionConfiguration {
 
   private final long writeBufferSize;
 
+  private final long writeBufferPeriodicFlushTimeoutNs;
+
   @SuppressWarnings("deprecation")
   AsyncConnectionConfiguration(Configuration conf) {
     this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
       conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
     this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
       conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
-    this.rpcTimeoutNs = TimeUnit.MILLISECONDS
-        .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
+    this.rpcTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
     this.readRpcTimeoutNs =
-        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
     this.writeRpcTimeoutNs =
-        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
     this.pauseNs =
-        TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
     this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.startLogErrorsCnt =
-        conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+      conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
     this.scanTimeoutNs = TimeUnit.MILLISECONDS
-        .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
-          HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
+      .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+        HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
     this.scannerCaching =
-        conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
-    this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
+      conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+    this.metaScannerCaching =
+      conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
     this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
       DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
-    this.writeBufferSize =  conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+    this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+    this.writeBufferPeriodicFlushTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
+        WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
   }
 
   long getMetaOperationTimeoutNs() {
@@ -159,7 +166,7 @@ class AsyncConnectionConfiguration {
     return scannerCaching;
   }
 
-  int getMetaScannerCaching(){
+  int getMetaScannerCaching() {
     return metaScannerCaching;
   }
 
@@ -170,4 +177,8 @@ class AsyncConnectionConfiguration {
   long getWriteBufferSize() {
     return writeBufferSize;
   }
+
+  long getWriteBufferPeriodicFlushTimeoutNs() {
+    return writeBufferPeriodicFlushTimeoutNs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index a05764e..078395b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -68,7 +68,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @VisibleForTesting
   static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
-      Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
+    Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
 
   private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
 
@@ -193,7 +193,7 @@ class AsyncConnectionImpl implements AsyncConnection {
         String msg = "ZooKeeper available but no active master location found";
         LOG.info(msg);
         this.masterStubMakeFuture.getAndSet(null)
-            .completeExceptionally(new MasterNotRunningException(msg));
+          .completeExceptionally(new MasterNotRunningException(msg));
         return;
       }
       try {
@@ -216,7 +216,7 @@ class AsyncConnectionImpl implements AsyncConnection {
           });
       } catch (IOException e) {
         this.masterStubMakeFuture.getAndSet(null)
-            .completeExceptionally(new IOException("Failed to create async master stub", e));
+          .completeExceptionally(new IOException("Failed to create async master stub", e));
       }
     });
   }
@@ -317,12 +317,13 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
-    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName));
+    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
   }
 
   @Override
   public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
       ExecutorService pool) {
-    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
+    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
+      RETRY_TIMER);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 9fe4ca7..6eed326 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,6 +33,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -45,12 +48,15 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
 @Category({ MediumTests.class, ClientTests.class })
 public class TestAsyncBufferMutator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
+    HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -96,10 +102,10 @@ public class TestAsyncBufferMutator {
   private void test(TableName tableName) throws InterruptedException {
     List<CompletableFuture<Void>> futures = new ArrayList<>();
     try (AsyncBufferedMutator mutator =
-        CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
+      CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
       List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
-          .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
-          .collect(Collectors.toList()));
+        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+        .collect(Collectors.toList()));
       // exceeded the write buffer size, a flush will be called directly
       fs.forEach(f -> f.join());
       IntStream.range(COUNT / 2, COUNT).forEach(i -> {
@@ -115,9 +121,9 @@ public class TestAsyncBufferMutator {
     futures.forEach(f -> f.join());
     AsyncTable<?> table = CONN.getTable(tableName);
     IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
-        .forEach(r -> {
-          assertArrayEquals(VALUE, r.getValue(CF, CQ));
-        });
+      .forEach(r -> {
+        assertArrayEquals(VALUE, r.getValue(CF, CQ));
+      });
   }
 
   @Test
@@ -142,4 +148,145 @@ public class TestAsyncBufferMutator {
       }
     }
   }
+
+  @Test
+  public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
+    try (AsyncBufferedMutator mutator =
+      CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
+      Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+      CompletableFuture<?> future = mutator.mutate(put);
+      Thread.sleep(2000);
+      // assert that we have not flushed it out
+      assertFalse(future.isDone());
+      mutator.flush();
+      future.get();
+    }
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+  }
+
+  @Test
+  public void testPeriodicFlush() throws InterruptedException, ExecutionException {
+    AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
+      .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
+    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+    CompletableFuture<?> future = mutator.mutate(put);
+    future.get();
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+  }
+
+  // a bit deep into the implementation
+  @Test
+  public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
+    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+    try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
+      .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
+      .setWriteBufferSize(10 * put.heapSize()).build()) {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      futures.add(mutator.mutate(put));
+      Timeout task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+      for (int i = 1;; i++) {
+        futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
+        if (mutator.periodicFlushTask == null) {
+          break;
+        }
+      }
+      assertTrue(task.isCancelled());
+      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+      for (int i = 0; i < futures.size(); i++) {
+        assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
+      }
+    }
+  }
+
+  @Test
+  public void testCancelPeriodicFlushByManuallyFlush()
+      throws InterruptedException, ExecutionException {
+    try (AsyncBufferedMutatorImpl mutator =
+      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+      CompletableFuture<?> future =
+        mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+      Timeout task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+      mutator.flush();
+      assertTrue(task.isCancelled());
+      future.get();
+      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+    }
+  }
+
+  @Test
+  public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
+    CompletableFuture<?> future;
+    Timeout task;
+    try (AsyncBufferedMutatorImpl mutator =
+      (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+        .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+      future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+      task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+    }
+    assertTrue(task.isCancelled());
+    future.get();
+    AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+    assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+  }
+
+  private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
+
+    private int flushCount;
+
+    AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+        long writeBufferSize, long periodicFlushTimeoutNs) {
+      super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
+    }
+
+    @Override
+    protected void internalFlush() {
+      flushCount++;
+      super.internalFlush();
+    }
+  }
+
+  @Test
+  public void testRaceBetweenNormalFlushAndPeriodicFlush()
+      throws InterruptedException, ExecutionException {
+    Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+    try (AsyncBufferMutatorForTest mutator =
+      new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
+        10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
+      CompletableFuture<?> future = mutator.mutate(put);
+      Timeout task = mutator.periodicFlushTask;
+      // we should have scheduled a periodic flush task
+      assertNotNull(task);
+      synchronized (mutator) {
+        // synchronized on mutator to prevent periodic flush to be executed
+        Thread.sleep(500);
+        // the timeout should be issued
+        assertTrue(task.isExpired());
+        // but no flush is issued as we hold the lock
+        assertEquals(0, mutator.flushCount);
+        assertFalse(future.isDone());
+        // manually flush, then release the lock
+        mutator.flush();
+      }
+      // this is a bit deep into the implementation in netty but anyway let's add a check here to
+      // confirm that an issued timeout can not be canceled by netty framework.
+      assertFalse(task.isCancelled());
+      // and the mutation is done
+      future.get();
+      AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+      assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+      // only the manual flush, the periodic flush should have been canceled by us
+      assertEquals(1, mutator.flushCount);
+    }
+  }
 }