You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2011/10/26 06:05:17 UTC
svn commit: r1189019 [1/2] - in /hbase/branches/0.90: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/catalog/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/client/replication/ src/m...
Author: tedyu
Date: Wed Oct 26 04:05:15 2011
New Revision: 1189019
URL: http://svn.apache.org/viewvc?rev=1189019&view=rev
Log:
HBASE-4508 Backport HBASE-3777 to 0.90 branch (Bright Fulton)
Modified:
hbase/branches/0.90/CHANGES.txt
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
hbase/branches/0.90/src/main/resources/hbase-default.xml
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java
hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java
Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Wed Oct 26 04:05:15 2011
@@ -85,6 +85,7 @@ Release 0.90.5 - Unreleased
(bluedavy via Lars H)
HBASE-4378 [hbck] Does not complain about regions with startkey==endkey.
(Jonathan Hsieh)
+ HBASE-4508 Backport HBASE-3777 to 0.90 branch (Bright Fulton)
IMPROVEMENT
HBASE-4205 Enhance HTable javadoc (Eric Charles)
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Oct 26 04:05:15 2011
@@ -93,14 +93,29 @@ public final class HConstants {
/** Name of ZooKeeper config file in conf/ directory. */
public static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg";
- /** default client port that the zookeeper listens on */
+ /** Parameter name for the client port that the zookeeper listens on */
+ public static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
+
+ /** Default client port that the zookeeper listens on */
public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181;
+ /** Parameter name for the wait time for the recoverable zookeeper */
+ public static final String ZOOKEEPER_RECOVERABLE_WAITTIME = "hbase.zookeeper.recoverable.waittime";
+
+ /** Default wait time for the recoverable zookeeper */
+ public static final long DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME = 10000;
+
/** Parameter name for the root dir in ZK for this cluster */
public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
public static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase";
+ /** Parameter name for the limit on concurrent client-side zookeeper connections */
+ public static final String ZOOKEEPER_MAX_CLIENT_CNXNS = "hbase.zookeeper.property.maxClientCnxns";
+
+ /** Default limit on concurrent client-side zookeeper connections */
+ public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 30;
+
/** Parameter name for port region server listens on. */
public static final String REGIONSERVER_PORT = "hbase.regionserver.port";
@@ -331,6 +346,80 @@ public final class HConstants {
*/
public static long DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE = Long.MAX_VALUE;
+ /**
+ * Parameter name for client pause value, used mostly as value to wait
+ * before running a retry of a failed get, region lookup, etc.
+ */
+ public static String HBASE_CLIENT_PAUSE = "hbase.client.pause";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_PAUSE}.
+ */
+ public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000;
+
+ /**
+ * Parameter name for maximum retries, used as maximum for all retryable
+ * operations such as fetching of the root region from root region server,
+ * getting a cell's value, starting a row update, etc.
+ */
+ public static String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}.
+ */
+ public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 10;
+
+ /**
+ * Parameter name for maximum attempts, used to limit the number of times the
+ * client will try to obtain the proxy for a given region server.
+ */
+ public static String HBASE_CLIENT_RPC_MAXATTEMPTS = "hbase.client.rpc.maxattempts";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_RPC_MAXATTEMPTS}.
+ */
+ public static int DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS = 1;
+
+ /**
+ * Parameter name for client prefetch limit, used as the maximum number of regions
+ * info that will be prefetched.
+ */
+ public static String HBASE_CLIENT_PREFETCH_LIMIT = "hbase.client.prefetch.limit";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_PREFETCH_LIMIT}.
+ */
+ public static int DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT = 10;
+
+ /**
+ * Parameter name for number of rows that will be fetched when calling next on
+ * a scanner if it is not served from memory. Higher caching values will
+ * enable faster scanners but will eat up more memory and some calls of next
+ * may take longer and longer times when the cache is empty.
+ */
+ public static String HBASE_META_SCANNER_CACHING = "hbase.meta.scanner.caching";
+
+ /**
+ * Default value of {@link #HBASE_META_SCANNER_CACHING}.
+ */
+ public static int DEFAULT_HBASE_META_SCANNER_CACHING = 100;
+
+ /**
+ * Parameter name for unique identifier for this {@link Configuration}
+ * instance. If there are two or more {@link Configuration} instances that,
+ * for all intents and purposes, are the same except for their instance ids,
+ * then they will not be able to share the same {@link Connection} instance.
+ * On the other hand, even if the instance ids are the same, it could result
+ * in non-shared {@link Connection} instances if some of the other connection
+ * parameters differ.
+ */
+ public static String HBASE_CLIENT_INSTANCE_ID = "hbase.client.instance.id";
+
+ /**
+ * If this parameter is set true then {@link HConnectionManager} will not share
+ * {@link HConnection} instances with different {@link Configuration} instances.
+ */
+ public static String HBASE_CONNECTION_PER_CONFIG = "hbase.connection.per.config";
/**
* HRegion server lease period in milliseconds. Clients must report in within this period
@@ -339,12 +428,11 @@ public final class HConstants {
public static String HBASE_REGIONSERVER_LEASE_PERIOD_KEY =
"hbase.regionserver.lease.period";
-
/**
* Default value of {@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}.
*/
public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
-
+
/**
* timeout for each RPC
*/
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java Wed Oct 26 04:05:15 2011
@@ -29,12 +29,14 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
@@ -58,6 +60,7 @@ import org.apache.hadoop.ipc.RemoteExcep
*/
public class CatalogTracker {
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
+ private final Configuration conf;
private final HConnection connection;
private final ZooKeeperWatcher zookeeper;
private final RootRegionTracker rootRegionTracker;
@@ -79,15 +82,18 @@ public class CatalogTracker {
HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
/**
- * Constructs a catalog tracker. Find current state of catalog tables and
- * begin active tracking by executing {@link #start()} post construction.
- * Does not timeout.
- * @param connection Server connection; if problem, this connections
- * {@link HConnection#abort(String, Throwable)} will be called.
- * @throws IOException
+ * Constructs a catalog tracker. Find current state of catalog tables and
+ * begin active tracking by executing {@link #start()} post construction. Does
+ * not timeout.
+ *
+ * @param conf
+ * the {@link Configuration} from which a {@link HConnection} will be
+ * obtained; if problem, this connections
+ * {@link HConnection#abort(String, Throwable)} will be called.
+ * @throws IOException
*/
- public CatalogTracker(final HConnection connection) throws IOException {
- this(connection.getZooKeeperWatcher(), connection, connection);
+ public CatalogTracker(final Configuration conf) throws IOException {
+ this(null, conf, null);
}
/**
@@ -99,10 +105,10 @@ public class CatalogTracker {
* @param abortable if fatal exception
* @throws IOException
*/
- public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
+ public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
final Abortable abortable)
throws IOException {
- this(zk, connection, abortable, 0);
+ this(zk, conf, abortable, 0);
}
/**
@@ -115,11 +121,21 @@ public class CatalogTracker {
* ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
* @throws IOException
*/
- public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
- final Abortable abortable, final int defaultTimeout)
+ public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
+ Abortable abortable, final int defaultTimeout)
+ throws IOException {
+ this(zk, conf, HConnectionManager.getConnection(conf), abortable, defaultTimeout);
+ }
+
+ CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
+ HConnection connection, Abortable abortable, final int defaultTimeout)
throws IOException {
- this.zookeeper = zk;
+ this.conf = conf;
this.connection = connection;
+ this.zookeeper = (zk == null) ? this.connection.getZooKeeperWatcher() : zk;
+ if (abortable == null) {
+ abortable = this.connection;
+ }
this.rootRegionTracker = new RootRegionTracker(zookeeper, abortable);
this.metaNodeTracker = new MetaNodeTracker(zookeeper, this, abortable);
this.defaultTimeout = defaultTimeout;
@@ -143,13 +159,24 @@ public class CatalogTracker {
* Interrupts any ongoing waits.
*/
public void stop() {
- LOG.debug("Stopping catalog tracker " + this);
- this.stopped = true;
- this.rootRegionTracker.stop();
- this.metaNodeTracker.stop();
- // Call this and it will interrupt any ongoing waits on meta.
- synchronized (this.metaAvailable) {
- this.metaAvailable.notifyAll();
+ if (!this.stopped) {
+ LOG.debug("Stopping catalog tracker " + this);
+ this.stopped = true;
+ this.rootRegionTracker.stop();
+ this.metaNodeTracker.stop();
+ try {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ } catch (IOException e) {
+ // Although the {@link Closeable} interface throws an {@link
+ // IOException}, in reality, the implementation would never do that.
+ LOG.error("Attempt to close catalog tracker's connection failed.", e);
+ }
+ // Call this and it will interrupt any ongoing waits on meta.
+ synchronized (this.metaAvailable) {
+ this.metaAvailable.notifyAll();
+ }
}
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Wed Oct 26 04:05:15 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
@@ -65,10 +66,10 @@ import org.apache.hadoop.util.StringUtil
* <p>Currently HBaseAdmin instances are not expected to be long-lived. For
* example, an HBaseAdmin instance will not ride over a Master restart.
*/
-public class HBaseAdmin implements Abortable {
+public class HBaseAdmin implements Abortable, Closeable {
private final Log LOG = LogFactory.getLog(this.getClass().getName());
// private final HConnection connection;
- final HConnection connection;
+ private final HConnection connection;
private volatile Configuration conf;
private final long pause;
private final int numRetries;
@@ -105,9 +106,7 @@ public class HBaseAdmin implements Abort
throws ZooKeeperConnectionException, IOException {
CatalogTracker ct = null;
try {
- HConnection connection =
- HConnectionManager.getConnection(this.conf);
- ct = new CatalogTracker(connection);
+ ct = new CatalogTracker(this.conf);
ct.start();
} catch (InterruptedException e) {
// Let it out as an IOE for now until we redo all so tolerate IEs
@@ -1257,4 +1256,10 @@ public class HBaseAdmin implements Abort
HBaseAdmin admin = new HBaseAdmin(copyOfConf);
HConnectionManager.deleteConnection(admin.getConfiguration(), false);
}
+
+ public void close() throws IOException {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed Oct 26 04:05:15 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.zookeeper
*
* @see HConnectionManager
*/
-public interface HConnection extends Abortable {
+public interface HConnection extends Abortable, Closeable {
/**
* @return Configuration instance being used by this HConnection instance.
*/
@@ -296,4 +297,4 @@ public interface HConnection extends Abo
*/
public void prewarmRegionCache(final byte[] tableName,
final Map<HRegionInfo, HServerAddress> regions);
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Oct 26 04:05:15 2011
@@ -19,10 +19,12 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -43,6 +45,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
+import com.google.common.collect.ImmutableMap;
+
/**
* A non-instantiable class that manages {@link HConnection}s.
* This class has a static Map of {@link HConnection} instances keyed by
@@ -118,19 +123,30 @@ import org.apache.zookeeper.KeeperExcept
*/
@SuppressWarnings("serial")
public class HConnectionManager {
- static final int MAX_CACHED_HBASE_INSTANCES = 2001;
+ // A LRU Map of HConnectionKey -> HConnection (TableServer).
+ private static final Map<HConnectionKey, HConnectionImplementation> HBASE_INSTANCES;
- // A LRU Map of Configuration hashcode -> TableServers. We set instances to 31.
- // The zk default max connections to the ensemble from the one client is 30 so
- // should run into zk issues before hit this value of 31.
- private static final Map<Configuration, HConnectionImplementation> HBASE_INSTANCES =
- new LinkedHashMap<Configuration, HConnectionImplementation>
- ((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) {
- @Override
- protected boolean removeEldestEntry(Map.Entry<Configuration, HConnectionImplementation> eldest) {
- return size() > MAX_CACHED_HBASE_INSTANCES;
- }
- };
+ public static final int MAX_CACHED_HBASE_INSTANCES;
+
+ static {
+ // We set instances to one more than the value specified for {@link
+ // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
+ // connections to the ensemble from the one client is 30, so in that case we
+ // should run into zk issues before the LRU hit this value of 31.
+ MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt(
+ HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
+ HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
+ HBASE_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
+ (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) {
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
+ return size() > MAX_CACHED_HBASE_INSTANCES;
+ }
+ };
+ }
+
+ static final Log LOG = LogFactory.getLog(HConnectionManager.class);
/*
* Non-instantiable.
@@ -150,33 +166,34 @@ public class HConnectionManager {
*/
public static HConnection getConnection(Configuration conf)
throws ZooKeeperConnectionException {
- HConnectionImplementation connection;
+ HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (HBASE_INSTANCES) {
- connection = HBASE_INSTANCES.get(conf);
+ HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
if (connection == null) {
connection = new HConnectionImplementation(conf);
- HBASE_INSTANCES.put(conf, connection);
+ HBASE_INSTANCES.put(connectionKey, connection);
}
+ connection.incCount();
+ return connection;
}
- return connection;
}
/**
* Delete connection information for the instance specified by configuration.
- * This will close connection to the zookeeper ensemble and let go of all
- * resources.
- * @param conf configuration whose identity is used to find {@link HConnection}
- * instance.
- * @param stopProxy Shuts down all the proxy's put up to cluster members
- * including to cluster HMaster. Calls {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}.
+ * If there are no more references to it, this will then close connection to
+ * the zookeeper ensemble and let go of all resources.
+ *
+ * @param conf
+ * configuration whose identity is used to find {@link HConnection}
+ * instance.
+ * @param stopProxy
+ * Shuts down all the proxy's put up to cluster members including to
+ * cluster HMaster. Calls
+ * {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}
+ * .
*/
public static void deleteConnection(Configuration conf, boolean stopProxy) {
- synchronized (HBASE_INSTANCES) {
- HConnectionImplementation t = HBASE_INSTANCES.remove(conf);
- if (t != null) {
- t.close(stopProxy);
- }
- }
+ deleteConnection(new HConnectionKey(conf), stopProxy);
}
/**
@@ -186,9 +203,38 @@ public class HConnectionManager {
*/
public static void deleteAllConnections(boolean stopProxy) {
synchronized (HBASE_INSTANCES) {
- for (HConnectionImplementation t : HBASE_INSTANCES.values()) {
- if (t != null) {
- t.close(stopProxy);
+ Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
+ connectionKeys.addAll(HBASE_INSTANCES.keySet());
+ for (HConnectionKey connectionKey : connectionKeys) {
+ deleteConnection(connectionKey, stopProxy);
+ }
+ HBASE_INSTANCES.clear();
+ }
+ }
+
+ private static void deleteConnection(HConnection connection, boolean stopProxy) {
+ synchronized (HBASE_INSTANCES) {
+ for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
+ .entrySet()) {
+ if (connectionEntry.getValue() == connection) {
+ deleteConnection(connectionEntry.getKey(), stopProxy);
+ break;
+ }
+ }
+ }
+ }
+
+ private static void deleteConnection(HConnectionKey connectionKey, boolean stopProxy) {
+ synchronized (HBASE_INSTANCES) {
+ HConnectionImplementation connection = HBASE_INSTANCES
+ .get(connectionKey);
+ if (connection != null) {
+ connection.decCount();
+ if (connection.isZeroReference()) {
+ HBASE_INSTANCES.remove(connectionKey);
+ connection.close(stopProxy);
+ } else if (stopProxy) {
+ connection.stopProxyOnClose(stopProxy);
}
}
}
@@ -201,10 +247,15 @@ public class HConnectionManager {
* @throws ZooKeeperConnectionException
*/
static int getCachedRegionCount(Configuration conf,
- byte[] tableName)
- throws ZooKeeperConnectionException {
- HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
- return connection.getNumberOfCachedRegionLocations(tableName);
+ final byte[] tableName)
+ throws IOException {
+ return execute(new HConnectable<Integer>(conf) {
+ @Override
+ public Integer connect(HConnection connection) {
+ return ((HConnectionImplementation) connection)
+ .getNumberOfCachedRegionLocations(tableName);
+ }
+ });
}
/**
@@ -214,13 +265,162 @@ public class HConnectionManager {
* @throws ZooKeeperConnectionException
*/
static boolean isRegionCached(Configuration conf,
- byte[] tableName, byte[] row) throws ZooKeeperConnectionException {
- HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
- return connection.isRegionCached(tableName, row);
+ final byte[] tableName, final byte[] row) throws IOException {
+ return execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) {
+ return ((HConnectionImplementation) connection).isRegionCached(tableName, row);
+ }
+ });
+ }
+
+ /**
+ * This class makes it convenient for one to execute a command in the context
+ * of a {@link HConnection} instance based on the given {@link Configuration}.
+ *
+ * <p>
+ * If you find yourself wanting to use a {@link Connection} for a relatively
+ * short duration of time, and do not want to deal with the hassle of creating
+ * and cleaning up that resource, then you should consider using this
+ * convenience class.
+ *
+ * @param <T>
+ * the return type of the {@link HConnectable#connect(HConnection)}
+ * method.
+ */
+ public static abstract class HConnectable<T> {
+ public Configuration conf;
+
+ public HConnectable(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public abstract T connect(HConnection connection) throws IOException;
+ }
+
+ /**
+ * This convenience method invokes the given {@link HConnectable#connect}
+ * implementation using a {@link HConnection} instance that lasts just for the
+ * duration of that invocation.
+ *
+ * @param <T> the return type of the connect method
+ * @param connectable the {@link HConnectable} instance
+ * @return the value returned by the connect method
+ * @throws IOException
+ */
+ public static <T> T execute(HConnectable<T> connectable) throws IOException {
+ if (connectable == null || connectable.conf == null) {
+ return null;
+ }
+ Configuration conf = connectable.conf;
+ HConnection connection = HConnectionManager.getConnection(conf);
+ boolean connectSucceeded = false;
+ try {
+ T returnValue = connectable.connect(connection);
+ connectSucceeded = true;
+ return returnValue;
+ } finally {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ if (connectSucceeded) {
+ throw new IOException("The connection to " + connection
+ + " could not be deleted.", e);
+ } else {
+ LOG.warn("Masking close error as the connectable block threw one itself.");
+ }
+ }
+ }
+ }
+
+ /**
+ * Denotes a unique key to a {@link HConnection} instance.
+ *
+ * In essence, this class captures the properties in {@link Configuration}
+ * that may be used in the process of establishing a connection. In light of
+ * that, if any new such properties are introduced into the mix, they must be
+ * added to the {@link HConnectionKey#properties} list.
+ *
+ */
+ static class HConnectionKey {
+ public static String[] CONNECTION_PROPERTIES = new String[] {
+ HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.ZOOKEEPER_CLIENT_PORT,
+ HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
+ HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.HBASE_META_SCANNER_CACHING,
+ HConstants.HBASE_CLIENT_INSTANCE_ID };
+
+ private Map<String, String> properties;
+
+ public HConnectionKey(Configuration conf) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ if (conf != null) {
+ if (conf.getBoolean(HConstants.HBASE_CONNECTION_PER_CONFIG, false)) {
+ builder.put(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(System.identityHashCode(conf)));
+ } else {
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = conf.get(property);
+ if (value != null) {
+ builder.put(property, value);
+ }
+ }
+ }
+ }
+ this.properties = builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = properties.get(property);
+ if (value != null) {
+ result = prime * result + value.hashCode();
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HConnectionKey that = (HConnectionKey) obj;
+ if (this.properties == null) {
+ if (that.properties != null) {
+ return false;
+ }
+ } else {
+ if (that.properties == null) {
+ return false;
+ }
+ for (String property : CONNECTION_PROPERTIES) {
+ String thisValue = this.properties.get(property);
+ String thatValue = that.properties.get(property);
+ if (thisValue == thatValue) {
+ continue;
+ }
+ if (thisValue == null || !thisValue.equals(thatValue)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
/* Encapsulates connection to zookeeper and regionservers.*/
- static class HConnectionImplementation implements HConnection {
+ static class HConnectionImplementation implements HConnection, Closeable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
private final long pause;
@@ -263,6 +463,10 @@ public class HConnectionManager {
private final Set<Integer> regionCachePrefetchDisabledTables =
new CopyOnWriteArraySet<Integer>();
+ private boolean stopProxy;
+ private int refCount;
+
+
/**
* constructor
* @param conf Configuration object
@@ -282,15 +486,19 @@ public class HConnectionManager {
"Unable to find region server interface " + serverClassName, e);
}
- this.pause = conf.getLong("hbase.client.pause", 1000);
- this.numRetries = conf.getInt("hbase.client.retries.number", 10);
- this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ this.maxRPCAttempts = conf.getInt(
+ HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+ HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS);
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
- this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
- 10);
+ this.prefetchRegionLimit = conf.getInt(
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
setupZookeeperTrackers();
@@ -1050,28 +1258,6 @@ public class HConnectionManager {
}
}
- void close(boolean stopProxy) {
- if (master != null) {
- if (stopProxy) {
- HBaseRPC.stopProxy(master);
- }
- master = null;
- masterChecked = false;
- }
- if (stopProxy) {
- for (HRegionInterface i: servers.values()) {
- HBaseRPC.stopProxy(i);
- }
- }
- if (this.zooKeeper != null) {
- LOG.info("Closed zookeeper sessionid=0x" +
- Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
- this.zooKeeper.close();
- this.zooKeeper = null;
- }
- this.closed = true;
- }
-
private Callable<MultiResponse> createCallable(
final HServerAddress address,
final MultiAction multi,
@@ -1352,5 +1538,85 @@ public class HConnectionManager {
else LOG.fatal(msg);
this.closed = true;
}
+
+ public void stopProxyOnClose(boolean stopProxy) {
+ this.stopProxy = stopProxy;
+ }
+
+ /**
+ * Increment this client's reference count.
+ */
+ void incCount() {
+ ++refCount;
+ }
+
+ /**
+ * Decrement this client's reference count.
+ */
+ void decCount() {
+ if (refCount > 0) {
+ --refCount;
+ }
+ }
+
+ /**
+ * Return if this client has no reference
+ *
+ * @return true if this client has no reference; false otherwise
+ */
+ boolean isZeroReference() {
+ return refCount == 0;
+ }
+
+ void close(boolean stopProxy) {
+ if (this.closed) {
+ return;
+ }
+ if (master != null) {
+ if (stopProxy) {
+ HBaseRPC.stopProxy(master);
+ }
+ master = null;
+ masterChecked = false;
+ }
+ if (stopProxy) {
+ for (HRegionInterface i : servers.values()) {
+ HBaseRPC.stopProxy(i);
+ }
+ }
+ this.servers.clear();
+ if (this.zooKeeper != null) {
+ LOG.info("Closed zookeeper sessionid=0x"
+ + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
+ this.zooKeeper.close();
+ this.zooKeeper = null;
+ }
+ this.closed = true;
+ }
+
+ public void close() {
+ HConnectionManager.deleteConnection(this, stopProxy);
+ LOG.debug("The connection to " + this.zooKeeper + " has been closed.");
+ }
+
+ /**
+ * Close the connection for good, regardless of what the current value of
+ * {@link #refCount} is. Ideally, {@link refCount} should be zero at this
+ * point, which would be the case if all of its consumers close the
+ * connection. However, on the off chance that someone is unable to close
+ * the connection, perhaps because it bailed out prematurely, the method
+ * below will ensure that this {@link Connection} instance is cleaned up.
+ * Caveat: The JVM may take an unknown amount of time to call finalize on an
+ * unreachable object, so our hope is that every consumer cleans up after
+ * itself, like any good citizen.
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ // Pretend as if we are about to release the last remaining reference
+ refCount = 1;
+ close();
+ LOG.debug("The connection to " + this.zooKeeper
+ + " was closed by the finalize method.");
+ }
}
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Oct 26 04:05:15 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -89,13 +91,17 @@ import org.apache.zookeeper.KeeperExcept
* {@link HTable} passing a new {@link Configuration} instance that has the
* new configuration.
*
+ * <p>Note that this class implements the {@link Closeable} interface. When a
+ * HTable instance is no longer required, it *should* be closed in order to ensure
+ * that the underlying resources are promptly released.
+ *
* @see HBaseAdmin for create, drop, list, enable and disable of tables.
* @see HConnection
* @see HConnectionManager
*/
-public class HTable implements HTableInterface {
+public class HTable implements HTableInterface, Closeable {
private static final Log LOG = LogFactory.getLog(HTable.class);
- private final HConnection connection;
+ private HConnection connection;
private final byte [] tableName;
protected final int scannerTimeout;
private volatile Configuration configuration;
@@ -108,6 +114,7 @@ public class HTable implements HTableInt
private int maxKeyValueSize;
private ExecutorService pool; // For Multi
private long maxScannerResultSize;
+ private boolean closed;
private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts.
/**
@@ -205,6 +212,7 @@ public class HTable implements HTableInt
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory());
((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
+ this.closed = false;
}
/**
@@ -268,9 +276,14 @@ public class HTable implements HTableInt
* @return {@code true} if table is online.
* @throws IOException if a remote or network exception occurs
*/
- public static boolean isTableEnabled(Configuration conf, byte[] tableName)
- throws IOException {
- return HConnectionManager.getConnection(conf).isTableEnabled(tableName);
+ public static boolean isTableEnabled(Configuration conf,
+ final byte[] tableName) throws IOException {
+ return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.isTableEnabled(tableName);
+ }
+ });
}
/**
@@ -858,8 +871,15 @@ public class HTable implements HTableInt
@Override
public void close() throws IOException {
+ if (this.closed) {
+ return;
+ }
flushCommits();
this.pool.shutdown();
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ this.closed = true;
}
// validate for well-formedness
@@ -1346,12 +1366,18 @@ public class HTable implements HTableInt
* @param tableName name of table to configure.
* @param enable Set to true to enable region cache prefetch. Or set to
* false to disable it.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
public static void setRegionCachePrefetch(final byte[] tableName,
- boolean enable) throws ZooKeeperConnectionException {
- HConnectionManager.getConnection(HBaseConfiguration.create()).
- setRegionCachePrefetch(tableName, enable);
+ final boolean enable) throws IOException {
+ HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
+ .create()) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ connection.setRegionCachePrefetch(tableName, enable);
+ return null;
+ }
+ });
}
/**
@@ -1362,12 +1388,17 @@ public class HTable implements HTableInt
* @param tableName name of table to configure.
* @param enable Set to true to enable region cache prefetch. Or set to
* false to disable it.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
public static void setRegionCachePrefetch(final Configuration conf,
- final byte[] tableName, boolean enable) throws ZooKeeperConnectionException {
- HConnectionManager.getConnection(conf).setRegionCachePrefetch(
- tableName, enable);
+ final byte[] tableName, final boolean enable) throws IOException {
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ connection.setRegionCachePrefetch(tableName, enable);
+ return null;
+ }
+ });
}
/**
@@ -1376,12 +1407,16 @@ public class HTable implements HTableInt
* @param tableName name of table to check
* @return true if table's region cache prefecth is enabled. Otherwise
* it is disabled.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
public static boolean getRegionCachePrefetch(final Configuration conf,
- final byte[] tableName) throws ZooKeeperConnectionException {
- return HConnectionManager.getConnection(conf).getRegionCachePrefetch(
- tableName);
+ final byte[] tableName) throws IOException {
+ return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.getRegionCachePrefetch(tableName);
+ }
+ });
}
/**
@@ -1389,10 +1424,14 @@ public class HTable implements HTableInt
* @param tableName name of table to check
* @return true if table's region cache prefecth is enabled. Otherwise
* it is disabled.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
- public static boolean getRegionCachePrefetch(final byte[] tableName) throws ZooKeeperConnectionException {
- return HConnectionManager.getConnection(HBaseConfiguration.create()).
- getRegionCachePrefetch(tableName);
+ public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
+ return HConnectionManager.execute(new HConnectable<Boolean>(HBaseConfiguration.create()) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.getRegionCachePrefetch(tableName);
+ }
+ });
}
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java Wed Oct 26 04:05:15 2011
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@@ -41,7 +43,7 @@ import org.apache.hadoop.hbase.util.Byte
*
* <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}.
*/
-public class HTablePool {
+public class HTablePool implements Closeable {
private final ConcurrentMap<String, LinkedList<HTableInterface>> tables =
new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
private final Configuration config;
@@ -68,7 +70,7 @@ public class HTablePool {
final HTableInterfaceFactory tableFactory) {
// Make a new configuration instance so I can safely cleanup when
// done with the pool.
- this.config = config == null? new Configuration(): new Configuration(config);
+ this.config = config == null? new Configuration(): config;
this.maxSize = maxSize;
this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory;
}
@@ -151,7 +153,6 @@ public class HTablePool {
table = queue.poll();
}
}
- HConnectionManager.deleteConnection(this.config, true);
}
/**
@@ -163,10 +164,21 @@ public class HTablePool {
closeTablePool(Bytes.toString(tableName));
}
+ /**
+ * Closes all the HTable instances , belonging to all tables in the table pool.
+ * <p>
+ * Note: this is a 'shutdown' of all the table pools.
+ */
+ public void close() throws IOException {
+ for (String tableName : tables.keySet()) {
+ closeTablePool(tableName);
+ }
+ }
+
int getCurrentPoolSize(String tableName) {
Queue<HTableInterface> queue = tables.get(tableName);
synchronized(queue) {
return queue.size();
}
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Wed Oct 26 04:05:15 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
@@ -113,12 +114,25 @@ public class MetaScanner {
* @throws IOException e
*/
public static void metaScan(Configuration configuration,
+ final MetaScannerVisitor visitor, final byte[] tableName,
+ final byte[] row, final int rowLimit, final byte[] metaTableName)
+ throws IOException {
+ HConnectionManager.execute(new HConnectable<Void>(configuration) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ metaScan(conf, connection, visitor, tableName, row, rowLimit,
+ metaTableName);
+ return null;
+ }
+ });
+ }
+
+ private static void metaScan(Configuration configuration, HConnection connection,
MetaScannerVisitor visitor, byte [] tableName, byte[] row,
int rowLimit, final byte [] metaTableName)
throws IOException {
int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE;
- HConnection connection = HConnectionManager.getConnection(configuration);
// if row is not null, we want to use the startKey of the row's region as
// the startRow for the meta scan.
byte[] startRow;
@@ -158,8 +172,9 @@ public class MetaScanner {
// Scan over each meta region
ScannerCallable callable;
- int rows = Math.min(rowLimit,
- configuration.getInt("hbase.meta.scanner.caching", 100));
+ int rows = Math.min(rowLimit, configuration.getInt(
+ HConstants.HBASE_META_SCANNER_CACHING,
+ HConstants.DEFAULT_HBASE_META_SCANNER_CACHING));
do {
final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
if (LOG.isDebugEnabled()) {
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Wed Oct 26 04:05:15 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client.replication;
+import java.io.Closeable;
import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
@@ -63,9 +64,10 @@ import org.apache.zookeeper.KeeperExcept
* <code>replication</code>.
* </p>
*/
-public class ReplicationAdmin {
+public class ReplicationAdmin implements Closeable {
private final ReplicationZookeeper replicationZk;
+ private final Configuration configuration;
private final HConnection connection;
/**
@@ -79,6 +81,7 @@ public class ReplicationAdmin {
throw new RuntimeException("hbase.replication isn't true, please " +
"enable it in order to use replication");
}
+ this.configuration = conf;
this.connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
try {
@@ -169,4 +172,11 @@ public class ReplicationAdmin {
ReplicationZookeeper getReplicationZk() {
return replicationZk;
}
+
+ @Override
+ public void close() throws IOException {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java Wed Oct 26 04:05:15 2011
@@ -66,7 +66,7 @@ FileOutputFormat<ImmutableBytesWritable,
public void close(Reporter reporter)
throws IOException {
- m_table.flushCommits();
+ m_table.close();
}
public void write(ImmutableBytesWritable key,
@@ -103,4 +103,4 @@ FileOutputFormat<ImmutableBytesWritable,
throw new IOException("Must specify table name");
}
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Wed Oct 26 04:05:15 2011
@@ -104,13 +104,7 @@ implements Configurable {
@Override
public void close(TaskAttemptContext context)
throws IOException {
- table.flushCommits();
- // The following call will shutdown all connections to the cluster from
- // this JVM. It will close out our zk session otherwise zk wil log
- // expired sessions rather than closed ones. If any other HTable instance
- // running in this JVM, this next call will cause it damage. Presumption
- // is that the above this.table is only instance.
- HConnectionManager.deleteAllConnections(true);
+ table.close();
}
/**
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Wed Oct 26 04:05:15 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -87,12 +88,12 @@ public class VerifyReplication {
* @throws IOException When something is broken with the data.
*/
@Override
- public void map(ImmutableBytesWritable row, Result value,
+ public void map(ImmutableBytesWritable row, final Result value,
Context context)
throws IOException {
if (replicatedScanner == null) {
Configuration conf = context.getConfiguration();
- Scan scan = new Scan();
+ final Scan scan = new Scan();
scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
long startTime = conf.getLong(NAME + ".startTime", 0);
long endTime = conf.getLong(NAME + ".endTime", 0);
@@ -107,18 +108,23 @@ public class VerifyReplication {
scan.setTimeRange(startTime,
endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
}
- try {
- HConnection conn = HConnectionManager.getConnection(conf);
- ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
- conn.getZooKeeperWatcher());
- ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
- HTable replicatedTable = new HTable(peer.getConfiguration(),
- conf.get(NAME+".tableName"));
- scan.setStartRow(value.getRow());
- replicatedScanner = replicatedTable.getScanner(scan);
- } catch (KeeperException e) {
- throw new IOException("Got a ZK exception", e);
- }
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection conn) throws IOException {
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+ conn.getZooKeeperWatcher());
+ ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
+ HTable replicatedTable = new HTable(peer.getConfiguration(),
+ conf.get(NAME+".tableName"));
+ scan.setStartRow(value.getRow());
+ replicatedScanner = replicatedTable.getScanner(scan);
+ } catch (KeeperException e) {
+ throw new IOException("Got a ZK exception", e);
+ }
+ return null;
+ }
+ });
}
Result res = replicatedScanner.next();
try {
@@ -151,20 +157,25 @@ public class VerifyReplication {
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
throw new IOException("Replication needs to be enabled to verify it.");
}
- try {
- HConnection conn = HConnectionManager.getConnection(conf);
- ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
- conn.getZooKeeperWatcher());
- // Just verifying it we can connect
- ReplicationPeer peer = zk.getPeer(peerId);
- if (peer == null) {
- throw new IOException("Couldn't get access to the slave cluster," +
- "please see the log");
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection conn) throws IOException {
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+ conn.getZooKeeperWatcher());
+ // Just verifying it we can connect
+ ReplicationPeer peer = zk.getPeer(peerId);
+ if (peer == null) {
+ throw new IOException("Couldn't get access to the slave cluster," +
+ "please see the log");
+ }
+ } catch (KeeperException ex) {
+ throw new IOException("Couldn't get access to the slave cluster" +
+ " because: ", ex);
+ }
+ return null;
}
- } catch (KeeperException ex) {
- throw new IOException("Couldn't get access to the slave cluster" +
- " because: ", ex);
- }
+ });
conf.set(NAME+".peerId", peerId);
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Oct 26 04:05:15 2011
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.UnknownRe
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@@ -137,8 +136,6 @@ implements HMasterInterface, HMasterRegi
// file system manager for the master FS operations
private MasterFileSystem fileSystemManager;
- private HConnection connection;
-
// server manager to deal with region server info
private ServerManager serverManager;
@@ -298,7 +295,6 @@ implements HMasterInterface, HMasterRegi
if (this.catalogTracker != null) this.catalogTracker.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
- HConnectionManager.deleteConnection(this.conf, true);
this.zooKeeper.close();
}
LOG.info("HMaster main thread exiting");
@@ -344,12 +340,11 @@ implements HMasterInterface, HMasterRegi
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
this.fileSystemManager = new MasterFileSystem(this, metrics);
- this.connection = HConnectionManager.getConnection(conf);
this.executorService = new ExecutorService(getServerName());
this.serverManager = new ServerManager(this, this, metrics);
- this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
+ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
this.catalogTracker.start();
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Wed Oct 26 04:05:15 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HServerLo
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -83,6 +84,7 @@ public class ServerManager {
private final Server master;
private final MasterServices services;
+ private final HConnection connection;
// Reporting to track master metrics.
private final MasterMetrics metrics;
@@ -96,9 +98,15 @@ public class ServerManager {
* @param master
* @param services
* @param metrics
+ * @throws ZooKeeperConnectionException
*/
- public ServerManager(final Server master, final MasterServices services,
- MasterMetrics metrics) {
+ public ServerManager(final Server master, final MasterServices services, MasterMetrics metrics)
+ throws ZooKeeperConnectionException {
+ this(master, services, metrics, true);
+ }
+
+ ServerManager(final Server master, final MasterServices services, MasterMetrics metrics, final boolean connect)
+ throws ZooKeeperConnectionException {
this.master = master;
this.services = services;
this.metrics = metrics;
@@ -106,6 +114,7 @@ public class ServerManager {
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
this.deadservers =
new DeadServer(c.getInt("hbase.master.maxdeadservers", 100));
+ this.connection = connect ? HConnectionManager.getConnection(c) : null;
}
/**
@@ -612,12 +621,10 @@ public class ServerManager {
*/
private HRegionInterface getServerConnection(HServerInfo info)
throws IOException {
- HConnection connection =
- HConnectionManager.getConnection(this.master.getConfiguration());
HRegionInterface hri = serverConnections.get(info.getServerName());
if (hri == null) {
LOG.debug("New connection to " + info.getServerName());
- hri = connection.getHRegionConnection(info.getServerAddress(), false);
+ hri = this.connection.getHRegionConnection(info.getServerAddress(), false);
this.serverConnections.put(info.getServerName(), hri);
}
return hri;
@@ -701,9 +708,15 @@ public class ServerManager {
}
/**
- * Stop the ServerManager. Currently does nothing.
+ * Stop the ServerManager. Currently closes the connection to the master.
*/
public void stop() {
-
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.error("Attempt to close connection to master failed", e);
+ }
+ }
}
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Oct 26 04:05:15 2011
@@ -82,8 +82,6 @@ import org.apache.hadoop.hbase.catalog.R
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiPut;
@@ -168,7 +166,6 @@ public class HRegionServer implements HR
protected HServerInfo serverInfo;
protected final Configuration conf;
- private final HConnection connection;
protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
private FileSystem fs;
private Path rootDir;
@@ -283,7 +280,6 @@ public class HRegionServer implements HR
public HRegionServer(Configuration conf) throws IOException, InterruptedException {
this.fsOk = true;
this.conf = conf;
- this.connection = HConnectionManager.getConnection(conf);
this.isOnline = false;
// check to see if the codec list is available:
@@ -503,7 +499,7 @@ public class HRegionServer implements HR
blockAndCheckIfStopped(this.clusterStatusTracker);
// Create the catalog tracker and start it;
- this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
+ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start();
}
@@ -679,7 +675,6 @@ public class HRegionServer implements HR
this.hbaseMaster = null;
}
this.leases.close();
- HConnectionManager.deleteConnection(conf, true);
this.zooKeeper.close();
if (!killed) {
join();
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Wed Oct 26 04:05:15 2011
@@ -154,6 +154,7 @@ public class ReplicationLogCleaner imple
LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher());
this.zkHelper.getZookeeperWatcher().close();
}
+ // Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.conf, true);
}
@@ -167,4 +168,4 @@ public class ReplicationLogCleaner imple
LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
stop(why);
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Oct 26 04:05:15 2011
@@ -341,6 +341,13 @@ public class ReplicationSource extends T
shipEdits();
}
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ } catch (IOException e) {
+ LOG.debug("Attempt to close connection failed", e);
+ }
+ }
LOG.debug("Source exiting " + peerClusterId);
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Wed Oct 26 04:05:15 2011
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.ZooKeeper
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@@ -244,15 +245,23 @@ public class HBaseFsck {
* Load the list of disabled tables in ZK into local set.
* @throws ZooKeeperConnectionException
* @throws IOException
- * @throws KeeperException
*/
private void loadDisabledTables()
- throws ZooKeeperConnectionException, IOException, KeeperException {
- ZooKeeperWatcher zkw =
- HConnectionManager.getConnection(conf).getZooKeeperWatcher();
- for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) {
- disabledTables.add(Bytes.toBytes(tableName));
- }
+ throws ZooKeeperConnectionException, IOException {
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ try {
+ for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) {
+ disabledTables.add(Bytes.toBytes(tableName));
+ }
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
+ }
+ return null;
+ }
+ });
}
/**
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java Wed Oct 26 04:05:15 2011
@@ -28,7 +28,9 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@@ -78,31 +80,58 @@ public class HBaseFsckRepair {
forceOfflineInZK(conf, actualRegion);
}
- private static void forceOfflineInZK(Configuration conf, HRegionInfo region)
+ private static void forceOfflineInZK(Configuration conf, final HRegionInfo region)
throws ZooKeeperConnectionException, KeeperException, IOException {
- ZKAssign.createOrForceNodeOffline(
- HConnectionManager.getConnection(conf).getZooKeeperWatcher(),
- region, HConstants.HBCK_CODE_NAME);
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ try {
+ ZKAssign.createOrForceNodeOffline(
+ connection.getZooKeeperWatcher(),
+ region, HConstants.HBCK_CODE_NAME);
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
+ }
+ return null;
+ }
+ });
}
protected static void closeRegionSilentlyAndWait(Configuration conf,
HServerAddress server, HRegionInfo region)
throws IOException, InterruptedException {
- HRegionInterface rs =
- HConnectionManager.getConnection(conf).getHRegionConnection(server);
- rs.closeRegion(region, false);
- long timeout = conf.getLong("hbase.hbck.close.timeout", 120000);
- long expiration = timeout + System.currentTimeMillis();
- while (System.currentTimeMillis() < expiration) {
+
+ HConnection connection = HConnectionManager.getConnection(conf);
+ boolean success = false;
+
+ try {
+ HRegionInterface rs = connection.getHRegionConnection(server);
+ rs.closeRegion(region, false);
+ long timeout = conf.getLong("hbase.hbck.close.timeout", 120000);
+ long expiration = timeout + System.currentTimeMillis();
+ while (System.currentTimeMillis() < expiration) {
+ try {
+ HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName());
+ if (rsRegion == null)
+ throw new NotServingRegionException();
+ } catch (Exception e) {
+ success = true;
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ throw new IOException("Region " + region + " failed to close within"
+ + " timeout " + timeout);
+
+ } finally {
try {
- HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName());
- if (rsRegion == null) throw new NotServingRegionException();
- } catch (Exception e) {
- return;
+ connection.close();
+ } catch (IOException ioe) {
+ if (success) {
+ throw ioe;
+ }
}
- Thread.sleep(1000);
}
- throw new IOException("Region " + region + " failed to close within" +
- " timeout " + timeout);
}
-}
\ No newline at end of file
+
+}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HMerge.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HMerge.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/util/HMerge.java Wed Oct 26 04:05:15 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -103,10 +104,14 @@ class HMerge {
throws IOException {
boolean masterIsRunning = false;
if (testMasterRunning) {
- HConnection connection = HConnectionManager.getConnection(conf);
- masterIsRunning = connection.isMasterRunning();
+ masterIsRunning = HConnectionManager
+ .execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.isMasterRunning();
+ }
+ });
}
- HConnectionManager.deleteConnection(conf, true);
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
if (masterIsRunning) {
throw new IllegalStateException(
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Wed Oct 26 04:05:15 2011
@@ -169,7 +169,7 @@ public class ZKUtil {
"[\\t\\n\\x0B\\f\\r]", ""));
StringBuilder builder = new StringBuilder(ensemble);
builder.append(":");
- builder.append(conf.get("hbase.zookeeper.property.clientPort"));
+ builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (name != null && !name.isEmpty()) {
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Wed Oct 26 04:05:15 2011
@@ -125,7 +125,8 @@ public class ZooKeeperWatcher implements
// Apparently this is recoverable. Retry a while.
// See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
// TODO: Generalize out in ZKUtil.
- long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000);
+ long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
+ HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME);
long finished = System.currentTimeMillis() + wait;
KeeperException ke = null;
do {
Modified: hbase/branches/0.90/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/resources/hbase-default.xml?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/resources/hbase-default.xml (original)
+++ hbase/branches/0.90/src/main/resources/hbase-default.xml Wed Oct 26 04:05:15 2011
@@ -124,6 +124,14 @@
before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
+ <name>hbase.connection.per.config</name>
+ <value>true</value>
+ <description>Disallows sharing of connections for configuration instances
+ with equivalent settings.
+ Default: true (expected to be false in future releases).
+ </description>
+ </property>
+ <property>
<name>hbase.client.retries.number</name>
<value>10</value>
<description>Maximum retries. Used as maximum for all retryable
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Wed Oct 26 04:05:15 2011
@@ -98,7 +98,8 @@ public class TestCatalogTracker {
private CatalogTracker constructAndStartCatalogTracker(final HConnection c)
throws IOException, InterruptedException {
- CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable);
+ CatalogTracker ct = new CatalogTracker(this.watcher, null, c,
+ this.abortable, 0);
ct.start();
return ct;
}
@@ -353,4 +354,4 @@ public class TestCatalogTracker {
this.ct.waitForRoot();
}
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java Wed Oct 26 04:05:15 2011
@@ -70,8 +70,7 @@ public class TestMetaReaderEditor {
@Before public void setup() throws IOException, InterruptedException {
Configuration c = new Configuration(UTIL.getConfiguration());
zkw = new ZooKeeperWatcher(c, "TestMetaReaderEditor", ABORTABLE);
- HConnection connection = HConnectionManager.getConnection(c);
- ct = new CatalogTracker(zkw, connection, ABORTABLE);
+ ct = new CatalogTracker(zkw, c, ABORTABLE);
ct.start();
}
@@ -160,4 +159,4 @@ public class TestMetaReaderEditor {
}
assertEquals(1, MetaReader.getTableRegions(ct, greaterName).size());
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Oct 26 04:05:15 2011
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -42,6 +43,7 @@ import org.apache.commons.logging.LogFac
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* This class is for testing HCM features
@@ -71,10 +73,7 @@ public class TestHCM {
* @throws SecurityException
* @see https://issues.apache.org/jira/browse/HBASE-2925
*/
- // Disabling. Of course this test will OOME using new Configuration each time
- // St.Ack 20110428
- // @Test
- public void testManyNewConnectionsDoesnotOOME()
+ @Test public void testManyNewConnectionsDoesnotOOME()
throws SecurityException, IllegalArgumentException,
ZooKeeperConnectionException, NoSuchFieldException, IllegalAccessException,
InterruptedException {
@@ -86,13 +85,16 @@ public class TestHCM {
public static void createNewConfigurations() throws SecurityException,
IllegalArgumentException, NoSuchFieldException,
IllegalAccessException, InterruptedException, ZooKeeperConnectionException {
+ int startingHConnectionManagerCacheSize = getHConnectionManagerCacheSize();
HConnection last = null;
- for (int i = 0; i <= (HConnectionManager.MAX_CACHED_HBASE_INSTANCES * 2); i++) {
+ for (int i = 0; i <= 100; i++) {
// set random key to differentiate the connection from previous ones
Configuration configuration = HBaseConfiguration.create();
+ configuration.set(HConstants.HBASE_CONNECTION_PER_CONFIG, "false");
configuration.set("somekey", String.valueOf(_randy.nextInt()));
System.out.println("Hash Code: " + configuration.hashCode());
- HConnection connection = HConnectionManager.getConnection(configuration);
+ HConnection connection =
+ HConnectionManager.getConnection(configuration);
if (last != null) {
if (last == connection) {
System.out.println("!! Got same connection for once !!");
@@ -104,14 +106,11 @@ public class TestHCM {
configuration.set("someotherkey", String.valueOf(_randy.nextInt()));
last = connection;
LOG.info("Cache Size: "
- + getHConnectionManagerCacheSize() + ", Valid Keys: "
- + getValidKeyCount());
+ + getHConnectionManagerCacheSize());
Thread.sleep(100);
}
- Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES,
+ Assert.assertEquals(startingHConnectionManagerCacheSize + 1,
getHConnectionManagerCacheSize());
- Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES,
- getValidKeyCount());
}
private static int getHConnectionManagerCacheSize()
@@ -124,21 +123,6 @@ public class TestHCM {
return cache.size();
}
- private static int getValidKeyCount() throws SecurityException,
- NoSuchFieldException, IllegalArgumentException,
- IllegalAccessException {
- Field cacheField =
- HConnectionManager.class.getDeclaredField("HBASE_INSTANCES");
- cacheField.setAccessible(true);
- Map<?, ?> cache = (Map<?, ?>) cacheField.get(null);
- List<Object> keys = new ArrayList<Object>(cache.keySet());
- Set<Object> values = new HashSet<Object>();
- for (Object key : keys) {
- values.add(cache.get(key));
- }
- return values.size();
- }
-
/**
* Test that when we delete a location using the first row of a region
* that we really delete it.
@@ -158,4 +142,70 @@ public class TestHCM {
HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
assertNull("What is this location?? " + rl, rl);
}
+
+ /**
+ * Make sure that {@link HConfiguration} instances that are essentially the
+ * same map to the same {@link HConnection} instance.
+ */
+ @Test
+ public void testConnectionSameness() throws Exception {
+ HConnection previousConnection = null;
+ for (int i = 0; i < 2; i++) {
+ // set random key to differentiate the connection from previous ones
+ Configuration configuration = TEST_UTIL.getConfiguration();
+ configuration.set(HConstants.HBASE_CONNECTION_PER_CONFIG, "false");
+ configuration.set("some_key", String.valueOf(_randy.nextInt()));
+ LOG.info("The hash code of the current configuration is: "
+ + configuration.hashCode());
+ HConnection currentConnection = HConnectionManager
+ .getConnection(configuration);
+ if (previousConnection != null) {
+ assertTrue(
+ "Did not get the same connection even though its key didn't change",
+ previousConnection == currentConnection);
+ }
+ previousConnection = currentConnection;
+ // change the configuration, so that it is no longer reachable from the
+ // client's perspective. However, since its part of the LRU doubly linked
+ // list, it will eventually get thrown out, at which time it should also
+ // close the corresponding {@link HConnection}.
+ configuration.set("other_key", String.valueOf(_randy.nextInt()));
+ }
+ }
+
+ /**
+ * Makes sure that there is no leaking of
+ * {@link HConnectionManager.TableServers} in the {@link HConnectionManager}
+ * class.
+ */
+ @Test
+ public void testConnectionUniqueness() throws Exception {
+ HConnection previousConnection = null;
+ for (int i = 0; i < 50; i++) {
+ // set random key to differentiate the connection from previous ones
+ Configuration configuration = TEST_UTIL.getConfiguration();
+ configuration.set(HConstants.HBASE_CONNECTION_PER_CONFIG, "false");
+ configuration.set("some_key", String.valueOf(_randy.nextInt()));
+ configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
+ String.valueOf(_randy.nextInt()));
+ LOG.info("The hash code of the current configuration is: "
+ + configuration.hashCode());
+ HConnection currentConnection = HConnectionManager
+ .getConnection(configuration);
+ if (previousConnection != null) {
+ assertTrue("Got the same connection even though its key changed!",
+ previousConnection != currentConnection);
+ }
+ // change the configuration, so that it is no longer reachable from the
+ // client's perspective. However, since its part of the LRU doubly linked
+ // list, it will eventually get thrown out, at which time it should also
+ // close the corresponding {@link HConnection}.
+ configuration.set("other_key", String.valueOf(_randy.nextInt()));
+
+ previousConnection = currentConnection;
+ LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: "
+ + getHConnectionManagerCacheSize());
+ Thread.sleep(50);
+ }
+ }
}
Modified: hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java?rev=1189019&r1=1189018&r2=1189019&view=diff
==============================================================================
--- hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java (original)
+++ hbase/branches/0.90/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java Wed Oct 26 04:05:15 2011
@@ -139,6 +139,7 @@ public class TestTableMapReduce extends
// verify map-reduce results
verify(Bytes.toString(table.getTableName()));
} finally {
+ table.close();
mrCluster.shutdown();
if (job != null) {
FileUtil.fullyDelete(
@@ -170,6 +171,7 @@ public class TestTableMapReduce extends
}
}
assertTrue(verified);
+ table.close();
}
/**