You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/02/25 23:50:29 UTC
svn commit: r1449950 [9/35] - in /hbase/trunk: ./ hbase-client/
hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/
hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/
hbase-client/src/main/java/org/apache/hadoop/ hb...
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,2493 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterAdminProtocol;
+import org.apache.hadoop.hbase.MasterMonitorProtocol;
+import org.apache.hadoop.hbase.MasterProtocol;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
+import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClientEngine;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.SoftValueSortedMap;
+import org.apache.hadoop.hbase.util.Triple;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A non-instantiable class that manages {@link HConnection}s.
+ * This class has a static Map of {@link HConnection} instances keyed by
+ * {@link Configuration}; all invocations of {@link #getConnection(Configuration)}
+ * that pass the same {@link Configuration} instance will be returned the same
+ * {@link HConnection} instance (Adding properties to a Configuration
+ * instance does not change its object identity). Sharing {@link HConnection}
+ * instances is usually what you want; all clients of the {@link HConnection}
+ * instances share the HConnections' cache of Region locations rather than each
+ * having to discover for itself the location of meta, root, etc. It makes
+ * sense for the likes of the pool of HTables class {@link HTablePool}, for
+ * instance (If concerned that a single {@link HConnection} is insufficient
+ * for sharing amongst clients in say an heavily-multithreaded environment,
+ * in practise its not proven to be an issue. Besides, {@link HConnection} is
+ * implemented atop Hadoop RPC and as of this writing, Hadoop RPC does a
+ * connection per cluster-member, exclusively).
+ *
+ * <p>But sharing connections
+ * makes clean up of {@link HConnection} instances a little awkward. Currently,
+ * clients cleanup by calling
+ * {@link #deleteConnection(Configuration)}. This will shutdown the
+ * zookeeper connection the HConnection was using and clean up all
+ * HConnection resources as well as stopping proxies to servers out on the
+ * cluster. Not running the cleanup will not end the world; it'll
+ * just stall the closeup some and spew some zookeeper connection failed
+ * messages into the log. Running the cleanup on a {@link HConnection} that is
+ * subsequently used by another will cause breakage so be careful running
+ * cleanup.
+ * <p>To create a {@link HConnection} that is not shared by others, you can
+ * create a new {@link Configuration} instance, pass this new instance to
+ * {@link #getConnection(Configuration)}, and then when done, close it up by
+ * doing something like the following:
+ * <pre>
+ * {@code
+ * Configuration newConfig = new Configuration(originalConf);
+ * HConnection connection = HConnectionManager.getConnection(newConfig);
+ * // Use the connection to your hearts' delight and then when done...
+ * HConnectionManager.deleteConnection(newConfig, true);
+ * }
+ * </pre>
+ * <p>Cleanup used to be done inside in a shutdown hook. On startup we'd
+ * register a shutdown hook that called {@link #deleteAllConnections()}
+ * on its way out but the order in which shutdown hooks run is not defined so
+ * were problematic for clients of HConnection that wanted to register their
+ * own shutdown hooks so we removed ours though this shifts the onus for
+ * cleanup to the client.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HConnectionManager {
+ // An LRU Map of HConnectionKey -> HConnection (TableServer). All
+ // access must be synchronized. This map is not private because tests
+ // need to be able to tinker with it.
+ static final Map<HConnectionKey, HConnectionImplementation> HBASE_INSTANCES;
+
+ public static final int MAX_CACHED_HBASE_INSTANCES;
+
+ /** Parameter name for what client protocol to use. */
+ public static final String CLIENT_PROTOCOL_CLASS = "hbase.clientprotocol.class";
+
+ /** Default client protocol class name. */
+ public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
+
+ /** Parameter name for what admin protocol to use. */
+ public static final String REGION_PROTOCOL_CLASS = "hbase.adminprotocol.class";
+
+ /** Default admin protocol class name. */
+ public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
+
+ private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
+
+ static {
+ // We set instances to one more than the value specified for {@link
+ // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
+ // connections to the ensemble from the one client is 30, so in that case we
+ // should run into zk issues before the LRU hit this value of 31.
+ MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt(
+ HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
+ HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
+ HBASE_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
+ (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) {
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
+ return size() > MAX_CACHED_HBASE_INSTANCES;
+ }
+ };
+ }
+
+ /*
+ * Non-instantiable.
+ */
+ protected HConnectionManager() {
+ super();
+ }
+
+ /**
+ * Get the connection that goes with the passed <code>conf</code>
+ * configuration instance.
+ * If no current connection exists, method creates a new connection for the
+ * passed <code>conf</code> instance.
+ * @param conf configuration
+ * @return HConnection object for <code>conf</code>
+ * @throws ZooKeeperConnectionException
+ */
+ public static HConnection getConnection(Configuration conf)
+ throws ZooKeeperConnectionException {
+ HConnectionKey connectionKey = new HConnectionKey(conf);
+ synchronized (HBASE_INSTANCES) {
+ HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
+ if (connection == null) {
+ connection = new HConnectionImplementation(conf, true);
+ HBASE_INSTANCES.put(connectionKey, connection);
+ } else if (connection.isClosed()) {
+ HConnectionManager.deleteConnection(connectionKey, true);
+ connection = new HConnectionImplementation(conf, true);
+ HBASE_INSTANCES.put(connectionKey, connection);
+ }
+ connection.incCount();
+ return connection;
+ }
+ }
+
+ /**
+ * Create a new HConnection instance using the passed <code>conf</code>
+ * instance.
+ * Note: This bypasses the usual HConnection life cycle management!
+ * Use this with caution, the caller is responsible for closing the
+ * created connection.
+ * @param conf configuration
+ * @return HConnection object for <code>conf</code>
+ * @throws ZooKeeperConnectionException
+ */
+ public static HConnection createConnection(Configuration conf)
+ throws ZooKeeperConnectionException {
+ return new HConnectionImplementation(conf, false);
+ }
+
+ /**
+ * Delete connection information for the instance specified by configuration.
+ * If there are no more references to it, this will then close connection to
+ * the zookeeper ensemble and let go of all resources.
+ *
+ * @param conf
+ * configuration whose identity is used to find {@link HConnection}
+ * instance.
+ */
+ public static void deleteConnection(Configuration conf) {
+ deleteConnection(new HConnectionKey(conf), false);
+ }
+
+ /**
+ * Delete stale connection information for the instance specified by configuration.
+ * This will then close connection to
+ * the zookeeper ensemble and let go of all resources.
+ *
+ * @param connection
+ */
+ public static void deleteStaleConnection(HConnection connection) {
+ deleteConnection(connection, true);
+ }
+
+ /**
+ * Delete information for all connections.
+ * @throws IOException
+ */
+ public static void deleteAllConnections() {
+ synchronized (HBASE_INSTANCES) {
+ Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
+ connectionKeys.addAll(HBASE_INSTANCES.keySet());
+ for (HConnectionKey connectionKey : connectionKeys) {
+ deleteConnection(connectionKey, false);
+ }
+ HBASE_INSTANCES.clear();
+ }
+ }
+
+ private static void deleteConnection(HConnection connection, boolean staleConnection) {
+ synchronized (HBASE_INSTANCES) {
+ for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
+ .entrySet()) {
+ if (connectionEntry.getValue() == connection) {
+ deleteConnection(connectionEntry.getKey(), staleConnection);
+ break;
+ }
+ }
+ }
+ }
+
+ private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
+ synchronized (HBASE_INSTANCES) {
+ HConnectionImplementation connection = HBASE_INSTANCES
+ .get(connectionKey);
+ if (connection != null) {
+ connection.decCount();
+ if (connection.isZeroReference() || staleConnection) {
+ HBASE_INSTANCES.remove(connectionKey);
+ connection.internalClose();
+ }
+ } else {
+ LOG.error("Connection not found in the list, can't delete it "+
+ "(connection key="+connectionKey+"). May be the key was modified?");
+ }
+ }
+ }
+
+ /**
+ * It is provided for unit test cases which verify the behavior of region
+ * location cache prefetch.
+ * @return Number of cached regions for the table.
+ * @throws ZooKeeperConnectionException
+ */
+ static int getCachedRegionCount(Configuration conf,
+ final byte[] tableName)
+ throws IOException {
+ return execute(new HConnectable<Integer>(conf) {
+ @Override
+ public Integer connect(HConnection connection) {
+ return ((HConnectionImplementation) connection)
+ .getNumberOfCachedRegionLocations(tableName);
+ }
+ });
+ }
+
+ /**
+ * It's provided for unit test cases which verify the behavior of region
+ * location cache prefetch.
+ * @return true if the region where the table and row reside is cached.
+ * @throws ZooKeeperConnectionException
+ */
+ static boolean isRegionCached(Configuration conf,
+ final byte[] tableName, final byte[] row) throws IOException {
+ return execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) {
+ return ((HConnectionImplementation) connection).isRegionCached(tableName, row);
+ }
+ });
+ }
+
+ /**
+ * This class makes it convenient for one to execute a command in the context
+ * of a {@link HConnection} instance based on the given {@link Configuration}.
+ *
+ * <p>
+ * If you find yourself wanting to use a {@link HConnection} for a relatively
+ * short duration of time, and do not want to deal with the hassle of creating
+ * and cleaning up that resource, then you should consider using this
+ * convenience class.
+ *
+ * @param <T>
+ * the return type of the {@link HConnectable#connect(HConnection)}
+ * method.
+ */
+ public static abstract class HConnectable<T> {
+ public Configuration conf;
+
+ protected HConnectable(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public abstract T connect(HConnection connection) throws IOException;
+ }
+
+ /**
+ * This convenience method invokes the given {@link HConnectable#connect}
+ * implementation using a {@link HConnection} instance that lasts just for the
+ * duration of that invocation.
+ *
+ * @param <T> the return type of the connect method
+ * @param connectable the {@link HConnectable} instance
+ * @return the value returned by the connect method
+ * @throws IOException
+ */
+ public static <T> T execute(HConnectable<T> connectable) throws IOException {
+ if (connectable == null || connectable.conf == null) {
+ return null;
+ }
+ Configuration conf = connectable.conf;
+ HConnection connection = HConnectionManager.getConnection(conf);
+ boolean connectSucceeded = false;
+ try {
+ T returnValue = connectable.connect(connection);
+ connectSucceeded = true;
+ return returnValue;
+ } finally {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ if (connectSucceeded) {
+ throw new IOException("The connection to " + connection
+ + " could not be deleted.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Denotes a unique key to a {@link HConnection} instance.
+ *
+ * In essence, this class captures the properties in {@link Configuration}
+ * that may be used in the process of establishing a connection. In light of
+ * that, if any new such properties are introduced into the mix, they must be
+ * added to the {@link HConnectionKey#properties} list.
+ *
+ */
+ public static class HConnectionKey {
+ final static String[] CONNECTION_PROPERTIES = new String[] {
+ HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.ZOOKEEPER_CLIENT_PORT,
+ HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
+ HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.HBASE_META_SCANNER_CACHING,
+ HConstants.HBASE_CLIENT_INSTANCE_ID };
+
+ private Map<String, String> properties;
+ private String username;
+
+ public HConnectionKey(Configuration conf) {
+ Map<String, String> m = new HashMap<String, String>();
+ if (conf != null) {
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = conf.get(property);
+ if (value != null) {
+ m.put(property, value);
+ }
+ }
+ }
+ this.properties = Collections.unmodifiableMap(m);
+
+ try {
+ User currentUser = User.getCurrent();
+ if (currentUser != null) {
+ username = currentUser.getName();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error obtaining current user, skipping username in HConnectionKey",
+ ioe);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ if (username != null) {
+ result = username.hashCode();
+ }
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = properties.get(property);
+ if (value != null) {
+ result = prime * result + value.hashCode();
+ }
+ }
+
+ return result;
+ }
+
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ",
+ justification="Optimization")
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HConnectionKey that = (HConnectionKey) obj;
+ if (this.username != null && !this.username.equals(that.username)) {
+ return false;
+ } else if (this.username == null && that.username != null) {
+ return false;
+ }
+ if (this.properties == null) {
+ if (that.properties != null) {
+ return false;
+ }
+ } else {
+ if (that.properties == null) {
+ return false;
+ }
+ for (String property : CONNECTION_PROPERTIES) {
+ String thisValue = this.properties.get(property);
+ String thatValue = that.properties.get(property);
+ //noinspection StringEquality
+ if (thisValue == thatValue) {
+ continue;
+ }
+ if (thisValue == null || !thisValue.equals(thatValue)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "HConnectionKey{" +
+ "properties=" + properties +
+ ", username='" + username + '\'' +
+ '}';
+ }
+ }
+
+ /** Encapsulates connection to zookeeper and regionservers.*/
+ static class HConnectionImplementation implements HConnection, Closeable {
+ static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
+ private final Class<? extends AdminProtocol> adminClass;
+ private final Class<? extends ClientProtocol> clientClass;
+ private final long pause;
+ private final int numRetries;
+ private final int maxRPCAttempts;
+ private final int rpcTimeout;
+ private final int prefetchRegionLimit;
+
+ private volatile boolean closed;
+ private volatile boolean aborted;
+
+ private final Object metaRegionLock = new Object();
+ private final Object userRegionLock = new Object();
+
+ // We have a single lock for master & zk to prevent deadlocks. Having
+ // one lock for ZK and one lock for master is not possible:
+ // When creating a connection to master, we need a connection to ZK to get
+ // its address. But another thread could have taken the ZK lock, and could
+ // be waiting for the master lock => deadlock.
+ private final Object masterAndZKLock = new Object();
+
+ private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+ private final DelayedClosing delayedClosing =
+ DelayedClosing.createAndStart(this);
+
+
+ private final Configuration conf;
+
+ // client RPC
+ private RpcClientEngine rpcEngine;
+
+ // Known region ServerName.toString() -> RegionClient/Admin
+ private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
+ new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
+ private final ConcurrentHashMap<String, String> connectionLock =
+ new ConcurrentHashMap<String, String>();
+
+ /**
+ * Map of table to table {@link HRegionLocation}s. The table key is made
+ * by doing a {@link Bytes#mapKey(byte[])} of the table's name.
+ */
+ private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>>
+ cachedRegionLocations =
+ new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
+
+ // The presence of a server in the map implies it's likely that there is an
+ // entry in cachedRegionLocations that map to this server; but the absence
+ // of a server in this map guarentees that there is no entry in cache that
+ // maps to the absent server.
+ private final Set<String> cachedServers =
+ new HashSet<String>();
+
+ // region cache prefetch is enabled by default. this set contains all
+ // tables whose region cache prefetch are disabled.
+ private final Set<Integer> regionCachePrefetchDisabledTables =
+ new CopyOnWriteArraySet<Integer>();
+
+ private int refCount;
+
+ // indicates whether this connection's life cycle is managed (by us)
+ private final boolean managed;
+ /**
+ * constructor
+ * @param conf Configuration object
+ */
+ @SuppressWarnings("unchecked")
+ public HConnectionImplementation(Configuration conf, boolean managed)
+ throws ZooKeeperConnectionException {
+ this.conf = conf;
+ this.managed = managed;
+ String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
+ DEFAULT_ADMIN_PROTOCOL_CLASS);
+ this.closed = false;
+ try {
+ this.adminClass =
+ (Class<? extends AdminProtocol>) Class.forName(adminClassName);
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find region server interface " + adminClassName, e);
+ }
+ String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
+ DEFAULT_CLIENT_PROTOCOL_CLASS);
+ try {
+ this.clientClass =
+ (Class<? extends ClientProtocol>) Class.forName(clientClassName);
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find client protocol " + clientClassName, e);
+ }
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ this.maxRPCAttempts = conf.getInt(
+ HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+ HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS);
+ this.rpcTimeout = conf.getInt(
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.prefetchRegionLimit = conf.getInt(
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+
+ retrieveClusterId();
+ // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
+ // but we maintain access through an interface to allow overriding for tests
+ // RPC engine setup must follow obtaining the cluster ID for token authentication to work
+ this.rpcEngine = new ProtobufRpcClientEngine(this.conf);
+ }
+
+ /**
+ * An identifier that will remain the same for a given connection.
+ * @return
+ */
+ public String toString(){
+ return "hconnection 0x" + Integer.toHexString( hashCode() );
+ }
+
+ private String clusterId = null;
+ public final void retrieveClusterId(){
+ if (conf.get(HConstants.CLUSTER_ID) != null){
+ return;
+ }
+
+ // No synchronized here, worse case we will retrieve it twice, that's
+ // not an issue.
+ if (this.clusterId == null){
+ this.clusterId = conf.get(HConstants.CLUSTER_ID);
+ if (this.clusterId == null) {
+ ZooKeeperKeepAliveConnection zkw = null;
+ try {
+ zkw = getKeepAliveZooKeeperWatcher();
+ this.clusterId = ZKClusterId.readClusterIdZNode(zkw);
+ if (clusterId == null) {
+ LOG.info("ClusterId read in ZooKeeper is null");
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Can't retrieve clusterId from Zookeeper", e);
+ } catch (IOException e) {
+ LOG.warn("Can't retrieve clusterId from Zookeeper", e);
+ } finally {
+ if (zkw != null) {
+ zkw.close();
+ }
+ }
+ if (this.clusterId == null) {
+ this.clusterId = "default";
+ }
+
+ LOG.info("ClusterId is " + clusterId);
+ }
+ }
+
+ conf.set(HConstants.CLUSTER_ID, clusterId);
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ private static class MasterProtocolState {
+ public MasterProtocol protocol;
+ public int userCount;
+ public long keepAliveUntil = Long.MAX_VALUE;
+ public final Class<? extends MasterProtocol> protocolClass;
+
+ public MasterProtocolState (
+ final Class<? extends MasterProtocol> protocolClass) {
+ this.protocolClass = protocolClass;
+ }
+ }
+
+ /**
+ * Create a new Master proxy. Try once only.
+ */
+ private MasterProtocol createMasterInterface(
+ MasterProtocolState masterProtocolState)
+ throws IOException, KeeperException, ServiceException {
+
+ ZooKeeperKeepAliveConnection zkw;
+ try {
+ zkw = getKeepAliveZooKeeperWatcher();
+ } catch (IOException e) {
+ throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
+ }
+
+ try {
+
+ checkIfBaseNodeAvailable(zkw);
+ ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
+ if (sn == null) {
+ String msg =
+ "ZooKeeper available but no active master location found";
+ LOG.info(msg);
+ throw new MasterNotRunningException(msg);
+ }
+
+
+ InetSocketAddress isa =
+ new InetSocketAddress(sn.getHostname(), sn.getPort());
+ MasterProtocol tryMaster = rpcEngine.getProxy(
+ masterProtocolState.protocolClass,
+ isa, this.conf, this.rpcTimeout);
+
+ if (tryMaster.isMasterRunning(
+ null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
+ return tryMaster;
+ } else {
+ String msg = "Can create a proxy to master, but it is not running";
+ LOG.info(msg);
+ throw new MasterNotRunningException(msg);
+ }
+ } finally {
+ zkw.close();
+ }
+ }
+
+ /**
+ * Create a master, retries if necessary.
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
+ private MasterProtocol createMasterWithRetries(
+ MasterProtocolState masterProtocolState) throws MasterNotRunningException {
+
+ // The lock must be at the beginning to prevent multiple master creation
+ // (and leaks) in a multithread context
+
+ synchronized (this.masterAndZKLock) {
+ Exception exceptionCaught = null;
+ MasterProtocol master = null;
+ int tries = 0;
+ while (
+ !this.closed && master == null
+ ) {
+ tries++;
+ try {
+ master = createMasterInterface(masterProtocolState);
+ } catch (IOException e) {
+ exceptionCaught = e;
+ } catch (KeeperException e) {
+ exceptionCaught = e;
+ } catch (ServiceException e) {
+ exceptionCaught = e;
+ }
+
+ if (exceptionCaught != null)
+ // It failed. If it's not the last try, we're going to wait a little
+ if (tries < numRetries) {
+ // tries at this point is 1 or more; decrement to start from 0.
+ long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
+ LOG.info("getMaster attempt " + tries + " of " + numRetries +
+ " failed; retrying after sleep of " +pauseTime, exceptionCaught);
+
+ try {
+ Thread.sleep(pauseTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Thread was interrupted while trying to connect to master.", e);
+ }
+
+ } else {
+ // Enough tries, we stop now
+ LOG.info("getMaster attempt " + tries + " of " + numRetries +
+ " failed; no more retrying.", exceptionCaught);
+ throw new MasterNotRunningException(exceptionCaught);
+ }
+ }
+
+ if (master == null) {
+ // implies this.closed true
+ throw new MasterNotRunningException(
+ "Connection was closed while trying to get master");
+ }
+
+ return master;
+ }
+ }
+
+ private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
+ throws MasterNotRunningException {
+ String errorMsg;
+ try {
+ if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
+ errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
+ + "It should have been written by the master. "
+ + "Check the value configured in 'zookeeper.znode.parent'. "
+ + "There could be a mismatch with the one configured in the master.";
+ LOG.error(errorMsg);
+ throw new MasterNotRunningException(errorMsg);
+ }
+ } catch (KeeperException e) {
+ errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
+ LOG.error(errorMsg);
+ throw new MasterNotRunningException(errorMsg, e);
+ }
+ }
+
+ /**
+ * @return true if the master is running, throws an exception otherwise
+ * @throws MasterNotRunningException - if the master is not running
+ * @throws ZooKeeperConnectionException
+ */
+ @Override
+ public boolean isMasterRunning()
+ throws MasterNotRunningException, ZooKeeperConnectionException {
+ // When getting the master proxy connection, we check it's running,
+ // so if there is no exception, it means we've been able to get a
+ // connection on a running master
+ getKeepAliveMasterMonitor().close();
+ return true;
+ }
+
+ @Override
+ public HRegionLocation getRegionLocation(final byte [] name,
+ final byte [] row, boolean reload)
+ throws IOException {
+ return reload? relocateRegion(name, row): locateRegion(name, row);
+ }
+
+ @Override
+ public boolean isTableEnabled(byte[] tableName) throws IOException {
+ return testTableOnlineState(tableName, true);
+ }
+
+ @Override
+ public boolean isTableDisabled(byte[] tableName) throws IOException {
+ return testTableOnlineState(tableName, false);
+ }
+
+ @Override
+ public boolean isTableAvailable(final byte[] tableName) throws IOException {
+ final AtomicBoolean available = new AtomicBoolean(true);
+ final AtomicInteger regionCount = new AtomicInteger(0);
+ MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
+ @Override
+ public boolean processRow(Result row) throws IOException {
+ HRegionInfo info = MetaScanner.getHRegionInfo(row);
+ if (info != null) {
+ if (Bytes.equals(tableName, info.getTableName())) {
+ ServerName server = HRegionInfo.getServerName(row);
+ if (server == null) {
+ available.set(false);
+ return false;
+ }
+ regionCount.incrementAndGet();
+ }
+ }
+ return true;
+ }
+ };
+ MetaScanner.metaScan(conf, visitor);
+ return available.get() && (regionCount.get() > 0);
+ }
+
+ /*
+ * @param True if table is online
+ */
+ private boolean testTableOnlineState(byte [] tableName, boolean online)
+ throws IOException {
+ if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+ // The root region is always enabled
+ return online;
+ }
+ String tableNameStr = Bytes.toString(tableName);
+ ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
+ try {
+ if (online) {
+ return ZKTableReadOnly.isEnabledTable(zkw, tableNameStr);
+ }
+ return ZKTableReadOnly.isDisabledTable(zkw, tableNameStr);
+ } catch (KeeperException e) {
+ throw new IOException("Enable/Disable failed", e);
+ }finally {
+ zkw.close();
+ }
+ }
+
+ @Override
+ public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
+ return locateRegion(HRegionInfo.getTableName(regionName),
+ HRegionInfo.getStartKey(regionName), false, true);
+ }
+
+ @Override
+ public List<HRegionLocation> locateRegions(final byte[] tableName)
+ throws IOException {
+ return locateRegions (tableName, false, true);
+ }
+
+ @Override
+ public List<HRegionLocation> locateRegions(final byte[] tableName, final boolean useCache,
+ final boolean offlined) throws IOException {
+ NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, tableName,
+ offlined);
+ final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
+ for (HRegionInfo regionInfo : regions.keySet()) {
+ locations.add(locateRegion(tableName, regionInfo.getStartKey(), useCache, true));
+ }
+ return locations;
+ }
+
+ @Override
+ public HRegionLocation locateRegion(final byte [] tableName,
+ final byte [] row)
+ throws IOException{
+ return locateRegion(tableName, row, true, true);
+ }
+
+ @Override
+ public HRegionLocation relocateRegion(final byte [] tableName,
+ final byte [] row)
+ throws IOException{
+
+ // Since this is an explicit request not to use any caching, finding
+ // disabled tables should not be desirable. This will ensure that an exception is thrown when
+ // the first time a disabled table is interacted with.
+ if (isTableDisabled(tableName)) {
+ throw new DoNotRetryIOException(Bytes.toString(tableName) + " is disabled.");
+ }
+
+ return locateRegion(tableName, row, false, true);
+ }
+
+ private HRegionLocation locateRegion(final byte [] tableName,
+ final byte [] row, boolean useCache, boolean retry)
+ throws IOException {
+ if (this.closed) throw new IOException(toString() + " closed");
+ if (tableName == null || tableName.length == 0) {
+ throw new IllegalArgumentException(
+ "table name cannot be null or zero length");
+ }
+
+ if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+ ZooKeeperKeepAliveConnection zkw = getKeepAliveZooKeeperWatcher();
+ try {
+ LOG.debug("Looking up root region location in ZK," +
+ " connection=" + this);
+ ServerName servername =
+ RootRegionTracker.blockUntilAvailable(zkw, this.rpcTimeout);
+
+ LOG.debug("Looked up root region location, connection=" + this +
+ "; serverName=" + ((servername == null) ? "null" : servername));
+ if (servername == null) return null;
+ return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, servername, 0);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ } finally {
+ zkw.close();
+ }
+ } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+ return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,
+ useCache, metaRegionLock, retry);
+ } else {
+ // Region not in the cache - have to go to the meta RS
+ return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
+ useCache, userRegionLock, retry);
+ }
+ }
+
+ /*
+ * Search .META. for the HRegionLocation info that contains the table and
+ * row we're seeking. It will prefetch certain number of regions info and
+ * save them to the global region cache.
+ */
+ private void prefetchRegionCache(final byte[] tableName,
+ final byte[] row) {
+ // Implement a new visitor for MetaScanner, and use it to walk through
+ // the .META.
+ MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
+ public boolean processRow(Result result) throws IOException {
+ try {
+ HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result);
+ if (regionInfo == null) {
+ return true;
+ }
+
+ // possible we got a region of a different table...
+ if (!Bytes.equals(regionInfo.getTableName(), tableName)) {
+ return false; // stop scanning
+ }
+ if (regionInfo.isOffline()) {
+ // don't cache offline regions
+ return true;
+ }
+
+ ServerName serverName = HRegionInfo.getServerName(result);
+ if (serverName == null) {
+ return true; // don't cache it
+ }
+ // instantiate the location
+ HRegionLocation loc = new HRegionLocation(regionInfo, serverName,
+ HRegionInfo.getSeqNumDuringOpen(result));
+ // cache this meta entry
+ cacheLocation(tableName, null, loc);
+ return true;
+ } catch (RuntimeException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ try {
+ // pre-fetch certain number of regions info at region cache.
+ MetaScanner.metaScan(conf, visitor, tableName, row,
+ this.prefetchRegionLimit);
+ } catch (IOException e) {
+ LOG.warn("Encountered problems when prefetch META table: ", e);
+ }
+ }
+
+ /*
+ * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
+ * info that contains the table and row we're seeking.
+ */
+ private HRegionLocation locateRegionInMeta(final byte [] parentTable,
+ final byte [] tableName, final byte [] row, boolean useCache,
+ Object regionLockObject, boolean retry)
+ throws IOException {
+ HRegionLocation location;
+ // If we are supposed to be using the cache, look in the cache to see if
+ // we already have the region.
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ }
+ int localNumRetries = retry ? numRetries : 1;
+ // build the key of the meta region we should be looking for.
+ // the extra 9's on the end are necessary to allow "exact" matches
+ // without knowing the precise region names.
+ byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
+ HConstants.NINES, false);
+ for (int tries = 0; true; tries++) {
+ if (tries >= localNumRetries) {
+ throw new NoServerForRegionException("Unable to find region for "
+ + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
+ }
+
+ HRegionLocation metaLocation = null;
+ try {
+ // locate the root or meta region
+ metaLocation = locateRegion(parentTable, metaKey, true, false);
+ // If null still, go around again.
+ if (metaLocation == null) continue;
+ ClientProtocol server =
+ getClient(metaLocation.getServerName());
+
+ Result regionInfoRow = null;
+ // This block guards against two threads trying to load the meta
+ // region at the same time. The first will load the meta region and
+ // the second will use the value that the first one found.
+ synchronized (regionLockObject) {
+ // If the parent table is META, we may want to pre-fetch some
+ // region info into the global region cache for this table.
+ if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
+ (getRegionCachePrefetch(tableName)) ) {
+ prefetchRegionCache(tableName, row);
+ }
+
+ // Check the cache again for a hit in case some other thread made the
+ // same query while we were waiting on the lock. If not supposed to
+ // be using the cache, delete any existing cached location so it won't
+ // interfere.
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ } else {
+ forceDeleteCachedLocation(tableName, row);
+ }
+
+ // Query the root or meta region for the location of the meta region
+ regionInfoRow = ProtobufUtil.getRowOrBefore(server,
+ metaLocation.getRegionInfo().getRegionName(), metaKey,
+ HConstants.CATALOG_FAMILY);
+ }
+ if (regionInfoRow == null) {
+ throw new TableNotFoundException(Bytes.toString(tableName));
+ }
+
+ // convert the row result into the HRegionLocation we need!
+ HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow);
+ if (regionInfo == null) {
+ throw new IOException("HRegionInfo was null or empty in " +
+ Bytes.toString(parentTable) + ", row=" + regionInfoRow);
+ }
+
+ // possible we got a region of a different table...
+ if (!Bytes.equals(regionInfo.getTableName(), tableName)) {
+ throw new TableNotFoundException(
+ "Table '" + Bytes.toString(tableName) + "' was not found, got: " +
+ Bytes.toString(regionInfo.getTableName()) + ".");
+ }
+ if (regionInfo.isSplit()) {
+ throw new RegionOfflineException("the only available region for" +
+ " the required row is a split parent," +
+ " the daughters should be online soon: " +
+ regionInfo.getRegionNameAsString());
+ }
+ if (regionInfo.isOffline()) {
+ throw new RegionOfflineException("the region is offline, could" +
+ " be caused by a disable table call: " +
+ regionInfo.getRegionNameAsString());
+ }
+
+ ServerName serverName = HRegionInfo.getServerName(regionInfoRow);
+ if (serverName == null) {
+ throw new NoServerForRegionException("No server address listed " +
+ "in " + Bytes.toString(parentTable) + " for region " +
+ regionInfo.getRegionNameAsString() + " containing row " +
+ Bytes.toStringBinary(row));
+ }
+
+ // Instantiate the location
+ location = new HRegionLocation(regionInfo, serverName,
+ HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
+ cacheLocation(tableName, null, location);
+ return location;
+ } catch (TableNotFoundException e) {
+ // if we got this error, probably means the table just plain doesn't
+ // exist. rethrow the error immediately. this should always be coming
+ // from the HTable constructor.
+ throw e;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ if (tries < numRetries - 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("locateRegionInMeta parentTable=" +
+ Bytes.toString(parentTable) + ", metaLocation=" +
+ ((metaLocation == null)? "null": "{" + metaLocation + "}") +
+ ", attempt=" + tries + " of " +
+ this.numRetries + " failed; retrying after sleep of " +
+ ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
+ }
+ } else {
+ throw e;
+ }
+ // Only relocate the parent region if necessary
+ if(!(e instanceof RegionOfflineException ||
+ e instanceof NoServerForRegionException)) {
+ relocateRegion(parentTable, metaKey);
+ }
+ }
+ try{
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Giving up trying to location region in " +
+ "meta: thread is interrupted.");
+ }
+ }
+ }
+
+ /*
+ * Search the cache for a location that fits our table and row key.
+ * Return null if no suitable region is located. TODO: synchronization note
+ *
+ * <p>TODO: This method during writing consumes 15% of CPU doing lookup
+ * into the Soft Reference SortedMap. Improve.
+ *
+ * @param tableName
+ * @param row
+ * @return Null or region location found in cache.
+ */
+ HRegionLocation getCachedLocation(final byte [] tableName,
+ final byte [] row) {
+ SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
+ getTableLocations(tableName);
+
+ // start to examine the cache. we can only do cache actions
+ // if there's something in the cache for this table.
+ if (tableLocations.isEmpty()) {
+ return null;
+ }
+
+ HRegionLocation possibleRegion = tableLocations.get(row);
+ if (possibleRegion != null) {
+ return possibleRegion;
+ }
+
+ possibleRegion = tableLocations.lowerValueByKey(row);
+ if (possibleRegion == null) {
+ return null;
+ }
+
+ // make sure that the end key is greater than the row we're looking
+ // for, otherwise the row actually belongs in the next region, not
+ // this one. the exception case is when the endkey is
+ // HConstants.EMPTY_END_ROW, signifying that the region we're
+ // checking is actually the last region in the table.
+ byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
+ if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
+ KeyValue.getRowComparator(tableName).compareRows(
+ endKey, 0, endKey.length, row, 0, row.length) > 0) {
+ return possibleRegion;
+ }
+
+ // Passed all the way through, so we got nothing - complete cache miss
+ return null;
+ }
+
+ /**
+ * Delete a cached location, no matter what it is. Called when we were told to not use cache.
+ * @param tableName tableName
+ * @param row
+ */
+ void forceDeleteCachedLocation(final byte [] tableName, final byte [] row) {
+ HRegionLocation rl = null;
+ synchronized (this.cachedRegionLocations) {
+ Map<byte[], HRegionLocation> tableLocations =
+ getTableLocations(tableName);
+ // start to examine the cache. we can only do cache actions
+ // if there's something in the cache for this table.
+ if (!tableLocations.isEmpty()) {
+ rl = getCachedLocation(tableName, row);
+ if (rl != null) {
+ tableLocations.remove(rl.getRegionInfo().getStartKey());
+ }
+ }
+ }
+ if ((rl != null) && LOG.isDebugEnabled()) {
+ LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
+ + " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
+ " for tableName=" + Bytes.toString(tableName) +
+ " from cache to make sure we don't use cache for " + Bytes.toStringBinary(row));
+ }
+ }
+
+ @Override
+ public void clearCaches(String sn) {
+ clearCachedLocationForServer(sn);
+ }
+
+ /*
+ * Delete all cached entries of a table that maps to a specific location.
+ *
+ * @param tablename
+ * @param server
+ */
+ private void clearCachedLocationForServer(final String server) {
+ boolean deletedSomething = false;
+ synchronized (this.cachedRegionLocations) {
+ if (!cachedServers.contains(server)) {
+ return;
+ }
+ for (Map<byte[], HRegionLocation> tableLocations :
+ cachedRegionLocations.values()) {
+ for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
+ if (e.getValue().getHostnamePort().equals(server)) {
+ tableLocations.remove(e.getKey());
+ deletedSomething = true;
+ }
+ }
+ }
+ cachedServers.remove(server);
+ }
+ if (deletedSomething && LOG.isDebugEnabled()) {
+ LOG.debug("Removed all cached region locations that map to " + server);
+ }
+ }
+
+ /*
+ * @param tableName
+ * @return Map of cached locations for passed <code>tableName</code>
+ */
+ private SoftValueSortedMap<byte [], HRegionLocation> getTableLocations(
+ final byte [] tableName) {
+ // find the map of cached locations for this table
+ Integer key = Bytes.mapKey(tableName);
+ SoftValueSortedMap<byte [], HRegionLocation> result;
+ synchronized (this.cachedRegionLocations) {
+ result = this.cachedRegionLocations.get(key);
+ // if tableLocations for this table isn't built yet, make one
+ if (result == null) {
+ result = new SoftValueSortedMap<byte [], HRegionLocation>(
+ Bytes.BYTES_COMPARATOR);
+ this.cachedRegionLocations.put(key, result);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void clearRegionCache() {
+ synchronized(this.cachedRegionLocations) {
+ this.cachedRegionLocations.clear();
+ this.cachedServers.clear();
+ }
+ }
+
+ @Override
+ public void clearRegionCache(final byte [] tableName) {
+ synchronized (this.cachedRegionLocations) {
+ this.cachedRegionLocations.remove(Bytes.mapKey(tableName));
+ }
+ }
+
+ /**
+ * Put a newly discovered HRegionLocation into the cache.
+ * @param tableName The table name.
+ * @param source the source of the new location, if it's not coming from meta
+ * @param location the new location
+ */
+ private void cacheLocation(final byte [] tableName, final HRegionLocation source,
+ final HRegionLocation location) {
+ boolean isFromMeta = (source == null);
+ byte [] startKey = location.getRegionInfo().getStartKey();
+ Map<byte [], HRegionLocation> tableLocations =
+ getTableLocations(tableName);
+ boolean isNewCacheEntry = false;
+ boolean isStaleUpdate = false;
+ HRegionLocation oldLocation = null;
+ synchronized (this.cachedRegionLocations) {
+ cachedServers.add(location.getHostnamePort());
+ oldLocation = tableLocations.get(startKey);
+ isNewCacheEntry = (oldLocation == null);
+ // If the server in cache sends us a redirect, assume it's always valid.
+ if (!isNewCacheEntry && !oldLocation.equals(source)) {
+ long newLocationSeqNum = location.getSeqNum();
+ // Meta record is stale - some (probably the same) server has closed the region
+ // with later seqNum and told us about the new location.
+ boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
+ // Same as above for redirect. However, in this case, if the number is equal to previous
+ // record, the most common case is that first the region was closed with seqNum, and then
+ // opened with the same seqNum; hence we will ignore the redirect.
+ // There are so many corner cases with various combinations of opens and closes that
+ // an additional counter on top of seqNum would be necessary to handle them all.
+ boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
+ isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
+ }
+ if (!isStaleUpdate) {
+ tableLocations.put(startKey, location);
+ }
+ }
+ if (isNewCacheEntry) {
+ LOG.debug("Cached location for " +
+ location.getRegionInfo().getRegionNameAsString() +
+ " is " + location.getHostnamePort());
+ } else if (isStaleUpdate && !location.equals(oldLocation)) {
+ LOG.debug("Ignoring stale location update for "
+ + location.getRegionInfo().getRegionNameAsString() + ": "
+ + location.getHostnamePort() + " at " + location.getSeqNum() + "; local "
+ + oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum());
+ }
+ }
+
+ @Override
+ @Deprecated
+ public AdminProtocol getAdmin(final String hostname, final int port) throws IOException {
+ return getAdmin(new ServerName(hostname, port, 0L));
+ }
+
+ @Override
+ public AdminProtocol getAdmin(final ServerName serverName)
+ throws IOException {
+ return getAdmin(serverName, false);
+ }
+
+ @Override
+ @Deprecated
+ public ClientProtocol getClient(final String hostname, final int port)
+ throws IOException {
+ return (ClientProtocol)getProtocol(hostname, port, clientClass);
+ }
+
+ @Override
+ public ClientProtocol getClient(final ServerName serverName)
+ throws IOException {
+ return (ClientProtocol)
+ getProtocol(serverName.getHostname(), serverName.getPort(), clientClass);
+ }
+
+ @Override
+ @Deprecated
+ public AdminProtocol getAdmin(final String hostname, final int port,
+ final boolean master)
+ throws IOException {
+ return (AdminProtocol)getProtocol(hostname, port, adminClass);
+ }
+
+ @Override
+ public AdminProtocol getAdmin(final ServerName serverName, final boolean master)
+ throws IOException {
+ return (AdminProtocol)getProtocol(
+ serverName.getHostname(), serverName.getPort(), adminClass);
+ }
+
+ /**
+ * Either the passed <code>isa</code> is null or <code>hostname</code>
+ * can be but not both.
+ * @param hostname
+ * @param port
+ * @param protocolClass
+ * @return Proxy.
+ * @throws IOException
+ */
+ IpcProtocol getProtocol(final String hostname,
+ final int port, final Class <? extends IpcProtocol> protocolClass)
+ throws IOException {
+ String rsName = Addressing.createHostAndPortStr(hostname, port);
+ // See if we already have a connection (common case)
+ Map<String, IpcProtocol> protocols = this.servers.get(rsName);
+ if (protocols == null) {
+ protocols = new HashMap<String, IpcProtocol>();
+ Map<String, IpcProtocol> existingProtocols =
+ this.servers.putIfAbsent(rsName, protocols);
+ if (existingProtocols != null) {
+ protocols = existingProtocols;
+ }
+ }
+ String protocol = protocolClass.getName();
+ IpcProtocol server = protocols.get(protocol);
+ if (server == null) {
+ // create a unique lock for this RS + protocol (if necessary)
+ String lockKey = protocol + "@" + rsName;
+ this.connectionLock.putIfAbsent(lockKey, lockKey);
+ // get the RS lock
+ synchronized (this.connectionLock.get(lockKey)) {
+ // do one more lookup in case we were stalled above
+ server = protocols.get(protocol);
+ if (server == null) {
+ try {
+ // Only create isa when we need to.
+ InetSocketAddress address = new InetSocketAddress(hostname, port);
+ // definitely a cache miss. establish an RPC for this RS
+ server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf,
+ this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
+ protocols.put(protocol, server);
+ } catch (RemoteException e) {
+ LOG.warn("RemoteException connecting to RS", e);
+ // Throw what the RemoteException was carrying.
+ throw e.unwrapRemoteException();
+ }
+ }
+ }
+ }
+ return server;
+ }
+
+ @Override
+ @Deprecated
+ public ZooKeeperWatcher getZooKeeperWatcher()
+ throws ZooKeeperConnectionException {
+ canCloseZKW = false;
+
+ try {
+ return getKeepAliveZooKeeperWatcher();
+ } catch (ZooKeeperConnectionException e){
+ throw e;
+ }catch (IOException e) {
+ // Encapsulate exception to keep interface
+ throw new ZooKeeperConnectionException(
+ "Can't create a zookeeper connection", e);
+ }
+ }
+
+
+ private ZooKeeperKeepAliveConnection keepAliveZookeeper;
+ private int keepAliveZookeeperUserCount;
+ private boolean canCloseZKW = true;
+
+ // keepAlive time, in ms. No reason to make it configurable.
+ private static final long keepAlive = 5 * 60 * 1000;
+
+ /**
+ * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have
+ * finished with it.
+ * @return The shared instance. Never returns null.
+ */
+ public ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
+ throws IOException {
+ synchronized (masterAndZKLock) {
+
+ if (keepAliveZookeeper == null) {
+ // We don't check that our link to ZooKeeper is still valid
+ // But there is a retry mechanism in the ZooKeeperWatcher itself
+ keepAliveZookeeper = new ZooKeeperKeepAliveConnection(
+ conf, this.toString(), this);
+ }
+ keepAliveZookeeperUserCount++;
+ keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+
+ return keepAliveZookeeper;
+ }
+ }
+
+ void releaseZooKeeperWatcher(ZooKeeperWatcher zkw) {
+ if (zkw == null){
+ return;
+ }
+ synchronized (masterAndZKLock) {
+ --keepAliveZookeeperUserCount;
+ if (keepAliveZookeeperUserCount <=0 ){
+ keepZooKeeperWatcherAliveUntil =
+ System.currentTimeMillis() + keepAlive;
+ }
+ }
+ }
+
+
+ /**
+ * Creates a Chore thread to check the connections to master & zookeeper
+ * and close them when they reach their closing time (
+ * {@link MasterProtocolState#keepAliveUntil} and
+ * {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
+ * managed by the release functions and the variable {@link #keepAlive}
+ */
+ private static class DelayedClosing extends Chore implements Stoppable {
+ private HConnectionImplementation hci;
+ Stoppable stoppable;
+
+ private DelayedClosing(
+ HConnectionImplementation hci, Stoppable stoppable){
+ super(
+ "ZooKeeperWatcher and Master delayed closing for connection "+hci,
+ 60*1000, // We check every minutes
+ stoppable);
+ this.hci = hci;
+ this.stoppable = stoppable;
+ }
+
+ static DelayedClosing createAndStart(HConnectionImplementation hci){
+ Stoppable stoppable = new Stoppable() {
+ private volatile boolean isStopped = false;
+ @Override public void stop(String why) { isStopped = true;}
+ @Override public boolean isStopped() {return isStopped;}
+ };
+
+ return new DelayedClosing(hci, stoppable);
+ }
+
+ protected void closeMasterProtocol(MasterProtocolState protocolState) {
+ if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
+ hci.closeMasterProtocol(protocolState);
+ protocolState.keepAliveUntil = Long.MAX_VALUE;
+ }
+ }
+
+ @Override
+ protected void chore() {
+ synchronized (hci.masterAndZKLock) {
+ if (hci.canCloseZKW) {
+ if (System.currentTimeMillis() >
+ hci.keepZooKeeperWatcherAliveUntil) {
+
+ hci.closeZooKeeperWatcher();
+ hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+ }
+ }
+ closeMasterProtocol(hci.masterAdminProtocol);
+ closeMasterProtocol(hci.masterMonitorProtocol);
+ }
+ }
+
+ @Override
+ public void stop(String why) {
+ stoppable.stop(why);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stoppable.isStopped();
+ }
+ }
+
+ private void closeZooKeeperWatcher() {
+ synchronized (masterAndZKLock) {
+ if (keepAliveZookeeper != null) {
+ LOG.info("Closing zookeeper sessionid=0x" +
+ Long.toHexString(
+ keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
+ keepAliveZookeeper.internalClose();
+ keepAliveZookeeper = null;
+ }
+ keepAliveZookeeperUserCount = 0;
+ }
+ }
+
+ private static class MasterProtocolHandler implements InvocationHandler {
+ private HConnectionImplementation connection;
+ private MasterProtocolState protocolStateTracker;
+
+ protected MasterProtocolHandler(HConnectionImplementation connection,
+ MasterProtocolState protocolStateTracker) {
+ this.connection = connection;
+ this.protocolStateTracker = protocolStateTracker;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ if (method.getName().equals("close") &&
+ method.getParameterTypes().length == 0) {
+ release(connection, protocolStateTracker);
+ return null;
+ } else {
+ try {
+ return method.invoke(protocolStateTracker.protocol, args);
+ }catch (InvocationTargetException e){
+ // We will have this for all the exception, checked on not, sent
+ // by any layer, including the functional exception
+ Throwable cause = e.getCause();
+ if (cause == null){
+ throw new RuntimeException(
+ "Proxy invocation failed and getCause is null", e);
+ }
+ if (cause instanceof UndeclaredThrowableException) {
+ cause = cause.getCause();
+ }
+ throw cause;
+ }
+ }
+ }
+
+ private void release(
+ HConnectionImplementation connection,
+ MasterProtocolState target) {
+ connection.releaseMaster(target);
+ }
+ }
+
+ MasterProtocolState masterAdminProtocol =
+ new MasterProtocolState(MasterAdminProtocol.class);
+ MasterProtocolState masterMonitorProtocol =
+ new MasterProtocolState(MasterMonitorProtocol.class);
+
+ /**
+ * This function allows HBaseAdmin and potentially others
+ * to get a shared master connection.
+ *
+ * @return The shared instance. Never returns null.
+ * @throws MasterNotRunningException
+ */
+ private Object getKeepAliveMasterProtocol(
+ MasterProtocolState protocolState, Class connectionClass)
+ throws MasterNotRunningException {
+ synchronized (masterAndZKLock) {
+ if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
+ protocolState.protocol = null;
+ protocolState.protocol = createMasterWithRetries(protocolState);
+ }
+ protocolState.userCount++;
+ protocolState.keepAliveUntil = Long.MAX_VALUE;
+
+ return Proxy.newProxyInstance(
+ connectionClass.getClassLoader(),
+ new Class[]{connectionClass},
+ new MasterProtocolHandler(this, protocolState)
+ );
+ }
+ }
+
+ @Override
+ public MasterAdminProtocol getMasterAdmin() throws MasterNotRunningException {
+ return getKeepAliveMasterAdmin();
+ }
+
+ @Override
+ public MasterMonitorProtocol getMasterMonitor() throws MasterNotRunningException {
+ return getKeepAliveMasterMonitor();
+ }
+
+ @Override
+ public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin()
+ throws MasterNotRunningException {
+ return (MasterAdminKeepAliveConnection)
+ getKeepAliveMasterProtocol(masterAdminProtocol, MasterAdminKeepAliveConnection.class);
+ }
+
+ @Override
+ public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitor()
+ throws MasterNotRunningException {
+ return (MasterMonitorKeepAliveConnection)
+ getKeepAliveMasterProtocol(masterMonitorProtocol, MasterMonitorKeepAliveConnection.class);
+ }
+
+ private boolean isKeepAliveMasterConnectedAndRunning(MasterProtocolState protocolState){
+ if (protocolState.protocol == null){
+ return false;
+ }
+ try {
+ return protocolState.protocol.isMasterRunning(
+ null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning();
+ }catch (UndeclaredThrowableException e){
+ // It's somehow messy, but we can receive exceptions such as
+ // java.net.ConnectException but they're not declared. So we catch
+ // it...
+ LOG.info("Master connection is not running anymore",
+ e.getUndeclaredThrowable());
+ return false;
+ } catch (ServiceException se) {
+ LOG.warn("Checking master connection", se);
+ return false;
+ }
+ }
+
+ private void releaseMaster(MasterProtocolState protocolState) {
+ if (protocolState.protocol == null){
+ return;
+ }
+ synchronized (masterAndZKLock) {
+ --protocolState.userCount;
+ if (protocolState.userCount <= 0) {
+ protocolState.keepAliveUntil =
+ System.currentTimeMillis() + keepAlive;
+ }
+ }
+ }
+
+ private void closeMasterProtocol(MasterProtocolState protocolState) {
+ if (protocolState.protocol != null){
+ LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
+ protocolState.protocol = null;
+ }
+ protocolState.userCount = 0;
+ }
+
+ /**
+ * Immediate close of the shared master. Can be by the delayed close or
+ * when closing the connection itself.
+ */
+ private void closeMaster() {
+ synchronized (masterAndZKLock) {
+ closeMasterProtocol(masterAdminProtocol);
+ closeMasterProtocol(masterMonitorProtocol);
+ }
+ }
+
+ @Override
+ public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
+ throws IOException, RuntimeException {
+ return callable.withRetries();
+ }
+
+ @Override
+ public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
+ throws IOException, RuntimeException {
+ return callable.withoutRetries();
+ }
+
+ @Deprecated
+ private <R> Callable<MultiResponse> createCallable(
+ final HRegionLocation loc, final MultiAction<R> multi,
+ final byte [] tableName) {
+ // TODO: This does not belong in here!!! St.Ack HConnections should
+ // not be dealing in Callables; Callables have HConnections, not other
+ // way around.
+ final HConnection connection = this;
+ return new Callable<MultiResponse>() {
+ public MultiResponse call() throws IOException {
+ ServerCallable<MultiResponse> callable =
+ new ServerCallable<MultiResponse>(connection, tableName, null) {
+ public MultiResponse call() throws IOException {
+ return ProtobufUtil.multi(server, multi);
+ }
+
+ @Override
+ public void connect(boolean reload) throws IOException {
+ server = connection.getClient(loc.getServerName());
+ }
+ };
+ return callable.withoutRetries();
+ }
+ };
+ }
+
+ void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
+ ServerName serverName, long seqNum) {
+ HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
+ synchronized (this.cachedRegionLocations) {
+ cacheLocation(hri.getTableName(), source, newHrl);
+ }
+ }
+
+ /**
+ * Deletes the cached location of the region if necessary, based on some error from source.
+ * @param hri The region in question.
+ * @param source The source of the error that prompts us to invalidate cache.
+ */
+ void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
+ boolean isStaleDelete = false;
+ HRegionLocation oldLocation;
+ synchronized (this.cachedRegionLocations) {
+ Map<byte[], HRegionLocation> tableLocations =
+ getTableLocations(hri.getTableName());
+ oldLocation = tableLocations.get(hri.getStartKey());
+ if (oldLocation != null) {
+ // Do not delete the cache entry if it's not for the same server that gave us the error.
+ isStaleDelete = (source != null) && !oldLocation.equals(source);
+ if (!isStaleDelete) {
+ tableLocations.remove(hri.getStartKey());
+ }
+ }
+ }
+ if (isStaleDelete) {
+ LOG.debug("Received an error from " + source.getHostnamePort() + " for region "
+ + hri.getRegionNameAsString() + "; not removing "
+ + oldLocation.getHostnamePort() + " from cache.");
+ }
+ }
+
+ /**
+ * Update the location with the new value (if the exception is a RegionMovedException)
+ * or delete it from the cache.
+ * @param exception an object (to simplify user code) on which we will try to find a nested
+ * or wrapped or both RegionMovedException
+ * @param source server that is the source of the location update.
+ */
+ private void updateCachedLocations(final byte[] tableName, Row row,
+ final Object exception, final HRegionLocation source) {
+ if (row == null || tableName == null) {
+ LOG.warn("Coding error, see method javadoc. row=" + (row == null ? "null" : row) +
+ ", tableName=" + (tableName == null ? "null" : Bytes.toString(tableName)));
+ return;
+ }
+
+ // Is it something we have already updated?
+ final HRegionLocation oldLocation = getCachedLocation(tableName, row.getRow());
+ if (oldLocation == null) {
+ // There is no such location in the cache => it's been removed already => nothing to do
+ return;
+ }
+
+ HRegionInfo regionInfo = oldLocation.getRegionInfo();
+ final RegionMovedException rme = RegionMovedException.find(exception);
+ if (rme != null) {
+ LOG.info("Region " + regionInfo.getRegionNameAsString() + " moved to " +
+ rme.getHostname() + ":" + rme.getPort() + " according to " + source.getHostnamePort());
+ updateCachedLocation(
+ regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
+ } else {
+ deleteCachedLocation(regionInfo, source);
+ }
+ }
+
+ @Override
+ @Deprecated
+ public void processBatch(List<? extends Row> list,
+ final byte[] tableName,
+ ExecutorService pool,
+ Object[] results) throws IOException, InterruptedException {
+ // This belongs in HTable!!! Not in here. St.Ack
+
+ // results must be the same size as list
+ if (results.length != list.size()) {
+ throw new IllegalArgumentException(
+ "argument results must be the same size as argument list");
+ }
+ processBatchCallback(list, tableName, pool, results, null);
+ }
+
+ /**
+ * Send the queries in parallel on the different region servers. Retries on failures.
+ * If the method returns it means that there is no error, and the 'results' array will
+ * contain no exception. On error, an exception is thrown, and the 'results' array will
+ * contain results and exceptions.
+ * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
+ */
+ @Override
+ @Deprecated
+ public <R> void processBatchCallback(
+ List<? extends Row> list,
+ byte[] tableName,
+ ExecutorService pool,
+ Object[] results,
+ Batch.Callback<R> callback)
+ throws IOException, InterruptedException {
+
+ Process<R> p = new Process<R>(this, list, tableName, pool, results, callback);
+ p.processBatchCallback();
+ }
+
+
+ /**
+ * Methods and attributes to manage a batch process are grouped into this single class.
+ * This allows, by creating a Process<R> per batch process to ensure multithread safety.
+ *
+ * This code should be move to HTable once processBatchCallback is not supported anymore in
+ * the HConnection interface.
+ */
+ private static class Process<R> {
+ // Info on the queries and their context
+ private final HConnectionImplementation hci;
+ private final List<? extends Row> rows;
+ private final byte[] tableName;
+ private final ExecutorService pool;
+ private final Object[] results;
+ private final Batch.Callback<R> callback;
+
+ // Used during the batch process
+ private final List<Action<R>> toReplay;
+ private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
+ inProgress;
+ private int curNumRetries;
+
+ // Notified when a tasks is done
+ private final List<MultiAction<R>> finishedTasks = new ArrayList<MultiAction<R>>();
+
+ private Process(HConnectionImplementation hci, List<? extends Row> list,
+ byte[] tableName, ExecutorService pool, Object[] results,
+ Batch.Callback<R> callback){
+ this.hci = hci;
+ this.rows = list;
+ this.tableName = tableName;
+ this.pool = pool;
+ this.results = results;
+ this.callback = callback;
+ this.toReplay = new ArrayList<Action<R>>();
+ this.inProgress =
+ new LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>();
+ this.curNumRetries = 0;
+ }
+
+
+ /**
+ * Group a list of actions per region servers, and send them. The created MultiActions are
+ * added to the inProgress list.
+ * @param actionsList
+ * @param sleepTime - sleep time before actually executing the actions. Can be zero.
+ * @throws IOException - if we can't locate a region after multiple retries.
+ */
+ private void submit(List<Action<R>> actionsList, final long sleepTime) throws IOException {
+ // group per location => regions server
+ final Map<HRegionLocation, MultiAction<R>> actionsByServer =
+ new HashMap<HRegionLocation, MultiAction<R>>();
+ for (Action<R> aAction : actionsList) {
+ final Row row = aAction.getAction();
+
+ if (row != null) {
+ final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow());
+ if (loc == null) {
+ throw new IOException("No location found, aborting submit.");
+ }
+
+ final byte[] regionName = loc.getRegionInfo().getRegionName();
+ MultiAction<R> actions = actionsByServer.get(loc);
+ if (actions == null) {
+ actions = new MultiAction<R>();
+ actionsByServer.put(loc, actions);
+ }
+ actions.add(regionName, aAction);
+ }
+ }
+
+ // Send the queries and add them to the inProgress list
+ for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
+ Callable<MultiResponse> callable =
+ createDelayedCallable(sleepTime, e.getKey(), e.getValue());
+ if (LOG.isTraceEnabled() && (sleepTime > 0)) {
+ StringBuilder sb = new StringBuilder();
+ for (Action<R> action : e.getValue().allActions()) {
+ sb.append(Bytes.toStringBinary(action.getAction().getRow()) + ";");
+ }
+ LOG.trace("Sending requests to [" + e.getKey().getHostnamePort()
+ + "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]");
+ }
+ Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
+ new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
+ e.getValue(), e.getKey(), this.pool.submit(callable));
+ this.inProgress.addLast(p);
+ }
+ }
+
+ /**
+ * Resubmit the actions which have failed, after a sleep time.
+ * @throws IOException
+ */
+ private void doRetry() throws IOException{
+ // curNumRetries at this point is 1 or more; decrement to start from 0.
+ final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries - 1);
+ submit(this.toReplay, sleepTime);
+ this.toReplay.clear();
+ }
+
+ /**
+ * Parameterized batch processing, allowing varying return types for
+ * different {@link Row} implementations.
+ * Throws an exception on error. If there are no exceptions, it means that the 'results'
+ * array is clean.
+ */
+ private void processBatchCallback() throws IOException, InterruptedException {
+ if (this.results.length != this.rows.size()) {
+ throw new IllegalArgumentException(
+ "argument results (size="+results.length+") must be the same size as " +
+ "argument list (size="+this.rows.size()+")");
+ }
+ if (this.rows.isEmpty()) {
+ return;
+ }
+
+ boolean isTraceEnabled = LOG.isTraceEnabled();
+ BatchErrors errors = new BatchErrors();
+ BatchErrors retriedErrors = null;
+ if (isTraceEnabled) {
+ retriedErrors = new BatchErrors();
+ }
+
+ // We keep the number of retry per action.
+ int[] nbRetries = new int[this.results.length];
+
+ // Build the action list. This list won't change after being created, hence the
+ // indexes will remain constant, allowing a direct lookup.
+ final List<Action<R>> listActions = new ArrayList<Action<R>>(this.rows.size());
+ for (int i = 0; i < this.rows.size(); i++) {
+ Action<R> action = new Action<R>(this.rows.get(i), i);
+ listActions.add(action);
+ }
+
+ // execute the actions. We will analyze and resubmit the actions in a 'while' loop.
+ submit(listActions, 0);
+
+ // LastRetry is true if, either:
+ // we had an exception 'DoNotRetry'
+ // we had more than numRetries for any action
+ // In this case, we will finish the current retries but we won't start new ones.
+ boolean lastRetry = false;
+ // despite its name numRetries means number of tries. So if numRetries == 1 it means we
+ // won't retry. And we compare vs. 2 in case someone set it to zero.
+ boolean noRetry = (hci.numRetries < 2);
+
+ // Analyze and resubmit until all actions are done successfully or failed after numRetries
+ while (!this.inProgress.isEmpty()) {
+ // We need the original multi action to find out what actions to replay if
+ // we have a 'total' failure of the Future<MultiResponse>
+ // We need the HRegionLocation as we give it back if we go out of retries
+ Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> currentTask =
+ removeFirstDone();
+
+ // Get the answer, keep the exception if any as we will use it for the analysis
+ MultiResponse responses = null;
+ ExecutionException exception = null;
+ try {
+ responses = currentTask.getThird().get();
+ } catch (ExecutionException e) {
+ exception = e;
+ }
+
+ // Error case: no result at all for this multi action. We need to redo all actions
+ if (responses == null) {
+ for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
+ for (Action<R> action : actions) {
+ Row row = action.getAction();
+ hci.updateCachedLocations(tableName, row, exception, currentTask.getSecond());
+ if (noRetry) {
+ errors.add(exception, row, currentTask);
+ } else {
+ if (isTraceEnabled) {
+ retriedErrors.add(exception, row, currentTask);
+ }
+ lastRetry = addToReplay(nbRetries, action);
+ }
+ }
+ }
+ } else { // Success or partial success
+ // Analyze detailed results. We can still have individual failures to be redo.
+ // two specific exceptions are managed:
+ // - DoNotRetryIOException: we continue to retry for other actions
+ // - RegionMovedException: we update the cache with the new region location
+ for (Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
+ responses.getResults().entrySet()) {
+ for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
+ Action<R> correspondingAction = listActions.get(regionResult.getFirst());
+ Object result = regionResult.getSecond();
+ this.results[correspondingAction.getOriginalIndex()] = result;
+
+ // Failure: retry if it's make sense else update the errors lists
+ if (result == null || result instanceof Throwable) {
+ Row row = correspondingAction.getAction();
+ hci.updateCachedLocations(this.tableName, row, result, currentTask.getSecond());
+ if (result instanceof DoNotRetryIOException || noRetry) {
+ errors.add((Exception)result, row, currentTask);
+ } else {
+ if (isTraceEnabled) {
+ retriedErrors.add((Exception)result, row, currentTask);
+ }
+ lastRetry = addToReplay(nbRetries, correspondingAction);
+ }
+ } else // success
+ if (callback != null) {
+ this.callback.update(resultsForRS.getKey(),
+ this.rows.get(regionResult.getFirst()).getRow(),
+ (R) result);
+ }
+ }
+ }
+ }
+
+ // Retry all actions in toReplay then clear it.
+ if (!noRetry && !toReplay.isEmpty()) {
+ if (isTraceEnabled) {
+ LOG.trace("Retrying due to errors: " + retriedErrors.getDescriptionAndClear());
+ }
+ doRetry();
+ if (lastRetry) {
+ if (isTraceEnabled) {
+ LOG.trace("No more retries");
+ }
+ noRetry = true;
+ }
+ }
+ }
+
+ errors.rethrowIfAny();
+ }
+
+
+ private class BatchErrors {
+ private List<Throwable> exceptions = new ArrayList<Throwable>();
+ private List<Row> actions = new ArrayList<Row>();
+ private List<String> addresses = new ArrayList<String>();
+
+ public void add(Exception ex, Row row,
+ Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> obj) {
+ exceptions.add(ex);
+ actions.add(row);
+ addresses.add(obj.getSecond().getHostnamePort());
+ }
+
+ public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
+ if (!exceptions.isEmpty()) {
+ throw makeException();
+ }
+ }
+
+ public String getDescriptionAndClear(){
+ if (exceptions.isEmpty()) {
+ return "";
+ }
+ String result = makeException().getMessage();
+ exceptions.clear();
+ actions.clear();
+ addresses.clear();
+ return result;
+ }
+
+ private RetriesExhaustedWithDetailsException makeException() {
+ return new RetriesExhaustedWithDetailsException(exceptions, actions, addresses);
+ }
+ }
+
+
+
+ /**
+ * Put the action that has to be retried in the Replay list.
+ * @return true if we're out of numRetries and it's the last retry.
+ */
+ private boolean addToReplay(int[] nbRetries, Action<R> action) {
+ this.toReplay.add(action);
+ nbRetries[action.getOriginalIndex()]++;
+ if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
+ this.curNumRetries = nbRetries[action.getOriginalIndex()];
+ }
+ // numRetries means number of tries, while curNumRetries means current number of retries. So
+ // we need to add 1 to make them comparable. And as we look for the last try we compare
+ // with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want
+ // to initialize it to 1.
+ return ( (this.curNumRetries +1) >= hci.numRetries);
+ }
+
+ /**
+ * Wait for one of tasks to be done, and remove it from the list.
+ * @return the tasks done.
+ */
+ private Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
+ removeFirstDone() throws InterruptedException {
+ while (true) {
+ synchronized (finishedTasks) {
+ if (!finishedTasks.isEmpty()) {
+ MultiAction<R> done = finishedTasks.remove(finishedTasks.size() - 1);
+
+ // We now need to remove it from the inProgress part.
+ Iterator<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> it =
+ inProgress.iterator();
+ while (it.hasNext()) {
+ Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> task = it.next();
+ if (task.getFirst() == done) { // We have the exact object. No java equals here.
+ it.remove();
+ return task;
+ }
+ }
+ LOG.error("Development error: We didn't see a task in the list. " +
+ done.getRegions());
+ }
+ finishedTasks.wait(10);
+ }
+ }
+ }
+
+ private Callable<MultiResponse> createDelayedCallable(
+ final long delay, final HRegionLocation loc, final MultiAction<R> multi) {
+
+ final Callable<MultiResponse> delegate = hci.createCallable(loc, multi, tableName);
+
+ return new Callable<MultiResponse>() {
+ private final long creationTime = System.currentTimeMillis();
+
+ @Override
+ public MultiResponse call() throws Exception {
+ try {
+ final long waitingTime = delay + creationTime - System.currentTimeMillis();
+ if (waitingTime > 0) {
+ Thread.sleep(waitingTime);
+ }
+ return delegate.call();
+ } finally {
+ synchronized (finishedTasks) {
+ finishedTasks.add(multi);
+ finishedTasks.notifyAll();
+ }
+ }
+ }
+ };
+ }
+ }
+
+ /*
+ * Return the number of cached region for a table. It will only be called
+ * from a unit test.
+ */
[... 255 lines stripped ...]