You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/13 21:58:12 UTC
[03/11] hbase git commit: HBASE-21570 Add write buffer periodic flush
support for AsyncBufferedMutator
HBASE-21570 Add write buffer periodic flush support for AsyncBufferedMutator
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b09b87d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b09b87d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b09b87d1
Branch: refs/heads/HBASE-20952
Commit: b09b87d143730db00ec56114a752d3a74f8982c4
Parents: da9508d
Author: zhangduo <zh...@apache.org>
Authored: Tue Dec 11 08:39:43 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Tue Dec 11 14:51:26 2018 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncBufferedMutator.java | 16 +-
.../client/AsyncBufferedMutatorBuilder.java | 19 +++
.../client/AsyncBufferedMutatorBuilderImpl.java | 19 ++-
.../hbase/client/AsyncBufferedMutatorImpl.java | 67 +++++---
.../client/AsyncConnectionConfiguration.java | 37 +++--
.../hbase/client/AsyncConnectionImpl.java | 11 +-
.../hbase/client/TestAsyncBufferMutator.java | 161 ++++++++++++++++++-
7 files changed, 277 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
index 6fe4b9a..7b21eb5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java
@@ -18,13 +18,16 @@
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
/**
* Used to communicate with a single HBase table in batches. Obtain an instance from a
* {@link AsyncConnection} and call {@link #close()} afterwards.
@@ -52,7 +55,9 @@ public interface AsyncBufferedMutator extends Closeable {
* part of a batch. Currently only supports {@link Put} and {@link Delete} mutations.
* @param mutation The data to send.
*/
- CompletableFuture<Void> mutate(Mutation mutation);
+ default CompletableFuture<Void> mutate(Mutation mutation) {
+ return Iterables.getOnlyElement(mutate(Collections.singletonList(mutation)));
+ }
/**
* Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire
@@ -81,4 +86,11 @@ public interface AsyncBufferedMutator extends Closeable {
* @return The size of the write buffer in bytes.
*/
long getWriteBufferSize();
+
+ /**
+ * Returns the periodical flush interval, 0 means disabled.
+ */
+ default long getPeriodicalFlushTimeout(TimeUnit unit) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
index 45959bb..c617c8e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java
@@ -46,6 +46,25 @@ public interface AsyncBufferedMutatorBuilder {
AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit);
/**
+ * Set the periodical flush interval. If the data in the buffer has not been flush for a long
+ * time, i.e, reach this timeout limit, we will flush it automatically.
+ * <p/>
+ * Notice that, set the timeout to 0 or a negative value means disable periodical flush, not
+ * 'flush immediately'. If you want to flush immediately then you should not use this class, as it
+ * is designed to be 'buffered'.
+ */
+ default AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
+ * Disable the periodical flush, i.e, set the timeout to 0.
+ */
+ default AsyncBufferedMutatorBuilder disableWriteBufferPeriodicFlush() {
+ return setWriteBufferPeriodicFlush(0, TimeUnit.NANOSECONDS);
+ }
+
+ /**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
* <p>
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
index 227d02b..eb8af17 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
/**
* The implementation of {@link AsyncBufferedMutatorBuilder}.
@@ -28,14 +29,20 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@InterfaceAudience.Private
class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
+ private final HashedWheelTimer periodicalFlushTimer;
+
private final AsyncTableBuilder<?> tableBuilder;
private long writeBufferSize;
+ private long periodicFlushTimeoutNs;
+
public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
- AsyncTableBuilder<?> tableBuilder) {
+ AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
+ this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
+ this.periodicalFlushTimer = periodicalFlushTimer;
}
@Override
@@ -77,8 +84,14 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {
}
@Override
- public AsyncBufferedMutator build() {
- return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize);
+ public AsyncBufferedMutatorBuilder setWriteBufferPeriodicFlush(long timeout, TimeUnit unit) {
+ this.periodicFlushTimeoutNs = unit.toNanos(timeout);
+ return this;
}
+ @Override
+ public AsyncBufferedMutator build() {
+ return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
+ periodicFlushTimeoutNs);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
index 5a92ace..318c6c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -29,16 +30,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
/**
* The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTable}.
*/
@InterfaceAudience.Private
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
+ private final HashedWheelTimer periodicalFlushTimer;
+
private final AsyncTable<?> table;
private final long writeBufferSize;
+ private final long periodicFlushTimeoutNs;
+
private List<Mutation> mutations = new ArrayList<>();
private List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -47,9 +56,15 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
private boolean closed;
- AsyncBufferedMutatorImpl(AsyncTable<?> table, long writeBufferSize) {
+ @VisibleForTesting
+ Timeout periodicFlushTask;
+
+ AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+ long writeBufferSize, long periodicFlushTimeoutNs) {
+ this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
+ this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
}
@Override
@@ -62,7 +77,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
return table.getConfiguration();
}
- private void internalFlush() {
+ // will be overridden in test
+ @VisibleForTesting
+ protected void internalFlush() {
+ if (periodicFlushTask != null) {
+ periodicFlushTask.cancel();
+ periodicFlushTask = null;
+ }
List<Mutation> toSend = this.mutations;
if (toSend.isEmpty()) {
return;
@@ -86,29 +107,10 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
}
@Override
- public CompletableFuture<Void> mutate(Mutation mutation) {
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- long heapSize = mutation.heapSize();
- synchronized (this) {
- if (closed) {
- future.completeExceptionally(new IOException("Already closed"));
- return future;
- }
- mutations.add(mutation);
- futures.add(future);
- bufferedSize += heapSize;
- if (bufferedSize >= writeBufferSize) {
- internalFlush();
- }
- }
- return future;
- }
-
- @Override
public List<CompletableFuture<Void>> mutate(List<? extends Mutation> mutations) {
List<CompletableFuture<Void>> futures =
- Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
- .collect(Collectors.toList());
+ Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutations.size())
+ .collect(Collectors.toList());
long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum();
synchronized (this) {
if (closed) {
@@ -116,6 +118,20 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
futures.forEach(f -> f.completeExceptionally(ioe));
return futures;
}
+ if (this.mutations.isEmpty() && periodicFlushTimeoutNs > 0) {
+ periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
+ synchronized (AsyncBufferedMutatorImpl.this) {
+ // confirm that we are still valid, if there is already an internalFlush call before us,
+ // then we should not execute any more. And in internalFlush we will set periodicFlush
+ // to null, and since we may schedule a new one, so here we check whether the references
+ // are equal.
+ if (timeout == periodicFlushTask) {
+ periodicFlushTask = null;
+ internalFlush();
+ }
+ }
+ }, periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+ }
this.mutations.addAll(mutations);
this.futures.addAll(futures);
bufferedSize += heapSize;
@@ -141,4 +157,9 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
public long getWriteBufferSize() {
return writeBufferSize;
}
+
+ @Override
+ public long getPeriodicalFlushTimeout(TimeUnit unit) {
+ return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index bd2add8..915e9dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -39,11 +39,12 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
@@ -91,32 +92,38 @@ class AsyncConnectionConfiguration {
private final long writeBufferSize;
+ private final long writeBufferPeriodicFlushTimeoutNs;
+
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
- this.rpcTimeoutNs = TimeUnit.MILLISECONDS
- .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
+ this.rpcTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
this.readRpcTimeoutNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
this.writeRpcTimeoutNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
this.pauseNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt =
- conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+ conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.scanTimeoutNs = TimeUnit.MILLISECONDS
- .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
- HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
+ .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
this.scannerCaching =
- conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
- this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
+ conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ this.metaScannerCaching =
+ conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
- this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+ this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
+ this.writeBufferPeriodicFlushTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS,
+ WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@@ -159,7 +166,7 @@ class AsyncConnectionConfiguration {
return scannerCaching;
}
- int getMetaScannerCaching(){
+ int getMetaScannerCaching() {
return metaScannerCaching;
}
@@ -170,4 +177,8 @@ class AsyncConnectionConfiguration {
long getWriteBufferSize() {
return writeBufferSize;
}
+
+ long getWriteBufferPeriodicFlushTimeoutNs() {
+ return writeBufferPeriodicFlushTimeoutNs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index a05764e..078395b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -68,7 +68,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
- Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
+ Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
@@ -193,7 +193,7 @@ class AsyncConnectionImpl implements AsyncConnection {
String msg = "ZooKeeper available but no active master location found";
LOG.info(msg);
this.masterStubMakeFuture.getAndSet(null)
- .completeExceptionally(new MasterNotRunningException(msg));
+ .completeExceptionally(new MasterNotRunningException(msg));
return;
}
try {
@@ -216,7 +216,7 @@ class AsyncConnectionImpl implements AsyncConnection {
});
} catch (IOException e) {
this.masterStubMakeFuture.getAndSet(null)
- .completeExceptionally(new IOException("Failed to create async master stub", e));
+ .completeExceptionally(new IOException("Failed to create async master stub", e));
}
});
}
@@ -317,12 +317,13 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
- return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName));
+ return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
}
@Override
public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
ExecutorService pool) {
- return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool));
+ return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
+ RETRY_TIMER);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b09b87d1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
index 9fe4ca7..6eed326 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -31,6 +33,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -45,12 +48,15 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncBufferMutator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
+ HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -96,10 +102,10 @@ public class TestAsyncBufferMutator {
private void test(TableName tableName) throws InterruptedException {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator =
- CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
+ CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
- .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
- .collect(Collectors.toList()));
+ .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+ .collect(Collectors.toList()));
// exceeded the write buffer size, a flush will be called directly
fs.forEach(f -> f.join());
IntStream.range(COUNT / 2, COUNT).forEach(i -> {
@@ -115,9 +121,9 @@ public class TestAsyncBufferMutator {
futures.forEach(f -> f.join());
AsyncTable<?> table = CONN.getTable(tableName);
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
- .forEach(r -> {
- assertArrayEquals(VALUE, r.getValue(CF, CQ));
- });
+ .forEach(r -> {
+ assertArrayEquals(VALUE, r.getValue(CF, CQ));
+ });
}
@Test
@@ -142,4 +148,145 @@ public class TestAsyncBufferMutator {
}
}
}
+
+ @Test
+ public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
+ try (AsyncBufferedMutator mutator =
+ CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ CompletableFuture<?> future = mutator.mutate(put);
+ Thread.sleep(2000);
+ // assert that we have not flushed it out
+ assertFalse(future.isDone());
+ mutator.flush();
+ future.get();
+ }
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+
+ @Test
+ public void testPeriodicFlush() throws InterruptedException, ExecutionException {
+ AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
+ .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ CompletableFuture<?> future = mutator.mutate(put);
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+
+ // a bit deep into the implementation
+ @Test
+ public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
+ .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
+ .setWriteBufferSize(10 * put.heapSize()).build()) {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ futures.add(mutator.mutate(put));
+ Timeout task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ for (int i = 1;; i++) {
+ futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
+ if (mutator.periodicFlushTask == null) {
+ break;
+ }
+ }
+ assertTrue(task.isCancelled());
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ for (int i = 0; i < futures.size(); i++) {
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
+ }
+ }
+ }
+
+ @Test
+ public void testCancelPeriodicFlushByManuallyFlush()
+ throws InterruptedException, ExecutionException {
+ try (AsyncBufferedMutatorImpl mutator =
+ (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+ .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+ CompletableFuture<?> future =
+ mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+ Timeout task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ mutator.flush();
+ assertTrue(task.isCancelled());
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+ }
+
+ @Test
+ public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
+ CompletableFuture<?> future;
+ Timeout task;
+ try (AsyncBufferedMutatorImpl mutator =
+ (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
+ .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
+ future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
+ task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ }
+ assertTrue(task.isCancelled());
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ }
+
+ private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
+
+ private int flushCount;
+
+ AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
+ long writeBufferSize, long periodicFlushTimeoutNs) {
+ super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs);
+ }
+
+ @Override
+ protected void internalFlush() {
+ flushCount++;
+ super.internalFlush();
+ }
+ }
+
+ @Test
+ public void testRaceBetweenNormalFlushAndPeriodicFlush()
+ throws InterruptedException, ExecutionException {
+ Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
+ try (AsyncBufferMutatorForTest mutator =
+ new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
+ 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) {
+ CompletableFuture<?> future = mutator.mutate(put);
+ Timeout task = mutator.periodicFlushTask;
+ // we should have scheduled a periodic flush task
+ assertNotNull(task);
+ synchronized (mutator) {
+ // synchronized on mutator to prevent periodic flush to be executed
+ Thread.sleep(500);
+ // the timeout should be issued
+ assertTrue(task.isExpired());
+ // but no flush is issued as we hold the lock
+ assertEquals(0, mutator.flushCount);
+ assertFalse(future.isDone());
+ // manually flush, then release the lock
+ mutator.flush();
+ }
+ // this is a bit deep into the implementation in netty but anyway let's add a check here to
+ // confirm that an issued timeout can not be canceled by netty framework.
+ assertFalse(task.isCancelled());
+ // and the mutation is done
+ future.get();
+ AsyncTable<?> table = CONN.getTable(TABLE_NAME);
+ assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
+ // only the manual flush, the periodic flush should have been canceled by us
+ assertEquals(1, mutator.flushCount);
+ }
+ }
}