You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/12/09 00:40:28 UTC
[4/4] hbase git commit: HBASE-12597
HBASE-12597
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a8e64618
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a8e64618
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a8e64618
Branch: refs/heads/master
Commit: a8e646185569ac121993c49cea0cfebfadac1b5c
Parents: 1a27cb7
Author: Jurriaan Mous <ju...@jurmo.us>
Authored: Sat Dec 6 13:29:15 2014 +0100
Committer: stack <st...@apache.org>
Committed: Mon Dec 8 15:40:09 2014 -0800
----------------------------------------------------------------------
.../hadoop/hbase/client/ConnectionManager.java | 19 +-
.../client/PreemptiveFastFailInterceptor.java | 2 +-
.../hadoop/hbase/ipc/AbstractRpcClient.java | 177 ++
.../java/org/apache/hadoop/hbase/ipc/Call.java | 127 ++
.../hadoop/hbase/ipc/CallTimeoutException.java | 35 +
.../apache/hadoop/hbase/ipc/ConnectionId.java | 78 +
.../hadoop/hbase/ipc/FailedServerException.java | 37 +
.../apache/hadoop/hbase/ipc/FailedServers.java | 79 +
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 22 +-
.../org/apache/hadoop/hbase/ipc/RpcClient.java | 1749 +-----------------
.../hadoop/hbase/ipc/RpcClientFactory.java | 70 +
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 1383 ++++++++++++++
.../hbase/zookeeper/MetaTableLocator.java | 4 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 2 +-
.../hadoop/hbase/master/AssignmentManager.java | 2 +-
.../hbase/regionserver/HRegionServer.java | 11 +-
.../client/TestClientScannerRPCTimeout.java | 4 +-
.../hadoop/hbase/client/TestClientTimeouts.java | 52 +-
.../hbase/client/TestFromClientSideNoCodec.java | 4 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 2 +-
.../hbase/filter/FilterTestingCluster.java | 4 +-
.../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 15 +-
.../hadoop/hbase/ipc/TestHBaseClient.java | 2 +-
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 20 +-
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 4 +-
.../hbase/master/TestHMasterRPCException.java | 5 +-
.../hadoop/hbase/security/TestSecureRPC.java | 6 +-
.../security/token/TestTokenAuthentication.java | 5 +-
.../snapshot/TestFlushSnapshotFromClient.java | 4 +-
29 files changed, 2113 insertions(+), 1811 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index f822709..7f599da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -619,7 +620,7 @@ class ConnectionManager {
this.registry = setupRegistry();
retrieveClusterId();
- this.rpcClient = new RpcClient(this.conf, this.clusterId);
+ this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
// Do we publish the status?
@@ -639,7 +640,7 @@ class ConnectionManager {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
- rpcClient.cancelConnections(sn.getHostname(), sn.getPort());
+ rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
}
@@ -785,18 +786,6 @@ class ConnectionManager {
/**
* For tests only.
- * @param rpcClient Client we should use instead.
- * @return Previous rpcClient
- */
- @VisibleForTesting
- RpcClient setRpcClient(final RpcClient rpcClient) {
- RpcClient oldRpcClient = this.rpcClient;
- this.rpcClient = rpcClient;
- return oldRpcClient;
- }
-
- /**
- * For tests only.
*/
@VisibleForTesting
RpcClient getRpcClient() {
@@ -2336,7 +2325,7 @@ class ConnectionManager {
clusterStatusListener.close();
}
if (rpcClient != null) {
- rpcClient.stop();
+ rpcClient.close();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
index 4256120..6fb2de3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
-import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
+import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
new file mode 100644
index 0000000..766ad8f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import java.net.SocketAddress;
+
+/**
+ * Provides the basics for a RpcClient implementation like configuration and Logging.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractRpcClient implements RpcClient {
+ public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
+
+ protected final Configuration conf;
+ protected String clusterId;
+ protected final SocketAddress localAddr;
+
+ protected UserProvider userProvider;
+ protected final IPCUtil ipcUtil;
+
+ protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
+ // time (in ms), it will be closed at any moment.
+ protected final int maxRetries; //the max. no. of retries for socket connections
+ protected final long failureSleep; // Time to sleep before retry on failure.
+ protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+ protected final boolean tcpKeepAlive; // if T then use keepalives
+ protected final Codec codec;
+ protected final CompressionCodec compressor;
+ protected final boolean fallbackAllowed;
+
+ protected final int connectTO;
+ protected final int readTO;
+ protected final int writeTO;
+
+ /**
+ * Construct an IPC client for the cluster <code>clusterId</code>
+ *
+ * @param conf configuration
+ * @param clusterId the cluster id
+ * @param localAddr client socket bind address.
+ */
+ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
+ this.userProvider = UserProvider.instantiate(conf);
+ this.localAddr = localAddr;
+ this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
+ this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
+ this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
+ this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
+ this.ipcUtil = new IPCUtil(conf);
+
+ this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
+ this.conf = conf;
+ this.codec = getCodec();
+ this.compressor = getCompressor(conf);
+ this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
+ this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
+ this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
+
+ // login the server principal (if using secure Hadoop)
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
+ ", tcpKeepAlive=" + this.tcpKeepAlive +
+ ", tcpNoDelay=" + this.tcpNoDelay +
+ ", connectTO=" + this.connectTO +
+ ", readTO=" + this.readTO +
+ ", writeTO=" + this.writeTO +
+ ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
+ ", maxRetries=" + this.maxRetries +
+ ", fallbackAllowed=" + this.fallbackAllowed +
+ ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
+ }
+ }
+
+ @VisibleForTesting
+ public static String getDefaultCodec(final Configuration c) {
+ // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
+ // Configuration will complain -- then no default codec (and we'll pb everything). Else
+ // default is KeyValueCodec
+ return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
+ }
+
+ /**
+ * Encapsulate the ugly casting and RuntimeException conversion in private method.
+ * @return Codec to use on this client.
+ */
+ Codec getCodec() {
+ // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
+ // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
+ String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
+ if (className == null || className.length() == 0) return null;
+ try {
+ return (Codec)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed getting codec " + className, e);
+ }
+ }
+
+ /**
+ * Encapsulate the ugly casting and RuntimeException conversion in private method.
+ * @param conf configuration
+ * @return The compressor to use on this client.
+ */
+ private static CompressionCodec getCompressor(final Configuration conf) {
+ String className = conf.get("hbase.client.rpc.compressor", null);
+ if (className == null || className.isEmpty()) return null;
+ try {
+ return (CompressionCodec)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed getting compressor " + className, e);
+ }
+ }
+
+ /**
+ * Return the pool type specified in the configuration, which must be set to
+ * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+ * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
+ * otherwise default to the former.
+ *
+ * For applications with many user threads, use a small round-robin pool. For
+ * applications with few user threads, you may want to try using a
+ * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
+ * instances should not exceed the operating system's hard limit on the number of
+ * connections.
+ *
+ * @param config configuration
+ * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
+ * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
+ */
+ protected static PoolMap.PoolType getPoolType(Configuration config) {
+ return PoolMap.PoolType
+ .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
+ PoolMap.PoolType.ThreadLocal);
+ }
+
+ /**
+ * Return the pool size specified in the configuration, which is applicable only if
+ * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
+ *
+ * @param config configuration
+ * @return the maximum pool size
+ */
+ protected static int getPoolSize(Configuration config) {
+ return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
new file mode 100644
index 0000000..df32730
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+
+/** A call waiting for a value. */
+@InterfaceAudience.Private
+public class Call {
+ final int id; // call id
+ final Message param; // rpc request method param object
+ /**
+ * Optionally has cells when making call. Optionally has cells set on response. Used
+ * passing cells to the rpc and receiving the response.
+ */
+ CellScanner cells;
+ Message response; // value, null if error
+ // The return type. Used to create shell into which we deserialize the response if any.
+ Message responseDefaultType;
+ IOException error; // exception, null if value
+ volatile boolean done; // true when call is done
+ long startTime;
+ final Descriptors.MethodDescriptor md;
+ final int timeout; // timeout in millisecond for this call; 0 means infinite.
+
+ protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
+ final CellScanner cells, final Message responseDefaultType, int timeout) {
+ this.param = param;
+ this.md = md;
+ this.cells = cells;
+ this.startTime = EnvironmentEdgeManager.currentTime();
+ this.responseDefaultType = responseDefaultType;
+ this.id = id;
+ this.timeout = timeout;
+ }
+
+ /**
+ * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
+ * @return true if the call is on timeout, false otherwise.
+ */
+ public boolean checkAndSetTimeout() {
+ if (timeout == 0){
+ return false;
+ }
+
+ long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
+ if (waitTime >= timeout) {
+ IOException ie = new CallTimeoutException("Call id=" + id +
+ ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
+ setException(ie); // includes a notify
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public int remainingTime() {
+ if (timeout == 0) {
+ return Integer.MAX_VALUE;
+ }
+
+ int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime());
+ return remaining > 0 ? remaining : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
+ (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
+ }
+
+ /** Indicate when the call is complete and the
+ * value or error are available. Notifies by default. */
+ protected synchronized void callComplete() {
+ this.done = true;
+ notify(); // notify caller
+ }
+
+ /** Set the exception when there is an error.
+ * Notify the caller the call is done.
+ *
+ * @param error exception thrown by the call; either local or remote
+ */
+ public void setException(IOException error) {
+ this.error = error;
+ callComplete();
+ }
+
+ /**
+ * Set the return value when there is no error.
+ * Notify the caller the call is done.
+ *
+ * @param response return value of the call.
+ * @param cells Can be null
+ */
+ public void setResponse(Message response, final CellScanner cells) {
+ this.response = response;
+ this.cells = cells;
+ callComplete();
+ }
+
+ public long getStartTime() {
+ return this.startTime;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
new file mode 100644
index 0000000..a81e5d1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import java.io.IOException;
+
+/**
+ * Client-side call timeout
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CallTimeoutException extends IOException {
+ public CallTimeoutException(final String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
new file mode 100644
index 0000000..a62d415
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.security.User;
+
+import java.net.InetSocketAddress;
+
+/**
+ * This class holds the address and the user ticket, etc. The client connections
+ * to servers are uniquely identified by <remoteAddress, ticket, serviceName>
+ */
+@InterfaceAudience.Private
+public class ConnectionId {
+ final InetSocketAddress address;
+ final User ticket;
+ private static final int PRIME = 16777619;
+ final String serviceName;
+
+ public ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
+ this.address = address;
+ this.ticket = ticket;
+ this.serviceName = serviceName;
+ }
+
+ public String getServiceName() {
+ return this.serviceName;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ public User getTicket() {
+ return ticket;
+ }
+
+ @Override
+ public String toString() {
+ return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ConnectionId) {
+ ConnectionId id = (ConnectionId) obj;
+ return address.equals(id.address) &&
+ ((ticket != null && ticket.equals(id.ticket)) ||
+ (ticket == id.ticket)) &&
+ this.serviceName == id.serviceName;
+ }
+ return false;
+ }
+
+ @Override // simply use the default Object#hashcode() ?
+ public int hashCode() {
+ int hashcode = (address.hashCode() +
+ PRIME * (PRIME * this.serviceName.hashCode() ^
+ (ticket == null ? 0 : ticket.hashCode())));
+ return hashcode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java
new file mode 100644
index 0000000..12f6451
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Indicates that we're trying to connect to a already known as dead server. We will want to
+ * retry: we're getting this because the region location was wrong, or because
+ * the server just died, in which case the retry loop will help us to wait for the
+ * regions to recover.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FailedServerException extends HBaseIOException {
+ public FailedServerException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java
new file mode 100644
index 0000000..16ec16c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * A class to manage a list of servers that failed recently.
+ */
+@InterfaceAudience.Private
+public class FailedServers {
+ private final LinkedList<Pair<Long, String>> failedServers = new
+ LinkedList<Pair<Long, String>>();
+ private final int recheckServersTimeout;
+
+ public FailedServers(Configuration conf) {
+ this.recheckServersTimeout = conf.getInt(
+ RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+ }
+
+ /**
+ * Add an address to the list of the failed servers list.
+ */
+ public synchronized void addToFailedServers(InetSocketAddress address) {
+ final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout;
+ failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
+ }
+
+ /**
+ * Check if the server should be considered as bad. Clean the old entries of the list.
+ *
+ * @return true if the server is in the failed servers list
+ */
+ public synchronized boolean isFailedServer(final InetSocketAddress address) {
+ if (failedServers.isEmpty()) {
+ return false;
+ }
+
+ final String lookup = address.toString();
+ final long now = EnvironmentEdgeManager.currentTime();
+
+ // iterate, looking for the search entry and cleaning expired entries
+ Iterator<Pair<Long, String>> it = failedServers.iterator();
+ while (it.hasNext()) {
+ Pair<Long, String> cur = it.next();
+ if (cur.getFirst() < now) {
+ it.remove();
+ } else {
+ if (lookup.equals(cur.getSecond())) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a8e64618/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 67e2524..b7e7728 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -51,7 +51,7 @@ import com.google.protobuf.Message;
* Utility to help ipc'ing.
*/
@InterfaceAudience.Private
-class IPCUtil {
+public class IPCUtil {
public static final Log LOG = LogFactory.getLog(IPCUtil.class);
/**
* How much we think the decompressor will expand the original compressed content.
@@ -60,7 +60,7 @@ class IPCUtil {
private final int cellBlockBuildingInitialBufferSize;
private final Configuration conf;
- IPCUtil(final Configuration conf) {
+ public IPCUtil(final Configuration conf) {
super();
this.conf = conf;
this.cellBlockDecompressionMultiplier =
@@ -81,14 +81,14 @@ class IPCUtil {
* <code>compressor</code>.
* @param codec
* @param compressor
- * @Param cellScanner
+ * @param cellScanner
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size.
* @throws IOException
*/
@SuppressWarnings("resource")
- ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner)
throws IOException {
if (cellScanner == null) return null;
@@ -145,7 +145,7 @@ class IPCUtil {
* @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException
*/
- CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock)
throws IOException {
return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
@@ -159,7 +159,7 @@ class IPCUtil {
* @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException
*/
- CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock, final int offset, final int length)
throws IOException {
// If compressed, decompress it first before passing it on else we will leak compression
@@ -200,7 +200,7 @@ class IPCUtil {
* @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null
* @throws IOException
*/
- static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
+ public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
if (m == null) return null;
int serializedSize = m.getSerializedSize();
int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
@@ -223,7 +223,7 @@ class IPCUtil {
* @return Total number of bytes written.
* @throws IOException
*/
- static int write(final OutputStream dos, final Message header, final Message param,
+ public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock)
throws IOException {
// Must calculate total size and write that first so other side can read it all in in one
@@ -255,7 +255,7 @@ class IPCUtil {
* @param len
* @throws IOException
*/
- static void readChunked(final DataInput in, byte[] dest, int offset, int len)
+ public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
throws IOException {
int maxRead = 8192;
@@ -265,11 +265,9 @@ class IPCUtil {
}
/**
- * @param header
- * @param body
* @return Size on the wire when the two messages are written with writeDelimitedTo
*/
- static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
+ public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
int totalSize = 0;
for (Message m: messages) {
if (m == null) continue;