You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/22 02:50:43 UTC

[hbase] 15/26: HBASE-21725 Implement BufferedMutator Based on AsyncBufferedMutator

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit e1edfd063672399b5178538451435cc8706144e5
Author: zhangduo <zh...@apache.org>
AuthorDate: Sun Apr 14 20:32:38 2019 +0800

    HBASE-21725 Implement BufferedMutator Based on AsyncBufferedMutator
---
 .../hadoop/hbase/client/BufferedMutator.java       |  10 ++
 .../BufferedMutatorOverAsyncBufferedMutator.java   | 175 +++++++++++++++++++++
 .../hadoop/hbase/client/BufferedMutatorParams.java |  23 ++-
 .../client/ConnectionOverAsyncConnection.java      |  19 ++-
 .../hadoop/hbase/client/TestBufferedMutator.java   |  82 ----------
 .../hadoop/hbase/client/TestBufferedMutator.java   |  90 +++++++++++
 6 files changed, 309 insertions(+), 90 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 7805f77..8ad6a79 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -62,7 +62,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface BufferedMutator extends Closeable {
   /**
    * Key to use setting non-default BufferedMutator implementation in Configuration.
+   * <p/>
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
+   *             any more.
    */
+  @Deprecated
   String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
 
   /**
@@ -179,12 +183,18 @@ public interface BufferedMutator extends Closeable {
 
   /**
    * Set rpc timeout for this mutator instance
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Please set this through the
+   *             {@link BufferedMutatorParams}.
    */
+  @Deprecated
   void setRpcTimeout(int timeout);
 
   /**
    * Set operation timeout for this mutator instance
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Please set this through the
+   *             {@link BufferedMutatorParams}.
    */
+  @Deprecated
   void setOperationTimeout(int timeout);
 
   /**
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
new file mode 100644
index 0000000..a7d4595
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}.
+ */
+@InterfaceAudience.Private
+class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator {
+
+  private final AsyncBufferedMutator mutator;
+
+  private final ExceptionListener listener;
+
+  private List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+  private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors =
+    new ConcurrentLinkedQueue<>();
+
+  private final static int BUFFERED_FUTURES_THRESHOLD = 1024;
+
+  BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator,
+      ExceptionListener listener) {
+    this.mutator = mutator;
+    this.listener = listener;
+  }
+
+  @Override
+  public TableName getName() {
+    return mutator.getName();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return mutator.getConfiguration();
+  }
+
+  @Override
+  public void mutate(Mutation mutation) throws IOException {
+    mutate(Collections.singletonList(mutation));
+  }
+
+  private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed");
+
+  // not always work, so may return an empty string
+  private String getHostnameAndPort(Throwable error) {
+    Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage());
+    if (matcher.matches()) {
+      return matcher.group(1);
+    } else {
+      return "";
+    }
+  }
+
+  private RetriesExhaustedWithDetailsException makeError() {
+    List<Row> rows = new ArrayList<>();
+    List<Throwable> throwables = new ArrayList<>();
+    List<String> hostnameAndPorts = new ArrayList<>();
+    for (;;) {
+      Pair<Mutation, Throwable> pair = errors.poll();
+      if (pair == null) {
+        break;
+      }
+      rows.add(pair.getFirst());
+      throwables.add(pair.getSecond());
+      hostnameAndPorts.add(getHostnameAndPort(pair.getSecond()));
+    }
+    return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts);
+  }
+
+  @Override
+  public void mutate(List<? extends Mutation> mutations) throws IOException {
+    List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
+    List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
+    for (int i = 0, n = fs.size(); i < n; i++) {
+      CompletableFuture<Void> toComplete = new CompletableFuture<>();
+      final int index = i;
+      addListener(fs.get(index), (r, e) -> {
+        if (e != null) {
+          errors.add(Pair.newPair(mutations.get(index), e));
+          toComplete.completeExceptionally(e);
+        } else {
+          toComplete.complete(r);
+        }
+      });
+      toBuffered.add(toComplete);
+    }
+    synchronized (this) {
+      futures.addAll(toBuffered);
+      if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
+        tryCompleteFuture();
+      }
+      if (!errors.isEmpty()) {
+        RetriesExhaustedWithDetailsException error = makeError();
+        listener.onException(error, this);
+      }
+    }
+  }
+
+  private void tryCompleteFuture() {
+    futures = futures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    mutator.close();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    mutator.flush();
+    synchronized (this) {
+      List<CompletableFuture<Void>> toComplete = this.futures;
+      this.futures = new ArrayList<>();
+      try {
+        CompletableFuture.allOf(toComplete.toArray(new CompletableFuture<?>[toComplete.size()]))
+          .join();
+      } catch (CompletionException e) {
+        // just ignore, we will record the actual error in the errors field
+      }
+      if (!errors.isEmpty()) {
+        RetriesExhaustedWithDetailsException error = makeError();
+        listener.onException(error, this);
+      }
+    }
+  }
+
+  @Override
+  public long getWriteBufferSize() {
+    return mutator.getWriteBufferSize();
+  }
+
+  @Override
+  public void setRpcTimeout(int timeout) {
+    // no effect
+  }
+
+  @Override
+  public void setOperationTimeout(int timeout) {
+    // no effect
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 3f6c565..49fb77b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -101,13 +101,21 @@ public class BufferedMutatorParams implements Cloneable {
     return this;
   }
 
+  /**
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. We use a common timer in the whole client
+   *             implementation so you can not set it any more.
+   */
+  @Deprecated
   public long getWriteBufferPeriodicFlushTimerTickMs() {
     return writeBufferPeriodicFlushTimerTickMs;
   }
 
   /**
    * Set the TimerTick how often the buffer timeout if checked.
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. We use a common timer in the whole client
+   *             implementation so you can not set it any more.
    */
+  @Deprecated
   public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
     this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
     return this;
@@ -141,9 +149,12 @@ public class BufferedMutatorParams implements Cloneable {
   }
 
   /**
-   * @return Name of the class we will use when we construct a
-   * {@link BufferedMutator} instance or null if default implementation.
+   * @return Name of the class we will use when we construct a {@link BufferedMutator} instance or
+   *         null if default implementation.
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. You can not set it any more as the
+   *             implementation has to use too many internal stuffs in HBase.
    */
+  @Deprecated
   public String getImplementationClassName() {
     return this.implementationClassName;
   }
@@ -151,7 +162,10 @@ public class BufferedMutatorParams implements Cloneable {
   /**
    * Specify a BufferedMutator implementation other than the default.
    * @param implementationClassName Name of the BufferedMutator implementation class
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. You can not set it any more as the
+   *             implementation has to use too many internal stuffs in HBase.
    */
+  @Deprecated
   public BufferedMutatorParams implementationClassName(String implementationClassName) {
     this.implementationClassName = implementationClassName;
     return this;
@@ -169,11 +183,6 @@ public class BufferedMutatorParams implements Cloneable {
     return this;
   }
 
-  /*
-   * (non-Javadoc)
-   *
-   * @see java.lang.Object#clone()
-   */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL",
     justification="The clone below is complete")
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index dfe7d8f..8ec7ab8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -87,7 +87,24 @@ class ConnectionOverAsyncConnection implements Connection {
 
   @Override
   public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
-    return oldConn.getBufferedMutator(params);
+    AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
+    if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
+      builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
+    }
+    if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
+      builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
+    }
+    if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
+      builder.setWriteBufferSize(params.getWriteBufferSize());
+    }
+    if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
+      builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
+        TimeUnit.MILLISECONDS);
+    }
+    if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
+      builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
+    }
+    return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
   }
 
   @Override
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
deleted file mode 100644
index 96bb846..0000000
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category({ SmallTests.class, ClientTests.class })
-public class TestBufferedMutator {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestBufferedMutator.class);
-
-  @Rule
-  public TestName name = new TestName();
-
-  /**
-   * My BufferedMutator. Just to prove that I can insert a BM other than default.
-   */
-  public static class MyBufferedMutator extends BufferedMutatorImpl {
-    MyBufferedMutator(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory,
-        RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
-      super(conn, rpcCallerFactory, rpcFactory, params);
-    }
-  }
-
-  @Test
-  public void testAlternateBufferedMutatorImpl() throws IOException {
-    BufferedMutatorParams params =
-      new BufferedMutatorParams(TableName.valueOf(name.getMethodName()));
-    Configuration conf = HBaseConfiguration.create();
-    conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingAsyncRegistry.class.getName());
-    try (ConnectionImplementation connection = ConnectionFactory.createConnectionImpl(conf, null,
-      UserProvider.instantiate(conf).getCurrent())) {
-      BufferedMutator bm = connection.getBufferedMutator(params);
-      // Assert we get default BM if nothing specified.
-      assertTrue(bm instanceof BufferedMutatorImpl);
-      // Now try and set my own BM implementation.
-      params.implementationClassName(MyBufferedMutator.class.getName());
-      bm = connection.getBufferedMutator(params);
-      assertTrue(bm instanceof MyBufferedMutator);
-    }
-    // Now try creating a Connection after setting an alterate BufferedMutator into
-    // the configuration and confirm we get what was expected.
-    conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName());
-    try (Connection connection = ConnectionFactory.createConnectionImpl(conf, null,
-      UserProvider.instantiate(conf).getCurrent())) {
-      BufferedMutator bm = connection.getBufferedMutator(params);
-      assertTrue(bm instanceof MyBufferedMutator);
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
new file mode 100644
index 0000000..23e69ee
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestBufferedMutator {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBufferedMutator.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("test");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  private static int COUNT = 1024;
+
+  private static byte[] VALUE = new byte[1024];
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, CF);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(TABLE_NAME)) {
+      mutator.mutate(IntStream.range(0, COUNT / 2)
+        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+        .collect(Collectors.toList()));
+      mutator.flush();
+      mutator.mutate(IntStream.range(COUNT / 2, COUNT)
+        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+        .collect(Collectors.toList()));
+      mutator.close();
+      verifyData();
+    }
+  }
+
+  private void verifyData() throws IOException {
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      for (int i = 0; i < COUNT; i++) {
+        Result r = table.get(new Get(Bytes.toBytes(i)));
+        assertArrayEquals(VALUE, ((Result) r).getValue(CF, CQ));
+      }
+    }
+  }
+}