You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/16 15:01:01 UTC
[08/10] hbase git commit: HBASE-16584 Backport the new ipc
implementation in HBASE-16432 to branch-1
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
deleted file mode 100644
index e0c7586..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.ipc.RemoteException;
-
-import com.google.protobuf.Message;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-
-/**
- * Handles Hbase responses
- */
-@InterfaceAudience.Private
-public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
- private final AsyncRpcChannel channel;
-
- /**
- * Constructor
- * @param channel on which this response handler operates
- */
- public AsyncServerResponseHandler(AsyncRpcChannel channel) {
- this.channel = channel;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) throws Exception {
- ByteBufInputStream in = new ByteBufInputStream(inBuffer);
- int totalSize = inBuffer.readableBytes();
- // Read the header
- RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
- int id = responseHeader.getCallId();
- AsyncCall call = channel.removePendingCall(id);
- if (call == null) {
- // So we got a response for which we have no corresponding 'call' here on the client-side.
- // We probably timed out waiting, cleaned up all references, and now the server decides
- // to return a response. There is nothing we can do w/ the response at this stage. Clean
- // out the wire of the response so its out of the way and we can get other responses on
- // this connection.
- int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
- int whatIsLeftToRead = totalSize - readSoFar;
-
- // This is done through a Netty ByteBuf which has different behavior than InputStream.
- // It does not return number of bytes read but will update pointer internally and throws an
- // exception when too many bytes are to be skipped.
- inBuffer.skipBytes(whatIsLeftToRead);
- return;
- }
-
- if (responseHeader.hasException()) {
- RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
- RemoteException re = createRemoteException(exceptionResponse);
- if (exceptionResponse.getExceptionClassName()
- .equals(FatalConnectionException.class.getName())) {
- channel.close(re);
- } else {
- call.setFailed(re);
- }
- } else {
- Message value = null;
- // Call may be null because it may have timedout and been cleaned up on this side already
- if (call.responseDefaultType != null) {
- Message.Builder builder = call.responseDefaultType.newBuilderForType();
- ProtobufUtil.mergeDelimitedFrom(builder, in);
- value = builder.build();
- }
- CellScanner cellBlockScanner = null;
- if (responseHeader.hasCellBlockMeta()) {
- int size = responseHeader.getCellBlockMeta().getLength();
- byte[] cellBlock = new byte[size];
- inBuffer.readBytes(cellBlock, 0, cellBlock.length);
- cellBlockScanner = channel.client.createCellScanner(cellBlock);
- }
- call.setSuccess(value, cellBlockScanner);
- call.callStats.setResponseSizeBytes(totalSize);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- channel.close(cause);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- channel.close(new IOException("connection closed"));
- }
-
- /**
- * @param e Proto exception
- * @return RemoteException made from passed <code>e</code>
- */
- private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
- String innerExceptionClassName = e.getExceptionClassName();
- boolean doNotRetry = e.getDoNotRetry();
- return e.hasHostname() ?
- // If a hostname then add it to the RemoteWithExtrasException
- new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
- e.getPort(), doNotRetry)
- : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
new file mode 100644
index 0000000..d27602e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * Does RPC against a cluster. Manages connections per regionserver in the cluster.
+ * <p>
+ * See HBaseServer
+ */
+@InterfaceAudience.Private
+public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection> {
+
+ protected final SocketFactory socketFactory; // how to create sockets
+
+ /**
+ * Used in test only. Construct an IPC client for the cluster {@code clusterId} with the default
+ * SocketFactory
+ */
+ @VisibleForTesting
+ BlockingRpcClient(Configuration conf) {
+ this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null);
+ }
+
+ /**
+ * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory This
+ * method is called with reflection by the RpcClientFactory to create an instance
+ * @param conf configuration
+ * @param clusterId the cluster id
+ * @param localAddr client socket bind address.
+ * @param metrics the connection metrics
+ */
+ public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+ MetricsConnection metrics) {
+ super(conf, clusterId, localAddr, metrics);
+ this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ }
+
+ /**
+ * Creates a connection. Can be overridden by a subclass for testing.
+ * @param remoteId - the ConnectionId to use for the connection creation.
+ */
+ protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException {
+ return new BlockingRpcConnection(this, remoteId);
+ }
+
+ @Override
+ protected void closeInternal() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
new file mode 100644
index 0000000..4dc121c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.RpcCallback;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
+ * remote address. Calls are multiplexed through this socket: responses may be delivered out of
+ * order.
+ */
+@InterfaceAudience.Private
+class BlockingRpcConnection extends RpcConnection implements Runnable {
+
+ private static final Log LOG = LogFactory.getLog(BlockingRpcConnection.class);
+
+ private final BlockingRpcClient rpcClient;
+
+ private final String threadName;
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "We are always under lock actually")
+ private Thread thread;
+
+ // connected socket. protected for writing UT.
+ protected Socket socket = null;
+ private DataInputStream in;
+ private DataOutputStream out;
+
+ private HBaseSaslRpcClient saslRpcClient;
+
+ // currently active calls
+ private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
+
+ private final CallSender callSender;
+
+ private boolean closed = false;
+
+ private byte[] connectionHeaderPreamble;
+
+ private byte[] connectionHeaderWithLength;
+
+ /**
+ * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
+ * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
+ * use a different thread for writing. This way, on interruptions, we either cancel the writes or
+ * ignore the answer if the write is already done, but we don't stop the write in the middle. This
+ * adds a thread per region server in the client, so it's kept as an option.
+ * <p>
+ * The implementation is simple: the client threads adds their call to the queue, and then wait
+ * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
+ * interruption, the client cancels its call. The CallSender checks that the call has not been
+ * canceled before writing it.
+ * </p>
+ * When the connection closes, all the calls not yet sent are dismissed. The client thread is
+ * notified with an appropriate exception, as if the call was already sent but the answer not yet
+ * received.
+ * </p>
+ */
+ private class CallSender extends Thread {
+
+ private final Queue<Call> callsToWrite;
+
+ private final int maxQueueSize;
+
+ public CallSender(String name, Configuration conf) {
+ int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
+ callsToWrite = new ArrayDeque<>(queueSize);
+ this.maxQueueSize = queueSize;
+ setDaemon(true);
+ setName(name + " - writer");
+ }
+
+ public void sendCall(final Call call) throws IOException {
+ if (callsToWrite.size() >= maxQueueSize) {
+ throw new IOException("Can't add the call " + call.id +
+ " to the write queue. callsToWrite.size()=" + callsToWrite.size());
+ }
+ callsToWrite.offer(call);
+ BlockingRpcConnection.this.notifyAll();
+ }
+
+ public void remove(Call call) {
+ callsToWrite.remove();
+ // By removing the call from the expected call list, we make the list smaller, but
+ // it means as well that we don't know how many calls we cancelled.
+ calls.remove(call.id);
+ call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime=" +
+ (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" +
+ call.timeout));
+ }
+
+ /**
+ * Reads the call from the queue, write them on the socket.
+ */
+ @Override
+ public void run() {
+ synchronized (BlockingRpcConnection.this) {
+ while (!closed) {
+ if (callsToWrite.isEmpty()) {
+ // We should use another monitor object here for better performance since the read
+ // thread also uses ConnectionImpl.this. But this makes the locking schema more
+ // complicated, can do it later as an optimization.
+ try {
+ BlockingRpcConnection.this.wait();
+ } catch (InterruptedException e) {
+ }
+ // check if we need to quit, so continue the main loop instead of fallback.
+ continue;
+ }
+ Call call = callsToWrite.poll();
+ if (call.isDone()) {
+ continue;
+ }
+ try {
+ tracedWriteRequest(call);
+ } catch (IOException e) {
+ // exception here means the call has not been added to the pendingCalls yet, so we need
+ // to fail it by our own.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("call write error for call #" + call.id, e);
+ }
+ call.setException(e);
+ closeConn(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Cleans the call not yet sent when we finish.
+ */
+ public void cleanup(IOException e) {
+ IOException ie =
+ new ConnectionClosingException("Connection to " + remoteId.address + " is closing.");
+ for (Call call : callsToWrite) {
+ call.setException(ie);
+ }
+ callsToWrite.clear();
+ }
+ }
+
+ BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
+ super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
+ rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
+ this.rpcClient = rpcClient;
+ if (remoteId.getAddress().isUnresolved()) {
+ throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
+ }
+
+ this.connectionHeaderPreamble = getConnectionHeaderPreamble();
+ ConnectionHeader header = getConnectionHeader();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeInt(header.getSerializedSize());
+ header.writeTo(dos);
+ assert baos.size() == 4 + header.getSerializedSize();
+ this.connectionHeaderWithLength = baos.getBuffer();
+
+ UserGroupInformation ticket = remoteId.ticket.getUGI();
+ this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() +
+ ") connection to " + remoteId.getAddress().toString() +
+ ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
+
+ if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
+ callSender = new CallSender(threadName, this.rpcClient.conf);
+ callSender.start();
+ } else {
+ callSender = null;
+ }
+ }
+
+ // protected for write UT.
+ protected void setupConnection() throws IOException {
+ short ioFailures = 0;
+ short timeoutFailures = 0;
+ while (true) {
+ try {
+ this.socket = this.rpcClient.socketFactory.createSocket();
+ this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
+ this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
+ if (this.rpcClient.localAddr != null) {
+ this.socket.bind(this.rpcClient.localAddr);
+ }
+ NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO);
+ this.socket.setSoTimeout(this.rpcClient.readTO);
+ return;
+ } catch (SocketTimeoutException toe) {
+ /*
+ * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
+ */
+ handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
+ } catch (IOException ie) {
+ handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
+ }
+ }
+ }
+
+ /**
+ * Handle connection failures If the current number of retries is equal to the max number of
+ * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
+ * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
+ * the sleep is synchronized; the locks will be retained.
+ * @param curRetries current number of retries
+ * @param maxRetries max number of retries allowed
+ * @param ioe failure reason
+ * @throws IOException if max number of retries is reached
+ */
+ private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
+ throws IOException {
+ closeSocket();
+
+ // throw the exception if the maximum number of retries is reached
+ if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
+ throw ioe;
+ }
+
+ // otherwise back off and retry
+ try {
+ Thread.sleep(this.rpcClient.failureSleep);
+ } catch (InterruptedException ie) {
+ ExceptionUtil.rethrowIfInterrupt(ie);
+ }
+
+ LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " +
+ this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s).");
+ }
+
+ /*
+ * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
+ * as to be closed, or the client is marked as not running.
+ * @return true if it is time to read a response; false otherwise.
+ */
+ private synchronized boolean waitForWork() {
+ // beware of the concurrent access to the calls list: we can add calls, but as well
+ // remove them.
+ long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
+ for (;;) {
+ if (thread == null) {
+ return false;
+ }
+ if (!calls.isEmpty()) {
+ return true;
+ }
+ if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
+ closeConn(
+ new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
+ return false;
+ }
+ try {
+ wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size());
+ }
+ while (waitForWork()) {
+ readResponse();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size());
+ }
+ }
+
+ private void disposeSasl() {
+ if (saslRpcClient != null) {
+ saslRpcClient.dispose();
+ saslRpcClient = null;
+ }
+ }
+
+ private boolean setupSaslConnection(final InputStream in2, final OutputStream out2)
+ throws IOException {
+ saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
+ this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection",
+ QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
+ return saslRpcClient.saslConnect(in2, out2);
+ }
+
+ /**
+ * If multiple clients with the same principal try to connect to the same server at the same time,
+ * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
+ * work around this, what is done is that the client backs off randomly and tries to initiate the
+ * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
+ * attempted.
+ * <p>
+ * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
+ * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
+ * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
+ * underlying authentication implementation, so there is no retry from other high level (for eg,
+ * HCM or HBaseAdmin).
+ * </p>
+ */
+ private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
+ final Exception ex, final UserGroupInformation user)
+ throws IOException, InterruptedException {
+ closeSocket();
+ user.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws IOException, InterruptedException {
+ if (shouldAuthenticateOverKrb()) {
+ if (currRetries < maxRetries) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception encountered while connecting to " + "the server : " + ex);
+ }
+ // try re-login
+ relogin();
+ disposeSasl();
+ // have granularity of milliseconds
+ // we are sleeping with the Connection lock held but since this
+ // connection instance is being used for connecting to the server
+ // in question, it is okay
+ Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
+ return null;
+ } else {
+ String msg = "Couldn't setup connection for " +
+ UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
+ LOG.warn(msg, ex);
+ throw (IOException) new IOException(msg).initCause(ex);
+ }
+ } else {
+ LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
+ }
+ if (ex instanceof RemoteException) {
+ throw (RemoteException) ex;
+ }
+ if (ex instanceof SaslException) {
+ String msg = "SASL authentication failed." +
+ " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
+ LOG.fatal(msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ throw new IOException(ex);
+ }
+ });
+ }
+
+ private void setupIOstreams() throws IOException {
+ if (socket != null) {
+ // The connection is already available. Perfect.
+ return;
+ }
+
+ if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not trying to connect to " + remoteId.address +
+ " this server is in the failed servers list");
+ }
+ throw new FailedServerException(
+ "This server is in the failed servers list: " + remoteId.address);
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to " + remoteId.address);
+ }
+
+ short numRetries = 0;
+ final short MAX_RETRIES = 5;
+ while (true) {
+ setupConnection();
+ InputStream inStream = NetUtils.getInputStream(socket);
+ // This creates a socket with a write timeout. This timeout cannot be changed.
+ OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
+ // Write out the preamble -- MAGIC, version, and auth to use.
+ writeConnectionHeaderPreamble(outStream);
+ if (useSasl) {
+ final InputStream in2 = inStream;
+ final OutputStream out2 = outStream;
+ UserGroupInformation ticket = getUGI();
+ boolean continueSasl;
+ if (ticket == null) {
+ throw new FatalConnectionException("ticket/user is null");
+ }
+ try {
+ continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return setupSaslConnection(in2, out2);
+ }
+ });
+ } catch (Exception ex) {
+ ExceptionUtil.rethrowIfInterrupt(ex);
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket);
+ continue;
+ }
+ if (continueSasl) {
+ // Sasl connect is successful. Let's set up Sasl i/o streams.
+ inStream = saslRpcClient.getInputStream(inStream);
+ outStream = saslRpcClient.getOutputStream(outStream);
+ } else {
+ // fall back to simple auth because server told us so.
+ // do not change authMethod and useSasl here, we should start from secure when
+ // reconnecting because regionserver may change its sasl config after restart.
+ }
+ }
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
+ this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ // Now write out the connection header
+ writeConnectionHeader();
+ break;
+ }
+ } catch (Throwable t) {
+ closeSocket();
+ IOException e = ExceptionUtil.asInterrupt(t);
+ if (e == null) {
+ this.rpcClient.failedServers.addToFailedServers(remoteId.address);
+ if (t instanceof LinkageError) {
+ // probably the hbase hadoop version does not match the running hadoop version
+ e = new DoNotRetryIOException(t);
+ } else if (t instanceof IOException) {
+ e = (IOException) t;
+ } else {
+ e = new IOException("Could not set up IO Streams to " + remoteId.address, t);
+ }
+ }
+ throw e;
+ }
+
+ // start the receiver thread after the socket connection has been set up
+ thread = new Thread(this, threadName);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
+ */
+ private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
+ out.write(connectionHeaderPreamble);
+ out.flush();
+ }
+
+ /**
+ * Write the connection header.
+ */
+ private void writeConnectionHeader() throws IOException {
+ this.out.write(connectionHeaderWithLength);
+ this.out.flush();
+ }
+
+ private void tracedWriteRequest(Call call) throws IOException {
+ try (TraceScope ignored = Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) {
+ writeRequest(call);
+ }
+ }
+
+ /**
+ * Initiates a call by sending the parameter to the remote server. Note: this is not called from
+ * the Connection thread, but by other threads.
+ * @see #readResponse()
+ */
+ private void writeRequest(Call call) throws IOException {
+ ByteBuffer cellBlock =
+ this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells);
+ CellBlockMeta cellBlockMeta;
+ if (cellBlock != null) {
+ cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
+ } else {
+ cellBlockMeta = null;
+ }
+ RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
+
+ setupIOstreams();
+
+ // Now we're going to write the call. We take the lock, then check that the connection
+ // is still valid, and, if so we do the write to the socket. If the write fails, we don't
+ // know where we stand, we have to close the connection.
+ if (Thread.interrupted()) {
+ throw new InterruptedIOException();
+ }
+
+ calls.put(call.id, call); // We put first as we don't want the connection to become idle.
+ // from here, we do not throw any exception to upper layer as the call has been tracked in the
+ // pending calls map.
+ try {
+ call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
+ } catch (IOException e) {
+ closeConn(e);
+ return;
+ }
+ notifyAll();
+ }
+
+ /*
+ * Receive a response. Because only one receiver, so no synchronization on in.
+ */
+ private void readResponse() {
+ Call call = null;
+ boolean expectedCall = false;
+ try {
+ // See HBaseServer.Call.setResponse for where we write out the response.
+ // Total size of the response. Unused. But have to read it in anyways.
+ int totalSize = in.readInt();
+
+ // Read the header
+ ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+ int id = responseHeader.getCallId();
+ call = calls.remove(id); // call.done have to be set before leaving this method
+ expectedCall = (call != null && !call.isDone());
+ if (!expectedCall) {
+ // So we got a response for which we have no corresponding 'call' here on the client-side.
+ // We probably timed out waiting, cleaned up all references, and now the server decides
+ // to return a response. There is nothing we can do w/ the response at this stage. Clean
+ // out the wire of the response so its out of the way and we can get other responses on
+ // this connection.
+ int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
+ int whatIsLeftToRead = totalSize - readSoFar;
+ IOUtils.skipFully(in, whatIsLeftToRead);
+ if (call != null) {
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats
+ .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+ }
+ return;
+ }
+ if (responseHeader.hasException()) {
+ ExceptionResponse exceptionResponse = responseHeader.getException();
+ RemoteException re = createRemoteException(exceptionResponse);
+ call.setException(re);
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats
+ .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+ if (isFatalConnectionException(exceptionResponse)) {
+ synchronized (this) {
+ closeConn(re);
+ }
+ }
+ } else {
+ Message value = null;
+ if (call.responseDefaultType != null) {
+ Builder builder = call.responseDefaultType.newBuilderForType();
+ ProtobufUtil.mergeDelimitedFrom(builder, in);
+ value = builder.build();
+ }
+ CellScanner cellBlockScanner = null;
+ if (responseHeader.hasCellBlockMeta()) {
+ int size = responseHeader.getCellBlockMeta().getLength();
+ byte[] cellBlock = new byte[size];
+ IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
+ cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
+ this.compressor, cellBlock);
+ }
+ call.setResponse(value, cellBlockScanner);
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats
+ .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+ }
+ } catch (IOException e) {
+ if (expectedCall) {
+ call.setException(e);
+ }
+ if (e instanceof SocketTimeoutException) {
+ // Clean up open calls but don't treat this as a fatal condition,
+ // since we expect certain responses to not make it by the specified
+ // {@link ConnectionId#rpcTimeout}.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ignored", e);
+ }
+ } else {
+ synchronized (this) {
+ closeConn(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected synchronized void callTimeout(Call call) {
+ // call sender
+ calls.remove(call.id);
+ }
+
+ // just close socket input and output.
+ private void closeSocket() {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(socket);
+ out = null;
+ in = null;
+ socket = null;
+ }
+
+ // close socket, reader, and clean up all pending calls.
+ private void closeConn(IOException e) {
+ if (thread == null) {
+ return;
+ }
+ thread.interrupt();
+ thread = null;
+ closeSocket();
+ if (callSender != null) {
+ callSender.cleanup(e);
+ }
+ for (Call call : calls.values()) {
+ call.setException(e);
+ }
+ calls.clear();
+ }
+
+ // release all resources, the connection will not be used any more.
+ @Override
+ public synchronized void shutdown() {
+ closed = true;
+ if (callSender != null) {
+ callSender.interrupt();
+ }
+ closeConn(new IOException("connection to " + remoteId.address + " closed"));
+ }
+
+ @Override
+ public void cleanupConnection() {
+ // do nothing
+ }
+
+ @Override
+ public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
+ throws IOException {
+ pcrc.notifyOnCancel(new RpcCallback<Object>() {
+
+ @Override
+ public void run(Object parameter) {
+ setCancelled(call);
+ synchronized (BlockingRpcConnection.this) {
+ if (callSender != null) {
+ callSender.remove(call);
+ } else {
+ calls.remove(call.id);
+ }
+ }
+ }
+ }, new CancellationCallback() {
+
+ @Override
+ public void run(boolean cancelled) throws IOException {
+ if (cancelled) {
+ setCancelled(call);
+ return;
+ }
+ scheduleTimeoutTask(call);
+ if (callSender != null) {
+ callSender.sendCall(call);
+ } else {
+ tracedWriteRequest(call);
+ }
+ }
+ });
+ }
+
+ @Override
+ public synchronized boolean isActive() {
+ return thread != null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
new file mode 100644
index 0000000..c628c31
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * We will expose the connection to upper layer before initialized, so we need to buffer the calls
+ * passed in and write them out once the connection is established.
+ */
+@InterfaceAudience.Private
+class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
+
+ private enum BufferCallAction {
+ FLUSH, FAIL
+ }
+
+ public static final class BufferCallEvent {
+
+ public final BufferCallAction action;
+
+ public final IOException error;
+
+ private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
+ IOException error) {
+ this.action = action;
+ this.error = error;
+ }
+
+ public static BufferCallBeforeInitHandler.BufferCallEvent success() {
+ return SUCCESS_EVENT;
+ }
+
+ public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) {
+ return new BufferCallEvent(BufferCallAction.FAIL, error);
+ }
+ }
+
+ private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH,
+ null);
+
+ private final Map<Integer, Call> id2Call = new HashMap<>();
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+ if (msg instanceof Call) {
+ Call call = (Call) msg;
+ id2Call.put(call.id, call);
+ // The call is already in track so here we set the write operation as success.
+ // We will fail the call directly if we can not write it out.
+ promise.trySuccess();
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof BufferCallEvent) {
+ BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
+ switch (bcEvt.action) {
+ case FLUSH:
+ for (Call call : id2Call.values()) {
+ ctx.write(call);
+ }
+ break;
+ case FAIL:
+ for (Call call : id2Call.values()) {
+ call.setException(bcEvt.error);
+ }
+ break;
+ }
+ ctx.flush();
+ ctx.pipeline().remove(this);
+ } else if (evt instanceof CallEvent) {
+ // just remove the call for now until we add other call event other than timeout and cancel.
+ id2Call.remove(((CallEvent) evt).call.id);
+ } else {
+ ctx.fireUserEventTriggered(evt);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index 5f90837..a6203d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -19,36 +19,50 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+
+import io.netty.util.Timeout;
+
+import java.io.IOException;
+
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-import java.io.IOException;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
/** A call waiting for a value. */
@InterfaceAudience.Private
-public class Call {
- final int id; // call id
- final Message param; // rpc request method param object
+class Call {
+ final int id; // call id
+ final Message param; // rpc request method param object
/**
- * Optionally has cells when making call. Optionally has cells set on response. Used
- * passing cells to the rpc and receiving the response.
+ * Optionally has cells when making call. Optionally has cells set on response. Used passing cells
+ * to the rpc and receiving the response.
*/
CellScanner cells;
- Message response; // value, null if error
- // The return type. Used to create shell into which we deserialize the response if any.
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "Direct access is only allowed after done")
+ Message response; // value, null if error
+ // The return type. Used to create shell into which we deserialize the response if any.
Message responseDefaultType;
- IOException error; // exception, null if value
- volatile boolean done; // true when call is done
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "Direct access is only allowed after done")
+ IOException error; // exception, null if value
+ private boolean done; // true when call is done
final Descriptors.MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite.
+ final int priority;
final MetricsConnection.CallStats callStats;
+ final RpcCallback<Call> callback;
+ final Span span;
+ Timeout timeoutTask;
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
- final CellScanner cells, final Message responseDefaultType, int timeout,
- MetricsConnection.CallStats callStats) {
+ final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
+ RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;
this.md = md;
this.cells = cells;
@@ -57,73 +71,74 @@ public class Call {
this.responseDefaultType = responseDefaultType;
this.id = id;
this.timeout = timeout;
+ this.priority = priority;
+ this.callback = callback;
+ this.span = Trace.currentSpan();
+ }
+
+ @Override
+ public String toString() {
+ return "callId: " + this.id + " methodName: " + this.md.getName() + " param {"
+ + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
}
/**
- * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
- * @return true if the call is on timeout, false otherwise.
+ * called from timeoutTask, prevent self cancel
*/
- public boolean checkAndSetTimeout() {
- if (timeout == 0){
- return false;
- }
-
- long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
- if (waitTime >= timeout) {
- IOException ie = new CallTimeoutException("Call id=" + id +
- ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
- setException(ie); // includes a notify
- return true;
- } else {
- return false;
+ public void setTimeout(IOException error) {
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.error = error;
}
+ callback.run(this);
}
- public int remainingTime() {
- if (timeout == 0) {
- return Integer.MAX_VALUE;
+ private void callComplete() {
+ if (timeoutTask != null) {
+ timeoutTask.cancel();
}
-
- int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime());
- return remaining > 0 ? remaining : 0;
+ callback.run(this);
}
- @Override
- public String toString() {
- return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
- (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
- }
-
- /** Indicate when the call is complete and the
- * value or error are available. Notifies by default. */
- protected synchronized void callComplete() {
- this.done = true;
- notify(); // notify caller
- }
-
- /** Set the exception when there is an error.
- * Notify the caller the call is done.
- *
+ /**
+ * Set the exception when there is an error. Notify the caller the call is done.
* @param error exception thrown by the call; either local or remote
*/
public void setException(IOException error) {
- this.error = error;
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.error = error;
+ }
callComplete();
}
/**
- * Set the return value when there is no error.
- * Notify the caller the call is done.
- *
+ * Set the return value when there is no error. Notify the caller the call is done.
* @param response return value of the call.
* @param cells Can be null
*/
public void setResponse(Message response, final CellScanner cells) {
- this.response = response;
- this.cells = cells;
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ this.done = true;
+ this.response = response;
+ this.cells = cells;
+ }
callComplete();
}
+ public synchronized boolean isDone() {
+ return done;
+ }
+
public long getStartTime() {
return this.callStats.getStartTime();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
new file mode 100644
index 0000000..a6777c0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Client side call cancelled.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CallCancelledException extends HBaseIOException {
+
+ private static final long serialVersionUID = 309775809470318208L;
+
+ public CallCancelledException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
new file mode 100644
index 0000000..1c2ea32
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to tell netty handler the call is cancelled, timeout...
+ */
+@InterfaceAudience.Private
+class CallEvent {
+
+ public enum Type {
+ TIMEOUT, CANCELLED
+ }
+
+ final Type type;
+
+ final Call call;
+
+ CallEvent(Type type, Call call) {
+ this.type = type;
+ this.call = call;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
new file mode 100644
index 0000000..0dac2d1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Helper class for building cell block.
+ */
+@InterfaceAudience.Private
+class CellBlockBuilder {
+
+ // LOG is being used in TestCellBlockBuilder
+ static final Log LOG = LogFactory.getLog(CellBlockBuilder.class);
+
+ private final Configuration conf;
+
+ /**
+ * How much we think the decompressor will expand the original compressed content.
+ */
+ private final int cellBlockDecompressionMultiplier;
+
+ private final int cellBlockBuildingInitialBufferSize;
+
+ public CellBlockBuilder(Configuration conf) {
+ this.conf = conf;
+ this.cellBlockDecompressionMultiplier =
+ conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+
+ // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
+ // #buildCellBlock.
+ this.cellBlockBuildingInitialBufferSize =
+ ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
+ }
+
+ private interface OutputStreamSupplier {
+
+ OutputStream get(int expectedSize);
+
+ int size();
+ }
+
+ private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier {
+
+ private ByteBufferOutputStream baos;
+
+ @Override
+ public OutputStream get(int expectedSize) {
+ baos = new ByteBufferOutputStream(expectedSize);
+ return baos;
+ }
+
+ @Override
+ public int size() {
+ return baos.size();
+ }
+ }
+
+ /**
+ * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+ * <code>compressor</code>.
+ * @param codec
+ * @param compressor
+ * @param cellScanner
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+ * been flipped and is ready for reading. Use limit to find total size.
+ * @throws IOException
+ */
+ public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ final CellScanner cellScanner) throws IOException {
+ ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
+ if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+ ByteBuffer bb = supplier.baos.getByteBuffer();
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ return bb.hasRemaining() ? bb : null;
+ } else {
+ return null;
+ }
+ }
+
+ private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier {
+
+ private final ByteBufAllocator alloc;
+
+ private ByteBuf buf;
+
+ public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) {
+ this.alloc = alloc;
+ }
+
+ @Override
+ public OutputStream get(int expectedSize) {
+ buf = alloc.buffer(expectedSize);
+ return new ByteBufOutputStream(buf);
+ }
+
+ @Override
+ public int size() {
+ return buf.writerIndex();
+ }
+ }
+
+ public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner,
+ ByteBufAllocator alloc) throws IOException {
+ ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
+ if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+ return supplier.buf;
+ } else {
+ return null;
+ }
+ }
+
+ private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
+ if (cellScanner == null) {
+ return false;
+ }
+ if (codec == null) {
+ throw new CellScannerButNoCodecException();
+ }
+ int bufferSize = cellBlockBuildingInitialBufferSize;
+ encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
+ if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
+ LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size() +
+ "; up hbase.ipc.cellblock.building.initial.buffersize?");
+ }
+ return true;
+ }
+
+ private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
+ CompressionCodec compressor) throws IOException {
+ Compressor poolCompressor = null;
+ try {
+ if (compressor != null) {
+ if (compressor instanceof Configurable) {
+ ((Configurable) compressor).setConf(this.conf);
+ }
+ poolCompressor = CodecPool.getCompressor(compressor);
+ os = compressor.createOutputStream(os, poolCompressor);
+ }
+ Codec.Encoder encoder = codec.getEncoder(os);
+ while (cellScanner.advance()) {
+ encoder.write(cellScanner.current());
+ }
+ encoder.flush();
+ } catch (BufferOverflowException | IndexOutOfBoundsException e) {
+ throw new DoNotRetryIOException(e);
+ } finally {
+ os.close();
+ if (poolCompressor != null) {
+ CodecPool.returnCompressor(poolCompressor);
+ }
+ }
+ }
+
+ /**
+ * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+ * <code>compressor</code>.
+ * @param codec
+ * @param compressor
+ * @param cellScanner
+ * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate our own
+ * ByteBuffer.
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+ * been flipped and is ready for reading. Use limit to find total size. If
+ * <code>pool</code> was not null, then this returned ByteBuffer came from there and
+ * should be returned to the pool when done.
+ * @throws IOException
+ */
+ public ByteBuffer buildCellBlock(Codec codec, CompressionCodec compressor,
+ CellScanner cellScanner, BoundedByteBufferPool pool) throws IOException {
+ if (cellScanner == null) {
+ return null;
+ }
+ if (codec == null) {
+ throw new CellScannerButNoCodecException();
+ }
+ ByteBufferOutputStream bbos;
+ ByteBuffer bb = null;
+ if (pool != null) {
+ bb = pool.getBuffer();
+ bbos = new ByteBufferOutputStream(bb);
+ } else {
+ bbos = new ByteBufferOutputStream(cellBlockBuildingInitialBufferSize);
+ }
+ encodeCellsTo(bbos, cellScanner, codec, compressor);
+ if (bbos.size() == 0) {
+ if (pool != null) {
+ pool.putBuffer(bb);
+ }
+ return null;
+ }
+ return bbos.getByteBuffer();
+ }
+
+ /**
+ * @param codec to use for cellblock
+ * @param cellBlock to encode
+ * @return CellScanner to work against the content of <code>cellBlock</code>
+ * @throws IOException if encoding fails
+ */
+ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+ final byte[] cellBlock) throws IOException {
+ return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
+ }
+
+ /**
+ * @param codec
+ * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
+ * position()'ed at the start of the cell block and limit()'ed at the end.
+ * @return CellScanner to work against the content of <code>cellBlock</code>
+ * @throws IOException
+ */
+ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+ ByteBuffer cellBlock) throws IOException {
+ if (compressor != null) {
+ cellBlock = decompress(compressor, cellBlock);
+ }
+ // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
+ // make Cells directly over the passed BB. This method is called at client side and we don't
+ // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
+ // of the Cells at user's app level will make it not possible to GC the response byte[]
+ return codec.getDecoder(new ByteBufferInputStream(cellBlock));
+ }
+
+ private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
+ throws IOException {
+ // GZIPCodec fails w/ NPE if no configuration.
+ if (compressor instanceof Configurable) {
+ ((Configurable) compressor).setConf(this.conf);
+ }
+ Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
+ CompressionInputStream cis =
+ compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor);
+ ByteBufferOutputStream bbos;
+ try {
+ // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
+ // TODO: Reuse buffers.
+ bbos =
+ new ByteBufferOutputStream(cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+ IOUtils.copy(cis, bbos);
+ bbos.close();
+ cellBlock = bbos.getByteBuffer();
+ } finally {
+ CodecPool.returnDecompressor(poolDecompressor);
+ }
+ return cellBlock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
new file mode 100644
index 0000000..ffd27b3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown if a cellscanner but no codec to encode it with.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CellScannerButNoCodecException extends HBaseIOException {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
new file mode 100644
index 0000000..f710d54
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The default netty event loop config
+ */
+@InterfaceAudience.Private
+class DefaultNettyEventLoopConfig {
+
+ public static final Pair<EventLoopGroup, Class<? extends Channel>> GROUP_AND_CHANNEL_CLASS = Pair
+ .<EventLoopGroup, Class<? extends Channel>> newPair(
+ new NioEventLoopGroup(0,
+ new DefaultThreadFactory("Default-IPC-NioEventLoopGroup", true, Thread.MAX_PRIORITY)),
+ NioSocketChannel.class);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
new file mode 100644
index 0000000..aaaea1f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.RpcCallback;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Simple delegating controller for use with the {@link RpcControllerFactory} to help override
+ * standard behavior of a {@link HBaseRpcController}. Used testing.
+ */
+@InterfaceAudience.Private
+public class DelegatingHBaseRpcController implements HBaseRpcController {
+
+ private final HBaseRpcController delegate;
+
+ public DelegatingHBaseRpcController(HBaseRpcController delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void reset() {
+ delegate.reset();
+ }
+
+ @Override
+ public boolean failed() {
+ return delegate.failed();
+ }
+
+ @Override
+ public String errorText() {
+ return delegate.errorText();
+ }
+
+ @Override
+ public void startCancel() {
+ delegate.startCancel();
+ }
+
+ @Override
+ public void setFailed(String reason) {
+ delegate.setFailed(reason);
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return delegate.isCanceled();
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> callback) {
+ delegate.notifyOnCancel(callback);
+ }
+
+ @Override
+ public CellScanner cellScanner() {
+ return delegate.cellScanner();
+ }
+
+ @Override
+ public void setCellScanner(CellScanner cellScanner) {
+ delegate.setCellScanner(cellScanner);
+ }
+
+ @Override
+ public void setPriority(int priority) {
+ delegate.setPriority(priority);
+ }
+
+ @Override
+ public void setPriority(TableName tn) {
+ delegate.setPriority(tn);
+ }
+
+ @Override
+ public int getPriority() {
+ return delegate.getPriority();
+ }
+
+ @Override
+ public int getCallTimeout() {
+ return delegate.getCallTimeout();
+ }
+
+ @Override
+ public void setCallTimeout(int callTimeout) {
+ delegate.setCallTimeout(callTimeout);
+ }
+
+ @Override
+ public boolean hasCallTimeout() {
+ return delegate.hasCallTimeout();
+ }
+
+ @Override
+ public void setFailed(IOException e) {
+ delegate.setFailed(e);
+ }
+
+ @Override
+ public IOException getFailed() {
+ return delegate.getFailed();
+ }
+
+ @Override
+ public void setDone(CellScanner cellScanner) {
+ delegate.setDone(cellScanner);
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
+ throws IOException {
+ delegate.notifyOnCancel(callback, action);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
deleted file mode 100644
index ad4224b..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Simple delegating controller for use with the {@link RpcControllerFactory} to help override
- * standard behavior of a {@link PayloadCarryingRpcController}.
- */
-@InterfaceAudience.Private
-public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController {
- private PayloadCarryingRpcController delegate;
-
- public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public CellScanner cellScanner() {
- return delegate.cellScanner();
- }
-
- @Override
- public void setCellScanner(final CellScanner cellScanner) {
- delegate.setCellScanner(cellScanner);
- }
-
- @Override
- public void setPriority(int priority) {
- delegate.setPriority(priority);
- }
-
- @Override
- public void setPriority(final TableName tn) {
- delegate.setPriority(tn);
- }
-
- @Override
- public int getPriority() {
- return delegate.getPriority();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
new file mode 100644
index 0000000..721148b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Indicate that the rpc server tells client to fallback to simple auth but client is disabled to do
+ * so.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FallbackDisallowedException extends HBaseIOException {
+
+ private static final long serialVersionUID = -6942845066279358253L;
+
+ public FallbackDisallowedException() {
+ super("Server asks us to fall back to SIMPLE auth, "
+ + "but this client is configured to only allow secure connections.");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
new file mode 100644
index 0000000..2c4b335
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it
+ * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid
+ * having to protobuf them (for performance reasons). This class is used ferrying data across the
+ * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing.
+ */
+@InterfaceAudience.Private
+public interface HBaseRpcController extends RpcController, CellScannable {
+
+ static final int PRIORITY_UNSET = -1;
+
+ /**
+ * Only used to send cells to rpc server, the returned cells should be set by
+ * {@link #setDone(CellScanner)}.
+ */
+ void setCellScanner(CellScanner cellScanner);
+
+ /**
+ * @param priority Priority for this request; should fall roughly in the range
+ * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
+ */
+ void setPriority(int priority);
+
+ /**
+ * @param tn Set priority based off the table we are going against.
+ */
+ void setPriority(final TableName tn);
+
+ /**
+ * @return The priority of this request
+ */
+ int getPriority();
+
+ int getCallTimeout();
+
+ void setCallTimeout(int callTimeout);
+
+ boolean hasCallTimeout();
+
+ /**
+ * Set failed with an exception to pass on. For use in async rpc clients
+ * @param e exception to set with
+ */
+ void setFailed(IOException e);
+
+ /**
+ * Return the failed exception, null if not failed.
+ */
+ IOException getFailed();
+
+ /**
+ * <b>IMPORTANT:</b> always call this method if the call finished without any exception to tell
+ * the {@code HBaseRpcController} that we are done.
+ */
+ void setDone(CellScanner cellScanner);
+
+ /**
+ * A little different from the basic RpcController:
+ * <ol>
+ * <li>You can register multiple callbacks to an {@code HBaseRpcController}.</li>
+ * <li>The callback will not be called if the rpc call is finished without any cancellation.</li>
+ * <li>You can call me at client side also.</li>
+ * </ol>
+ */
+ @Override
+ void notifyOnCancel(RpcCallback<Object> callback);
+
+ interface CancellationCallback {
+ void run(boolean cancelled) throws IOException;
+ }
+
+ /**
+ * If not cancelled, add the callback to cancellation callback list. And then execute the action
+ * with the cancellation state as a parameter. The implementation should guarantee that the
+ * cancellation state does not change during this call.
+ */
+ void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
new file mode 100644
index 0000000..a976473
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.RpcCallback;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it
+ * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid
+ * having to protobuf them (for performance reasons). This class is used ferrying data across the
+ * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing.
+ */
+@InterfaceAudience.Private
+public class HBaseRpcControllerImpl implements HBaseRpcController {
+ /**
+ * The time, in ms before the call should expire.
+ */
+ private Integer callTimeout;
+
+ private boolean done = false;
+
+ private boolean cancelled = false;
+
+ private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
+
+ private IOException exception;
+
+ /**
+ * Priority to set on this request. Set it here in controller so available composing the request.
+ * This is the ordained way of setting priorities going forward. We will be undoing the old
+ * annotation-based mechanism.
+ */
+ private int priority = PRIORITY_UNSET;
+
+ /**
+ * They are optionally set on construction, cleared after we make the call, and then optionally
+ * set on response with the result. We use this lowest common denominator access to Cells because
+ * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
+ * block that implements CellScanner.
+ */
+ private CellScanner cellScanner;
+
+ public HBaseRpcControllerImpl() {
+ this((CellScanner) null);
+ }
+
+ public HBaseRpcControllerImpl(final CellScanner cellScanner) {
+ this.cellScanner = cellScanner;
+ }
+
+ public HBaseRpcControllerImpl(final List<CellScannable> cellIterables) {
+ this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables);
+ }
+
+ /**
+ * @return One-shot cell scanner (you cannot back it up and restart)
+ */
+ @Override
+ public CellScanner cellScanner() {
+ return cellScanner;
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "The only possible race method is startCancel")
+ @Override
+ public void setCellScanner(final CellScanner cellScanner) {
+ this.cellScanner = cellScanner;
+ }
+
+ @Override
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ @Override
+ public void setPriority(final TableName tn) {
+ setPriority(
+ tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "The only possible race method is startCancel")
+ @Override
+ public void reset() {
+ priority = 0;
+ cellScanner = null;
+ exception = null;
+ callTimeout = null;
+ // In the implementations of some callable with replicas, rpc calls are executed in a executor
+ // and we could cancel the operation from outside which means there could be a race between
+ // reset and startCancel. Although I think the race should be handled by the callable since the
+ // reset may clear the cancel state...
+ synchronized (this) {
+ done = false;
+ cancelled = false;
+ cancellationCbs.clear();
+ }
+ }
+
+ @Override
+ public int getCallTimeout() {
+ if (callTimeout != null) {
+ return callTimeout.intValue();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public void setCallTimeout(int callTimeout) {
+ this.callTimeout = callTimeout;
+ }
+
+ @Override
+ public boolean hasCallTimeout() {
+ return callTimeout != null;
+ }
+
+ @Override
+ public synchronized String errorText() {
+ if (!done || exception == null) {
+ return null;
+ }
+ return exception.getMessage();
+ }
+
+ @Override
+ public synchronized boolean failed() {
+ return done && this.exception != null;
+ }
+
+ @Override
+ public synchronized boolean isCanceled() {
+ return cancelled;
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> callback) {
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ if (!cancelled) {
+ cancellationCbs.add(callback);
+ return;
+ }
+ }
+ // run it directly as we have already been cancelled.
+ callback.run(null);
+ }
+
+ @Override
+ public synchronized void setFailed(String reason) {
+ if (done) {
+ return;
+ }
+ done = true;
+ exception = new IOException(reason);
+ }
+
+ @Override
+ public synchronized void setFailed(IOException e) {
+ if (done) {
+ return;
+ }
+ done = true;
+ exception = e;
+ }
+
+ @Override
+ public synchronized IOException getFailed() {
+ return done ? exception : null;
+ }
+
+ @Override
+ public synchronized void setDone(CellScanner cellScanner) {
+ if (done) {
+ return;
+ }
+ done = true;
+ this.cellScanner = cellScanner;
+ }
+
+ @Override
+ public void startCancel() {
+ // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
+ // to copy it.
+ List<RpcCallback<Object>> cbs;
+ synchronized (this) {
+ if (done) {
+ return;
+ }
+ done = true;
+ cancelled = true;
+ cbs = new ArrayList<>(cancellationCbs);
+ }
+ for (RpcCallback<?> cb : cbs) {
+ cb.run(null);
+ }
+ }
+
+ @Override
+ public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
+ throws IOException {
+ if (cancelled) {
+ action.run(true);
+ } else {
+ cancellationCbs.add(callback);
+ action.run(false);
+ }
+ }
+
+}
\ No newline at end of file