You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/21 13:59:43 UTC

hbase git commit: HBASE-18081 The way we process connection preamble in SimpleRpcServer is broken

Repository: hbase
Updated Branches:
  refs/heads/master 1520c8fd4 -> 1ceb25cf0


HBASE-18081 The way we process connection preamble in SimpleRpcServer is broken


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1ceb25cf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1ceb25cf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1ceb25cf

Branch: refs/heads/master
Commit: 1ceb25cf09f5057bb9ec23eef90373c8febbc6e2
Parents: 1520c8f
Author: zhangduo <zh...@apache.org>
Authored: Fri May 19 22:12:00 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun May 21 20:36:33 2017 +0800

----------------------------------------------------------------------
 .../hbase/ipc/SimpleServerRpcConnection.java    |  54 ++++----
 .../ipc/TestRpcServerSlowConnectionSetup.java   | 136 +++++++++++++++++++
 2 files changed, 161 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1ceb25cf/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index 50a1a6b..b2507d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -63,6 +63,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
   final SocketChannel channel;
   private ByteBuff data;
   private ByteBuffer dataLengthBuffer;
+  private ByteBuffer preambleBuffer;
   protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue =
       new ConcurrentLinkedDeque<>();
   final Lock responseWriteLock = new ReentrantLock();
@@ -130,22 +131,25 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
   }
 
   private int readPreamble() throws IOException {
-    int count;
-    // Check for 'HBas' magic.
-    this.dataLengthBuffer.flip();
-    if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
-      return doBadPreambleHandling(
-        "Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
-            Bytes.toStringBinary(dataLengthBuffer.array()) + " from " + toString());
+    if (preambleBuffer == null) {
+      preambleBuffer = ByteBuffer.allocate(6);
     }
-    // Now read the next two bytes, the version and the auth to use.
-    ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
-    count = this.rpcServer.channelRead(channel, versionAndAuthBytes);
-    if (count < 0 || versionAndAuthBytes.remaining() > 0) {
+    int count = this.rpcServer.channelRead(channel, preambleBuffer);
+    if (count < 0 || preambleBuffer.remaining() > 0) {
       return count;
     }
-    int version = versionAndAuthBytes.get(0);
-    byte authbyte = versionAndAuthBytes.get(1);
+    // Check for 'HBas' magic.
+    preambleBuffer.flip();
+    for (int i = 0; i < HConstants.RPC_HEADER.length; i++) {
+      if (HConstants.RPC_HEADER[i] != preambleBuffer.get(i)) {
+        return doBadPreambleHandling("Expected HEADER=" +
+            Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER=" +
+            Bytes.toStringBinary(preambleBuffer.array(), 0, HConstants.RPC_HEADER.length) +
+            " from " + toString());
+      }
+    }
+    int version = preambleBuffer.get(HConstants.RPC_HEADER.length);
+    byte authbyte = preambleBuffer.get(HConstants.RPC_HEADER.length + 1);
     this.authMethod = AuthMethod.valueOf(authbyte);
     if (version != SimpleRpcServer.CURRENT_VERSION) {
       String msg = getFatalConnectionString(version, authbyte);
@@ -178,8 +182,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
     if (authMethod != AuthMethod.SIMPLE) {
       useSasl = true;
     }
-
-    dataLengthBuffer.clear();
+    preambleBuffer = null; // do not need it anymore
     connectionPreambleRead = true;
     return count;
   }
@@ -200,26 +203,19 @@ class SimpleServerRpcConnection extends ServerRpcConnection {
    * @throws InterruptedException
    */
   public int readAndProcess() throws IOException, InterruptedException {
-    // Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it
-    // does, read in the rest of the connection preamble, the version and the auth method.
-    // Else it will be length of the data to read (or -1 if a ping). We catch the integer
-    // length into the 4-byte this.dataLengthBuffer.
-    int count = read4Bytes();
-    if (count < 0 || dataLengthBuffer.remaining() > 0) {
-      return count;
-    }
-
     // If we have not read the connection setup preamble, look to see if that is on the wire.
     if (!connectionPreambleRead) {
-      count = readPreamble();
+      int count = readPreamble();
       if (!connectionPreambleRead) {
         return count;
       }
+    }
 
-      count = read4Bytes();
-      if (count < 0 || dataLengthBuffer.remaining() > 0) {
-        return count;
-      }
+    // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the
+    // integer length into the 4-byte this.dataLengthBuffer.
+    int count = read4Bytes();
+    if (count < 0 || dataLengthBuffer.remaining() > 0) {
+      return count;
     }
 
     // We have read a length and we have read the preamble. It is either the connection header

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ceb25cf/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
new file mode 100644
index 0000000..fba5ca7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+@Category({ RPCTests.class, MediumTests.class })
+public class TestRpcServerSlowConnectionSetup {
+
+  private RpcServer server;
+
+  private Socket socket;
+
+  @Parameter
+  public Class<? extends RpcServer> rpcServerImpl;
+
+  @Parameters(name = "{index}: rpcServerImpl={0}")
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { SimpleRpcServer.class },
+      new Object[] { NettyRpcServer.class });
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl.getName());
+    server = RpcServerFactory.createRpcServer(null, "testRpcServer",
+      Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
+      new InetSocketAddress("localhost", 0), conf, new FifoRpcScheduler(conf, 1));
+    server.start();
+    socket = new Socket("localhost", server.getListenerAddress().getPort());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (socket != null) {
+      socket.close();
+    }
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    int rpcHeaderLen = HConstants.RPC_HEADER.length;
+    byte[] preamble = new byte[rpcHeaderLen + 2];
+    System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
+    preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
+    preamble[rpcHeaderLen + 1] = AuthMethod.SIMPLE.code;
+    socket.getOutputStream().write(preamble, 0, rpcHeaderLen + 1);
+    socket.getOutputStream().flush();
+    Thread.sleep(5000);
+    socket.getOutputStream().write(preamble, rpcHeaderLen + 1, 1);
+    socket.getOutputStream().flush();
+
+    ConnectionHeader header = ConnectionHeader.newBuilder()
+        .setServiceName(TestRpcServiceProtos.TestProtobufRpcProto.getDescriptor().getFullName())
+        .setVersionInfo(ProtobufUtil.getVersionInfo()).build();
+    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
+    dos.writeInt(header.getSerializedSize());
+    header.writeTo(dos);
+    dos.flush();
+
+    int callId = 10;
+    Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
+        EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000,
+        HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats());
+    RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
+    dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param));
+    requestHeader.writeDelimitedTo(dos);
+    call.param.writeDelimitedTo(dos);
+    dos.flush();
+
+    DataInputStream dis = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
+    int size = dis.readInt();
+    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(dis);
+    assertEquals(callId, responseHeader.getCallId());
+    EmptyResponseProto.Builder builder = EmptyResponseProto.newBuilder();
+    builder.mergeDelimitedFrom(dis);
+    assertEquals(size, IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader, builder.build()));
+  }
+}