You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2022/07/12 18:17:12 UTC

[hbase] branch branch-2 updated: HBASE-27097 SimpleRpcServer is broken (#4613)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 0018cbec583 HBASE-27097 SimpleRpcServer is broken (#4613)
0018cbec583 is described below

commit 0018cbec583548ff447fb84465e7d52f2d1c4613
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Tue Jul 12 11:07:23 2022 -0700

    HBASE-27097 SimpleRpcServer is broken (#4613)
    
    Replace BufferChain#write(channel,int) with a simpler #write(channel)
    implementation that does not attempt to "chunk" data to be written. This
    method was used exclusively by SimpleRpcServer. The code was unnecessarily
    complex and caused short writes when values were large, so was corrected
    and simplified. Any difference in performance from this change will be
    limited to SimpleRpcServer. Testing under load confirms the fix and does
    not show significant regression.
    
    SimpleRpcServer and its related code is now also marked as @Deprecated.
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    
    Conflicts:
            hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
---
 .../org/apache/hadoop/hbase/ipc/BufferChain.java   | 61 +++++++---------------
 .../apache/hadoop/hbase/ipc/SimpleRpcServer.java   | 14 +----
 .../hadoop/hbase/ipc/SimpleRpcServerResponder.java |  1 +
 .../apache/hadoop/hbase/ipc/SimpleServerCall.java  |  1 +
 .../hbase/ipc/SimpleServerRpcConnection.java       |  1 +
 .../apache/hadoop/hbase/ipc/TestBufferChain.java   | 50 +++---------------
 6 files changed, 29 insertions(+), 99 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index 534a467eda1..5877a06d930 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -23,13 +23,12 @@ import java.nio.channels.GatheringByteChannel;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Chain of ByteBuffers. Used writing out an array of byte buffers. Writes in chunks.
+ * Chain of ByteBuffers. Used writing out an array of byte buffers.
  */
 @InterfaceAudience.Private
 class BufferChain {
   private final ByteBuffer[] buffers;
   private int remaining = 0;
-  private int bufferOffset = 0;
   private int size;
 
   BufferChain(ByteBuffer... buffers) {
@@ -61,51 +60,27 @@ class BufferChain {
     return remaining > 0;
   }
 
-  /**
-   * Write out our chain of buffers in chunks
-   * @param channel   Where to write
-   * @param chunkSize Size of chunks to write.
-   * @return Amount written. n
-   */
-  long write(GatheringByteChannel channel, int chunkSize) throws IOException {
-    int chunkRemaining = chunkSize;
-    ByteBuffer lastBuffer = null;
-    int bufCount = 0;
-    int restoreLimit = -1;
-
-    while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
-      lastBuffer = buffers[bufferOffset + bufCount];
-      if (!lastBuffer.hasRemaining()) {
-        bufferOffset++;
-        continue;
-      }
-      bufCount++;
-      if (lastBuffer.remaining() > chunkRemaining) {
-        restoreLimit = lastBuffer.limit();
-        lastBuffer.limit(lastBuffer.position() + chunkRemaining);
-        chunkRemaining = 0;
-        break;
-      } else {
-        chunkRemaining -= lastBuffer.remaining();
-      }
-    }
-    assert lastBuffer != null;
-    if (chunkRemaining == chunkSize) {
-      assert !hasRemaining();
-      // no data left to write
+  long write(GatheringByteChannel channel) throws IOException {
+    if (!hasRemaining()) {
       return 0;
     }
-    try {
-      long ret = channel.write(buffers, bufferOffset, bufCount);
-      if (ret > 0) {
-        remaining = (int) (remaining - ret);
-      }
-      return ret;
-    } finally {
-      if (restoreLimit >= 0) {
-        lastBuffer.limit(restoreLimit);
+    long written = 0;
+    for (ByteBuffer bb : this.buffers) {
+      if (bb.hasRemaining()) {
+        final int pos = bb.position();
+        final int result = channel.write(bb);
+        if (result <= 0) {
+          // Write error. Return how much we were able to write until now.
+          return written;
+        }
+        // Adjust the position of buffers already written so we don't write out
+        // duplicate data upon retry of incomplete write with the same buffer chain.
+        bb.position(pos + result);
+        remaining -= result;
+        written += result;
       }
     }
+    return written;
   }
 
   int size() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index 156981ebd2d..34b17197afb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -75,6 +75,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Message;
  * put itself on new queue for Responder to pull from and return result to client.
  * @see BlockingRpcClient
  */
+@Deprecated
 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
 public class SimpleRpcServer extends RpcServer {
 
@@ -487,20 +488,9 @@ public class SimpleRpcServer extends RpcServer {
     return call(fakeCall, status);
   }
 
-  /**
-   * This is a wrapper around
-   * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. If the amount of data
-   * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
-   * direct buffers as the size of buffer increases. This also minimizes extra copies in NIO layer
-   * as a result of multiple write operations required to write a large buffer.
-   * @param channel     writable byte channel to write to
-   * @param bufferChain Chain of buffers to write
-   * @return number of bytes written
-   * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
-   */
   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
     throws IOException {
-    long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
+    long count = bufferChain.write(channel);
     if (count > 0) {
       this.metrics.sentBytes(count);
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
index 200c4ebd1af..b9d8d3dffc4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java
@@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 /**
  * Sends responses of RPC back to clients.
  */
+@Deprecated
 @InterfaceAudience.Private
 class SimpleRpcServerResponder extends Thread {
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
index 861da8055d1..5c5e9102115 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
  * Datastructure that holds all necessary to a method invocation and then afterward, carries the
  * result.
  */
+@Deprecated
 @InterfaceAudience.Private
 class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index ba7a9752a79..51e1bedba57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
 /** Reads calls from a connection and queues them for handling. */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
     justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
+@Deprecated
 @InterfaceAudience.Private
 class SimpleServerRpcConnection extends ServerRpcConnection {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java
index df85589ab8b..c7142f34f63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -33,7 +35,6 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
 import org.apache.hbase.thirdparty.com.google.common.io.Files;
@@ -68,53 +69,15 @@ public class TestBufferChain {
     assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes()));
   }
 
-  @Test
-  public void testChainChunkBiggerThanWholeArray() throws IOException {
-    ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
-    BufferChain chain = new BufferChain(bufs);
-    writeAndVerify(chain, "hello world", 8192);
-    assertNoRemaining(bufs);
-  }
-
-  @Test
-  public void testChainChunkBiggerThanSomeArrays() throws IOException {
-    ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
-    BufferChain chain = new BufferChain(bufs);
-    writeAndVerify(chain, "hello world", 3);
-    assertNoRemaining(bufs);
-  }
-
   @Test
   public void testLimitOffset() throws IOException {
     ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1),
       stringBuf("XXXXworldY", 4, 5) };
     BufferChain chain = new BufferChain(bufs);
-    writeAndVerify(chain, "hello world", 3);
+    writeAndVerify(chain, "hello world");
     assertNoRemaining(bufs);
   }
 
-  @Test
-  public void testWithSpy() throws IOException {
-    ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1),
-      stringBuf("XXXXworldY", 4, 5) };
-    BufferChain chain = new BufferChain(bufs);
-    FileOutputStream fos = new FileOutputStream(tmpFile);
-    FileChannel ch = Mockito.spy(fos.getChannel());
-    try {
-      chain.write(ch, 2);
-      assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8));
-      chain.write(ch, 2);
-      assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8));
-      chain.write(ch, 3);
-      assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8));
-      chain.write(ch, 8);
-      assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8));
-    } finally {
-      ch.close();
-      fos.close();
-    }
-  }
-
   private ByteBuffer stringBuf(String string, int position, int length) {
     ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8));
     buf.position(position);
@@ -137,14 +100,13 @@ public class TestBufferChain {
     return ret;
   }
 
-  private void writeAndVerify(BufferChain chain, String string, int chunkSize) throws IOException {
+  private void writeAndVerify(BufferChain chain, String string) throws IOException {
     FileOutputStream fos = new FileOutputStream(tmpFile);
     FileChannel ch = fos.getChannel();
     try {
       long remaining = string.length();
       while (chain.hasRemaining()) {
-        long n = chain.write(ch, chunkSize);
-        assertTrue(n == chunkSize || n == remaining);
+        long n = chain.write(ch);
         remaining -= n;
       }
       assertEquals(0, remaining);