You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/12/29 14:31:08 UTC

hbase git commit: HBASE-19486 Periodically ensure records are not buffered too long by BufferedMutator

Repository: hbase
Updated Branches:
  refs/heads/branch-2 5612f2f69 -> e2f07aafb


HBASE-19486 Periodically ensure records are not buffered too long by BufferedMutator

Signed-off-by: Chia-Ping Tsai <ch...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2f07aaf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2f07aaf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2f07aaf

Branch: refs/heads/branch-2
Commit: e2f07aafb67a45dcf108cdbd437d7d1161f8d04c
Parents: 5612f2f
Author: Niels Basjes <nb...@bol.com>
Authored: Fri Dec 29 22:22:34 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri Dec 29 22:24:30 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/BufferedMutator.java    |  65 +++++++++-
 .../hbase/client/BufferedMutatorImpl.java       | 124 +++++++++++++++++--
 .../hbase/client/BufferedMutatorParams.java     |  39 +++++-
 .../hbase/client/ConnectionConfiguration.java   |  26 ++++
 .../hbase/client/ConnectionImplementation.java  |   8 ++
 .../hadoop/hbase/client/TestAsyncProcess.java   | 114 ++++++++++++++++-
 .../hbase/client/TestBufferedMutatorParams.java |  16 ++-
 7 files changed, 366 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f07aaf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 4c6a0ef..7805f77 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -18,13 +18,12 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * <p>Used to communicate with a single HBase table similar to {@link Table} but meant for
@@ -64,7 +63,13 @@ public interface BufferedMutator extends Closeable {
   /**
    * Key to use setting non-default BufferedMutator implementation in Configuration.
    */
-  public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
+  String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
+
+  /**
+   * Having the timer tick run more often that once every 100ms is needless and will
+   * probably cause too many timer events firing having a negative impact on performance.
+   */
+  long MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS = 100;
 
   /**
    * Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
@@ -115,6 +120,56 @@ public interface BufferedMutator extends Closeable {
   void flush() throws IOException;
 
   /**
+   * Sets the maximum time before the buffer is automatically flushed checking once per second.
+   * @param timeoutMs    The maximum number of milliseconds how long records may be buffered
+   *                     before they are flushed. Set to 0 to disable.
+   */
+  default void setWriteBufferPeriodicFlush(long timeoutMs) {
+    setWriteBufferPeriodicFlush(timeoutMs, 1000L);
+  }
+
+  /**
+   * Sets the maximum time before the buffer is automatically flushed.
+   * @param timeoutMs    The maximum number of milliseconds how long records may be buffered
+   *                     before they are flushed. Set to 0 to disable.
+   * @param timerTickMs  The number of milliseconds between each check if the
+   *                     timeout has been exceeded. Must be 100ms (as defined in
+   *                     {@link #MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS})
+   *                     or larger to avoid performance problems.
+   */
+  default void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
+    throw new UnsupportedOperationException(
+            "The BufferedMutator::setWriteBufferPeriodicFlush has not been implemented");
+  }
+
+  /**
+   * Disable periodic flushing of the write buffer.
+   */
+  default void disableWriteBufferPeriodicFlush() {
+    setWriteBufferPeriodicFlush(0, MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+  }
+
+  /**
+   * Returns the current periodic flush timeout value in milliseconds.
+   * @return The maximum number of milliseconds how long records may be buffered before they
+   *   are flushed. The value 0 means this is disabled.
+   */
+  default long getWriteBufferPeriodicFlushTimeoutMs() {
+    throw new UnsupportedOperationException(
+            "The BufferedMutator::getWriteBufferPeriodicFlushTimeoutMs has not been implemented");
+  }
+
+  /**
+   * Returns the current periodic flush timertick interval in milliseconds.
+   * @return The number of milliseconds between each check if the timeout has been exceeded.
+   *   This value only has a real meaning if the timeout has been set to > 0
+   */
+  default long getWriteBufferPeriodicFlushTimerTickMs() {
+    throw new UnsupportedOperationException(
+            "The BufferedMutator::getWriteBufferPeriodicFlushTimerTickMs has not been implemented");
+  }
+
+  /**
    * Returns the maximum size in bytes of the write buffer for this HTable.
    * <p>
    * The default value comes from the configuration parameter {@code hbase.client.write.buffer}.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f07aaf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index cf85f57..13b1a81 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -15,18 +15,20 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -75,6 +77,11 @@ public class BufferedMutatorImpl implements BufferedMutator {
    */
   private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
   private final long writeBufferSize;
+
+  private long  writeBufferPeriodicFlushTimeoutMs;
+  private long  writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
+  private Timer writeBufferPeriodicFlushTimer = null;
+
   private final int maxKeyValueSize;
   private final ExecutorService pool;
   private final AtomicInteger rpcTimeout;
@@ -99,15 +106,34 @@ public class BufferedMutatorImpl implements BufferedMutator {
       cleanupPoolOnClose = false;
     }
     ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
-    this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
-        params.getWriteBufferSize() : tableConf.getWriteBufferSize();
-    this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
-        params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
-
-    this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ?
-    params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
-    this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ?
-    params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
+    this.writeBufferSize =
+            params.getWriteBufferSize() != UNSET ?
+            params.getWriteBufferSize() : tableConf.getWriteBufferSize();
+
+    // Set via the setter because it does value validation and starts/stops the TimerTask
+    long newWriteBufferPeriodicFlushTimeoutMs =
+            params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
+              ? params.getWriteBufferPeriodicFlushTimeoutMs()
+              : tableConf.getWriteBufferPeriodicFlushTimeoutMs();
+    long newWriteBufferPeriodicFlushTimerTickMs =
+            params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
+              ? params.getWriteBufferPeriodicFlushTimerTickMs()
+              : tableConf.getWriteBufferPeriodicFlushTimerTickMs();
+    this.setWriteBufferPeriodicFlush(
+            newWriteBufferPeriodicFlushTimeoutMs,
+            newWriteBufferPeriodicFlushTimerTickMs);
+
+    this.maxKeyValueSize =
+            params.getMaxKeyValueSize() != UNSET ?
+            params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
+
+    this.rpcTimeout = new AtomicInteger(
+            params.getRpcTimeout() != UNSET ?
+            params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
+
+    this.operationTimeout = new AtomicInteger(
+            params.getOperationTimeout() != UNSET ?
+            params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
     this.ap = ap;
   }
   BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
@@ -161,6 +187,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
       ++toAddCount;
     }
 
+    if (currentWriteBufferSize.get() == 0) {
+      firstRecordInBufferTimestamp = System.currentTimeMillis();
+    }
+
     // This behavior is highly non-intuitive... it does not protect us against
     // 94-incompatible behavior, which is a timing issue because hasError, the below code
     // and setter of hasError are not synchronized. Perhaps it should be removed.
@@ -182,6 +212,31 @@ public class BufferedMutatorImpl implements BufferedMutator {
     }
   }
 
+  @VisibleForTesting
+  protected long getExecutedWriteBufferPeriodicFlushes() {
+    return executedWriteBufferPeriodicFlushes;
+  }
+
+  private long firstRecordInBufferTimestamp = 0;
+  private long executedWriteBufferPeriodicFlushes = 0;
+
+  private void timerCallbackForWriteBufferPeriodicFlush() {
+    if (currentWriteBufferSize.get() == 0) {
+      return; // Nothing to flush
+    }
+    long now = System.currentTimeMillis();
+    if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) {
+      return; // No need to flush yet
+    }
+    // The first record in the writebuffer has been in there too long --> flush
+    try {
+      executedWriteBufferPeriodicFlushes++;
+      flush();
+    } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+      LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
+    }
+  }
+
   // validate for well-formedness
   public void validatePut(final Put put) throws IllegalArgumentException {
     HTable.validatePut(put, maxKeyValueSize);
@@ -193,6 +248,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
       if (this.closed) {
         return;
       }
+
+      // Stop any running Periodic Flush timer.
+      disableWriteBufferPeriodicFlush();
+
       // As we can have an operation in progress even if the buffer is empty, we call
       // backgroundFlushCommits at least one time.
       backgroundFlushCommits(true);
@@ -277,7 +336,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
   }
 
   /**
-   * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
+   * Reuse the AsyncProcessTask when calling
+   * {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
    * @param taker access the inner buffer.
    * @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
    */
@@ -310,6 +370,48 @@ public class BufferedMutatorImpl implements BufferedMutator {
   }
 
   @Override
+  public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
+    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs;
+    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
+
+    // Both parameters have minimal values.
+    this.writeBufferPeriodicFlushTimeoutMs   = Math.max(0, timeoutMs);
+    this.writeBufferPeriodicFlushTimerTickMs =
+            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs);
+
+    // If something changed we stop the old Timer.
+    if (this.writeBufferPeriodicFlushTimeoutMs   != originalTimeoutMs  ||
+        this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
+      if (writeBufferPeriodicFlushTimer != null) {
+        writeBufferPeriodicFlushTimer.cancel();
+        writeBufferPeriodicFlushTimer = null;
+      }
+    }
+
+    // If we have the need for a timer and there is none we start it
+    if (writeBufferPeriodicFlushTimer == null &&
+        writeBufferPeriodicFlushTimeoutMs > 0) {
+      writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
+      writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
+        }
+      }, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs);
+    }
+  }
+
+  @Override
+  public long getWriteBufferPeriodicFlushTimeoutMs() {
+    return this.writeBufferPeriodicFlushTimeoutMs;
+  }
+
+  @Override
+  public long getWriteBufferPeriodicFlushTimerTickMs() {
+    return this.writeBufferPeriodicFlushTimerTickMs;
+  }
+
+  @Override
   public void setRpcTimeout(int rpcTimeout) {
     this.rpcTimeout.set(rpcTimeout);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f07aaf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 0648501..3f6c565 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.ExecutorService;
-
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -34,6 +33,8 @@ public class BufferedMutatorParams implements Cloneable {
 
   private final TableName tableName;
   private long writeBufferSize = UNSET;
+  private long writeBufferPeriodicFlushTimeoutMs = UNSET;
+  private long writeBufferPeriodicFlushTimerTickMs = UNSET;
   private int maxKeyValueSize = UNSET;
   private ExecutorService pool = null;
   private String implementationClassName = null;
@@ -88,6 +89,30 @@ public class BufferedMutatorParams implements Cloneable {
     return this;
   }
 
+  public long getWriteBufferPeriodicFlushTimeoutMs() {
+    return writeBufferPeriodicFlushTimeoutMs;
+  }
+
+  /**
+   * Set the max timeout before the buffer is automatically flushed.
+   */
+  public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs) {
+    this.writeBufferPeriodicFlushTimeoutMs = timeoutMs;
+    return this;
+  }
+
+  public long getWriteBufferPeriodicFlushTimerTickMs() {
+    return writeBufferPeriodicFlushTimerTickMs;
+  }
+
+  /**
+   * Set the TimerTick how often the buffer timeout if checked.
+   */
+  public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
+    this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
+    return this;
+  }
+
   public int getMaxKeyValueSize() {
     return maxKeyValueSize;
   }
@@ -154,11 +179,13 @@ public class BufferedMutatorParams implements Cloneable {
   @Override
   public BufferedMutatorParams clone() {
     BufferedMutatorParams clone = new BufferedMutatorParams(this.tableName);
-    clone.writeBufferSize = this.writeBufferSize;
-    clone.maxKeyValueSize = maxKeyValueSize;
-    clone.pool = this.pool;
-    clone.listener = this.listener;
-    clone.implementationClassName = this.implementationClassName;
+    clone.writeBufferSize                     = this.writeBufferSize;
+    clone.writeBufferPeriodicFlushTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs;
+    clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
+    clone.maxKeyValueSize                     = this.maxKeyValueSize;
+    clone.pool                                = this.pool;
+    clone.listener                            = this.listener;
+    clone.implementationClassName             = this.implementationClassName;
     return clone;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f07aaf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index a582405..d996004 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -30,10 +30,18 @@ public class ConnectionConfiguration {
 
   public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
   public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
+  public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS =
+          "hbase.client.write.buffer.periodicflush.timeout.ms";
+  public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS =
+          "hbase.client.write.buffer.periodicflush.timertick.ms";
+  public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT = 0; // 0 == Disabled
+  public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
   public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
   public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
 
   private final long writeBufferSize;
+  private final long writeBufferPeriodicFlushTimeoutMs;
+  private final long writeBufferPeriodicFlushTimerTickMs;
   private final int metaOperationTimeout;
   private final int operationTimeout;
   private final int scannerCaching;
@@ -56,6 +64,14 @@ public class ConnectionConfiguration {
   ConnectionConfiguration(Configuration conf) {
     this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
 
+    this.writeBufferPeriodicFlushTimeoutMs = conf.getLong(
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT);
+
+    this.writeBufferPeriodicFlushTimerTickMs = conf.getLong(
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
+            WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT);
+
     this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
 
@@ -105,6 +121,8 @@ public class ConnectionConfiguration {
   @VisibleForTesting
   protected ConnectionConfiguration() {
     this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT;
+    this.writeBufferPeriodicFlushTimeoutMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
+    this.writeBufferPeriodicFlushTimerTickMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT;
     this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
     this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
     this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
@@ -133,6 +151,14 @@ public class ConnectionConfiguration {
     return writeBufferSize;
   }
 
+  public long getWriteBufferPeriodicFlushTimeoutMs() {
+    return writeBufferPeriodicFlushTimeoutMs;
+  }
+
+  public long getWriteBufferPeriodicFlushTimerTickMs() {
+    return writeBufferPeriodicFlushTimerTickMs;
+  }
+
   public int getMetaOperationTimeout() {
     return metaOperationTimeout;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f07aaf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 653fbfd..562630f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -363,6 +363,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
       params.writeBufferSize(connectionConfig.getWriteBufferSize());
     }
+    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
+      params.setWriteBufferPeriodicFlushTimeoutMs(
+              connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
+    }
+    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
+      params.setWriteBufferPeriodicFlushTimerTickMs(
+              connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
+    }
     if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
       params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f07aaf/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index ba6756b..fc59793 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.CategoryBasedTimeout;
@@ -1102,6 +1101,119 @@ public class TestAsyncProcess {
   }
 
   @Test
+  public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
+    ClusterConnection conn = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+
+    checkPeriodicFlushParameters(conn, ap,
+            1234, 1234,
+            1234, 1234);
+    checkPeriodicFlushParameters(conn, ap,
+               0,    0,
+               0,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+    checkPeriodicFlushParameters(conn, ap,
+           -1234,    0,
+           -1234,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+    checkPeriodicFlushParameters(conn, ap,
+               1,    1,
+               1,    BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+  }
+
+  private void checkPeriodicFlushParameters(ClusterConnection conn,
+                                            MyAsyncProcess ap,
+                                            long setTO, long expectTO,
+                                            long setTT, long expectTT
+                                            ) {
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+
+    // The BufferedMutatorParams does nothing with the value
+    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
+    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
+    Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
+
+    // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
+    BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
+    Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
+
+    // The BufferedMutatorImpl corrects illegal values (direct via setter)
+    BufferedMutatorImpl ht2 =
+            new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
+    ht2.setWriteBufferPeriodicFlush(setTO, setTT);
+    Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
+
+  }
+
+  @Test
+  public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
+    ClusterConnection conn = createHConnection();
+    MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+    BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+
+    bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1);     // Flush ASAP
+    bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
+    bufferParam.writeBufferSize(10000);  // Write buffer set to much larger than the single record
+
+    BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
+
+    // Verify if BufferedMutator has the right settings.
+    Assert.assertEquals(10000, ht.getWriteBufferSize());
+    Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
+    Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
+            ht.getWriteBufferPeriodicFlushTimerTickMs());
+
+    Put put = createPut(1, true);
+
+    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // ----- Insert, flush immediately, MUST NOT flush automatically
+    ht.mutate(put);
+    ht.flush();
+
+    Thread.sleep(1000);
+    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // ----- Insert, NO flush, MUST flush automatically
+    ht.mutate(put);
+    Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+    // The timerTick should fire every 100ms, so after twice that we must have
+    // seen at least 1 tick and we should see an automatic flush
+    Thread.sleep(200);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // Ensure it does not flush twice
+    Thread.sleep(200);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+    // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
+    ht.disableWriteBufferPeriodicFlush();
+    ht.mutate(put);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+    // Wait for at least 1 timerTick, we should see NO flushes.
+    Thread.sleep(200);
+    Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+    // Reenable periodic flushing, a flush seems to take about 1 second
+    // so we wait for 2 seconds and it should have finished the flush.
+    ht.setWriteBufferPeriodicFlush(1, 100);
+    Thread.sleep(2000);
+    Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
+    Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+  }
+
+
+  @Test
   public void testBufferedMutatorImplWithSharedPool() throws Exception {
     ClusterConnection conn = createHConnection();
     MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f07aaf/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
index 0646f41..2c2935e 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -30,7 +29,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -136,13 +134,21 @@ public class TestBufferedMutatorParams {
     BufferedMutatorParams bmp = new BufferedMutatorParams(TableName.valueOf(tableName));
 
     BufferedMutator.ExceptionListener listener = new MockExceptionListener();
-    bmp.writeBufferSize(17).maxKeyValueSize(13).pool(pool).listener(listener);
+    bmp
+            .writeBufferSize(17)
+            .setWriteBufferPeriodicFlushTimeoutMs(123)
+            .setWriteBufferPeriodicFlushTimerTickMs(456)
+            .maxKeyValueSize(13)
+            .pool(pool)
+            .listener(listener);
     bmp.implementationClassName("someClassName");
     BufferedMutatorParams clone = bmp.clone();
 
     // Confirm some literals
     assertEquals(tableName, clone.getTableName().toString());
     assertEquals(17, clone.getWriteBufferSize());
+    assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
+    assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
     assertEquals(13, clone.getMaxKeyValueSize());
     assertEquals("someClassName", clone.getImplementationClassName());
 
@@ -168,6 +174,10 @@ public class TestBufferedMutatorParams {
     assertEquals(some.getTableName().toString(),
         clone.getTableName().toString());
     assertEquals(some.getWriteBufferSize(), clone.getWriteBufferSize());
+    assertEquals(some.getWriteBufferPeriodicFlushTimeoutMs(),
+        clone.getWriteBufferPeriodicFlushTimeoutMs());
+    assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
+        clone.getWriteBufferPeriodicFlushTimerTickMs());
     assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
     assertTrue(some.getListener() == clone.getListener());
     assertTrue(some.getPool() == clone.getPool());