You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/12/14 06:39:05 UTC

hbase git commit: HBASE-17277 Allow alternate BufferedMutator implemenation Specify the name of an alternate BufferedMutator implementation by either:

Repository: hbase
Updated Branches:
  refs/heads/master a9310436d -> 68ce3f1e3


HBASE-17277 Allow alternate BufferedMutator implemenation Specify the name of an alternate BufferedMutator implementation by either:

+ Setting "hbase.client.bufferedmutator.classname" to the name of the
alternate implementation class
+ Or, by setting implementationClassName on BufferedMutatorParams and
passing the amended BufferedMutatorParams when calling Connection#getBufferedMutator.

Add a test to exercise both means.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/68ce3f1e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/68ce3f1e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/68ce3f1e

Branch: refs/heads/master
Commit: 68ce3f1e3b22e8a8c6b17072738be6a08489dfca
Parents: a931043
Author: Michael Stack <st...@apache.org>
Authored: Wed Dec 7 12:33:23 2016 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Dec 13 22:38:58 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/BufferedMutator.java    |   5 +
 .../hbase/client/BufferedMutatorImpl.java       |  12 ++-
 .../hbase/client/BufferedMutatorParams.java     |  21 +++-
 .../hbase/client/ConnectionImplementation.java  |  27 ++++-
 .../hbase/client/TestBufferedMutator.java       | 100 +++++++++++++++++++
 5 files changed, 161 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index fcc9af7..cea9304 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -64,6 +64,11 @@ import java.util.List;
 @InterfaceStability.Evolving
 public interface BufferedMutator extends Closeable {
   /**
+   * Key to use setting non-default BufferedMutator implementation in Configuration.
+   */
+  public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
+
+  /**
    * Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
    */
   TableName getName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 54955a0..0085767 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -41,11 +41,13 @@ import java.util.concurrent.atomic.AtomicLong;
  * <p>
  * Used to communicate with a single HBase table similar to {@link Table}
  * but meant for batched, potentially asynchronous puts. Obtain an instance from
- * a {@link Connection} and call {@link #close()} afterwards.
+ * a {@link Connection} and call {@link #close()} afterwards. Provide an alternate
+ * to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)}
+ * or by setting alternate classname via the key {} in Configuration.
  * </p>
  *
  * <p>
- * While this can be used accross threads, great care should be used when doing so.
+ * While this can be used across threads, great care should be used when doing so.
  * Errors are global to the buffered mutator and the Exceptions can be thrown on any
  * thread that causes the flush for requests.
  * </p>
@@ -57,6 +59,12 @@ import java.util.concurrent.atomic.AtomicLong;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class BufferedMutatorImpl implements BufferedMutator {
+  /**
+   * Key to use setting non-default BufferedMutator implementation
+   * classname via Configuration.
+   */
+  public static final String HBASE_BUFFEREDMUTATOR_CLASSNAME_KEY =
+      "hbase.client.bufferedmutator.classname";
 
   private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
   

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index d4cdead..aacb5f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -38,6 +38,8 @@ public class BufferedMutatorParams {
   private long writeBufferSize = UNSET;
   private int maxKeyValueSize = UNSET;
   private ExecutorService pool = null;
+  private String implementationClassName = null;
+
   private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
     @Override
     public void onException(RetriesExhaustedWithDetailsException exception,
@@ -96,6 +98,23 @@ public class BufferedMutatorParams {
     return this;
   }
 
+  /**
+   * @return Name of the class we will use when we construct a
+   * {@link BufferedMutator} instance or null if default implementation.
+   */
+  public String getImplementationClassName() {
+    return this.implementationClassName;
+  }
+
+  /**
+   * Specify a BufferedMutator implementation other than the default.
+   * @param implementationClassName Name of the BufferedMutator implementation class
+   */
+  public BufferedMutatorParams implementationClassName(String implementationClassName) {
+    this.implementationClassName = implementationClassName;
+    return this;
+  }
+
   public BufferedMutator.ExceptionListener getListener() {
     return listener;
   }
@@ -107,4 +126,4 @@ public class BufferedMutatorParams {
     this.listener = listener;
     return this;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index e75d9a5..0c512be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -185,6 +186,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
   private final ClientBackoffPolicy backoffPolicy;
 
   /**
+   * Allow setting an alternate BufferedMutator implementation via
+   * config. If null, use default.
+   */
+  private final String alternateBufferedMutatorClassName;
+
+  /**
    * constructor
    * @param conf Configuration object
    */
@@ -244,6 +251,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
             ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
             ClusterStatusListener.Listener.class);
 
+    // Is there an alternate BufferedMutator to use?
+    this.alternateBufferedMutatorClassName =
+        this.conf.get(BufferedMutator.CLASSNAME_KEY);
+
     try {
       this.registry = setupRegistry();
       retrieveClusterId();
@@ -315,7 +326,21 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
       params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
     }
-    return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+    // Look to see if an alternate BufferedMutation implementation is wanted.
+    // Look in params and in config. If null, use default.
+    String implementationClassName = params.getImplementationClassName();
+    if (implementationClassName == null) {
+      implementationClassName = this.alternateBufferedMutatorClassName;
+    }
+    if (implementationClassName == null) {
+      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+    }
+    try {
+      return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
+          this, rpcCallerFactory, rpcControllerFactory, params);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ce3f1e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
new file mode 100644
index 0000000..84eb948
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class, ClientTests.class})
+public class TestBufferedMutator {
+
+  /**
+   * Registry that does nothing.
+   * Otherwise, default Registry wants zookeeper up and running.
+   */
+  public static class DoNothingRegistry implements Registry {
+    @Override
+    public void init(Connection connection) {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public RegionLocations getMetaRegionLocation() throws IOException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public String getClusterId() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public int getCurrentNrHRS() throws IOException {
+      // TODO Auto-generated method stub
+      return 0;
+    }
+  }
+
+  /**
+   * My BufferedMutator.
+   * Just to prove that I can insert a BM other than default.
+   */
+  public static class MyBufferedMutator extends BufferedMutatorImpl {
+    MyBufferedMutator(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
+        RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+      super(conn, rpcCallerFactory, rpcFactory, params);
+    }
+  }
+
+  @Test
+  public void testAlternateBufferedMutatorImpl() throws IOException {
+    BufferedMutatorParams params =  new BufferedMutatorParams(TableName.valueOf("t"));
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(RegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingRegistry.class.getName());
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
+      BufferedMutator bm = connection.getBufferedMutator(params);
+      // Assert we get default BM if nothing specified.
+      assertTrue(bm instanceof BufferedMutatorImpl);
+      // Now try and set my own BM implementation.
+      params.implementationClassName(MyBufferedMutator.class.getName());
+      bm = connection.getBufferedMutator(params);
+      assertTrue(bm instanceof MyBufferedMutator);
+    }
+    // Now try creating a Connection after setting an alterate BufferedMutator into
+    // the configuration and confirm we get what was expected.
+    conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName());
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
+      BufferedMutator bm = connection.getBufferedMutator(params);
+      assertTrue(bm instanceof MyBufferedMutator);
+    }
+  }
+}