You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/07/05 01:43:48 UTC
hbase git commit: HBASE-18297 Provide a AsyncAdminBuilder to create
new AsyncAdmin instance
Repository: hbase
Updated Branches:
refs/heads/master 63607800c -> e71e5ece8
HBASE-18297 Provide a AsyncAdminBuilder to create new AsyncAdmin instance
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e71e5ece
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e71e5ece
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e71e5ece
Branch: refs/heads/master
Commit: e71e5ece88caa8f6633b678dbb35d2d12ebc04ec
Parents: 6360780
Author: Guanghao Zhang <zg...@apache.org>
Authored: Wed Jul 5 09:18:02 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Wed Jul 5 09:18:02 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncAdminBuilder.java | 93 ++++++++
.../hbase/client/AsyncAdminBuilderBase.java | 77 +++++++
.../hadoop/hbase/client/AsyncConnection.java | 43 +++-
.../hbase/client/AsyncConnectionImpl.java | 19 +-
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 12 +-
.../hbase/client/TestAsyncAdminBuilder.java | 214 +++++++++++++++++++
6 files changed, 440 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e71e5ece/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
new file mode 100644
index 0000000..d706949
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * For creating {@link AsyncAdmin}. The implementation should have default configurations set before
+ * returning the builder to user. So users are free to only set the configs they care about to
+ * create a new AsyncAdmin instance.
+ */
+@InterfaceAudience.Public
+public interface AsyncAdminBuilder<T extends AsyncAdmin> {
+
+ /**
+ * Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry
+ * times) are both limitations for retrying, we will stop retrying when we reach any of the
+ * limitations.
+ * @param timeout
+ * @param unit
+ * @return this for invocation chaining
+ */
+ AsyncAdminBuilder<T> setOperationTimeout(long timeout, TimeUnit unit);
+
+ /**
+ * Set timeout for each rpc request.
+ * @param timeout
+ * @param unit
+ * @return this for invocation chaining
+ */
+ AsyncAdminBuilder<T> setRpcTimeout(long timeout, TimeUnit unit);
+
+ /**
+ * Set the base pause time for retrying. We use an exponential policy to generate sleep time when
+ * retrying.
+ * @param timeout
+ * @param unit
+ * @return this for invocation chaining
+ */
+ AsyncAdminBuilder<T> setRetryPause(long timeout, TimeUnit unit);
+
+ /**
+ * Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
+ * we will stop retrying when we reach any of the limitations.
+ * @param maxRetries
+ * @return this for invocation chaining
+ */
+ default AsyncAdminBuilder<T> setMaxRetries(int maxRetries) {
+ return setMaxAttempts(retries2Attempts(maxRetries));
+ }
+
+ /**
+ * Set the max attempt times for an admin operation. Usually it is the max retry times plus 1.
+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
+ * we will stop retrying when we reach any of the limitations.
+ * @param maxAttempts
+ * @return this for invocation chaining
+ */
+ AsyncAdminBuilder<T> setMaxAttempts(int maxAttempts);
+
+ /**
+ * Set the number of retries that are allowed before we start to log.
+ * @param startLogErrorsCnt
+ * @return this for invocation chaining
+ */
+ AsyncAdminBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt);
+
+ /**
+ * Create a {@link AsyncAdmin} instance.
+ * @return a {@link AsyncAdmin} instance
+ */
+ T build();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/e71e5ece/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
new file mode 100644
index 0000000..013e8d7
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Base class for all asynchronous admin builders.
+ */
+@InterfaceAudience.Private
+abstract class AsyncAdminBuilderBase<T extends AsyncAdmin> implements AsyncAdminBuilder<T> {
+
+ protected long rpcTimeoutNs;
+
+ protected long operationTimeoutNs;
+
+ protected long pauseNs;
+
+ protected int maxAttempts;
+
+ protected int startLogErrorsCnt;
+
+ AsyncAdminBuilderBase(AsyncConnectionConfiguration connConf) {
+ this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
+ this.operationTimeoutNs = connConf.getOperationTimeoutNs();
+ this.pauseNs = connConf.getPauseNs();
+ this.maxAttempts = connConf.getMaxRetries();
+ this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
+ }
+
+ @Override
+ public AsyncAdminBuilder<T> setOperationTimeout(long timeout, TimeUnit unit) {
+ this.operationTimeoutNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncAdminBuilder<T> setRpcTimeout(long timeout, TimeUnit unit) {
+ this.rpcTimeoutNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncAdminBuilder<T> setRetryPause(long timeout, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncAdminBuilder<T> setMaxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ @Override
+ public AsyncAdminBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ return this;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/e71e5ece/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 22ed064..04ef78e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -96,17 +96,44 @@ public interface AsyncConnection extends Closeable {
AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
/**
- * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned
- * {@code CompletableFuture} will be finished directly in the rpc framework's callback thread, so
- * typically you should not do any time consuming work inside these methods.
- * @return an AsyncAdmin instance for cluster administration
+ * Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster.
+ * <p>
+ * The returned instance will use default configs. Use {@link #getAdminBuilder()} if you want to
+ * customize some configs.
+ * <p>
+ * The admin operation's returned {@code CompletableFuture} will be finished directly in the rpc
+ * framework's callback thread, so typically you should not do any time consuming work inside
+ * these methods.
+ * @return an {@link AsyncAdmin} instance for cluster administration
+ */
+ default AsyncAdmin getAdmin() {
+ return getAdminBuilder().build();
+ }
+
+ /**
+ * Returns an {@link AsyncAdminBuilder} for creating {@link AsyncAdmin}.
+ * <p>
+ * The admin operation's returned {@code CompletableFuture} will be finished directly in the rpc
+ * framework's callback thread, so typically you should not do any time consuming work inside
+ * these methods.
*/
- AsyncAdmin getAdmin();
+ AsyncAdminBuilder<RawAsyncHBaseAdmin> getAdminBuilder();
+
+ /**
+ * Retrieve an {@link AsyncAdmin} implementation to administer an HBase cluster.
+ * <p>
+ * The returned instance will use default configs. Use {@link #getAdminBuilder(ExecutorService)}
+ * if you want to customize some configs.
+ * @param pool the thread pool to use for executing callback
+ * @return an {@link AsyncAdmin} instance for cluster administration
+ */
+ default AsyncAdmin getAdmin(ExecutorService pool) {
+ return getAdminBuilder(pool).build();
+ }
/**
- * Retrieve an AsyncAdmin implementation to administer an HBase cluster.
+ * Returns an {@link AsyncAdminBuilder} for creating {@link AsyncAdmin}.
* @param pool the thread pool to use for executing callback
- * @return an AsyncAdmin instance for cluster administration
*/
- AsyncAdmin getAdmin(ExecutorService pool);
+ AsyncAdminBuilder<AsyncHBaseAdmin> getAdminBuilder(ExecutorService pool);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e71e5ece/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index c170bce..5dd40cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -278,12 +278,23 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
- public AsyncAdmin getAdmin() {
- return new RawAsyncHBaseAdmin(this);
+ public AsyncAdminBuilder<RawAsyncHBaseAdmin> getAdminBuilder() {
+ return new AsyncAdminBuilderBase<RawAsyncHBaseAdmin>(connConf) {
+ @Override
+ public RawAsyncHBaseAdmin build() {
+ return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, this);
+ }
+ };
}
@Override
- public AsyncAdmin getAdmin(ExecutorService pool) {
- return new AsyncHBaseAdmin(new RawAsyncHBaseAdmin(this), pool);
+ public AsyncAdminBuilder<AsyncHBaseAdmin> getAdminBuilder(ExecutorService pool) {
+ return new AsyncAdminBuilderBase<AsyncHBaseAdmin>(connConf) {
+ @Override
+ public AsyncHBaseAdmin build() {
+ RawAsyncHBaseAdmin rawAdmin = new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, this);
+ return new AsyncHBaseAdmin(rawAdmin, pool);
+ }
+ };
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/e71e5ece/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index fcfdf93..179fd7d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -212,14 +212,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
private final NonceGenerator ng;
- RawAsyncHBaseAdmin(AsyncConnectionImpl connection) {
+ RawAsyncHBaseAdmin(AsyncConnectionImpl connection, AsyncAdminBuilderBase<?> builder) {
this.connection = connection;
this.metaTable = connection.getRawTable(META_TABLE_NAME);
- this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
- this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
- this.pauseNs = connection.connConf.getPauseNs();
- this.maxAttempts = connection.connConf.getMaxRetries();
- this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
+ this.rpcTimeoutNs = builder.rpcTimeoutNs;
+ this.operationTimeoutNs = builder.operationTimeoutNs;
+ this.pauseNs = builder.pauseNs;
+ this.maxAttempts = builder.maxAttempts;
+ this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.ng = connection.getNonceGenerator();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e71e5ece/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
new file mode 100644
index 0000000..ea25ee4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminBuilder.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.apache.hadoop.hbase.NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncAdminBuilder {
+
+ private static final Log LOG = LogFactory.getLog(TestAsyncAdminBuilder.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static AsyncConnection ASYNC_CONN;
+
+ @Parameter
+ public Supplier<AsyncAdminBuilder<?>> getAdminBuilder;
+
+ private static AsyncAdminBuilder<RawAsyncHBaseAdmin> getRawAsyncAdminBuilder() {
+ return ASYNC_CONN.getAdminBuilder();
+ }
+
+ private static AsyncAdminBuilder<AsyncHBaseAdmin> getAsyncAdminBuilder() {
+ return ASYNC_CONN.getAdminBuilder(ForkJoinPool.commonPool());
+ }
+
+ @Parameters
+ public static List<Object[]> params() {
+ return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBuilder::getRawAsyncAdminBuilder },
+ new Supplier<?>[] { TestAsyncAdminBuilder::getAsyncAdminBuilder });
+ }
+
+ private static final int DEFAULT_RPC_TIMEOUT = 10000;
+ private static final int DEFAULT_OPERATION_TIMEOUT = 30000;
+ private static final int DEFAULT_RETRIES_NUMBER = 2;
+
+ @Before
+ public void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, DEFAULT_RPC_TIMEOUT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ DEFAULT_OPERATION_TIMEOUT);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ DEFAULT_RETRIES_NUMBER);
+ TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ IOUtils.closeQuietly(ASYNC_CONN);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testRpcTimeout() throws Exception {
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ TestRpcTimeoutCoprocessor.class.getName());
+ TEST_UTIL.startMiniCluster(2);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+
+ try {
+ getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
+ .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
+ fail("We expect an exception here");
+ } catch (Exception e) {
+ // expected
+ }
+
+ try {
+ getAdminBuilder.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
+ .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
+ } catch (Exception e) {
+ fail("The Operation should succeed, unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOperationTimeout() throws Exception {
+ // set retry number to 100 to make sure that this test only be affected by operation timeout
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100);
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ TestOperationTimeoutCoprocessor.class.getName());
+ TEST_UTIL.startMiniCluster(2);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+
+ try {
+ getAdminBuilder.get()
+ .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT / 2, TimeUnit.MILLISECONDS).build()
+ .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
+ fail("We expect an exception here");
+ } catch (Exception e) {
+ // expected
+ }
+
+ try {
+ getAdminBuilder.get()
+ .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT * 2, TimeUnit.MILLISECONDS).build()
+ .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
+ } catch (Exception e) {
+ fail("The Operation should succeed, unexpected exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMaxRetries() throws Exception {
+ // set operation timeout to 300s to make sure that this test only be affected by retry number
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 300000);
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ TestMaxRetriesCoprocessor.class.getName());
+ TEST_UTIL.startMiniCluster(2);
+ ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+
+ try {
+ getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER / 2).build()
+ .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
+ fail("We expect an exception here");
+ } catch (Exception e) {
+ // expected
+ }
+
+ try {
+ getAdminBuilder.get().setMaxRetries(DEFAULT_RETRIES_NUMBER * 2).build()
+ .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR).get();
+ } catch (Exception e) {
+ fail("The Operation should succeed, unexpected exception: " + e.getMessage());
+ }
+ }
+
+ public static class TestRpcTimeoutCoprocessor implements MasterObserver {
+ public TestRpcTimeoutCoprocessor() {
+ }
+
+ @Override
+ public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String namespace) throws IOException {
+ Threads.sleep(DEFAULT_RPC_TIMEOUT);
+ }
+ }
+
+ public static class TestOperationTimeoutCoprocessor implements MasterObserver {
+ AtomicLong sleepTime = new AtomicLong(0);
+
+ public TestOperationTimeoutCoprocessor() {
+ }
+
+ @Override
+ public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String namespace) throws IOException {
+ Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
+ if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) {
+ throw new IOException("call fail");
+ }
+ }
+ }
+
+ public static class TestMaxRetriesCoprocessor implements MasterObserver {
+ AtomicLong retryNum = new AtomicLong(0);
+
+ public TestMaxRetriesCoprocessor() {
+ }
+
+ @Override
+ public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ String namespace) throws IOException {
+ if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {
+ throw new IOException("call fail");
+ }
+ }
+ }
+}
\ No newline at end of file