You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/15 10:08:18 UTC

[3/3] hbase git commit: HBASE-18012 Move RpcServer.Connection to a separated file

HBASE-18012 Move RpcServer.Connection to a separated file


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/341223d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/341223d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/341223d8

Branch: refs/heads/master
Commit: 341223d86c567e2124802b0a19e1ba7bdc560cad
Parents: 5cdaca5
Author: zhangduo <zh...@apache.org>
Authored: Wed May 10 11:05:38 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon May 15 18:07:38 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/nio/SingleByteBuff.java |   4 +-
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |   4 +-
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java | 186 +---
 .../hadoop/hbase/ipc/NettyServerCall.java       |  13 +-
 .../hbase/ipc/NettyServerRpcConnection.java     | 206 +++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 820 +-----------------
 .../org/apache/hadoop/hbase/ipc/ServerCall.java |   7 +-
 .../hadoop/hbase/ipc/ServerRpcConnection.java   | 852 +++++++++++++++++++
 .../hadoop/hbase/ipc/SimpleRpcServer.java       | 717 +---------------
 .../hbase/ipc/SimpleRpcServerResponder.java     | 316 +++++++
 .../hadoop/hbase/ipc/SimpleServerCall.java      |  18 +-
 .../hbase/ipc/SimpleServerRpcConnection.java    | 428 ++++++++++
 .../hbase/security/HBaseSaslRpcServer.java      |  24 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |  11 +-
 14 files changed, 1869 insertions(+), 1737 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 9f6b7b5..6ed90ad 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.nio;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
@@ -27,8 +29,6 @@ import org.apache.hadoop.hbase.util.ObjectIntPair;
 import org.apache.hadoop.hbase.util.UnsafeAccess;
 import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import sun.nio.ch.DirectBuffer;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index f16fc50..f476b11 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -75,8 +75,8 @@ public class CallRunner {
    * @deprecated As of release 2.0, this will be removed in HBase 3.0
    */
   @Deprecated
-  public ServerCall getCall() {
-    return (ServerCall) call;
+  public ServerCall<?> getCall() {
+    return (ServerCall<?>) call;
   }
 
   public void setStatus(MonitoredRPCHandler status) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index c18b894..4a4ddba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.ipc;
 
 import io.netty.bootstrap.ServerBootstrap;
@@ -46,10 +45,7 @@ import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -57,31 +53,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVM;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.htrace.TraceInfo;
 
 /**
  * An RPC server with Netty4 implementation.
- *
  */
+@InterfaceAudience.Private
 public class NettyRpcServer extends RpcServer {
 
   public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
@@ -187,166 +173,6 @@ public class NettyRpcServer extends RpcServer {
     return ((InetSocketAddress) serverChannel.localAddress());
   }
 
-  public class NettyConnection extends RpcServer.Connection {
-
-    protected Channel channel;
-
-    NettyConnection(Channel channel) {
-      super();
-      this.channel = channel;
-      InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
-      this.addr = inetSocketAddress.getAddress();
-      if (addr == null) {
-        this.hostAddress = "*Unknown*";
-      } else {
-        this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
-      }
-      this.remotePort = inetSocketAddress.getPort();
-      this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
-          null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
-      this.setConnectionHeaderResponseCall =
-          new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this,
-              0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
-      this.authFailedCall =
-          new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0,
-              null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
-    }
-
-    void readPreamble(ByteBuf buffer) throws IOException {
-      byte[] rpcHead =
-          { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
-      if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
-         doBadPreambleHandling("Expected HEADER="
-            + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER="
-            + Bytes.toStringBinary(rpcHead) + " from " + toString());
-         return;
-      }
-      // Now read the next two bytes, the version and the auth to use.
-      int version = buffer.readByte();
-      byte authbyte = buffer.readByte();
-      this.authMethod = AuthMethod.valueOf(authbyte);
-      if (version != CURRENT_VERSION) {
-        String msg = getFatalConnectionString(version, authbyte);
-        doBadPreambleHandling(msg, new WrongVersionException(msg));
-        return;
-      }
-      if (authMethod == null) {
-        String msg = getFatalConnectionString(version, authbyte);
-        doBadPreambleHandling(msg, new BadAuthException(msg));
-        return;
-      }
-      if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
-        if (allowFallbackToSimpleAuth) {
-          metrics.authenticationFallback();
-          authenticatedWithFallback = true;
-        } else {
-          AccessDeniedException ae = new AccessDeniedException(
-              "Authentication is required");
-          setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
-          ((NettyServerCall) authFailedCall)
-              .sendResponseIfReady(ChannelFutureListener.CLOSE);
-          return;
-        }
-      }
-      if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
-        doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
-          null);
-        authMethod = AuthMethod.SIMPLE;
-        // client has already sent the initial Sasl message and we
-        // should ignore it. Both client and server should fall back
-        // to simple auth from now on.
-        skipInitialSaslHandshake = true;
-      }
-      if (authMethod != AuthMethod.SIMPLE) {
-        useSasl = true;
-      }
-      connectionPreambleRead = true;
-    }
-
-    private void doBadPreambleHandling(final String msg) throws IOException {
-      doBadPreambleHandling(msg, new FatalConnectionException(msg));
-    }
-
-    private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
-      LOG.warn(msg);
-      NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1,
-          null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
-      setupResponse(null, fakeCall, e, msg);
-      // closes out the connection.
-      fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
-    }
-
-    void process(final ByteBuf buf) throws IOException, InterruptedException {
-      if (connectionHeaderRead) {
-        this.callCleanup = new RpcServer.CallCleanup() {
-          @Override
-          public void run() {
-            buf.release();
-          }
-        };
-        process(new SingleByteBuff(buf.nioBuffer()));
-      } else {
-        byte[] data = new byte[buf.readableBytes()];
-        buf.readBytes(data, 0, data.length);
-        ByteBuffer connectionHeader = ByteBuffer.wrap(data);
-        buf.release();
-        process(connectionHeader);
-      }
-    }
-
-    void process(ByteBuffer buf) throws IOException, InterruptedException {
-      process(new SingleByteBuff(buf));
-    }
-
-    void process(ByteBuff buf) throws IOException, InterruptedException {
-      try {
-        if (skipInitialSaslHandshake) {
-          skipInitialSaslHandshake = false;
-          if (callCleanup != null) {
-            callCleanup.run();
-          }
-          return;
-        }
-
-        if (useSasl) {
-          saslReadAndProcess(buf);
-        } else {
-          processOneRpc(buf);
-        }
-      } catch (Exception e) {
-        if (callCleanup != null) {
-          callCleanup.run();
-        }
-        throw e;
-      } finally {
-        this.callCleanup = null;
-      }
-    }
-
-    @Override
-    public synchronized void close() {
-      disposeSasl();
-      channel.close();
-      callCleanup = null;
-    }
-
-    @Override
-    public boolean isConnectionOpen() {
-      return channel.isOpen();
-    }
-
-    @Override
-    public ServerCall createCall(int id, final BlockingService service,
-        final MethodDescriptor md, RequestHeader header, Message param,
-        CellScanner cellScanner, RpcServer.Connection connection, long size,
-        TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
-        CallCleanup reqCleanup) {
-      return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size,
-          tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
-          reqCleanup);
-    }
-  }
-
   private class Initializer extends ChannelInitializer<SocketChannel> {
 
     final int maxRequestSize;
@@ -368,7 +194,7 @@ public class NettyRpcServer extends RpcServer {
   }
 
   private class ConnectionHeaderHandler extends ByteToMessageDecoder {
-    private NettyConnection connection;
+    private NettyServerRpcConnection connection;
 
     ConnectionHeaderHandler() {
     }
@@ -379,7 +205,7 @@ public class NettyRpcServer extends RpcServer {
       if (byteBuf.readableBytes() < 6) {
         return;
       }
-      connection = new NettyConnection(ctx.channel());
+      connection = new NettyServerRpcConnection(NettyRpcServer.this, ctx.channel());
       connection.readPreamble(byteBuf);
       ((MessageDecoder) ctx.pipeline().get("decoder"))
           .setConnection(connection);
@@ -390,9 +216,9 @@ public class NettyRpcServer extends RpcServer {
 
   private class MessageDecoder extends ChannelInboundHandlerAdapter {
 
-    private NettyConnection connection;
+    private NettyServerRpcConnection connection;
 
-    void setConnection(NettyConnection connection) {
+    void setConnection(NettyServerRpcConnection connection) {
       this.connection = connection;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index a3f23dd..3cb9a5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -25,7 +25,6 @@ import java.net.InetAddress;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -38,30 +37,26 @@ import org.apache.htrace.TraceInfo;
  * result.
  */
 @InterfaceAudience.Private
-class NettyServerCall extends ServerCall {
+class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
 
   NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
-      Message param, CellScanner cellScanner, RpcServer.Connection connection, long size,
+      Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
       TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout,
       ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
     super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
         receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
   }
 
-  NettyConnection getConnection() {
-    return (NettyConnection) this.connection;
-  }
-
   /**
    * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
    * respond to client. This is called by the RPC code in the context of the Handler thread.
    */
   @Override
   public synchronized void sendResponseIfReady() throws IOException {
-    getConnection().channel.writeAndFlush(this);
+    connection.channel.writeAndFlush(this);
   }
 
   public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
-    getConnection().channel.writeAndFlush(this).addListener(listener);
+    connection.channel.writeAndFlush(this).addListener(listener);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
new file mode 100644
index 0000000..7985295
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * RpcConnection implementation for netty rpc server.
+ */
+@InterfaceAudience.Private
+class NettyServerRpcConnection extends ServerRpcConnection {
+
+  final Channel channel;
+
+  NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
+    super(rpcServer);
+    this.channel = channel;
+    InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
+    this.addr = inetSocketAddress.getAddress();
+    if (addr == null) {
+      this.hostAddress = "*Unknown*";
+    } else {
+      this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
+    }
+    this.remotePort = inetSocketAddress.getPort();
+    this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
+        null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
+    this.setConnectionHeaderResponseCall = new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
+        null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
+        rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
+    this.authFailedCall = new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null,
+        null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir,
+        rpcServer.cellBlockBuilder, null);
+  }
+
+  void readPreamble(ByteBuf buffer) throws IOException {
+    byte[] rpcHead = { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
+    if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
+      doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) +
+          " but received HEADER=" + Bytes.toStringBinary(rpcHead) + " from " + toString());
+      return;
+    }
+    // Now read the next two bytes, the version and the auth to use.
+    int version = buffer.readByte();
+    byte authbyte = buffer.readByte();
+    this.authMethod = AuthMethod.valueOf(authbyte);
+    if (version != NettyRpcServer.CURRENT_VERSION) {
+      String msg = getFatalConnectionString(version, authbyte);
+      doBadPreambleHandling(msg, new WrongVersionException(msg));
+      return;
+    }
+    if (authMethod == null) {
+      String msg = getFatalConnectionString(version, authbyte);
+      doBadPreambleHandling(msg, new BadAuthException(msg));
+      return;
+    }
+    if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+      if (this.rpcServer.allowFallbackToSimpleAuth) {
+        this.rpcServer.metrics.authenticationFallback();
+        authenticatedWithFallback = true;
+      } else {
+        AccessDeniedException ae = new AccessDeniedException("Authentication is required");
+        this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
+        ((NettyServerCall) authFailedCall).sendResponseIfReady(ChannelFutureListener.CLOSE);
+        return;
+      }
+    }
+    if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+      doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
+        null);
+      authMethod = AuthMethod.SIMPLE;
+      // client has already sent the initial Sasl message and we
+      // should ignore it. Both client and server should fall back
+      // to simple auth from now on.
+      skipInitialSaslHandshake = true;
+    }
+    if (authMethod != AuthMethod.SIMPLE) {
+      useSasl = true;
+    }
+    connectionPreambleRead = true;
+  }
+
+  private void doBadPreambleHandling(final String msg) throws IOException {
+    doBadPreambleHandling(msg, new FatalConnectionException(msg));
+  }
+
+  private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
+    NettyRpcServer.LOG.warn(msg);
+    NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, null,
+        null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
+        this.rpcServer.cellBlockBuilder, null);
+    this.rpcServer.setupResponse(null, fakeCall, e, msg);
+    // closes out the connection.
+    fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
+  }
+
+  void process(final ByteBuf buf) throws IOException, InterruptedException {
+    if (connectionHeaderRead) {
+      this.callCleanup = new RpcServer.CallCleanup() {
+        @Override
+        public void run() {
+          buf.release();
+        }
+      };
+      process(new SingleByteBuff(buf.nioBuffer()));
+    } else {
+      byte[] data = new byte[buf.readableBytes()];
+      buf.readBytes(data, 0, data.length);
+      ByteBuffer connectionHeader = ByteBuffer.wrap(data);
+      buf.release();
+      process(connectionHeader);
+    }
+  }
+
+  void process(ByteBuffer buf) throws IOException, InterruptedException {
+    process(new SingleByteBuff(buf));
+  }
+
+  void process(ByteBuff buf) throws IOException, InterruptedException {
+    try {
+      if (skipInitialSaslHandshake) {
+        skipInitialSaslHandshake = false;
+        if (callCleanup != null) {
+          callCleanup.run();
+        }
+        return;
+      }
+
+      if (useSasl) {
+        saslReadAndProcess(buf);
+      } else {
+        processOneRpc(buf);
+      }
+    } catch (Exception e) {
+      if (callCleanup != null) {
+        callCleanup.run();
+      }
+      throw e;
+    } finally {
+      this.callCleanup = null;
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    disposeSasl();
+    channel.close();
+    callCleanup = null;
+  }
+
+  @Override
+  public boolean isConnectionOpen() {
+    return channel.isOpen();
+  }
+
+  @Override
+  public NettyServerCall createCall(int id, final BlockingService service,
+      final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner,
+      long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+      CallCleanup reqCleanup) {
+    return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, tinfo,
+        remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+        this.rpcServer.cellBlockBuilder, reqCleanup);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index bbc329c..d68a05e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -20,34 +20,22 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
 
-import java.io.ByteArrayInputStream;
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.security.GeneralSecurityException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.atomic.LongAdder;
 
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.commons.crypto.cipher.CryptoCipherFactory;
-import org.apache.commons.crypto.random.CryptoRandom;
-import org.apache.commons.crypto.random.CryptoRandomFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -59,65 +47,36 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.VersionInfoUtil;
-import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.htrace.TraceInfo;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * An RPC server that hosts protobuf described Services.
  *
@@ -262,739 +221,6 @@ public abstract class RpcServer implements RpcServerInterface,
     void run();
   }
 
-  /** Reads calls from a connection and queues them for handling. */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      value="VO_VOLATILE_INCREMENT",
-      justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
-  public abstract class Connection implements Closeable {
-    // If initial preamble with version and magic has been read or not.
-    protected boolean connectionPreambleRead = false;
-    // If the connection header has been read or not.
-    protected boolean connectionHeaderRead = false;
-
-    protected CallCleanup callCleanup;
-
-    // Cache the remote host & port info so that even if the socket is
-    // disconnected, we can say where it used to connect to.
-    protected String hostAddress;
-    protected int remotePort;
-    protected InetAddress addr;
-    protected ConnectionHeader connectionHeader;
-
-    /**
-     * Codec the client asked use.
-     */
-    protected Codec codec;
-    /**
-     * Compression codec the client asked us use.
-     */
-    protected CompressionCodec compressionCodec;
-    protected BlockingService service;
-
-    protected AuthMethod authMethod;
-    protected boolean saslContextEstablished;
-    protected boolean skipInitialSaslHandshake;
-    private ByteBuffer unwrappedData;
-    // When is this set? FindBugs wants to know! Says NP
-    private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
-    protected boolean useSasl;
-    protected SaslServer saslServer;
-    protected CryptoAES cryptoAES;
-    protected boolean useWrap = false;
-    protected boolean useCryptoAesWrap = false;
-    // Fake 'call' for failed authorization response
-    protected static final int AUTHORIZATION_FAILED_CALLID = -1;
-    protected ServerCall authFailedCall;
-    protected ByteArrayOutputStream authFailedResponse =
-        new ByteArrayOutputStream();
-    // Fake 'call' for SASL context setup
-    protected static final int SASL_CALLID = -33;
-    protected ServerCall saslCall;
-    // Fake 'call' for connection header response
-    protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
-    protected ServerCall setConnectionHeaderResponseCall;
-
-    // was authentication allowed with a fallback to simple auth
-    protected boolean authenticatedWithFallback;
-
-    protected boolean retryImmediatelySupported = false;
-
-    public UserGroupInformation attemptingUser = null; // user name before auth
-    protected User user = null;
-    protected UserGroupInformation ugi = null;
-
-    public Connection() {
-      this.callCleanup = null;
-    }
-
-    @Override
-    public String toString() {
-      return getHostAddress() + ":" + remotePort;
-    }
-
-    public String getHostAddress() {
-      return hostAddress;
-    }
-
-    public InetAddress getHostInetAddress() {
-      return addr;
-    }
-
-    public int getRemotePort() {
-      return remotePort;
-    }
-
-    public VersionInfo getVersionInfo() {
-      if (connectionHeader.hasVersionInfo()) {
-        return connectionHeader.getVersionInfo();
-      }
-      return null;
-    }
-
-    protected String getFatalConnectionString(final int version, final byte authByte) {
-      return "serverVersion=" + CURRENT_VERSION +
-      ", clientVersion=" + version + ", authMethod=" + authByte +
-      ", authSupported=" + (authMethod != null) + " from " + toString();
-    }
-
-    protected UserGroupInformation getAuthorizedUgi(String authorizedId)
-        throws IOException {
-      UserGroupInformation authorizedUgi;
-      if (authMethod == AuthMethod.DIGEST) {
-        TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
-            secretManager);
-        authorizedUgi = tokenId.getUser();
-        if (authorizedUgi == null) {
-          throw new AccessDeniedException(
-              "Can't retrieve username from tokenIdentifier.");
-        }
-        authorizedUgi.addTokenIdentifier(tokenId);
-      } else {
-        authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
-      }
-      authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
-      return authorizedUgi;
-    }
-
-    /**
-     * Set up cell block codecs
-     * @throws FatalConnectionException
-     */
-    protected void setupCellBlockCodecs(final ConnectionHeader header)
-        throws FatalConnectionException {
-      // TODO: Plug in other supported decoders.
-      if (!header.hasCellBlockCodecClass()) return;
-      String className = header.getCellBlockCodecClass();
-      if (className == null || className.length() == 0) return;
-      try {
-        this.codec = (Codec)Class.forName(className).newInstance();
-      } catch (Exception e) {
-        throw new UnsupportedCellCodecException(className, e);
-      }
-      if (!header.hasCellBlockCompressorClass()) return;
-      className = header.getCellBlockCompressorClass();
-      try {
-        this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
-      } catch (Exception e) {
-        throw new UnsupportedCompressionCodecException(className, e);
-      }
-    }
-
-    /**
-     * Set up cipher for rpc encryption with Apache Commons Crypto
-     *
-     * @throws FatalConnectionException
-     */
-    protected void setupCryptoCipher(final ConnectionHeader header,
-        RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
-        throws FatalConnectionException {
-      // If simple auth, return
-      if (saslServer == null) return;
-      // check if rpc encryption with Crypto AES
-      String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
-      boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
-          .getSaslQop().equalsIgnoreCase(qop);
-      boolean isCryptoAesEncryption = isEncryption && conf.getBoolean(
-          "hbase.rpc.crypto.encryption.aes.enabled", false);
-      if (!isCryptoAesEncryption) return;
-      if (!header.hasRpcCryptoCipherTransformation()) return;
-      String transformation = header.getRpcCryptoCipherTransformation();
-      if (transformation == null || transformation.length() == 0) return;
-       // Negotiates AES based on complete saslServer.
-       // The Crypto metadata need to be encrypted and send to client.
-      Properties properties = new Properties();
-      // the property for SecureRandomFactory
-      properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
-          conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
-              "org.apache.commons.crypto.random.JavaCryptoRandom"));
-      // the property for cipher class
-      properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
-          conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
-              "org.apache.commons.crypto.cipher.JceCipher"));
-
-      int cipherKeyBits = conf.getInt(
-          "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
-      // generate key and iv
-      if (cipherKeyBits % 8 != 0) {
-        throw new IllegalArgumentException("The AES cipher key size in bits" +
-            " should be a multiple of byte");
-      }
-      int len = cipherKeyBits / 8;
-      byte[] inKey = new byte[len];
-      byte[] outKey = new byte[len];
-      byte[] inIv = new byte[len];
-      byte[] outIv = new byte[len];
-
-      try {
-        // generate the cipher meta data with SecureRandom
-        CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
-        secureRandom.nextBytes(inKey);
-        secureRandom.nextBytes(outKey);
-        secureRandom.nextBytes(inIv);
-        secureRandom.nextBytes(outIv);
-
-        // create CryptoAES for server
-        cryptoAES = new CryptoAES(transformation, properties,
-            inKey, outKey, inIv, outIv);
-        // create SaslCipherMeta and send to client,
-        //  for client, the [inKey, outKey], [inIv, outIv] should be reversed
-        RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
-        ccmBuilder.setTransformation(transformation);
-        ccmBuilder.setInIv(getByteString(outIv));
-        ccmBuilder.setInKey(getByteString(outKey));
-        ccmBuilder.setOutIv(getByteString(inIv));
-        ccmBuilder.setOutKey(getByteString(inKey));
-        chrBuilder.setCryptoCipherMeta(ccmBuilder);
-        useCryptoAesWrap = true;
-      } catch (GeneralSecurityException | IOException ex) {
-        throw new UnsupportedCryptoException(ex.getMessage(), ex);
-      }
-    }
-
-    private ByteString getByteString(byte[] bytes) {
-      // return singleton to reduce object allocation
-      return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
-    }
-
-    protected UserGroupInformation createUser(ConnectionHeader head) {
-      UserGroupInformation ugi = null;
-
-      if (!head.hasUserInfo()) {
-        return null;
-      }
-      UserInformation userInfoProto = head.getUserInfo();
-      String effectiveUser = null;
-      if (userInfoProto.hasEffectiveUser()) {
-        effectiveUser = userInfoProto.getEffectiveUser();
-      }
-      String realUser = null;
-      if (userInfoProto.hasRealUser()) {
-        realUser = userInfoProto.getRealUser();
-      }
-      if (effectiveUser != null) {
-        if (realUser != null) {
-          UserGroupInformation realUserUgi =
-              UserGroupInformation.createRemoteUser(realUser);
-          ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
-        } else {
-          ugi = UserGroupInformation.createRemoteUser(effectiveUser);
-        }
-      }
-      return ugi;
-    }
-
-    protected void disposeSasl() {
-      if (saslServer != null) {
-        try {
-          saslServer.dispose();
-          saslServer = null;
-        } catch (SaslException ignored) {
-          // Ignored. This is being disposed of anyway.
-        }
-      }
-    }
-
-    /**
-     * No protobuf encoding of raw sasl messages
-     */
-    protected void doRawSaslReply(SaslStatus status, Writable rv,
-        String errorClass, String error) throws IOException {
-      ByteBufferOutputStream saslResponse = null;
-      DataOutputStream out = null;
-      try {
-        // In my testing, have noticed that sasl messages are usually
-        // in the ballpark of 100-200. That's why the initial capacity is 256.
-        saslResponse = new ByteBufferOutputStream(256);
-        out = new DataOutputStream(saslResponse);
-        out.writeInt(status.state); // write status
-        if (status == SaslStatus.SUCCESS) {
-          rv.write(out);
-        } else {
-          WritableUtils.writeString(out, errorClass);
-          WritableUtils.writeString(out, error);
-        }
-        saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
-        saslCall.sendResponseIfReady();
-      } finally {
-        if (saslResponse != null) {
-          saslResponse.close();
-        }
-        if (out != null) {
-          out.close();
-        }
-      }
-    }
-
-    public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
-        InterruptedException {
-      if (saslContextEstablished) {
-        if (LOG.isTraceEnabled())
-          LOG.trace("Have read input token of size " + saslToken.limit()
-              + " for processing by saslServer.unwrap()");
-
-        if (!useWrap) {
-          processOneRpc(saslToken);
-        } else {
-          byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
-          byte [] plaintextData;
-          if (useCryptoAesWrap) {
-            // unwrap with CryptoAES
-            plaintextData = cryptoAES.unwrap(b, 0, b.length);
-          } else {
-            plaintextData = saslServer.unwrap(b, 0, b.length);
-          }
-          processUnwrappedData(plaintextData);
-        }
-      } else {
-        byte[] replyToken;
-        try {
-          if (saslServer == null) {
-            switch (authMethod) {
-            case DIGEST:
-              if (secretManager == null) {
-                throw new AccessDeniedException(
-                    "Server is not configured to do DIGEST authentication.");
-              }
-              saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
-                  .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
-                  HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
-                      secretManager, this));
-              break;
-            default:
-              UserGroupInformation current = UserGroupInformation.getCurrentUser();
-              String fullName = current.getUserName();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Kerberos principal name is " + fullName);
-              }
-              final String names[] = SaslUtil.splitKerberosName(fullName);
-              if (names.length != 3) {
-                throw new AccessDeniedException(
-                    "Kerberos principal name does NOT have the expected "
-                        + "hostname part: " + fullName);
-              }
-              current.doAs(new PrivilegedExceptionAction<Object>() {
-                @Override
-                public Object run() throws SaslException {
-                  saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
-                      .getMechanismName(), names[0], names[1],
-                      HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
-                  return null;
-                }
-              });
-            }
-            if (saslServer == null)
-              throw new AccessDeniedException(
-                  "Unable to find SASL server implementation for "
-                      + authMethod.getMechanismName());
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
-            }
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Have read input token of size " + saslToken.limit()
-                + " for processing by saslServer.evaluateResponse()");
-          }
-          replyToken = saslServer
-              .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
-        } catch (IOException e) {
-          IOException sendToClient = e;
-          Throwable cause = e;
-          while (cause != null) {
-            if (cause instanceof InvalidToken) {
-              sendToClient = (InvalidToken) cause;
-              break;
-            }
-            cause = cause.getCause();
-          }
-          doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
-            sendToClient.getLocalizedMessage());
-          metrics.authenticationFailure();
-          String clientIP = this.toString();
-          // attempting user could be null
-          AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
-          throw e;
-        }
-        if (replyToken != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Will send token of size " + replyToken.length
-                + " from saslServer.");
-          }
-          doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
-              null);
-        }
-        if (saslServer.isComplete()) {
-          String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
-          useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
-          ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("SASL server context established. Authenticated client: "
-              + ugi + ". Negotiated QoP is "
-              + saslServer.getNegotiatedProperty(Sasl.QOP));
-          }
-          metrics.authenticationSuccess();
-          AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
-          saslContextEstablished = true;
-        }
-      }
-    }
-
-    private void processUnwrappedData(byte[] inBuf) throws IOException,
-    InterruptedException {
-      ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
-      // Read all RPCs contained in the inBuf, even partial ones
-      while (true) {
-        int count;
-        if (unwrappedDataLengthBuffer.remaining() > 0) {
-          count = channelRead(ch, unwrappedDataLengthBuffer);
-          if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
-            return;
-        }
-
-        if (unwrappedData == null) {
-          unwrappedDataLengthBuffer.flip();
-          int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
-          if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
-            if (LOG.isDebugEnabled())
-              LOG.debug("Received ping message");
-            unwrappedDataLengthBuffer.clear();
-            continue; // ping message
-          }
-          unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
-        }
-
-        count = channelRead(ch, unwrappedData);
-        if (count <= 0 || unwrappedData.remaining() > 0)
-          return;
-
-        if (unwrappedData.remaining() == 0) {
-          unwrappedDataLengthBuffer.clear();
-          unwrappedData.flip();
-          processOneRpc(new SingleByteBuff(unwrappedData));
-          unwrappedData = null;
-        }
-      }
-    }
-
-    public void processOneRpc(ByteBuff buf) throws IOException,
-        InterruptedException {
-      if (connectionHeaderRead) {
-        processRequest(buf);
-      } else {
-        processConnectionHeader(buf);
-        this.connectionHeaderRead = true;
-        if (!authorizeConnection()) {
-          // Throw FatalConnectionException wrapping ACE so client does right thing and closes
-          // down the connection instead of trying to read non-existent retun.
-          throw new AccessDeniedException("Connection from " + this + " for service " +
-            connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
-        }
-        this.user = userProvider.create(this.ugi);
-      }
-    }
-
-    protected boolean authorizeConnection() throws IOException {
-      try {
-        // If auth method is DIGEST, the token was obtained by the
-        // real user for the effective user, therefore not required to
-        // authorize real user. doAs is allowed only for simple or kerberos
-        // authentication
-        if (ugi != null && ugi.getRealUser() != null
-            && (authMethod != AuthMethod.DIGEST)) {
-          ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
-        }
-        authorize(ugi, connectionHeader, getHostInetAddress());
-        metrics.authorizationSuccess();
-      } catch (AuthorizationException ae) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
-        }
-        metrics.authorizationFailure();
-        setupResponse(authFailedResponse, authFailedCall,
-          new AccessDeniedException(ae), ae.getMessage());
-        authFailedCall.sendResponseIfReady();
-        return false;
-      }
-      return true;
-    }
-
-    // Reads the connection header following version
-    protected void processConnectionHeader(ByteBuff buf) throws IOException {
-      if (buf.hasArray()) {
-        this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
-      } else {
-        CodedInputStream cis = UnsafeByteOperations
-            .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
-        cis.enableAliasing(true);
-        this.connectionHeader = ConnectionHeader.parseFrom(cis);
-      }
-      String serviceName = connectionHeader.getServiceName();
-      if (serviceName == null) throw new EmptyServiceNameException();
-      this.service = getService(services, serviceName);
-      if (this.service == null) throw new UnknownServiceException(serviceName);
-      setupCellBlockCodecs(this.connectionHeader);
-      RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
-          RPCProtos.ConnectionHeaderResponse.newBuilder();
-      setupCryptoCipher(this.connectionHeader, chrBuilder);
-      responseConnectionHeader(chrBuilder);
-      UserGroupInformation protocolUser = createUser(connectionHeader);
-      if (!useSasl) {
-        ugi = protocolUser;
-        if (ugi != null) {
-          ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
-        }
-        // audit logging for SASL authenticated users happens in saslReadAndProcess()
-        if (authenticatedWithFallback) {
-          LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
-              + " connecting from " + getHostAddress());
-        }
-        AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
-      } else {
-        // user is authenticated
-        ugi.setAuthenticationMethod(authMethod.authenticationMethod);
-        //Now we check if this is a proxy user case. If the protocol user is
-        //different from the 'user', it is a proxy user scenario. However,
-        //this is not allowed if user authenticated with DIGEST.
-        if ((protocolUser != null)
-            && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
-          if (authMethod == AuthMethod.DIGEST) {
-            // Not allowed to doAs if token authentication is used
-            throw new AccessDeniedException("Authenticated user (" + ugi
-                + ") doesn't match what the client claims to be ("
-                + protocolUser + ")");
-          } else {
-            // Effective user can be different from authenticated user
-            // for simple auth or kerberos auth
-            // The user is the real user. Now we create a proxy user
-            UserGroupInformation realUser = ugi;
-            ugi = UserGroupInformation.createProxyUser(protocolUser
-                .getUserName(), realUser);
-            // Now the user is a proxy user, set Authentication method Proxy.
-            ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
-          }
-        }
-      }
-      if (connectionHeader.hasVersionInfo()) {
-        // see if this connection will support RetryImmediatelyException
-        retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
-
-        AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
-            + " with version info: "
-            + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
-      } else {
-        AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
-            + " with unknown version info");
-      }
-    }
-
-    private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
-        throws FatalConnectionException {
-      // Response the connection header if Crypto AES is enabled
-      if (!chrBuilder.hasCryptoCipherMeta()) return;
-      try {
-        byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
-        // encrypt the Crypto AES cipher meta data with sasl server, and send to client
-        byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
-        Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
-        Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
-
-        doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
-      } catch (IOException ex) {
-        throw new UnsupportedCryptoException(ex.getMessage(), ex);
-      }
-    }
-
-    /**
-     * Send the response for connection header
-     */
-    private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
-        throws IOException {
-      ByteBufferOutputStream response = null;
-      DataOutputStream out = null;
-      try {
-        response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
-        out = new DataOutputStream(response);
-        out.writeInt(wrappedCipherMetaData.length);
-        out.write(wrappedCipherMetaData);
-
-        setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
-            .getByteBuffer());
-        setConnectionHeaderResponseCall.sendResponseIfReady();
-      } finally {
-        if (out != null) {
-          out.close();
-        }
-        if (response != null) {
-          response.close();
-        }
-      }
-    }
-
-    /**
-     * @param buf
-     *          Has the request header and the request param and optionally
-     *          encoded data buffer all in this one array.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected void processRequest(ByteBuff buf) throws IOException,
-        InterruptedException {
-      long totalRequestSize = buf.limit();
-      int offset = 0;
-      // Here we read in the header. We avoid having pb
-      // do its default 4k allocation for CodedInputStream. We force it to use
-      // backing array.
-      CodedInputStream cis;
-      if (buf.hasArray()) {
-        cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit())
-            .newCodedInput();
-      } else {
-        cis = UnsafeByteOperations.unsafeWrap(
-            new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit())
-            .newCodedInput();
-      }
-      cis.enableAliasing(true);
-      int headerSize = cis.readRawVarint32();
-      offset = cis.getTotalBytesRead();
-      Message.Builder builder = RequestHeader.newBuilder();
-      ProtobufUtil.mergeFrom(builder, cis, headerSize);
-      RequestHeader header = (RequestHeader) builder.build();
-      offset += headerSize;
-      int id = header.getCallId();
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
-            + " totalRequestSize: " + totalRequestSize + " bytes");
-      }
-      // Enforcing the call queue size, this triggers a retry in the client
-      // This is a bit late to be doing this check - we have already read in the
-      // total request.
-      if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
-        final ServerCall callTooBig = createCall(id, this.service, null,
-            null, null, null, this, totalRequestSize, null, null, 0,
-            this.callCleanup);
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
-        setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
-            "Call queue is full on " + server.getServerName()
-                + ", is hbase.ipc.server.max.callqueue.size too small?");
-        callTooBig.sendResponseIfReady();
-        return;
-      }
-      MethodDescriptor md = null;
-      Message param = null;
-      CellScanner cellScanner = null;
-      try {
-        if (header.hasRequestParam() && header.getRequestParam()) {
-          md = this.service.getDescriptorForType().findMethodByName(
-              header.getMethodName());
-          if (md == null)
-            throw new UnsupportedOperationException(header.getMethodName());
-          builder = this.service.getRequestPrototype(md).newBuilderForType();
-          cis.resetSizeCounter();
-          int paramSize = cis.readRawVarint32();
-          offset += cis.getTotalBytesRead();
-          if (builder != null) {
-            ProtobufUtil.mergeFrom(builder, cis, paramSize);
-            param = builder.build();
-          }
-          offset += paramSize;
-        } else {
-          // currently header must have request param, so we directly throw
-          // exception here
-          String msg = "Invalid request header: "
-              + TextFormat.shortDebugString(header)
-              + ", should have param set in it";
-          LOG.warn(msg);
-          throw new DoNotRetryIOException(msg);
-        }
-        if (header.hasCellBlockMeta()) {
-          buf.position(offset);
-          ByteBuff dup = buf.duplicate();
-          dup.limit(offset + header.getCellBlockMeta().getLength());
-          cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(
-              this.codec, this.compressionCodec, dup);
-        }
-      } catch (Throwable t) {
-        InetSocketAddress address = getListenerAddress();
-        String msg = (address != null ? address : "(channel closed)")
-            + " is unable to read call parameter from client "
-            + getHostAddress();
-        LOG.warn(msg, t);
-
-        metrics.exception(t);
-
-        // probably the hbase hadoop version does not match the running hadoop
-        // version
-        if (t instanceof LinkageError) {
-          t = new DoNotRetryIOException(t);
-        }
-        // If the method is not present on the server, do not retry.
-        if (t instanceof UnsupportedOperationException) {
-          t = new DoNotRetryIOException(t);
-        }
-
-        final ServerCall readParamsFailedCall = createCall(id,
-            this.service, null, null, null, null, this, totalRequestSize, null,
-            null, 0, this.callCleanup);
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        setupResponse(responseBuffer, readParamsFailedCall, t,
-            msg + "; " + t.getMessage());
-        readParamsFailedCall.sendResponseIfReady();
-        return;
-      }
-
-      TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header
-          .getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
-          : null;
-      int timeout = 0;
-      if (header.hasTimeout() && header.getTimeout() > 0) {
-        timeout = Math.max(minClientRequestTimeout, header.getTimeout());
-      }
-      ServerCall call = createCall(id, this.service, md, header, param,
-          cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout,
-          this.callCleanup);
-
-      if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
-        callQueueSizeInBytes.add(-1 * call.getSize());
-
-        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
-        metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
-        setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
-            "Call queue is full on " + server.getServerName()
-                + ", too many items queued ?");
-        call.sendResponseIfReady();
-      }
-    }
-
-    public abstract boolean isConnectionOpen();
-
-    public abstract ServerCall createCall(int id, final BlockingService service,
-        final MethodDescriptor md, RequestHeader header, Message param,
-        CellScanner cellScanner, Connection connection, long size,
-        TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
-        CallCleanup reqCleanup);
-  }
-
   /**
    * Datastructure for passing a {@link BlockingService} and its associated class of
    * protobuf service interface.  For example, a server that fielded what is defined
@@ -1122,7 +348,7 @@ public abstract class RpcServer implements RpcServerInterface,
    * @param error error message, if the call failed
    * @throws IOException
    */
-  protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t,
+  protected void setupResponse(ByteArrayOutputStream response, ServerCall<?> call, Throwable t,
       String error) throws IOException {
     if (response != null) response.reset();
     call.setResponse(null, null, t, error);
@@ -1574,44 +800,4 @@ public abstract class RpcServer implements RpcServerInterface,
   public void setRsRpcServices(RSRpcServices rsRpcServices) {
     this.rsRpcServices = rsRpcServices;
   }
-
-  protected static class ByteBuffByteInput extends ByteInput {
-
-    private ByteBuff buf;
-    private int offset;
-    private int length;
-
-    ByteBuffByteInput(ByteBuff buf, int offset, int length) {
-      this.buf = buf;
-      this.offset = offset;
-      this.length = length;
-    }
-
-    @Override
-    public byte read(int offset) {
-      return this.buf.get(getAbsoluteOffset(offset));
-    }
-
-    private int getAbsoluteOffset(int offset) {
-      return this.offset + offset;
-    }
-
-    @Override
-    public int read(int offset, byte[] out, int outOffset, int len) {
-      this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
-      return len;
-    }
-
-    @Override
-    public int read(int offset, ByteBuffer out) {
-      int len = out.remaining();
-      this.buf.get(out, getAbsoluteOffset(offset), len);
-      return len;
-    }
-
-    @Override
-    public int size() {
-      return this.length;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index 9294839..15fe3e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
-import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
@@ -52,7 +51,7 @@ import org.apache.htrace.TraceInfo;
  * the result.
  */
 @InterfaceAudience.Private
-abstract class ServerCall implements RpcCall {
+abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
 
   protected final int id;                             // the client's call id
   protected final BlockingService service;
@@ -61,7 +60,7 @@ abstract class ServerCall implements RpcCall {
   protected Message param;                      // the parameter passed
   // Optional cell data passed outside of protobufs.
   protected final CellScanner cellScanner;
-  protected final Connection connection;              // connection to client
+  protected final T connection;              // connection to client
   protected final long receiveTime;      // the time received when response is null
                                  // the time served when response is not null
   protected final int timeout;
@@ -96,7 +95,7 @@ abstract class ServerCall implements RpcCall {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
       justification="Can't figure why this complaint is happening... see below")
   ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
-      Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo,
+      Message param, CellScanner cellScanner, T connection, long size, TraceInfo tinfo,
       InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
       CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
     this.id = id;