You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/02 02:01:14 UTC
[20/51] [abbrv] hbase git commit: HBASE-19486 Periodically ensure
records are not buffered too long by BufferedMutator
HBASE-19486 Periodically ensure records are not buffered too long by BufferedMutator
Signed-off-by: Chia-Ping Tsai <ch...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a1c36f7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a1c36f7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a1c36f7
Branch: refs/heads/HBASE-19397
Commit: 5a1c36f70ac52e6f4e85f11ea0602d46b4861ac0
Parents: e23f7af
Author: Niels Basjes <nb...@bol.com>
Authored: Fri Dec 29 22:22:34 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri Dec 29 22:23:18 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/BufferedMutator.java | 65 +++++++++-
.../hbase/client/BufferedMutatorImpl.java | 124 +++++++++++++++++--
.../hbase/client/BufferedMutatorParams.java | 39 +++++-
.../hbase/client/ConnectionConfiguration.java | 26 ++++
.../hbase/client/ConnectionImplementation.java | 8 ++
.../hadoop/hbase/client/TestAsyncProcess.java | 114 ++++++++++++++++-
.../hbase/client/TestBufferedMutatorParams.java | 16 ++-
7 files changed, 366 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a1c36f7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 4c6a0ef..7805f77 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -18,13 +18,12 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* <p>Used to communicate with a single HBase table similar to {@link Table} but meant for
@@ -64,7 +63,13 @@ public interface BufferedMutator extends Closeable {
/**
* Key to use setting non-default BufferedMutator implementation in Configuration.
*/
- public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
+ String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
+
+ /**
+ * Having the timer tick run more often that once every 100ms is needless and will
+ * probably cause too many timer events firing having a negative impact on performance.
+ */
+ long MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS = 100;
/**
* Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
@@ -115,6 +120,56 @@ public interface BufferedMutator extends Closeable {
void flush() throws IOException;
/**
+ * Sets the maximum time before the buffer is automatically flushed checking once per second.
+ * @param timeoutMs The maximum number of milliseconds how long records may be buffered
+ * before they are flushed. Set to 0 to disable.
+ */
+ default void setWriteBufferPeriodicFlush(long timeoutMs) {
+ setWriteBufferPeriodicFlush(timeoutMs, 1000L);
+ }
+
+ /**
+ * Sets the maximum time before the buffer is automatically flushed.
+ * @param timeoutMs The maximum number of milliseconds how long records may be buffered
+ * before they are flushed. Set to 0 to disable.
+ * @param timerTickMs The number of milliseconds between each check if the
+ * timeout has been exceeded. Must be 100ms (as defined in
+ * {@link #MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS})
+ * or larger to avoid performance problems.
+ */
+ default void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
+ throw new UnsupportedOperationException(
+ "The BufferedMutator::setWriteBufferPeriodicFlush has not been implemented");
+ }
+
+ /**
+ * Disable periodic flushing of the write buffer.
+ */
+ default void disableWriteBufferPeriodicFlush() {
+ setWriteBufferPeriodicFlush(0, MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+ }
+
+ /**
+ * Returns the current periodic flush timeout value in milliseconds.
+ * @return The maximum number of milliseconds how long records may be buffered before they
+ * are flushed. The value 0 means this is disabled.
+ */
+ default long getWriteBufferPeriodicFlushTimeoutMs() {
+ throw new UnsupportedOperationException(
+ "The BufferedMutator::getWriteBufferPeriodicFlushTimeoutMs has not been implemented");
+ }
+
+ /**
+ * Returns the current periodic flush timertick interval in milliseconds.
+ * @return The number of milliseconds between each check if the timeout has been exceeded.
+ * This value only has a real meaning if the timeout has been set to > 0
+ */
+ default long getWriteBufferPeriodicFlushTimerTickMs() {
+ throw new UnsupportedOperationException(
+ "The BufferedMutator::getWriteBufferPeriodicFlushTimerTickMs has not been implemented");
+ }
+
+ /**
* Returns the maximum size in bytes of the write buffer for this HTable.
* <p>
* The default value comes from the configuration parameter {@code hbase.client.write.buffer}.
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a1c36f7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index cf85f57..13b1a81 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -15,18 +15,20 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.BufferedMutatorParams.UNSET;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -75,6 +77,11 @@ public class BufferedMutatorImpl implements BufferedMutator {
*/
private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
private final long writeBufferSize;
+
+ private long writeBufferPeriodicFlushTimeoutMs;
+ private long writeBufferPeriodicFlushTimerTickMs = MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
+ private Timer writeBufferPeriodicFlushTimer = null;
+
private final int maxKeyValueSize;
private final ExecutorService pool;
private final AtomicInteger rpcTimeout;
@@ -99,15 +106,34 @@ public class BufferedMutatorImpl implements BufferedMutator {
cleanupPoolOnClose = false;
}
ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
- this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
- params.getWriteBufferSize() : tableConf.getWriteBufferSize();
- this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
- params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
-
- this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ?
- params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
- this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ?
- params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
+ this.writeBufferSize =
+ params.getWriteBufferSize() != UNSET ?
+ params.getWriteBufferSize() : tableConf.getWriteBufferSize();
+
+ // Set via the setter because it does value validation and starts/stops the TimerTask
+ long newWriteBufferPeriodicFlushTimeoutMs =
+ params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
+ ? params.getWriteBufferPeriodicFlushTimeoutMs()
+ : tableConf.getWriteBufferPeriodicFlushTimeoutMs();
+ long newWriteBufferPeriodicFlushTimerTickMs =
+ params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
+ ? params.getWriteBufferPeriodicFlushTimerTickMs()
+ : tableConf.getWriteBufferPeriodicFlushTimerTickMs();
+ this.setWriteBufferPeriodicFlush(
+ newWriteBufferPeriodicFlushTimeoutMs,
+ newWriteBufferPeriodicFlushTimerTickMs);
+
+ this.maxKeyValueSize =
+ params.getMaxKeyValueSize() != UNSET ?
+ params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
+
+ this.rpcTimeout = new AtomicInteger(
+ params.getRpcTimeout() != UNSET ?
+ params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
+
+ this.operationTimeout = new AtomicInteger(
+ params.getOperationTimeout() != UNSET ?
+ params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
this.ap = ap;
}
BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
@@ -161,6 +187,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
++toAddCount;
}
+ if (currentWriteBufferSize.get() == 0) {
+ firstRecordInBufferTimestamp = System.currentTimeMillis();
+ }
+
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
@@ -182,6 +212,31 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
}
+ @VisibleForTesting
+ protected long getExecutedWriteBufferPeriodicFlushes() {
+ return executedWriteBufferPeriodicFlushes;
+ }
+
+ private long firstRecordInBufferTimestamp = 0;
+ private long executedWriteBufferPeriodicFlushes = 0;
+
+ private void timerCallbackForWriteBufferPeriodicFlush() {
+ if (currentWriteBufferSize.get() == 0) {
+ return; // Nothing to flush
+ }
+ long now = System.currentTimeMillis();
+ if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > now) {
+ return; // No need to flush yet
+ }
+ // The first record in the writebuffer has been in there too long --> flush
+ try {
+ executedWriteBufferPeriodicFlushes++;
+ flush();
+ } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
+ LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
+ }
+ }
+
// validate for well-formedness
public void validatePut(final Put put) throws IllegalArgumentException {
HTable.validatePut(put, maxKeyValueSize);
@@ -193,6 +248,10 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (this.closed) {
return;
}
+
+ // Stop any running Periodic Flush timer.
+ disableWriteBufferPeriodicFlush();
+
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
@@ -277,7 +336,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
/**
- * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
+ * Reuse the AsyncProcessTask when calling
+ * {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
* @param taker access the inner buffer.
* @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
*/
@@ -310,6 +370,48 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
@Override
+ public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
+ long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs;
+ long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
+
+ // Both parameters have minimal values.
+ this.writeBufferPeriodicFlushTimeoutMs = Math.max(0, timeoutMs);
+ this.writeBufferPeriodicFlushTimerTickMs =
+ Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs);
+
+ // If something changed we stop the old Timer.
+ if (this.writeBufferPeriodicFlushTimeoutMs != originalTimeoutMs ||
+ this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
+ if (writeBufferPeriodicFlushTimer != null) {
+ writeBufferPeriodicFlushTimer.cancel();
+ writeBufferPeriodicFlushTimer = null;
+ }
+ }
+
+ // If we have the need for a timer and there is none we start it
+ if (writeBufferPeriodicFlushTimer == null &&
+ writeBufferPeriodicFlushTimeoutMs > 0) {
+ writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running as Daemon.
+ writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
+ }
+ }, writeBufferPeriodicFlushTimerTickMs, writeBufferPeriodicFlushTimerTickMs);
+ }
+ }
+
+ @Override
+ public long getWriteBufferPeriodicFlushTimeoutMs() {
+ return this.writeBufferPeriodicFlushTimeoutMs;
+ }
+
+ @Override
+ public long getWriteBufferPeriodicFlushTimerTickMs() {
+ return this.writeBufferPeriodicFlushTimerTickMs;
+ }
+
+ @Override
public void setRpcTimeout(int rpcTimeout) {
this.rpcTimeout.set(rpcTimeout);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a1c36f7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 0648501..3f6c565 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.client;
import java.util.concurrent.ExecutorService;
-
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@@ -34,6 +33,8 @@ public class BufferedMutatorParams implements Cloneable {
private final TableName tableName;
private long writeBufferSize = UNSET;
+ private long writeBufferPeriodicFlushTimeoutMs = UNSET;
+ private long writeBufferPeriodicFlushTimerTickMs = UNSET;
private int maxKeyValueSize = UNSET;
private ExecutorService pool = null;
private String implementationClassName = null;
@@ -88,6 +89,30 @@ public class BufferedMutatorParams implements Cloneable {
return this;
}
+ public long getWriteBufferPeriodicFlushTimeoutMs() {
+ return writeBufferPeriodicFlushTimeoutMs;
+ }
+
+ /**
+ * Set the max timeout before the buffer is automatically flushed.
+ */
+ public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs) {
+ this.writeBufferPeriodicFlushTimeoutMs = timeoutMs;
+ return this;
+ }
+
+ public long getWriteBufferPeriodicFlushTimerTickMs() {
+ return writeBufferPeriodicFlushTimerTickMs;
+ }
+
+ /**
+ * Set the TimerTick how often the buffer timeout if checked.
+ */
+ public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
+ this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
+ return this;
+ }
+
public int getMaxKeyValueSize() {
return maxKeyValueSize;
}
@@ -154,11 +179,13 @@ public class BufferedMutatorParams implements Cloneable {
@Override
public BufferedMutatorParams clone() {
BufferedMutatorParams clone = new BufferedMutatorParams(this.tableName);
- clone.writeBufferSize = this.writeBufferSize;
- clone.maxKeyValueSize = maxKeyValueSize;
- clone.pool = this.pool;
- clone.listener = this.listener;
- clone.implementationClassName = this.implementationClassName;
+ clone.writeBufferSize = this.writeBufferSize;
+ clone.writeBufferPeriodicFlushTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs;
+ clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
+ clone.maxKeyValueSize = this.maxKeyValueSize;
+ clone.pool = this.pool;
+ clone.listener = this.listener;
+ clone.implementationClassName = this.implementationClassName;
return clone;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a1c36f7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index a582405..d996004 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -30,10 +30,18 @@ public class ConnectionConfiguration {
public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
+ public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS =
+ "hbase.client.write.buffer.periodicflush.timeout.ms";
+ public static final String WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS =
+ "hbase.client.write.buffer.periodicflush.timertick.ms";
+ public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT = 0; // 0 == Disabled
+ public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
private final long writeBufferSize;
+ private final long writeBufferPeriodicFlushTimeoutMs;
+ private final long writeBufferPeriodicFlushTimerTickMs;
private final int metaOperationTimeout;
private final int operationTimeout;
private final int scannerCaching;
@@ -56,6 +64,14 @@ public class ConnectionConfiguration {
ConnectionConfiguration(Configuration conf) {
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+ this.writeBufferPeriodicFlushTimeoutMs = conf.getLong(
+ WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
+ WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT);
+
+ this.writeBufferPeriodicFlushTimerTickMs = conf.getLong(
+ WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
+ WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT);
+
this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
@@ -105,6 +121,8 @@ public class ConnectionConfiguration {
@VisibleForTesting
protected ConnectionConfiguration() {
this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT;
+ this.writeBufferPeriodicFlushTimeoutMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
+ this.writeBufferPeriodicFlushTimerTickMs = WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT;
this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
@@ -133,6 +151,14 @@ public class ConnectionConfiguration {
return writeBufferSize;
}
+ public long getWriteBufferPeriodicFlushTimeoutMs() {
+ return writeBufferPeriodicFlushTimeoutMs;
+ }
+
+ public long getWriteBufferPeriodicFlushTimerTickMs() {
+ return writeBufferPeriodicFlushTimerTickMs;
+ }
+
public int getMetaOperationTimeout() {
return metaOperationTimeout;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a1c36f7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 653fbfd..562630f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -363,6 +363,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
params.writeBufferSize(connectionConfig.getWriteBufferSize());
}
+ if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
+ params.setWriteBufferPeriodicFlushTimeoutMs(
+ connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
+ }
+ if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
+ params.setWriteBufferPeriodicFlushTimerTickMs(
+ connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
+ }
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a1c36f7/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index ba6756b..fc59793 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -48,7 +48,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CategoryBasedTimeout;
@@ -1102,6 +1101,119 @@ public class TestAsyncProcess {
}
@Test
+ public void testSettingWriteBufferPeriodicFlushParameters() throws Exception {
+ ClusterConnection conn = createHConnection();
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+
+ checkPeriodicFlushParameters(conn, ap,
+ 1234, 1234,
+ 1234, 1234);
+ checkPeriodicFlushParameters(conn, ap,
+ 0, 0,
+ 0, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+ checkPeriodicFlushParameters(conn, ap,
+ -1234, 0,
+ -1234, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+ checkPeriodicFlushParameters(conn, ap,
+ 1, 1,
+ 1, BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+ }
+
+ private void checkPeriodicFlushParameters(ClusterConnection conn,
+ MyAsyncProcess ap,
+ long setTO, long expectTO,
+ long setTT, long expectTT
+ ) {
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+
+ // The BufferedMutatorParams does nothing with the value
+ bufferParam.setWriteBufferPeriodicFlushTimeoutMs(setTO);
+ bufferParam.setWriteBufferPeriodicFlushTimerTickMs(setTT);
+ Assert.assertEquals(setTO, bufferParam.getWriteBufferPeriodicFlushTimeoutMs());
+ Assert.assertEquals(setTT, bufferParam.getWriteBufferPeriodicFlushTimerTickMs());
+
+ // The BufferedMutatorImpl corrects illegal values (indirect via BufferedMutatorParams)
+ BufferedMutatorImpl ht1 = new BufferedMutatorImpl(conn, bufferParam, ap);
+ Assert.assertEquals(expectTO, ht1.getWriteBufferPeriodicFlushTimeoutMs());
+ Assert.assertEquals(expectTT, ht1.getWriteBufferPeriodicFlushTimerTickMs());
+
+ // The BufferedMutatorImpl corrects illegal values (direct via setter)
+ BufferedMutatorImpl ht2 =
+ new BufferedMutatorImpl(conn, createBufferedMutatorParams(ap, DUMMY_TABLE), ap);
+ ht2.setWriteBufferPeriodicFlush(setTO, setTT);
+ Assert.assertEquals(expectTO, ht2.getWriteBufferPeriodicFlushTimeoutMs());
+ Assert.assertEquals(expectTT, ht2.getWriteBufferPeriodicFlushTimerTickMs());
+
+ }
+
+ @Test
+ public void testWriteBufferPeriodicFlushTimeoutMs() throws Exception {
+ ClusterConnection conn = createHConnection();
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+
+ bufferParam.setWriteBufferPeriodicFlushTimeoutMs(1); // Flush ASAP
+ bufferParam.setWriteBufferPeriodicFlushTimerTickMs(1); // Check every 100ms
+ bufferParam.writeBufferSize(10000); // Write buffer set to much larger than the single record
+
+ BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
+
+ // Verify if BufferedMutator has the right settings.
+ Assert.assertEquals(10000, ht.getWriteBufferSize());
+ Assert.assertEquals(1, ht.getWriteBufferPeriodicFlushTimeoutMs());
+ Assert.assertEquals(BufferedMutator.MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
+ ht.getWriteBufferPeriodicFlushTimerTickMs());
+
+ Put put = createPut(1, true);
+
+ Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+ // ----- Insert, flush immediately, MUST NOT flush automatically
+ ht.mutate(put);
+ ht.flush();
+
+ Thread.sleep(1000);
+ Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+ // ----- Insert, NO flush, MUST flush automatically
+ ht.mutate(put);
+ Assert.assertEquals(0, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+ // The timerTick should fire every 100ms, so after twice that we must have
+ // seen at least 1 tick and we should see an automatic flush
+ Thread.sleep(200);
+ Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+ // Ensure it does not flush twice
+ Thread.sleep(200);
+ Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+
+ // ----- DISABLE AUTO FLUSH, Insert, NO flush, MUST NOT flush automatically
+ ht.disableWriteBufferPeriodicFlush();
+ ht.mutate(put);
+ Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+ // Wait for at least 1 timerTick, we should see NO flushes.
+ Thread.sleep(200);
+ Assert.assertEquals(1, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertTrue(ht.getCurrentWriteBufferSize() > 0);
+
+ // Reenable periodic flushing, a flush seems to take about 1 second
+ // so we wait for 2 seconds and it should have finished the flush.
+ ht.setWriteBufferPeriodicFlush(1, 100);
+ Thread.sleep(2000);
+ Assert.assertEquals(2, ht.getExecutedWriteBufferPeriodicFlushes());
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+ }
+
+
+ @Test
public void testBufferedMutatorImplWithSharedPool() throws Exception {
ClusterConnection conn = createHConnection();
MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a1c36f7/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
index 0646f41..2c2935e 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
@@ -30,7 +29,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -136,13 +134,21 @@ public class TestBufferedMutatorParams {
BufferedMutatorParams bmp = new BufferedMutatorParams(TableName.valueOf(tableName));
BufferedMutator.ExceptionListener listener = new MockExceptionListener();
- bmp.writeBufferSize(17).maxKeyValueSize(13).pool(pool).listener(listener);
+ bmp
+ .writeBufferSize(17)
+ .setWriteBufferPeriodicFlushTimeoutMs(123)
+ .setWriteBufferPeriodicFlushTimerTickMs(456)
+ .maxKeyValueSize(13)
+ .pool(pool)
+ .listener(listener);
bmp.implementationClassName("someClassName");
BufferedMutatorParams clone = bmp.clone();
// Confirm some literals
assertEquals(tableName, clone.getTableName().toString());
assertEquals(17, clone.getWriteBufferSize());
+ assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
+ assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(13, clone.getMaxKeyValueSize());
assertEquals("someClassName", clone.getImplementationClassName());
@@ -168,6 +174,10 @@ public class TestBufferedMutatorParams {
assertEquals(some.getTableName().toString(),
clone.getTableName().toString());
assertEquals(some.getWriteBufferSize(), clone.getWriteBufferSize());
+ assertEquals(some.getWriteBufferPeriodicFlushTimeoutMs(),
+ clone.getWriteBufferPeriodicFlushTimeoutMs());
+ assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
+ clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
assertTrue(some.getListener() == clone.getListener());
assertTrue(some.getPool() == clone.getPool());