You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/12/14 06:39:05 UTC
hbase git commit: HBASE-17277 Allow alternate BufferedMutator
implemenation Specify the name of an alternate BufferedMutator implementation
by either:
Repository: hbase
Updated Branches:
refs/heads/master a9310436d -> 68ce3f1e3
HBASE-17277 Allow alternate BufferedMutator implemenation Specify the name of an alternate BufferedMutator implementation by either:
+ Setting "hbase.client.bufferedmutator.classname" to the name of the
alternate implementation class
+ Or, by setting implementationClassName on BufferedMutatorParams and
passing the amended BufferedMutatorParams when calling Connection#getBufferedMutator.
Add a test to exercise both means.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/68ce3f1e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/68ce3f1e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/68ce3f1e
Branch: refs/heads/master
Commit: 68ce3f1e3b22e8a8c6b17072738be6a08489dfca
Parents: a931043
Author: Michael Stack <st...@apache.org>
Authored: Wed Dec 7 12:33:23 2016 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Dec 13 22:38:58 2016 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/BufferedMutator.java | 5 +
.../hbase/client/BufferedMutatorImpl.java | 12 ++-
.../hbase/client/BufferedMutatorParams.java | 21 +++-
.../hbase/client/ConnectionImplementation.java | 27 ++++-
.../hbase/client/TestBufferedMutator.java | 100 +++++++++++++++++++
5 files changed, 161 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index fcc9af7..cea9304 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -64,6 +64,11 @@ import java.util.List;
@InterfaceStability.Evolving
public interface BufferedMutator extends Closeable {
/**
+ * Key to use setting non-default BufferedMutator implementation in Configuration.
+ */
+ public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
+
+ /**
* Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
*/
TableName getName();
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 54955a0..0085767 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -41,11 +41,13 @@ import java.util.concurrent.atomic.AtomicLong;
* <p>
* Used to communicate with a single HBase table similar to {@link Table}
* but meant for batched, potentially asynchronous puts. Obtain an instance from
- * a {@link Connection} and call {@link #close()} afterwards.
+ * a {@link Connection} and call {@link #close()} afterwards. Provide an alternate
+ * to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)}
+ * or by setting alternate classname via the key {} in Configuration.
* </p>
*
* <p>
- * While this can be used accross threads, great care should be used when doing so.
+ * While this can be used across threads, great care should be used when doing so.
* Errors are global to the buffered mutator and the Exceptions can be thrown on any
* thread that causes the flush for requests.
* </p>
@@ -57,6 +59,12 @@ import java.util.concurrent.atomic.AtomicLong;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BufferedMutatorImpl implements BufferedMutator {
+ /**
+ * Key to use setting non-default BufferedMutator implementation
+ * classname via Configuration.
+ */
+ public static final String HBASE_BUFFEREDMUTATOR_CLASSNAME_KEY =
+ "hbase.client.bufferedmutator.classname";
private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index d4cdead..aacb5f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -38,6 +38,8 @@ public class BufferedMutatorParams {
private long writeBufferSize = UNSET;
private int maxKeyValueSize = UNSET;
private ExecutorService pool = null;
+ private String implementationClassName = null;
+
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
@@ -96,6 +98,23 @@ public class BufferedMutatorParams {
return this;
}
+ /**
+ * @return Name of the class we will use when we construct a
+ * {@link BufferedMutator} instance or null if default implementation.
+ */
+ public String getImplementationClassName() {
+ return this.implementationClassName;
+ }
+
+ /**
+ * Specify a BufferedMutator implementation other than the default.
+ * @param implementationClassName Name of the BufferedMutator implementation class
+ */
+ public BufferedMutatorParams implementationClassName(String implementationClassName) {
+ this.implementationClassName = implementationClassName;
+ return this;
+ }
+
public BufferedMutator.ExceptionListener getListener() {
return listener;
}
@@ -107,4 +126,4 @@ public class BufferedMutatorParams {
this.listener = listener;
return this;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index e75d9a5..0c512be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -185,6 +186,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private final ClientBackoffPolicy backoffPolicy;
/**
+ * Allow setting an alternate BufferedMutator implementation via
+ * config. If null, use default.
+ */
+ private final String alternateBufferedMutatorClassName;
+
+ /**
* constructor
* @param conf Configuration object
*/
@@ -244,6 +251,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
+ // Is there an alternate BufferedMutator to use?
+ this.alternateBufferedMutatorClassName =
+ this.conf.get(BufferedMutator.CLASSNAME_KEY);
+
try {
this.registry = setupRegistry();
retrieveClusterId();
@@ -315,7 +326,21 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
- return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+ // Look to see if an alternate BufferedMutation implementation is wanted.
+ // Look in params and in config. If null, use default.
+ String implementationClassName = params.getImplementationClassName();
+ if (implementationClassName == null) {
+ implementationClassName = this.alternateBufferedMutatorClassName;
+ }
+ if (implementationClassName == null) {
+ return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+ }
+ try {
+ return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
+ this, rpcCallerFactory, rpcControllerFactory, params);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
new file mode 100644
index 0000000..84eb948
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class, ClientTests.class})
+public class TestBufferedMutator {
+
+ /**
+ * Registry that does nothing.
+ * Otherwise, default Registry wants zookeeper up and running.
+ */
+ public static class DoNothingRegistry implements Registry {
+ @Override
+ public void init(Connection connection) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public RegionLocations getMetaRegionLocation() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getClusterId() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getCurrentNrHRS() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+ }
+
+ /**
+ * My BufferedMutator.
+ * Just to prove that I can insert a BM other than default.
+ */
+ public static class MyBufferedMutator extends BufferedMutatorImpl {
+ MyBufferedMutator(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
+ RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+ super(conn, rpcCallerFactory, rpcFactory, params);
+ }
+ }
+
+ @Test
+ public void testAlternateBufferedMutatorImpl() throws IOException {
+ BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t"));
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(RegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingRegistry.class.getName());
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ BufferedMutator bm = connection.getBufferedMutator(params);
+ // Assert we get default BM if nothing specified.
+ assertTrue(bm instanceof BufferedMutatorImpl);
+ // Now try and set my own BM implementation.
+ params.implementationClassName(MyBufferedMutator.class.getName());
+ bm = connection.getBufferedMutator(params);
+ assertTrue(bm instanceof MyBufferedMutator);
+ }
+ // Now try creating a Connection after setting an alterate BufferedMutator into
+ // the configuration and confirm we get what was expected.
+ conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName());
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ BufferedMutator bm = connection.getBufferedMutator(params);
+ assertTrue(bm instanceof MyBufferedMutator);
+ }
+ }
+}