You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/05/03 05:58:35 UTC
svn commit: r1478639 [2/10] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/
hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java...
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1478639&r1=1478638&r2=1478639&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri May 3 03:58:33 2013
@@ -20,15 +20,9 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -59,12 +53,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterAdminProtocol;
-import org.apache.hadoop.hbase.MasterMonitorProtocol;
-import org.apache.hadoop.hbase.MasterProtocol;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@@ -77,26 +66,95 @@ import org.apache.hadoop.hbase.exception
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
-import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
-import org.apache.hadoop.hbase.ipc.RpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Triple;
-import org.apache.hadoop.hbase.zookeeper.*;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
@@ -105,7 +163,8 @@ import com.google.protobuf.ServiceExcept
* {@link Configuration}; all invocations of {@link #getConnection(Configuration)}
* that pass the same {@link Configuration} instance will be returned the same
* {@link HConnection} instance (Adding properties to a Configuration
- * instance does not change its object identity). Sharing {@link HConnection}
+ * instance does not change its object identity; for more on how this is done see
+ * {@link HConnectionKey}). Sharing {@link HConnection}
* instances is usually what you want; all clients of the {@link HConnection}
* instances share the HConnections' cache of Region locations rather than each
* having to discover for itself the location of meta, etc. It makes
@@ -116,11 +175,9 @@ import com.google.protobuf.ServiceExcept
* implemented atop Hadoop RPC and as of this writing, Hadoop RPC does a
* connection per cluster-member, exclusively).
*
- * <p>But sharing connections
- * makes clean up of {@link HConnection} instances a little awkward. Currently,
- * clients cleanup by calling
- * {@link #deleteConnection(Configuration)}. This will shutdown the
- * zookeeper connection the HConnection was using and clean up all
+ * <p>But sharing connections makes clean up of {@link HConnection} instances a little awkward.
+ * Currently, clients cleanup by calling {@link #deleteConnection(Configuration)}. This will
+ * shutdown the zookeeper connection the HConnection was using and clean up all
* HConnection resources as well as stopping proxies to servers out on the
* cluster. Not running the cleanup will not end the world; it'll
* just stall the closeup some and spew some zookeeper connection failed
@@ -150,43 +207,30 @@ import com.google.protobuf.ServiceExcept
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HConnectionManager {
+ static final Log LOG = LogFactory.getLog(HConnectionManager.class);
+
+ public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
+
// An LRU Map of HConnectionKey -> HConnection (TableServer). All
// access must be synchronized. This map is not private because tests
// need to be able to tinker with it.
- static final Map<HConnectionKey, HConnectionImplementation> HBASE_INSTANCES;
-
- public static final int MAX_CACHED_HBASE_INSTANCES;
-
- /** Parameter name for what client protocol to use. */
- public static final String CLIENT_PROTOCOL_CLASS = "hbase.clientprotocol.class";
-
- /** Default client protocol class name. */
- public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
-
- /** Parameter name for what admin protocol to use. */
- public static final String REGION_PROTOCOL_CLASS = "hbase.adminprotocol.class";
-
- /** Default admin protocol class name. */
- public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
+ static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
- public static final String RETRIES_BY_SERVER = "hbase.client.retries.by.server";
-
- private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
+ public static final int MAX_CACHED_CONNECTION_INSTANCES;
static {
// We set instances to one more than the value specified for {@link
// HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
// connections to the ensemble from the one client is 30, so in that case we
// should run into zk issues before the LRU hit this value of 31.
- MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt(
- HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
- HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
- HBASE_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
- (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) {
- @Override
+ MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
+ HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
+ CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
+ (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
+ @Override
protected boolean removeEldestEntry(
Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
- return size() > MAX_CACHED_HBASE_INSTANCES;
+ return size() > MAX_CACHED_CONNECTION_INSTANCES;
}
};
}
@@ -194,31 +238,31 @@ public class HConnectionManager {
/*
* Non-instantiable.
*/
- protected HConnectionManager() {
+ private HConnectionManager() {
super();
}
/**
- * Get the connection that goes with the passed <code>conf</code>
- * configuration instance.
- * If no current connection exists, method creates a new connection for the
- * passed <code>conf</code> instance.
+ * Get the connection that goes with the passed <code>conf</code> configuration instance.
+ * If no current connection exists, method creates a new connection and keys it using
+ * connection-specific properties from the passed {@link Configuration}; see
+ * {@link HConnectionKey}.
* @param conf configuration
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
- public static HConnection getConnection(Configuration conf)
+ public static HConnection getConnection(final Configuration conf)
throws IOException {
HConnectionKey connectionKey = new HConnectionKey(conf);
- synchronized (HBASE_INSTANCES) {
- HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
+ synchronized (CONNECTION_INSTANCES) {
+ HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
if (connection == null) {
connection = new HConnectionImplementation(conf, true);
- HBASE_INSTANCES.put(connectionKey, connection);
+ CONNECTION_INSTANCES.put(connectionKey, connection);
} else if (connection.isClosed()) {
HConnectionManager.deleteConnection(connectionKey, true);
connection = new HConnectionImplementation(conf, true);
- HBASE_INSTANCES.put(connectionKey, connection);
+ CONNECTION_INSTANCES.put(connectionKey, connection);
}
connection.incCount();
return connection;
@@ -226,11 +270,10 @@ public class HConnectionManager {
}
/**
- * Create a new HConnection instance using the passed <code>conf</code>
- * instance.
- * Note: This bypasses the usual HConnection life cycle management!
- * Use this with caution, the caller is responsible for closing the
- * created connection.
+ * Create a new HConnection instance using the passed <code>conf</code> instance.
+ * <p>Note: This bypasses the usual HConnection life cycle management done by
+ * {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for
+ * calling {@link HConnection#close()} on the returned connection instance.
* @param conf configuration
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
@@ -241,22 +284,19 @@ public class HConnectionManager {
}
/**
- * Delete connection information for the instance specified by configuration.
- * If there are no more references to it, this will then close connection to
- * the zookeeper ensemble and let go of all resources.
+ * Delete connection information for the instance specified by passed configuration.
+ * If there are no more references to the designated connection connection, this method will
+ * then close connection to the zookeeper ensemble and let go of all associated resources.
*
- * @param conf
- * configuration whose identity is used to find {@link HConnection}
- * instance.
+ * @param conf configuration whose identity is used to find {@link HConnection} instance.
*/
public static void deleteConnection(Configuration conf) {
deleteConnection(new HConnectionKey(conf), false);
}
/**
- * Delete stale connection information for the instance specified by configuration.
- * This will then close connection to
- * the zookeeper ensemble and let go of all resources.
+ * Cleanup a known stale connection.
+ * This will then close connection to the zookeeper ensemble and let go of all resources.
*
* @param connection
*/
@@ -268,22 +308,21 @@ public class HConnectionManager {
* Delete information for all connections.
*/
public static void deleteAllConnections() {
- synchronized (HBASE_INSTANCES) {
+ synchronized (CONNECTION_INSTANCES) {
Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
- connectionKeys.addAll(HBASE_INSTANCES.keySet());
+ connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
for (HConnectionKey connectionKey : connectionKeys) {
deleteConnection(connectionKey, false);
}
- HBASE_INSTANCES.clear();
+ CONNECTION_INSTANCES.clear();
}
}
private static void deleteConnection(HConnection connection, boolean staleConnection) {
- synchronized (HBASE_INSTANCES) {
- for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
- .entrySet()) {
- if (connectionEntry.getValue() == connection) {
- deleteConnection(connectionEntry.getKey(), staleConnection);
+ synchronized (CONNECTION_INSTANCES) {
+ for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
+ if (e.getValue() == connection) {
+ deleteConnection(e.getKey(), staleConnection);
break;
}
}
@@ -291,18 +330,17 @@ public class HConnectionManager {
}
private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
- synchronized (HBASE_INSTANCES) {
- HConnectionImplementation connection = HBASE_INSTANCES
- .get(connectionKey);
+ synchronized (CONNECTION_INSTANCES) {
+ HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
if (connection != null) {
connection.decCount();
if (connection.isZeroReference() || staleConnection) {
- HBASE_INSTANCES.remove(connectionKey);
+ CONNECTION_INSTANCES.remove(connectionKey);
connection.internalClose();
}
} else {
LOG.error("Connection not found in the list, can't delete it "+
- "(connection key="+connectionKey+"). May be the key was modified?");
+ "(connection key=" + connectionKey + "). May be the key was modified?");
}
}
}
@@ -313,14 +351,12 @@ public class HConnectionManager {
* @return Number of cached regions for the table.
* @throws ZooKeeperConnectionException
*/
- static int getCachedRegionCount(Configuration conf,
- final byte[] tableName)
+ static int getCachedRegionCount(Configuration conf, final byte[] tableName)
throws IOException {
return execute(new HConnectable<Integer>(conf) {
@Override
public Integer connect(HConnection connection) {
- return ((HConnectionImplementation) connection)
- .getNumberOfCachedRegionLocations(tableName);
+ return ((HConnectionImplementation)connection).getNumberOfCachedRegionLocations(tableName);
}
});
}
@@ -331,8 +367,8 @@ public class HConnectionManager {
* @return true if the region where the table and row reside is cached.
* @throws ZooKeeperConnectionException
*/
- static boolean isRegionCached(Configuration conf,
- final byte[] tableName, final byte[] row) throws IOException {
+ static boolean isRegionCached(Configuration conf, final byte[] tableName, final byte[] row)
+ throws IOException {
return execute(new HConnectable<Boolean>(conf) {
@Override
public Boolean connect(HConnection connection) {
@@ -342,33 +378,9 @@ public class HConnectionManager {
}
/**
- * This class makes it convenient for one to execute a command in the context
- * of a {@link HConnection} instance based on the given {@link Configuration}.
- *
- * <p>
- * If you find yourself wanting to use a {@link HConnection} for a relatively
- * short duration of time, and do not want to deal with the hassle of creating
- * and cleaning up that resource, then you should consider using this
- * convenience class.
- *
- * @param <T>
- * the return type of the {@link HConnectable#connect(HConnection)}
- * method.
- */
- public static abstract class HConnectable<T> {
- public Configuration conf;
-
- protected HConnectable(Configuration conf) {
- this.conf = conf;
- }
-
- public abstract T connect(HConnection connection) throws IOException;
- }
-
- /**
* This convenience method invokes the given {@link HConnectable#connect}
* implementation using a {@link HConnection} instance that lasts just for the
- * duration of that invocation.
+ * duration of the invocation.
*
* @param <T> the return type of the connect method
* @param connectable the {@link HConnectable} instance
@@ -398,127 +410,14 @@ public class HConnectionManager {
}
}
- /**
- * Denotes a unique key to a {@link HConnection} instance.
- *
- * In essence, this class captures the properties in {@link Configuration}
- * that may be used in the process of establishing a connection. In light of
- * that, if any new such properties are introduced into the mix, they must be
- * added to the {@link HConnectionKey#properties} list.
- *
- */
- public static class HConnectionKey {
- final static String[] CONNECTION_PROPERTIES = new String[] {
- HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
- HConstants.ZOOKEEPER_CLIENT_PORT,
- HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
- HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
- HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
- HConstants.HBASE_META_SCANNER_CACHING,
- HConstants.HBASE_CLIENT_INSTANCE_ID };
-
- private Map<String, String> properties;
- private String username;
-
- public HConnectionKey(Configuration conf) {
- Map<String, String> m = new HashMap<String, String>();
- if (conf != null) {
- for (String property : CONNECTION_PROPERTIES) {
- String value = conf.get(property);
- if (value != null) {
- m.put(property, value);
- }
- }
- }
- this.properties = Collections.unmodifiableMap(m);
-
- try {
- User currentUser = User.getCurrent();
- if (currentUser != null) {
- username = currentUser.getName();
- }
- } catch (IOException ioe) {
- LOG.warn("Error obtaining current user, skipping username in HConnectionKey",
- ioe);
- }
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- if (username != null) {
- result = username.hashCode();
- }
- for (String property : CONNECTION_PROPERTIES) {
- String value = properties.get(property);
- if (value != null) {
- result = prime * result + value.hashCode();
- }
- }
-
- return result;
- }
-
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ",
- justification="Optimization")
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- HConnectionKey that = (HConnectionKey) obj;
- if (this.username != null && !this.username.equals(that.username)) {
- return false;
- } else if (this.username == null && that.username != null) {
- return false;
- }
- if (this.properties == null) {
- if (that.properties != null) {
- return false;
- }
- } else {
- if (that.properties == null) {
- return false;
- }
- for (String property : CONNECTION_PROPERTIES) {
- String thisValue = this.properties.get(property);
- String thatValue = that.properties.get(property);
- //noinspection StringEquality
- if (thisValue == thatValue) {
- continue;
- }
- if (thisValue == null || !thisValue.equals(thatValue)) {
- return false;
- }
- }
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "HConnectionKey{" +
- "properties=" + properties +
- ", username='" + username + '\'' +
- '}';
- }
- }
-
/** Encapsulates connection to zookeeper and regionservers.*/
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
+ justification="Access to the conncurrent hash map is under a lock so should be fine.")
static class HConnectionImplementation implements HConnection, Closeable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
- private final Class<? extends AdminProtocol> adminClass;
- private final Class<? extends ClientProtocol> clientClass;
private final long pause;
private final int numTries;
- private final int maxRPCAttempts;
private final int rpcTimeout;
private final int prefetchRegionLimit;
private final boolean useServerTrackerForRetries;
@@ -546,22 +445,15 @@ public class HConnectionManager {
private final Configuration conf;
- // client RPC
- private RpcClientEngine rpcEngine;
-
- // Known region ServerName.toString() -> RegionClient/Admin
- private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
- new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
- private final ConcurrentHashMap<String, String> connectionLock =
- new ConcurrentHashMap<String, String>();
+ // Client rpc instance.
+ private RpcClient rpcClient;
/**
* Map of table to table {@link HRegionLocation}s. The table key is made
* by doing a {@link Bytes#mapKey(byte[])} of the table's name.
*/
- private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>>
- cachedRegionLocations =
- new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
+ private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>> cachedRegionLocations =
+ new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
// The presence of a server in the map implies it's likely that there is an
// entry in cachedRegionLocations that map to this server; but the absence
@@ -579,47 +471,33 @@ public class HConnectionManager {
// indicates whether this connection's life cycle is managed (by us)
private final boolean managed;
+
/**
* constructor
* @param conf Configuration object
+ * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
+ * to zk and shutdown of all services; we just close down the resources this connection was
+ * responsible for and decrement usage counters. It is up to the caller to do the full
+ * cleanup. It is set when we want have connection sharing going on -- reuse of zk connection,
+ * and cached region locations, established regionserver connections, etc. When connections
+ * are shared, we have reference counting going on and will only do full cleanup when no more
+ * users of an HConnectionImplementation instance.
*/
- @SuppressWarnings("unchecked")
- public HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
+ HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
this.conf = conf;
this.managed = managed;
- String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
- DEFAULT_ADMIN_PROTOCOL_CLASS);
this.closed = false;
- try {
- this.adminClass =
- (Class<? extends AdminProtocol>) Class.forName(adminClassName);
- } catch (ClassNotFoundException e) {
- throw new UnsupportedOperationException(
- "Unable to find region server interface " + adminClassName, e);
- }
- String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
- DEFAULT_CLIENT_PROTOCOL_CLASS);
- try {
- this.clientClass =
- (Class<? extends ClientProtocol>) Class.forName(clientClassName);
- } catch (ClassNotFoundException e) {
- throw new UnsupportedOperationException(
- "Unable to find client protocol " + clientClassName, e);
- }
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- this.maxRPCAttempts = conf.getInt(
- HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
- HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS);
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.rpcTimeout = conf.getInt(
- HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.prefetchRegionLimit = conf.getInt(
- HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
- HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
- this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER, true);
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+ this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER_KEY, true);
long serverTrackerTimeout = 0;
if (this.useServerTrackerForRetries) {
// Server tracker allows us to do faster, and yet useful (hopefully), retries.
@@ -636,11 +514,7 @@ public class HConnectionManager {
this.serverTrackerTimeout = serverTrackerTimeout;
retrieveClusterId();
- // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
- // but we maintain access through an interface to allow overriding for tests
- // RPC engine setup must follow obtaining the cluster ID for token authentication to work
- this.rpcEngine = new ProtobufRpcClientEngine(this.conf, this.clusterId);
-
+ this.rpcClient = new RpcClient(this.conf, this.clusterId);
// Do we publish the status?
Class<? extends ClusterStatusListener.Listener> listenerClass =
@@ -654,14 +528,25 @@ public class HConnectionManager {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
- rpcEngine.getClient().cancelConnections(sn.getHostname(), sn.getPort(),
- new SocketException(sn.getServerName() + " is dead: closing its connection."));
+ rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
+ new SocketException(sn.getServerName() + " is dead: closing its connection."));
}
}, conf, listenerClass);
}
}
/**
+ * For tests only.
+ * @param rpcClient Client we should use instead.
+ * @return Previous rpcClient
+ */
+ RpcClient setRpcClient(final RpcClient rpcClient) {
+ RpcClient oldRpcClient = this.rpcClient;
+ this.rpcClient = rpcClient;
+ return oldRpcClient;
+ }
+
+ /**
* An identifier that will remain the same for a given connection.
* @return
*/
@@ -706,125 +591,6 @@ public class HConnectionManager {
return this.conf;
}
- private static class MasterProtocolState {
- public MasterProtocol protocol;
- public int userCount;
- public long keepAliveUntil = Long.MAX_VALUE;
- public final Class<? extends MasterProtocol> protocolClass;
-
- public MasterProtocolState (
- final Class<? extends MasterProtocol> protocolClass) {
- this.protocolClass = protocolClass;
- }
- }
-
- /**
- * Create a new Master proxy. Try once only.
- */
- private MasterProtocol createMasterInterface(
- MasterProtocolState masterProtocolState)
- throws IOException, KeeperException, ServiceException {
-
- ZooKeeperKeepAliveConnection zkw;
- try {
- zkw = getKeepAliveZooKeeperWatcher();
- } catch (IOException e) {
- throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
- }
-
- try {
-
- checkIfBaseNodeAvailable(zkw);
- ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
- if (sn == null) {
- String msg =
- "ZooKeeper available but no active master location found";
- LOG.info(msg);
- throw new MasterNotRunningException(msg);
- }
-
-
- InetSocketAddress isa =
- new InetSocketAddress(sn.getHostname(), sn.getPort());
- MasterProtocol tryMaster = rpcEngine.getProxy(
- masterProtocolState.protocolClass,
- isa, this.conf, this.rpcTimeout);
-
- if (tryMaster.isMasterRunning(
- null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
- return tryMaster;
- } else {
- String msg = "Can create a proxy to master, but it is not running";
- LOG.info(msg);
- throw new MasterNotRunningException(msg);
- }
- } finally {
- zkw.close();
- }
- }
-
- /**
- * Create a master, retries if necessary.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
- private MasterProtocol createMasterWithRetries(
- MasterProtocolState masterProtocolState) throws MasterNotRunningException {
-
- // The lock must be at the beginning to prevent multiple master creation
- // (and leaks) in a multithread context
-
- synchronized (this.masterAndZKLock) {
- Exception exceptionCaught = null;
- MasterProtocol master = null;
- int tries = 0;
- while (
- !this.closed && master == null
- ) {
- tries++;
- try {
- master = createMasterInterface(masterProtocolState);
- } catch (IOException e) {
- exceptionCaught = e;
- } catch (KeeperException e) {
- exceptionCaught = e;
- } catch (ServiceException e) {
- exceptionCaught = e;
- }
-
- if (exceptionCaught != null)
- // It failed. If it's not the last try, we're going to wait a little
- if (tries < numTries) {
- // tries at this point is 1 or more; decrement to start from 0.
- long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
- LOG.info("getMaster attempt " + tries + " of " + numTries +
- " failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught);
-
- try {
- Thread.sleep(pauseTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(
- "Thread was interrupted while trying to connect to master.", e);
- }
-
- } else {
- // Enough tries, we stop now
- LOG.info("getMaster attempt " + tries + " of " + numTries +
- " failed; no more retrying.", exceptionCaught);
- throw new MasterNotRunningException(exceptionCaught);
- }
- }
-
- if (master == null) {
- // implies this.closed true
- throw new MasterNotRunningException(
- "Connection was closed while trying to get master");
- }
-
- return master;
- }
- }
-
private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
throws MasterNotRunningException {
String errorMsg;
@@ -851,11 +617,16 @@ public class HConnectionManager {
*/
@Override
public boolean isMasterRunning()
- throws MasterNotRunningException, ZooKeeperConnectionException {
- // When getting the master proxy connection, we check it's running,
+ throws MasterNotRunningException, ZooKeeperConnectionException {
+ // When getting the master connection, we check it's running,
// so if there is no exception, it means we've been able to get a
// connection on a running master
- getKeepAliveMasterMonitor().close();
+ MasterMonitorKeepAliveConnection m = getKeepAliveMasterMonitorService();
+ try {
+ m.close();
+ } catch (IOException e) {
+ throw new MasterNotRunningException("Failed close", e);
+ }
return true;
}
@@ -1142,7 +913,7 @@ public class HConnectionManager {
metaLocation = locateRegion(parentTable, metaKey, true, false);
// If null still, go around again.
if (metaLocation == null) continue;
- ClientProtocol server = getClient(metaLocation.getServerName());
+ ClientService.BlockingInterface service = getClient(metaLocation.getServerName());
Result regionInfoRow;
// This block guards against two threads trying to load the meta
@@ -1172,7 +943,7 @@ public class HConnectionManager {
forceDeleteCachedLocation(tableName, row);
}
// Query the meta region for the location of the meta region
- regionInfoRow = ProtobufUtil.getRowOrBefore(server,
+ regionInfoRow = ProtobufUtil.getRowOrBefore(service,
metaLocation.getRegionInfo().getRegionName(), metaKey,
HConstants.CATALOG_FAMILY);
}
@@ -1231,7 +1002,7 @@ public class HConnectionManager {
throw e;
} catch (IOException e) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = ((RemoteException)e).unwrapRemoteException();
}
if (tries < numTries - 1) {
if (LOG.isDebugEnabled()) {
@@ -1445,103 +1216,334 @@ public class HConnectionManager {
}
}
- @Override
- @Deprecated
- public AdminProtocol getAdmin(final String hostname, final int port) throws IOException {
- return getAdmin(new ServerName(hostname, port, 0L));
+ // Map keyed by service name + regionserver to service stub implementation
+ private final ConcurrentHashMap<String, Object> stubs =
+ new ConcurrentHashMap<String, Object>();
+ // Map of locks used creating service stubs per regionserver.
+ private final ConcurrentHashMap<String, String> connectionLock =
+ new ConcurrentHashMap<String, String>();
+
+ /**
+ * Maintains current state of MasterService instance.
+ */
+ static abstract class MasterServiceState {
+ HConnection connection;
+ int userCount;
+ long keepAliveUntil = Long.MAX_VALUE;
+
+ MasterServiceState (final HConnection connection) {
+ super();
+ this.connection = connection;
+ }
+
+ abstract Object getStub();
+ abstract void clearStub();
+ abstract boolean isMasterRunning() throws ServiceException;
}
- @Override
- public AdminProtocol getAdmin(final ServerName serverName)
- throws IOException {
- return getAdmin(serverName, false);
+ /**
+ * State of the MasterAdminService connection/setup.
+ */
+ static class MasterAdminServiceState extends MasterServiceState {
+ MasterAdminService.BlockingInterface stub;
+ MasterAdminServiceState(final HConnection connection) {
+ super(connection);
+ }
+
+ @Override
+ public String toString() {
+ return "MasterAdminService";
+ }
+
+ @Override
+ Object getStub() {
+ return this.stub;
+ }
+
+ @Override
+ void clearStub() {
+ this.stub = null;
+ }
+
+ @Override
+ boolean isMasterRunning() throws ServiceException {
+ MasterProtos.IsMasterRunningResponse response =
+ this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+ return response != null? response.getIsMasterRunning(): false;
+ }
}
- @Override
- @Deprecated
- public ClientProtocol getClient(final String hostname, final int port)
- throws IOException {
- return (ClientProtocol)getProtocol(hostname, port, clientClass);
+ /**
+ * State of the MasterMonitorService connection/setup.
+ */
+ static class MasterMonitorServiceState extends MasterServiceState {
+ MasterMonitorService.BlockingInterface stub;
+ MasterMonitorServiceState(final HConnection connection) {
+ super(connection);
+ }
+
+ @Override
+ public String toString() {
+ return "MasterMonitorService";
+ }
+
+ @Override
+ Object getStub() {
+ return this.stub;
+ }
+
+ @Override
+ void clearStub() {
+ this.stub = null;
+ }
+
+ @Override
+ boolean isMasterRunning() throws ServiceException {
+ MasterProtos.IsMasterRunningResponse response =
+ this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+ return response != null? response.getIsMasterRunning(): false;
+ }
}
- @Override
- public ClientProtocol getClient(final ServerName serverName)
- throws IOException {
- if (isDeadServer(serverName)){
- throw new RegionServerStoppedException("The server " + serverName + " is dead.");
+ /**
+ * Makes a client-side stub for master services. Sub-class to specialize.
+ * Depends on hosting class so not static. Exists so we avoid duplicating a bunch of code
+ * when setting up the MasterMonitorService and MasterAdminService.
+ */
+ abstract class StubMaker {
+ /**
+ * Returns the name of the service stub being created.
+ */
+ protected abstract String getServiceName();
+
+ /**
+ * Make stub and cache it internal so can be used later doing the isMasterRunning call.
+ * @param channel
+ */
+ protected abstract Object makeStub(final BlockingRpcChannel channel);
+
+ /**
+ * Once setup, check it works by doing isMasterRunning check.
+ * @throws ServiceException
+ */
+ protected abstract void isMasterRunning() throws ServiceException;
+
+ /**
+ * Create a stub. Try once only. It is not typed because there is no common type to
+ * protobuf services nor their interfaces. Let the caller do appropriate casting.
+ * @return A stub for master services.
+ * @throws IOException
+ * @throws KeeperException
+ * @throws ServiceException
+ */
+ private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
+ ZooKeeperKeepAliveConnection zkw;
+ try {
+ zkw = getKeepAliveZooKeeperWatcher();
+ } catch (IOException e) {
+ throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
+ }
+ try {
+ checkIfBaseNodeAvailable(zkw);
+ ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
+ if (sn == null) {
+ String msg = "ZooKeeper available but no active master location found";
+ LOG.info(msg);
+ throw new MasterNotRunningException(msg);
+ }
+ if (isDeadServer(sn)) {
+ throw new MasterNotRunningException(sn + " is dead.");
+ }
+ // Use the security info interface name as our stub key
+ String key = getStubKey(getServiceName(), sn.getHostAndPort());
+ connectionLock.putIfAbsent(key, key);
+ Object stub = null;
+ synchronized (connectionLock.get(key)) {
+ stub = stubs.get(key);
+ if (stub == null) {
+ BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
+ User.getCurrent(), rpcTimeout);
+ stub = makeStub(channel);
+ isMasterRunning();
+ stubs.put(key, stub);
+ }
+ }
+ return stub;
+ } finally {
+ zkw.close();
+ }
+ }
+
+ /**
+ * Create a stub against the master. Retry if necessary.
+ * @return A stub to do <code>intf</code> against the master
+ * @throws MasterNotRunningException
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
+ Object makeStub() throws MasterNotRunningException {
+ // The lock must be at the beginning to prevent multiple master creations
+ // (and leaks) in a multithread context
+ synchronized (masterAndZKLock) {
+ Exception exceptionCaught = null;
+ Object stub = null;
+ int tries = 0;
+ while (!closed && stub == null) {
+ tries++;
+ try {
+ stub = makeStubNoRetries();
+ } catch (IOException e) {
+ exceptionCaught = e;
+ } catch (KeeperException e) {
+ exceptionCaught = e;
+ } catch (ServiceException e) {
+ exceptionCaught = e;
+ }
+
+ if (exceptionCaught != null)
+ // It failed. If it's not the last try, we're going to wait a little
+ if (tries < numTries) {
+ // tries at this point is 1 or more; decrement to start from 0.
+ long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
+ LOG.info("getMaster attempt " + tries + " of " + numTries +
+ " failed; retrying after sleep of " +pauseTime + ", exception=" +
+ exceptionCaught);
+
+ try {
+ Thread.sleep(pauseTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Thread was interrupted while trying to connect to master.", e);
+ }
+ } else {
+ // Enough tries, we stop now
+ LOG.info("getMaster attempt " + tries + " of " + numTries +
+ " failed; no more retrying.", exceptionCaught);
+ throw new MasterNotRunningException(exceptionCaught);
+ }
+ }
+
+ if (stub == null) {
+ // implies this.closed true
+ throw new MasterNotRunningException("Connection was closed while trying to get master");
+ }
+ return stub;
+ }
}
- return (ClientProtocol)
- getProtocol(serverName.getHostname(), serverName.getPort(), clientClass);
}
- @Override
- @Deprecated
- public AdminProtocol getAdmin(final String hostname, final int port,
- final boolean master)
- throws IOException {
- return (AdminProtocol)getProtocol(hostname, port, adminClass);
+ /**
+ * Class to make a MasterMonitorService stub.
+ */
+ class MasterMonitorServiceStubMaker extends StubMaker {
+ private MasterMonitorService.BlockingInterface stub;
+ @Override
+ protected String getServiceName() {
+ return MasterMonitorService.getDescriptor().getName();
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD")
+ MasterMonitorService.BlockingInterface makeStub() throws MasterNotRunningException {
+ return (MasterMonitorService.BlockingInterface)super.makeStub();
+ }
+
+ @Override
+ protected Object makeStub(BlockingRpcChannel channel) {
+ this.stub = MasterMonitorService.newBlockingStub(channel);
+ return this.stub;
+ }
+
+ @Override
+ protected void isMasterRunning() throws ServiceException {
+ this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+ }
}
+ /**
+ * Class to make a MasterAdminService stub.
+ */
+ class MasterAdminServiceStubMaker extends StubMaker {
+ private MasterAdminService.BlockingInterface stub;
+
+ @Override
+ protected String getServiceName() {
+ return MasterAdminService.getDescriptor().getName();
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD")
+ MasterAdminService.BlockingInterface makeStub() throws MasterNotRunningException {
+ return (MasterAdminService.BlockingInterface)super.makeStub();
+ }
+
+ @Override
+ protected Object makeStub(BlockingRpcChannel channel) {
+ this.stub = MasterAdminService.newBlockingStub(channel);
+ return this.stub;
+ }
+
+ @Override
+ protected void isMasterRunning() throws ServiceException {
+ this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+ }
+ };
+
@Override
- public AdminProtocol getAdmin(final ServerName serverName, final boolean master)
+ public AdminService.BlockingInterface getAdmin(final ServerName serverName)
throws IOException {
- if (isDeadServer(serverName)){
- throw new RegionServerStoppedException("The server " + serverName + " is dead.");
+ return getAdmin(serverName, false);
+ }
+
+ @Override
+ // Nothing is done w/ the 'master' parameter. It is ignored.
+ public AdminService.BlockingInterface getAdmin(final ServerName serverName,
+ final boolean master)
+ throws IOException {
+ if (isDeadServer(serverName)) {
+ throw new RegionServerStoppedException(serverName + " is dead.");
+ }
+ String key = getStubKey(AdminService.BlockingInterface.class.getName(),
+ serverName.getHostAndPort());
+ this.connectionLock.putIfAbsent(key, key);
+ AdminService.BlockingInterface stub = null;
+ synchronized (this.connectionLock.get(key)) {
+ stub = (AdminService.BlockingInterface)this.stubs.get(key);
+ if (stub == null) {
+ BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
+ User.getCurrent(), this.rpcTimeout);
+ stub = AdminService.newBlockingStub(channel);
+ this.stubs.put(key, stub);
+ }
}
- return (AdminProtocol)getProtocol(
- serverName.getHostname(), serverName.getPort(), adminClass);
+ return stub;
}
- /**
- * Either the passed <code>isa</code> is null or <code>hostname</code>
- * can be but not both.
- * @param hostname
- * @param port
- * @param protocolClass
- * @return Proxy.
- * @throws IOException
- */
- IpcProtocol getProtocol(final String hostname,
- final int port, final Class <? extends IpcProtocol> protocolClass)
+ @Override
+ public ClientService.BlockingInterface getClient(final ServerName sn)
throws IOException {
- String rsName = Addressing.createHostAndPortStr(hostname, port);
- // See if we already have a connection (common case)
- Map<String, IpcProtocol> protocols = this.servers.get(rsName);
- if (protocols == null) {
- protocols = new HashMap<String, IpcProtocol>();
- Map<String, IpcProtocol> existingProtocols =
- this.servers.putIfAbsent(rsName, protocols);
- if (existingProtocols != null) {
- protocols = existingProtocols;
- }
- }
- String protocol = protocolClass.getName();
- IpcProtocol server = protocols.get(protocol);
- if (server == null) {
- // create a unique lock for this RS + protocol (if necessary)
- String lockKey = protocol + "@" + rsName;
- this.connectionLock.putIfAbsent(lockKey, lockKey);
- // get the RS lock
- synchronized (this.connectionLock.get(lockKey)) {
- // do one more lookup in case we were stalled above
- server = protocols.get(protocol);
- if (server == null) {
- try {
- // Only create isa when we need to.
- InetSocketAddress address = new InetSocketAddress(hostname, port);
- // definitely a cache miss. establish an RPC for this RS
- server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf,
- this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
- protocols.put(protocol, server);
- } catch (RemoteException e) {
- LOG.warn("RemoteException connecting to RS", e);
- // Throw what the RemoteException was carrying.
- throw e.unwrapRemoteException();
- }
- }
+ if (isDeadServer(sn)) {
+ throw new RegionServerStoppedException(sn + " is dead.");
+ }
+ String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
+ this.connectionLock.putIfAbsent(key, key);
+ ClientService.BlockingInterface stub = null;
+ synchronized (this.connectionLock.get(key)) {
+ stub = (ClientService.BlockingInterface)this.stubs.get(key);
+ if (stub == null) {
+ BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
+ User.getCurrent(), this.rpcTimeout);
+ stub = ClientService.newBlockingStub(channel);
+ // In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
+ // Just fail on first actual call rather than in here on setup.
+ this.stubs.put(key, stub);
}
}
- return server;
+ return stub;
+ }
+
+ static String getStubKey(final String serviceName, final String rsHostnamePort) {
+ return serviceName + "@" + rsHostnamePort;
}
@Override
@@ -1570,14 +1572,12 @@ public class HConnectionManager {
private static final long keepAlive = 5 * 60 * 1000;
/**
- * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have
- * finished with it.
+ * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
* @return The shared instance. Never returns null.
*/
public ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
throws IOException {
synchronized (masterAndZKLock) {
-
if (keepAliveZookeeper == null) {
// We don't check that our link to ZooKeeper is still valid
// But there is a retry mechanism in the ZooKeeperWatcher itself
@@ -1586,7 +1586,6 @@ public class HConnectionManager {
}
keepAliveZookeeperUserCount++;
keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
-
return keepAliveZookeeper;
}
}
@@ -1608,7 +1607,7 @@ public class HConnectionManager {
/**
* Creates a Chore thread to check the connections to master & zookeeper
* and close them when they reach their closing time (
- * {@link MasterProtocolState#keepAliveUntil} and
+ * {@link MasterServiceState#keepAliveUntil} and
* {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
* managed by the release functions and the variable {@link #keepAlive}
*/
@@ -1636,9 +1635,9 @@ public class HConnectionManager {
return new DelayedClosing(hci, stoppable);
}
- protected void closeMasterProtocol(MasterProtocolState protocolState) {
+ protected void closeMasterProtocol(MasterServiceState protocolState) {
if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
- hci.closeMasterProtocol(protocolState);
+ hci.closeMasterService(protocolState);
protocolState.keepAliveUntil = Long.MAX_VALUE;
}
}
@@ -1654,8 +1653,8 @@ public class HConnectionManager {
hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
}
}
- closeMasterProtocol(hci.masterAdminProtocol);
- closeMasterProtocol(hci.masterMonitorProtocol);
+ closeMasterProtocol(hci.adminMasterServiceState);
+ closeMasterProtocol(hci.monitorMasterServiceState);
}
}
@@ -1683,117 +1682,289 @@ public class HConnectionManager {
}
}
- private static class MasterProtocolHandler implements InvocationHandler {
- private HConnectionImplementation connection;
- private MasterProtocolState protocolStateTracker;
+ final MasterAdminServiceState adminMasterServiceState = new MasterAdminServiceState(this);
+ final MasterMonitorServiceState monitorMasterServiceState =
+ new MasterMonitorServiceState(this);
- protected MasterProtocolHandler(HConnectionImplementation connection,
- MasterProtocolState protocolStateTracker) {
- this.connection = connection;
- this.protocolStateTracker = protocolStateTracker;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- if (method.getName().equals("close") &&
- method.getParameterTypes().length == 0) {
- release(connection, protocolStateTracker);
- return null;
- } else {
- try {
- return method.invoke(protocolStateTracker.protocol, args);
- }catch (InvocationTargetException e){
- // We will have this for all the exception, checked on not, sent
- // by any layer, including the functional exception
- Throwable cause = e.getCause();
- if (cause == null){
- throw new RuntimeException(
- "Proxy invocation failed and getCause is null", e);
- }
- if (cause instanceof UndeclaredThrowableException) {
- cause = cause.getCause();
- }
- throw cause;
- }
- }
- }
+ @Override
+ public MasterAdminService.BlockingInterface getMasterAdmin() throws MasterNotRunningException {
+ return getKeepAliveMasterAdminService();
+ }
- private void release(
- HConnectionImplementation connection,
- MasterProtocolState target) {
- connection.releaseMaster(target);
- }
+ @Override
+ public MasterMonitorService.BlockingInterface getMasterMonitor()
+ throws MasterNotRunningException {
+ return getKeepAliveMasterMonitorService();
}
- MasterProtocolState masterAdminProtocol =
- new MasterProtocolState(MasterAdminProtocol.class);
- MasterProtocolState masterMonitorProtocol =
- new MasterProtocolState(MasterMonitorProtocol.class);
+ private void resetMasterServiceState(final MasterServiceState mss) {
+ mss.userCount++;
+ mss.keepAliveUntil = Long.MAX_VALUE;
+ }
- /**
- * This function allows HBaseAdmin and potentially others
- * to get a shared master connection.
- *
- * @return The shared instance. Never returns null.
- * @throws MasterNotRunningException
- */
- private Object getKeepAliveMasterProtocol(
- MasterProtocolState protocolState, Class connectionClass)
- throws MasterNotRunningException {
+ @Override
+ public MasterAdminKeepAliveConnection getKeepAliveMasterAdminService()
+ throws MasterNotRunningException {
synchronized (masterAndZKLock) {
- if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
- protocolState.protocol = null;
- protocolState.protocol = createMasterWithRetries(protocolState);
+ if (!isKeepAliveMasterConnectedAndRunning(this.adminMasterServiceState)) {
+ MasterAdminServiceStubMaker stubMaker = new MasterAdminServiceStubMaker();
+ this.adminMasterServiceState.stub = stubMaker.makeStub();
+ }
+ resetMasterServiceState(this.adminMasterServiceState);
+ }
+ // Ugly delegation just so we can add in a Close method.
+ final MasterAdminService.BlockingInterface stub = this.adminMasterServiceState.stub;
+ return new MasterAdminKeepAliveConnection() {
+ MasterAdminServiceState mss = adminMasterServiceState;
+ @Override
+ public AddColumnResponse addColumn(RpcController controller,
+ AddColumnRequest request) throws ServiceException {
+ return stub.addColumn(controller, request);
}
- protocolState.userCount++;
- protocolState.keepAliveUntil = Long.MAX_VALUE;
- return Proxy.newProxyInstance(
- connectionClass.getClassLoader(),
- new Class[]{connectionClass},
- new MasterProtocolHandler(this, protocolState)
- );
- }
- }
+ @Override
+ public DeleteColumnResponse deleteColumn(RpcController controller,
+ DeleteColumnRequest request) throws ServiceException {
+ return stub.deleteColumn(controller, request);
+ }
- @Override
- public MasterAdminProtocol getMasterAdmin() throws MasterNotRunningException {
- return getKeepAliveMasterAdmin();
- }
+ @Override
+ public ModifyColumnResponse modifyColumn(RpcController controller,
+ ModifyColumnRequest request) throws ServiceException {
+ return stub.modifyColumn(controller, request);
+ }
- @Override
- public MasterMonitorProtocol getMasterMonitor() throws MasterNotRunningException {
- return getKeepAliveMasterMonitor();
+ @Override
+ public MoveRegionResponse moveRegion(RpcController controller,
+ MoveRegionRequest request) throws ServiceException {
+ return stub.moveRegion(controller, request);
+ }
+
+ @Override
+ public DispatchMergingRegionsResponse dispatchMergingRegions(
+ RpcController controller, DispatchMergingRegionsRequest request)
+ throws ServiceException {
+ return stub.dispatchMergingRegions(controller, request);
+ }
+
+ @Override
+ public AssignRegionResponse assignRegion(RpcController controller,
+ AssignRegionRequest request) throws ServiceException {
+ return stub.assignRegion(controller, request);
+ }
+
+ @Override
+ public UnassignRegionResponse unassignRegion(RpcController controller,
+ UnassignRegionRequest request) throws ServiceException {
+ return stub.unassignRegion(controller, request);
+ }
+
+ @Override
+ public OfflineRegionResponse offlineRegion(RpcController controller,
+ OfflineRegionRequest request) throws ServiceException {
+ return stub.offlineRegion(controller, request);
+ }
+
+ @Override
+ public DeleteTableResponse deleteTable(RpcController controller,
+ DeleteTableRequest request) throws ServiceException {
+ return stub.deleteTable(controller, request);
+ }
+
+ @Override
+ public EnableTableResponse enableTable(RpcController controller,
+ EnableTableRequest request) throws ServiceException {
+ return stub.enableTable(controller, request);
+ }
+
+ @Override
+ public DisableTableResponse disableTable(RpcController controller,
+ DisableTableRequest request) throws ServiceException {
+ return stub.disableTable(controller, request);
+ }
+
+ @Override
+ public ModifyTableResponse modifyTable(RpcController controller,
+ ModifyTableRequest request) throws ServiceException {
+ return stub.modifyTable(controller, request);
+ }
+
+ @Override
+ public CreateTableResponse createTable(RpcController controller,
+ CreateTableRequest request) throws ServiceException {
+ return stub.createTable(controller, request);
+ }
+
+ @Override
+ public ShutdownResponse shutdown(RpcController controller,
+ ShutdownRequest request) throws ServiceException {
+ return stub.shutdown(controller, request);
+ }
+
+ @Override
+ public StopMasterResponse stopMaster(RpcController controller,
+ StopMasterRequest request) throws ServiceException {
+ return stub.stopMaster(controller, request);
+ }
+
+ @Override
+ public BalanceResponse balance(RpcController controller,
+ BalanceRequest request) throws ServiceException {
+ return stub.balance(controller, request);
+ }
+
+ @Override
+ public SetBalancerRunningResponse setBalancerRunning(
+ RpcController controller, SetBalancerRunningRequest request)
+ throws ServiceException {
+ return stub.setBalancerRunning(controller, request);
+ }
+
+ @Override
+ public CatalogScanResponse runCatalogScan(RpcController controller,
+ CatalogScanRequest request) throws ServiceException {
+ return stub.runCatalogScan(controller, request);
+ }
+
+ @Override
+ public EnableCatalogJanitorResponse enableCatalogJanitor(
+ RpcController controller, EnableCatalogJanitorRequest request)
+ throws ServiceException {
+ return stub.enableCatalogJanitor(controller, request);
+ }
+
+ @Override
+ public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
+ RpcController controller, IsCatalogJanitorEnabledRequest request)
+ throws ServiceException {
+ return stub.isCatalogJanitorEnabled(controller, request);
+ }
+
+ @Override
+ public CoprocessorServiceResponse execMasterService(
+ RpcController controller, CoprocessorServiceRequest request)
+ throws ServiceException {
+ return stub.execMasterService(controller, request);
+ }
+
+ @Override
+ public TakeSnapshotResponse snapshot(RpcController controller,
+ TakeSnapshotRequest request) throws ServiceException {
+ return stub.snapshot(controller, request);
+ }
+
+ @Override
+ public ListSnapshotResponse getCompletedSnapshots(
+ RpcController controller, ListSnapshotRequest request)
+ throws ServiceException {
+ return stub.getCompletedSnapshots(controller, request);
+ }
+
+ @Override
+ public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
+ DeleteSnapshotRequest request) throws ServiceException {
+ return stub.deleteSnapshot(controller, request);
+ }
+
+ @Override
+ public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
+ IsSnapshotDoneRequest request) throws ServiceException {
+ return stub.isSnapshotDone(controller, request);
+ }
+
+ @Override
+ public RestoreSnapshotResponse restoreSnapshot(
+ RpcController controller, RestoreSnapshotRequest request)
+ throws ServiceException {
+ return stub.restoreSnapshot(controller, request);
+ }
+
+ @Override
+ public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
+ RpcController controller, IsRestoreSnapshotDoneRequest request)
+ throws ServiceException {
+ return stub.isRestoreSnapshotDone(controller, request);
+ }
+
+ @Override
+ public IsMasterRunningResponse isMasterRunning(
+ RpcController controller, IsMasterRunningRequest request)
+ throws ServiceException {
+ return stub.isMasterRunning(controller, request);
+ }
+
+ @Override
+ public void close() {
+ release(this.mss);
+ }
+ };
}
- @Override
- public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin()
- throws MasterNotRunningException {
- return (MasterAdminKeepAliveConnection)
- getKeepAliveMasterProtocol(masterAdminProtocol, MasterAdminKeepAliveConnection.class);
+ private static void release(MasterServiceState mss) {
+ if (mss != null && mss.connection != null) {
+ ((HConnectionImplementation)mss.connection).releaseMaster(mss);
+ }
}
@Override
- public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitor()
- throws MasterNotRunningException {
- return (MasterMonitorKeepAliveConnection)
- getKeepAliveMasterProtocol(masterMonitorProtocol, MasterMonitorKeepAliveConnection.class);
+ public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitorService()
+ throws MasterNotRunningException {
+ synchronized (masterAndZKLock) {
+ if (!isKeepAliveMasterConnectedAndRunning(this.monitorMasterServiceState)) {
+ MasterMonitorServiceStubMaker stubMaker = new MasterMonitorServiceStubMaker();
+ this.monitorMasterServiceState.stub = stubMaker.makeStub();
+ }
+ resetMasterServiceState(this.monitorMasterServiceState);
+ }
+ // Ugly delegation just so can implement close
+ final MasterMonitorService.BlockingInterface stub = this.monitorMasterServiceState.stub;
+ return new MasterMonitorKeepAliveConnection() {
+ final MasterMonitorServiceState mss = monitorMasterServiceState;
+ @Override
+ public GetSchemaAlterStatusResponse getSchemaAlterStatus(
+ RpcController controller, GetSchemaAlterStatusRequest request)
+ throws ServiceException {
+ return stub.getSchemaAlterStatus(controller, request);
+ }
+
+ @Override
+ public GetTableDescriptorsResponse getTableDescriptors(
+ RpcController controller, GetTableDescriptorsRequest request)
+ throws ServiceException {
+ return stub.getTableDescriptors(controller, request);
+ }
+
+ @Override
+ public GetClusterStatusResponse getClusterStatus(
+ RpcController controller, GetClusterStatusRequest request)
+ throws ServiceException {
+ return stub.getClusterStatus(controller, request);
+ }
+
+ @Override
+ public IsMasterRunningResponse isMasterRunning(
+ RpcController controller, IsMasterRunningRequest request)
+ throws ServiceException {
+ return stub.isMasterRunning(controller, request);
+ }
+
+ @Override
+ public void close() throws IOException {
+ release(this.mss);
+ }
+ };
}
- private boolean isKeepAliveMasterConnectedAndRunning(MasterProtocolState protocolState){
- if (protocolState.protocol == null){
+ private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
+ if (mss.getStub() == null){
return false;
}
try {
- return protocolState.protocol.isMasterRunning(
- null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning();
- }catch (UndeclaredThrowableException e){
+ return mss.isMasterRunning();
+ } catch (UndeclaredThrowableException e) {
// It's somehow messy, but we can receive exceptions such as
- // java.net.ConnectException but they're not declared. So we catch
- // it...
- LOG.info("Master connection is not running anymore",
- e.getUndeclaredThrowable());
+ // java.net.ConnectException but they're not declared. So we catch it...
+ LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
return false;
} catch (ServiceException se) {
LOG.warn("Checking master connection", se);
@@ -1801,35 +1972,32 @@ public class HConnectionManager {
}
}
- private void releaseMaster(MasterProtocolState protocolState) {
- if (protocolState.protocol == null){
- return;
- }
+ void releaseMaster(MasterServiceState mss) {
+ if (mss.getStub() == null) return;
synchronized (masterAndZKLock) {
- --protocolState.userCount;
- if (protocolState.userCount <= 0) {
- protocolState.keepAliveUntil =
- System.currentTimeMillis() + keepAlive;
+ --mss.userCount;
+ if (mss.userCount <= 0) {
+ mss.keepAliveUntil = System.currentTimeMillis() + keepAlive;
}
}
}
- private void closeMasterProtocol(MasterProtocolState protocolState) {
- if (protocolState.protocol != null){
- LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
- protocolState.protocol = null;
+ private void closeMasterService(MasterServiceState mss) {
+ if (mss.getStub() != null) {
+ LOG.info("Closing master protocol: " + mss);
+ mss.clearStub();
}
- protocolState.userCount = 0;
+ mss.userCount = 0;
}
/**
- * Immediate close of the shared master. Can be by the delayed close or
- * when closing the connection itself.
+ * Immediate close of the shared master. Can be by the delayed close or when closing the
+ * connection itself.
*/
private void closeMaster() {
synchronized (masterAndZKLock) {
- closeMasterProtocol(masterAdminProtocol);
- closeMasterProtocol(masterMonitorProtocol);
+ closeMasterService(adminMasterServiceState);
+ closeMasterService(monitorMasterServiceState);
}
}
@@ -2473,8 +2641,7 @@ public class HConnectionManager {
delayedClosing.stop("Closing connection");
closeMaster();
closeZooKeeperWatcher();
- this.servers.clear();
- this.rpcEngine.close();
+ this.stubs.clear();
if (clusterStatusListener != null) {
clusterStatusListener.close();
}
@@ -2515,7 +2682,7 @@ public class HConnectionManager {
@Override
public HTableDescriptor[] listTables() throws IOException {
- MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor();
+ MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService();
try {
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(null);
@@ -2530,7 +2697,7 @@ public class HConnectionManager {
@Override
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames) throws IOException {
if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
- MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor();
+ MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService();
try {
GetTableDescriptorsRequest req =
RequestConverter.buildGetTableDescriptorsRequest(tableNames);
@@ -2556,7 +2723,7 @@ public class HConnectionManager {
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
return HTableDescriptor.META_TABLEDESC;
}
- MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor();
+ MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService();
GetTableDescriptorsResponse htds;
try {
GetTableDescriptorsRequest req =
@@ -2576,14 +2743,6 @@ public class HConnectionManager {
}
/**
- * Override the RpcClientEngine implementation used by this connection.
- * <strong>FOR TESTING PURPOSES ONLY!</strong>
- */
- void setRpcEngine(RpcClientEngine engine) {
- this.rpcEngine = engine;
- }
-
- /**
* The record of errors for servers. Visible for testing.
*/
@VisibleForTesting
@@ -2685,17 +2844,15 @@ public class HConnectionManager {
* @param c The Configuration instance to set the retries into.
* @param log Used to log what we set in here.
*/
- public static void setServerSideHConnectionRetries(final Configuration c,
+ public static void setServerSideHConnectionRetries(final Configuration c, final String sn,
final Log log) {
int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
// Go big. Multiply by 10. If we can't get to meta after this many retries
// then something seriously wrong.
- int serversideMultiplier =
- c.getInt("hbase.client.serverside.retries.multiplier", 10);
+ int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
int retries = hcRetries * serversideMultiplier;
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
- log.debug("HConnection retries=" + retries);
+ log.debug(sn + " HConnection server-to-server retries=" + retries);
}
}
-