You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/05/03 06:43:54 UTC
svn commit: r1098901 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/catalog/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/client/replication/
src/main/java/org/a...
Author: stack
Date: Tue May 3 04:43:53 2011
New Revision: 1098901
URL: http://svn.apache.org/viewvc?rev=1098901&view=rev
Log:
HBASE-3777 Redefine Identity Of HBase Configuration
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue May 3 04:43:53 2011
@@ -96,6 +96,7 @@ Release 0.91.0 - Unreleased
HBASE-3838 RegionCoprocesorHost.preWALRestore throws npe in case there is
no RegionObserver registered (Himanshu Vashishtha)
HBASE-3847 Turn off DEBUG logging of RPCs in WriteableRPCEngine on TRUNK
+ HBASE-3777 Redefine Identity Of HBase Configuration (Karthick Sankarachary)
IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue May 3 04:43:53 2011
@@ -93,14 +93,29 @@ public final class HConstants {
/** Name of ZooKeeper config file in conf/ directory. */
public static final String ZOOKEEPER_CONFIG_NAME = "zoo.cfg";
- /** default client port that the zookeeper listens on */
+ /** Parameter name for the client port that the zookeeper listens on */
+ public static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
+
+ /** Default client port that the zookeeper listens on */
public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181;
+ /** Parameter name for the wait time for the recoverable zookeeper */
+ public static final String ZOOKEEPER_RECOVERABLE_WAITTIME = "hbase.zookeeper.recoverable.waittime";
+
+ /** Default wait time for the recoverable zookeeper */
+ public static final long DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME = 10000;
+
/** Parameter name for the root dir in ZK for this cluster */
public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
public static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase";
+ /** Parameter name for the limit on concurrent client-side zookeeper connections */
+ public static final String ZOOKEEPER_MAX_CLIENT_CNXNS = "hbase.zookeeper.property.maxClientCnxns";
+
+ /** Default limit on concurrent client-side zookeeper connections */
+ public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 30;
+
/** Parameter name for port region server listens on. */
public static final String REGIONSERVER_PORT = "hbase.regionserver.port";
@@ -343,6 +358,74 @@ public final class HConstants {
*/
public static long DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE = Long.MAX_VALUE;
+ /**
+ * Parameter name for client pause value, used mostly as value to wait
+ * before running a retry of a failed get, region lookup, etc.
+ */
+ public static String HBASE_CLIENT_PAUSE = "hbase.client.pause";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_PAUSE}.
+ */
+ public static long DEFAULT_HBASE_CLIENT_PAUSE = 1000;
+
+ /**
+ * Parameter name for maximum retries, used as maximum for all retryable
+ * operations such as fetching of the root region from root region server,
+ * getting a cell's value, starting a row update, etc.
+ */
+ public static String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}.
+ */
+ public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 10;
+
+ /**
+ * Parameter name for maximum attempts, used to limit the number of times the
+ * client will try to obtain the proxy for a given region server.
+ */
+ public static String HBASE_CLIENT_RPC_MAXATTEMPTS = "hbase.client.rpc.maxattempts";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_RPC_MAXATTEMPTS}.
+ */
+ public static int DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS = 1;
+
+ /**
+ * Parameter name for client prefetch limit, used as the maximum number of regions
+ * info that will be prefetched.
+ */
+ public static String HBASE_CLIENT_PREFETCH_LIMIT = "hbase.client.prefetch.limit";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_PREFETCH_LIMIT}.
+ */
+ public static int DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT = 10;
+
+ /**
+ * Parameter name for number of rows that will be fetched when calling next on
+ * a scanner if it is not served from memory. Higher caching values will
+ * enable faster scanners but will eat up more memory and some calls of next
+ * may take longer and longer times when the cache is empty.
+ */
+ public static String HBASE_META_SCANNER_CACHING = "hbase.meta.scanner.caching";
+
+ /**
+ * Default value of {@link #HBASE_META_SCANNER_CACHING}.
+ */
+ public static int DEFAULT_HBASE_META_SCANNER_CACHING = 100;
+
+ /**
+ * Parameter name for unique identifier for this {@link Configuration}
+ * instance. If there are two or more {@link Configuration} instances that,
+ * for all intents and purposes, are the same except for their instance ids,
+ * then they will not be able to share the same {@link Connection} instance.
+ * On the other hand, even if the instance ids are the same, it could result
+ * in non-shared {@link Connection} instances if some of the other connection
+ * parameters differ.
+ */
+ public static String HBASE_CLIENT_INSTANCE_ID = "hbase.client.instance.id";
/**
* HRegion server lease period in milliseconds. Clients must report in within this period
@@ -351,12 +434,11 @@ public final class HConstants {
public static String HBASE_REGIONSERVER_LEASE_PERIOD_KEY =
"hbase.regionserver.lease.period";
-
/**
* Default value of {@link #HBASE_REGIONSERVER_LEASE_PERIOD_KEY}.
*/
public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
-
+
/**
* timeout for each RPC
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java Tue May 3 04:43:53 2011
@@ -28,12 +28,14 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
@@ -57,6 +59,7 @@ import org.apache.hadoop.ipc.RemoteExcep
*/
public class CatalogTracker {
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
+ private final Configuration conf;
private final HConnection connection;
private final ZooKeeperWatcher zookeeper;
private final RootRegionTracker rootRegionTracker;
@@ -77,15 +80,18 @@ public class CatalogTracker {
HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
/**
- * Constructs a catalog tracker. Find current state of catalog tables and
- * begin active tracking by executing {@link #start()} post construction.
- * Does not timeout.
- * @param connection Server connection; if problem, this connections
- * {@link HConnection#abort(String, Throwable)} will be called.
- * @throws IOException
+ * Constructs a catalog tracker. Find current state of catalog tables and
+ * begin active tracking by executing {@link #start()} post construction. Does
+ * not timeout.
+ *
+ * @param conf
+ * the {@link Configuration} from which a {@link HConnection} will be
+ * obtained; if problem, this connections
+ * {@link HConnection#abort(String, Throwable)} will be called.
+ * @throws IOException
*/
- public CatalogTracker(final HConnection connection) throws IOException {
- this(connection.getZooKeeperWatcher(), connection, connection);
+ public CatalogTracker(final Configuration conf) throws IOException {
+ this(null, conf, null);
}
/**
@@ -97,10 +103,10 @@ public class CatalogTracker {
* @param abortable if fatal exception
* @throws IOException
*/
- public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
+ public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
final Abortable abortable)
throws IOException {
- this(zk, connection, abortable, 0);
+ this(zk, conf, abortable, 0);
}
/**
@@ -113,11 +119,21 @@ public class CatalogTracker {
* ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
* @throws IOException
*/
- public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
- final Abortable abortable, final int defaultTimeout)
+ public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
+ Abortable abortable, final int defaultTimeout)
throws IOException {
- this.zookeeper = zk;
+ this(zk, conf, HConnectionManager.getConnection(conf), abortable, defaultTimeout);
+ }
+
+ CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
+ HConnection connection, Abortable abortable, final int defaultTimeout)
+ throws IOException {
+ this.conf = conf;
this.connection = connection;
+ this.zookeeper = (zk == null) ? this.connection.getZooKeeperWatcher() : zk;
+ if (abortable == null) {
+ abortable = this.connection;
+ }
this.rootRegionTracker = new RootRegionTracker(zookeeper, abortable);
this.metaNodeTracker = new MetaNodeTracker(zookeeper, this, abortable);
this.defaultTimeout = defaultTimeout;
@@ -141,13 +157,24 @@ public class CatalogTracker {
* Interrupts any ongoing waits.
*/
public void stop() {
- LOG.debug("Stopping catalog tracker " + this);
- this.stopped = true;
- this.rootRegionTracker.stop();
- this.metaNodeTracker.stop();
- // Call this and it will interrupt any ongoing waits on meta.
- synchronized (this.metaAvailable) {
- this.metaAvailable.notifyAll();
+ if (!this.stopped) {
+ LOG.debug("Stopping catalog tracker " + this);
+ this.stopped = true;
+ this.rootRegionTracker.stop();
+ this.metaNodeTracker.stop();
+ try {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ } catch (IOException e) {
+ // Although the {@link Closeable} interface throws an {@link
+ // IOException}, in reality, the implementation would never do that.
+ LOG.error("Attempt to close catalog tracker's connection failed.", e);
+ }
+ // Call this and it will interrupt any ongoing waits on meta.
+ synchronized (this.metaAvailable) {
+ this.metaAvailable.notifyAll();
+ }
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue May 3 04:43:53 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -62,10 +63,10 @@ import org.apache.hadoop.util.StringUtil
* <p>Currently HBaseAdmin instances are not expected to be long-lived. For
* example, an HBaseAdmin instance will not ride over a Master restart.
*/
-public class HBaseAdmin implements Abortable {
+public class HBaseAdmin implements Abortable, Closeable {
private final Log LOG = LogFactory.getLog(this.getClass().getName());
// private final HConnection connection;
- final HConnection connection;
+ private final HConnection connection;
private volatile Configuration conf;
private final long pause;
private final int numRetries;
@@ -102,9 +103,7 @@ public class HBaseAdmin implements Abort
throws ZooKeeperConnectionException, IOException {
CatalogTracker ct = null;
try {
- HConnection connection =
- HConnectionManager.getConnection(this.conf);
- ct = new CatalogTracker(connection);
+ ct = new CatalogTracker(this.conf);
ct.start();
} catch (InterruptedException e) {
// Let it out as an IOE for now until we redo all so tolerate IEs
@@ -1240,4 +1239,10 @@ public class HBaseAdmin implements Abort
copyOfConf.setInt("hbase.client.retries.number", 1);
new HBaseAdmin(copyOfConf);
}
+
+ public void close() throws IOException {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Tue May 3 04:43:53 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
@@ -57,7 +58,7 @@ import org.apache.hadoop.hbase.zookeeper
*
* @see HConnectionManager
*/
-public interface HConnection extends Abortable {
+public interface HConnection extends Abortable, Closeable {
/**
* @return Configuration instance being used by this HConnection instance.
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue May 3 04:43:53 2011
@@ -19,12 +19,14 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -45,6 +47,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -77,6 +80,8 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
+import com.google.common.collect.ImmutableMap;
+
/**
* A non-instantiable class that manages {@link HConnection}s.
* This class has a static Map of {@link HConnection} instances keyed by
@@ -126,19 +131,28 @@ import org.apache.zookeeper.KeeperExcept
*/
@SuppressWarnings("serial")
public class HConnectionManager {
- static final int MAX_CACHED_HBASE_INSTANCES = 2001;
+ // A LRU Map of HConnectionKey -> HConnection (TableServer).
+ private static final Map<HConnectionKey, HConnectionImplementation> HBASE_INSTANCES;
- // A LRU Map of Configuration hashcode -> TableServers. We set instances to 31.
- // The zk default max connections to the ensemble from the one client is 30 so
- // should run into zk issues before hit this value of 31.
- private static final Map<Configuration, HConnectionImplementation> HBASE_INSTANCES =
- new LinkedHashMap<Configuration, HConnectionImplementation>
- ((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) {
- @Override
- protected boolean removeEldestEntry(Map.Entry<Configuration, HConnectionImplementation> eldest) {
- return size() > MAX_CACHED_HBASE_INSTANCES;
- }
- };
+ public static final int MAX_CACHED_HBASE_INSTANCES;
+
+ static {
+ // We set instances to one more than the value specified for {@link
+ // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
+ // connections to the ensemble from the one client is 30, so in that case we
+ // should run into zk issues before the LRU hit this value of 31.
+ MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt(
+ HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
+ HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
+ HBASE_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
+ (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) {
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
+ return size() > MAX_CACHED_HBASE_INSTANCES;
+ }
+ };
+ }
/*
* Non-instantiable.
@@ -158,33 +172,34 @@ public class HConnectionManager {
*/
public static HConnection getConnection(Configuration conf)
throws ZooKeeperConnectionException {
- HConnectionImplementation connection;
+ HConnectionKey connectionKey = new HConnectionKey(conf);
synchronized (HBASE_INSTANCES) {
- connection = HBASE_INSTANCES.get(conf);
+ HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
if (connection == null) {
connection = new HConnectionImplementation(conf);
- HBASE_INSTANCES.put(conf, connection);
+ HBASE_INSTANCES.put(connectionKey, connection);
}
+ connection.incCount();
+ return connection;
}
- return connection;
}
/**
* Delete connection information for the instance specified by configuration.
- * This will close connection to the zookeeper ensemble and let go of all
- * resources.
- * @param conf configuration whose identity is used to find {@link HConnection}
- * instance.
- * @param stopProxy Shuts down all the proxy's put up to cluster members
- * including to cluster HMaster. Calls {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}.
+ * If there are no more references to it, this will then close connection to
+ * the zookeeper ensemble and let go of all resources.
+ *
+ * @param conf
+ * configuration whose identity is used to find {@link HConnection}
+ * instance.
+ * @param stopProxy
+ * Shuts down all the proxy's put up to cluster members including to
+ * cluster HMaster. Calls
+ * {@link HBaseRPC#stopProxy(org.apache.hadoop.ipc.VersionedProtocol)}
+ * .
*/
public static void deleteConnection(Configuration conf, boolean stopProxy) {
- synchronized (HBASE_INSTANCES) {
- HConnectionImplementation t = HBASE_INSTANCES.remove(conf);
- if (t != null) {
- t.close(stopProxy);
- }
- }
+ deleteConnection(new HConnectionKey(conf), stopProxy);
}
/**
@@ -194,9 +209,38 @@ public class HConnectionManager {
*/
public static void deleteAllConnections(boolean stopProxy) {
synchronized (HBASE_INSTANCES) {
- for (HConnectionImplementation t : HBASE_INSTANCES.values()) {
- if (t != null) {
- t.close(stopProxy);
+ Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
+ connectionKeys.addAll(HBASE_INSTANCES.keySet());
+ for (HConnectionKey connectionKey : connectionKeys) {
+ deleteConnection(connectionKey, stopProxy);
+ }
+ HBASE_INSTANCES.clear();
+ }
+ }
+
+ private static void deleteConnection(HConnection connection, boolean stopProxy) {
+ synchronized (HBASE_INSTANCES) {
+ for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
+ .entrySet()) {
+ if (connectionEntry.getValue() == connection) {
+ deleteConnection(connectionEntry.getKey(), stopProxy);
+ break;
+ }
+ }
+ }
+ }
+
+ private static void deleteConnection(HConnectionKey connectionKey, boolean stopProxy) {
+ synchronized (HBASE_INSTANCES) {
+ HConnectionImplementation connection = HBASE_INSTANCES
+ .get(connectionKey);
+ if (connection != null) {
+ connection.decCount();
+ if (connection.isZeroReference()) {
+ HBASE_INSTANCES.remove(connectionKey);
+ connection.close(stopProxy);
+ } else if (stopProxy) {
+ connection.stopProxyOnClose(stopProxy);
}
}
}
@@ -209,10 +253,15 @@ public class HConnectionManager {
* @throws ZooKeeperConnectionException
*/
static int getCachedRegionCount(Configuration conf,
- byte[] tableName)
- throws ZooKeeperConnectionException {
- HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
- return connection.getNumberOfCachedRegionLocations(tableName);
+ final byte[] tableName)
+ throws IOException {
+ return execute(new HConnectable<Integer>(conf) {
+ @Override
+ public Integer connect(HConnection connection) {
+ return ((HConnectionImplementation) connection)
+ .getNumberOfCachedRegionLocations(tableName);
+ }
+ });
}
/**
@@ -222,13 +271,156 @@ public class HConnectionManager {
* @throws ZooKeeperConnectionException
*/
static boolean isRegionCached(Configuration conf,
- byte[] tableName, byte[] row) throws ZooKeeperConnectionException {
- HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
- return connection.isRegionCached(tableName, row);
+ final byte[] tableName, final byte[] row) throws IOException {
+ return execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) {
+ return ((HConnectionImplementation) connection).isRegionCached(tableName, row);
+ }
+ });
+ }
+
+ /**
+ * This class makes it convenient for one to execute a command in the context
+ * of a {@link HConnection} instance based on the given {@link Configuration}.
+ *
+ * <p>
+ * If you find yourself wanting to use a {@link Connection} for a relatively
+ * short duration of time, and do not want to deal with the hassle of creating
+ * and cleaning up that resource, then you should consider using this
+ * convenience class.
+ *
+ * @param <T>
+ * the return type of the {@link HConnectable#connect(HConnection)}
+ * method.
+ */
+ public static abstract class HConnectable<T> {
+ public Configuration conf;
+
+ public HConnectable(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public abstract T connect(HConnection connection) throws IOException;
+ }
+
+ /**
+ * This convenience method invokes the given {@link HConnectable#connect}
+ * implementation using a {@link HConnection} instance that lasts just for the
+ * duration of that invocation.
+ *
+ * @param <T> the return type of the connect method
+ * @param connectable the {@link HConnectable} instance
+ * @return the value returned by the connect method
+ * @throws IOException
+ */
+ public static <T> T execute(HConnectable<T> connectable) throws IOException {
+ if (connectable == null || connectable.conf == null) {
+ return null;
+ }
+ Configuration conf = connectable.conf;
+ HConnection connection = HConnectionManager.getConnection(conf);
+ boolean connectSucceeded = false;
+ try {
+ T returnValue = connectable.connect(connection);
+ connectSucceeded = true;
+ return returnValue;
+ } finally {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ if (connectSucceeded) {
+ throw new IOException("The connection to " + connection
+ + " could not be deleted.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Denotes a unique key to a {@link HConnection} instance.
+ *
+ * In essence, this class captures the properties in {@link Configuration}
+ * that may be used in the process of establishing a connection. In light of
+ * that, if any new such properties are introduced into the mix, they must be
+ * added to the {@link HConnectionKey#properties} list.
+ *
+ */
+ static class HConnectionKey {
+ public static String[] CONNECTION_PROPERTIES = new String[] {
+ HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.ZOOKEEPER_CLIENT_PORT,
+ HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
+ HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.HBASE_META_SCANNER_CACHING,
+ HConstants.HBASE_CLIENT_INSTANCE_ID };
+
+ private Map<String, String> properties;
+
+ public HConnectionKey(Configuration conf) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ if (conf != null) {
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = conf.get(property);
+ if (value != null) {
+ builder.put(property, value);
+ }
+ }
+ }
+ this.properties = builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ for (String property : CONNECTION_PROPERTIES) {
+ String value = properties.get(property);
+ if (value != null) {
+ result = prime * result + value.hashCode();
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ HConnectionKey that = (HConnectionKey) obj;
+ if (this.properties == null) {
+ if (that.properties != null) {
+ return false;
+ }
+ } else {
+ if (that.properties == null) {
+ return false;
+ }
+ for (String property : CONNECTION_PROPERTIES) {
+ String thisValue = this.properties.get(property);
+ String thatValue = that.properties.get(property);
+ if (thisValue == thatValue) {
+ continue;
+ }
+ if (thisValue == null || !thisValue.equals(thatValue)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
/* Encapsulates connection to zookeeper and regionservers.*/
- static class HConnectionImplementation implements HConnection {
+ static class HConnectionImplementation implements HConnection, Closeable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
private final long pause;
@@ -273,6 +465,10 @@ public class HConnectionManager {
private final Set<Integer> regionCachePrefetchDisabledTables =
new CopyOnWriteArraySet<Integer>();
+ private boolean stopProxy;
+ private int refCount;
+
+
/**
* constructor
* @param conf Configuration object
@@ -292,15 +488,19 @@ public class HConnectionManager {
"Unable to find region server interface " + serverClassName, e);
}
- this.pause = conf.getLong("hbase.client.pause", 1000);
- this.numRetries = conf.getInt("hbase.client.retries.number", 10);
- this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ this.maxRPCAttempts = conf.getInt(
+ HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
+ HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS);
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
- this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
- 10);
+ this.prefetchRegionLimit = conf.getInt(
+ HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+ HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
setupZookeeperTrackers();
@@ -1098,28 +1298,6 @@ public class HConnectionManager {
}
}
- void close(boolean stopProxy) {
- if (master != null) {
- if (stopProxy) {
- HBaseRPC.stopProxy(master);
- }
- master = null;
- masterChecked = false;
- }
- if (stopProxy) {
- for (HRegionInterface i: servers.values()) {
- HBaseRPC.stopProxy(i);
- }
- }
- if (this.zooKeeper != null) {
- LOG.info("Closed zookeeper sessionid=0x" +
- Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
- this.zooKeeper.close();
- this.zooKeeper = null;
- }
- this.closed = true;
- }
-
private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
final MultiAction<R> multi, final byte [] tableName) {
final HConnection connection = this;
@@ -1516,5 +1694,85 @@ public class HConnectionManager {
throw new IOException("Unexpected ZooKeeper exception", ke);
}
}
+
+ public void stopProxyOnClose(boolean stopProxy) {
+ this.stopProxy = stopProxy;
+ }
+
+ /**
+ * Increment this client's reference count.
+ */
+ void incCount() {
+ ++refCount;
+ }
+
+ /**
+ * Decrement this client's reference count.
+ */
+ void decCount() {
+ if (refCount > 0) {
+ --refCount;
+ }
+ }
+
+ /**
+ * Return if this client has no reference
+ *
+ * @return true if this client has no reference; false otherwise
+ */
+ boolean isZeroReference() {
+ return refCount == 0;
+ }
+
+ void close(boolean stopProxy) {
+ if (this.closed) {
+ return;
+ }
+ if (master != null) {
+ if (stopProxy) {
+ HBaseRPC.stopProxy(master);
+ }
+ master = null;
+ masterChecked = false;
+ }
+ if (stopProxy) {
+ for (HRegionInterface i : servers.values()) {
+ HBaseRPC.stopProxy(i);
+ }
+ }
+ this.servers.clear();
+ if (this.zooKeeper != null) {
+ LOG.info("Closed zookeeper sessionid=0x"
+ + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
+ this.zooKeeper.close();
+ this.zooKeeper = null;
+ }
+ this.closed = true;
+ }
+
+ public void close() {
+ HConnectionManager.deleteConnection(this, stopProxy);
+ LOG.debug("The connection to " + this.zooKeeper + " has been closed.");
+ }
+
+ /**
+ * Close the connection for good, regardless of what the current value of
+ * {@link #refCount} is. Ideally, {@link refCount} should be zero at this
+ * point, which would be the case if all of its consumers close the
+ * connection. However, on the off chance that someone is unable to close
+ * the connection, perhaps because it bailed out prematurely, the method
+ * below will ensure that this {@link Connection} instance is cleaned up.
+ * Caveat: The JVM may take an unknown amount of time to call finalize on an
+ * unreachable object, so our hope is that every consumer cleans up after
+ * itself, like any good citizen.
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ // Pretend as if we are about to release the last remaining reference
+ refCount = 1;
+ close();
+ LOG.debug("The connection to " + this.zooKeeper
+ + " was closed by the finalize method.");
+ }
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue May 3 04:43:53 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.NotServin
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
@@ -86,13 +88,17 @@ import org.apache.hadoop.hbase.util.Writ
* {@link HTable} passing a new {@link Configuration} instance that has the
* new configuration.
*
+ * <p>Note that this class implements the {@link Closeable} interface. When a
+ * HTable instance is no longer required, it *should* be closed in order to ensure
+ * that the underlying resources are promptly released.
+ *
* @see HBaseAdmin for create, drop, list, enable and disable of tables.
* @see HConnection
* @see HConnectionManager
*/
-public class HTable implements HTableInterface {
+public class HTable implements HTableInterface, Closeable {
private static final Log LOG = LogFactory.getLog(HTable.class);
- private final HConnection connection;
+ private HConnection connection;
private final byte [] tableName;
protected final int scannerTimeout;
private volatile Configuration configuration;
@@ -104,6 +110,7 @@ public class HTable implements HTableInt
private int maxKeyValueSize;
private ExecutorService pool; // For Multi
private long maxScannerResultSize;
+ private boolean closed;
/**
* Creates an object to access a HBase table.
@@ -199,6 +206,7 @@ public class HTable implements HTableInt
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory());
((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
+ this.closed = false;
}
public Configuration getConfiguration() {
@@ -250,9 +258,14 @@ public class HTable implements HTableInt
* @return {@code true} if table is online.
* @throws IOException if a remote or network exception occurs
*/
- public static boolean isTableEnabled(Configuration conf, byte[] tableName)
- throws IOException {
- return HConnectionManager.getConnection(conf).isTableEnabled(tableName);
+ public static boolean isTableEnabled(Configuration conf,
+ final byte[] tableName) throws IOException {
+ return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.isTableEnabled(tableName);
+ }
+ });
}
/**
@@ -839,8 +852,15 @@ public class HTable implements HTableInt
@Override
public void close() throws IOException {
+ if (this.closed) {
+ return;
+ }
flushCommits();
this.pool.shutdown();
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ this.closed = true;
}
// validate for well-formedness
@@ -1299,12 +1319,18 @@ public class HTable implements HTableInt
* @param tableName name of table to configure.
* @param enable Set to true to enable region cache prefetch. Or set to
* false to disable it.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
public static void setRegionCachePrefetch(final byte[] tableName,
- boolean enable) throws ZooKeeperConnectionException {
- HConnectionManager.getConnection(HBaseConfiguration.create()).
- setRegionCachePrefetch(tableName, enable);
+ final boolean enable) throws IOException {
+ HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
+ .create()) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ connection.setRegionCachePrefetch(tableName, enable);
+ return null;
+ }
+ });
}
/**
@@ -1315,12 +1341,17 @@ public class HTable implements HTableInt
* @param tableName name of table to configure.
* @param enable Set to true to enable region cache prefetch. Or set to
* false to disable it.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
public static void setRegionCachePrefetch(final Configuration conf,
- final byte[] tableName, boolean enable) throws ZooKeeperConnectionException {
- HConnectionManager.getConnection(conf).setRegionCachePrefetch(
- tableName, enable);
+ final byte[] tableName, final boolean enable) throws IOException {
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ connection.setRegionCachePrefetch(tableName, enable);
+ return null;
+ }
+ });
}
/**
@@ -1329,12 +1360,16 @@ public class HTable implements HTableInt
* @param tableName name of table to check
* @return true if table's region cache prefecth is enabled. Otherwise
* it is disabled.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
public static boolean getRegionCachePrefetch(final Configuration conf,
- final byte[] tableName) throws ZooKeeperConnectionException {
- return HConnectionManager.getConnection(conf).getRegionCachePrefetch(
- tableName);
+ final byte[] tableName) throws IOException {
+ return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.getRegionCachePrefetch(tableName);
+ }
+ });
}
/**
@@ -1342,12 +1377,17 @@ public class HTable implements HTableInt
* @param tableName name of table to check
* @return true if table's region cache prefecth is enabled. Otherwise
* it is disabled.
- * @throws ZooKeeperConnectionException
+ * @throws IOException
*/
- public static boolean getRegionCachePrefetch(final byte[] tableName) throws ZooKeeperConnectionException {
- return HConnectionManager.getConnection(HBaseConfiguration.create()).
- getRegionCachePrefetch(tableName);
- }
+ public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
+ return HConnectionManager.execute(new HConnectable<Boolean>(
+ HBaseConfiguration.create()) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.getRegionCachePrefetch(tableName);
+ }
+ });
+ }
/**
* Explicitly clears the region cache to fetch the latest value from META.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java Tue May 3 04:43:53 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
@@ -42,7 +43,7 @@ import org.apache.hadoop.hbase.util.Byte
*
* <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}.
*/
-public class HTablePool {
+public class HTablePool implements Closeable {
private final Map<String, Queue<HTableInterface>> tables =
new ConcurrentHashMap<String, Queue<HTableInterface>>();
private final Configuration config;
@@ -69,7 +70,7 @@ public class HTablePool {
final HTableInterfaceFactory tableFactory) {
// Make a new configuration instance so I can safely cleanup when
// done with the pool.
- this.config = config == null? new Configuration(): new Configuration(config);
+ this.config = config == null? new Configuration(): config;
this.maxSize = maxSize;
this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory;
}
@@ -147,7 +148,6 @@ public class HTablePool {
table = queue.poll();
}
}
- HConnectionManager.deleteConnection(this.config, true);
}
/**
@@ -159,6 +159,17 @@ public class HTablePool {
closeTablePool(Bytes.toString(tableName));
}
+ /**
+ * Closes all the HTable instances , belonging to all tables in the table pool.
+ * <p>
+ * Note: this is a 'shutdown' of all the table pools.
+ */
+ public void close() throws IOException {
+ for (String tableName : tables.keySet()) {
+ closeTablePool(tableName);
+ }
+ }
+
int getCurrentPoolSize(String tableName) {
Queue<HTableInterface> queue = tables.get(tableName);
return queue.size();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Tue May 3 04:43:53 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
@@ -120,12 +121,25 @@ public class MetaScanner {
* @throws IOException e
*/
public static void metaScan(Configuration configuration,
+ final MetaScannerVisitor visitor, final byte[] tableName,
+ final byte[] row, final int rowLimit, final byte[] metaTableName)
+ throws IOException {
+ HConnectionManager.execute(new HConnectable<Void>(configuration) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ metaScan(conf, connection, visitor, tableName, row, rowLimit,
+ metaTableName);
+ return null;
+ }
+ });
+ }
+
+ private static void metaScan(Configuration configuration, HConnection connection,
MetaScannerVisitor visitor, byte [] tableName, byte[] row,
int rowLimit, final byte [] metaTableName)
throws IOException {
int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE;
- HConnection connection = HConnectionManager.getConnection(configuration);
// if row is not null, we want to use the startKey of the row's region as
// the startRow for the meta scan.
byte[] startRow;
@@ -165,8 +179,9 @@ public class MetaScanner {
// Scan over each meta region
ScannerCallable callable;
- int rows = Math.min(rowLimit,
- configuration.getInt("hbase.meta.scanner.caching", 100));
+ int rows = Math.min(rowLimit, configuration.getInt(
+ HConstants.HBASE_META_SCANNER_CACHING,
+ HConstants.DEFAULT_HBASE_META_SCANNER_CACHING));
do {
final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
if (LOG.isDebugEnabled()) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Tue May 3 04:43:53 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.client.replication;
+import java.io.Closeable;
import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
@@ -63,9 +64,10 @@ import org.apache.zookeeper.KeeperExcept
* <code>replication</code>.
* </p>
*/
-public class ReplicationAdmin {
+public class ReplicationAdmin implements Closeable {
private final ReplicationZookeeper replicationZk;
+ private final Configuration configuration;
private final HConnection connection;
/**
@@ -79,6 +81,7 @@ public class ReplicationAdmin {
throw new RuntimeException("hbase.replication isn't true, please " +
"enable it in order to use replication");
}
+ this.configuration = conf;
this.connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
try {
@@ -169,4 +172,11 @@ public class ReplicationAdmin {
ReplicationZookeeper getReplicationZk() {
return replicationZk;
}
+
+ @Override
+ public void close() throws IOException {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java Tue May 3 04:43:53 2011
@@ -66,7 +66,7 @@ FileOutputFormat<ImmutableBytesWritable,
public void close(Reporter reporter)
throws IOException {
- m_table.flushCommits();
+ m_table.close();
}
public void write(ImmutableBytesWritable key,
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java Tue May 3 04:43:53 2011
@@ -104,13 +104,7 @@ implements Configurable {
@Override
public void close(TaskAttemptContext context)
throws IOException {
- table.flushCommits();
- // The following call will shutdown all connections to the cluster from
- // this JVM. It will close out our zk session otherwise zk wil log
- // expired sessions rather than closed ones. If any other HTable instance
- // running in this JVM, this next call will cause it damage. Presumption
- // is that the above this.table is only instance.
- HConnectionManager.deleteAllConnections(true);
+ table.close();
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Tue May 3 04:43:53 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -87,12 +88,12 @@ public class VerifyReplication {
* @throws IOException When something is broken with the data.
*/
@Override
- public void map(ImmutableBytesWritable row, Result value,
+ public void map(ImmutableBytesWritable row, final Result value,
Context context)
throws IOException {
if (replicatedScanner == null) {
Configuration conf = context.getConfiguration();
- Scan scan = new Scan();
+ final Scan scan = new Scan();
scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
long startTime = conf.getLong(NAME + ".startTime", 0);
long endTime = conf.getLong(NAME + ".endTime", 0);
@@ -107,18 +108,23 @@ public class VerifyReplication {
scan.setTimeRange(startTime,
endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
}
- try {
- HConnection conn = HConnectionManager.getConnection(conf);
- ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
- conn.getZooKeeperWatcher());
- ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
- HTable replicatedTable = new HTable(peer.getConfiguration(),
- conf.get(NAME+".tableName"));
- scan.setStartRow(value.getRow());
- replicatedScanner = replicatedTable.getScanner(scan);
- } catch (KeeperException e) {
- throw new IOException("Got a ZK exception", e);
- }
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection conn) throws IOException {
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+ conn.getZooKeeperWatcher());
+ ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
+ HTable replicatedTable = new HTable(peer.getConfiguration(),
+ conf.get(NAME+".tableName"));
+ scan.setStartRow(value.getRow());
+ replicatedScanner = replicatedTable.getScanner(scan);
+ } catch (KeeperException e) {
+ throw new IOException("Got a ZK exception", e);
+ }
+ return null;
+ }
+ });
}
Result res = replicatedScanner.next();
try {
@@ -151,20 +157,25 @@ public class VerifyReplication {
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
throw new IOException("Replication needs to be enabled to verify it.");
}
- try {
- HConnection conn = HConnectionManager.getConnection(conf);
- ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
- conn.getZooKeeperWatcher());
- // Just verifying it we can connect
- ReplicationPeer peer = zk.getPeer(peerId);
- if (peer == null) {
- throw new IOException("Couldn't get access to the slave cluster," +
- "please see the log");
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection conn) throws IOException {
+ try {
+ ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+ conn.getZooKeeperWatcher());
+ // Just verifying it we can connect
+ ReplicationPeer peer = zk.getPeer(peerId);
+ if (peer == null) {
+ throw new IOException("Couldn't get access to the slave cluster," +
+ "please see the log");
+ }
+ } catch (KeeperException ex) {
+ throw new IOException("Couldn't get access to the slave cluster" +
+ " because: ", ex);
+ }
+ return null;
}
- } catch (KeeperException ex) {
- throw new IOException("Couldn't get access to the slave cluster" +
- " because: ", ex);
- }
+ });
conf.set(NAME+".peerId", peerId);
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue May 3 04:43:53 2011
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.UnknownRe
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
-import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@@ -141,8 +140,6 @@ implements HMasterInterface, HMasterRegi
// file system manager for the master FS operations
private MasterFileSystem fileSystemManager;
- private HConnection connection;
-
// server manager to deal with region server info
private ServerManager serverManager;
@@ -309,7 +306,6 @@ implements HMasterInterface, HMasterRegi
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
- HConnectionManager.deleteConnection(this.conf, true);
this.zooKeeper.close();
}
LOG.info("HMaster main thread exiting");
@@ -337,7 +333,7 @@ implements HMasterInterface, HMasterRegi
*/
private void initializeZKBasedSystemTrackers() throws IOException,
InterruptedException, KeeperException {
- this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
+ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
this.catalogTracker.start();
@@ -407,7 +403,6 @@ implements HMasterInterface, HMasterRegi
ClusterId.setClusterId(this.zooKeeper,
fileSystemManager.getClusterId());
- this.connection = HConnectionManager.getConnection(conf);
this.executorService = new ExecutorService(getServerName().toString());
this.serverManager = new ServerManager(this, this);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue May 3 04:43:53 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.PleaseHol
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -80,6 +81,7 @@ public class ServerManager {
private final Server master;
private final MasterServices services;
+ private final HConnection connection;
private final DeadServer deadservers;
@@ -89,13 +91,21 @@ public class ServerManager {
* Constructor.
* @param master
* @param services
+ * @throws ZooKeeperConnectionException
*/
- public ServerManager(final Server master, final MasterServices services) {
+ public ServerManager(final Server master, final MasterServices services)
+ throws ZooKeeperConnectionException {
+ this(master, services, true);
+ }
+
+ ServerManager(final Server master, final MasterServices services,
+ final boolean connect) throws ZooKeeperConnectionException {
this.master = master;
this.services = services;
Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
this.deadservers = new DeadServer();
+ this.connection = connect ? HConnectionManager.getConnection(c) : null;
}
/**
@@ -443,12 +453,10 @@ public class ServerManager {
*/
private HRegionInterface getServerConnection(final ServerName sn)
throws IOException {
- HConnection connection =
- HConnectionManager.getConnection(this.master.getConfiguration());
HRegionInterface hri = this.serverConnections.get(sn.toString());
if (hri == null) {
LOG.debug("New connection to " + sn.toString());
- hri = connection.getHRegionConnection(sn.getHostname(), sn.getPort());
+ hri = this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
this.serverConnections.put(sn, hri);
}
return hri;
@@ -501,8 +509,15 @@ public class ServerManager {
}
/**
- * Stop the ServerManager. Currently does nothing.
+ * Stop the ServerManager. Currently closes the connection to the master.
*/
public void stop() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.error("Attempt to close connection to master failed", e);
+ }
+ }
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue May 3 04:43:53 2011
@@ -78,8 +78,6 @@ import org.apache.hadoop.hbase.catalog.R
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiPut;
@@ -169,7 +167,6 @@ public class HRegionServer implements HR
protected final Configuration conf;
- private final HConnection connection;
protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
private FileSystem fs;
private Path rootDir;
@@ -305,7 +302,6 @@ public class HRegionServer implements HR
throws IOException, InterruptedException {
this.fsOk = true;
this.conf = conf;
- this.connection = HConnectionManager.getConnection(conf);
this.isOnline = false;
checkCodecs(this.conf);
@@ -534,7 +530,7 @@ public class HRegionServer implements HR
blockAndCheckIfStopped(this.clusterStatusTracker);
// Create the catalog tracker and start it;
- this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
+ this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start();
}
@@ -707,7 +703,6 @@ public class HRegionServer implements HR
} catch (KeeperException e) {
LOG.warn("Failed deleting my ephemeral node", e);
}
- HConnectionManager.deleteConnection(conf, true);
this.zooKeeper.close();
if (!killed) {
join();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Tue May 3 04:43:53 2011
@@ -154,6 +154,7 @@ public class ReplicationLogCleaner imple
LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher());
this.zkHelper.getZookeeperWatcher().close();
}
+ // Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.conf, true);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue May 3 04:43:53 2011
@@ -343,6 +343,13 @@ public class ReplicationSource extends T
shipEdits();
}
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ } catch (IOException e) {
+ LOG.debug("Attempt to close connection failed", e);
+ }
+ }
LOG.debug("Source exiting " + peerClusterId);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Tue May 3 04:43:53 2011
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.catalog.M
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.Result;
@@ -228,15 +229,23 @@ public class HBaseFsck {
* Load the list of disabled tables in ZK into local set.
* @throws ZooKeeperConnectionException
* @throws IOException
- * @throws KeeperException
*/
private void loadDisabledTables()
- throws ZooKeeperConnectionException, IOException, KeeperException {
- ZooKeeperWatcher zkw =
- HConnectionManager.getConnection(conf).getZooKeeperWatcher();
- for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) {
- disabledTables.add(Bytes.toBytes(tableName));
- }
+ throws ZooKeeperConnectionException, IOException {
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ try {
+ for (String tableName : ZKTable.getDisabledOrDisablingTables(zkw)) {
+ disabledTables.add(Bytes.toBytes(tableName));
+ }
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
+ }
+ return null;
+ }
+ });
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java Tue May 3 04:43:53 2011
@@ -29,7 +29,9 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@@ -79,31 +81,55 @@ public class HBaseFsckRepair {
forceOfflineInZK(conf, actualRegion);
}
- private static void forceOfflineInZK(Configuration conf, HRegionInfo region)
+ private static void forceOfflineInZK(Configuration conf, final HRegionInfo region)
throws ZooKeeperConnectionException, KeeperException, IOException {
- ZKAssign.createOrForceNodeOffline(
- HConnectionManager.getConnection(conf).getZooKeeperWatcher(),
- region, HConstants.HBCK_CODE_SERVERNAME);
+ HConnectionManager.execute(new HConnectable<Void>(conf) {
+ @Override
+ public Void connect(HConnection connection) throws IOException {
+ try {
+ ZKAssign.createOrForceNodeOffline(
+ connection.getZooKeeperWatcher(),
+ region, HConstants.HBCK_CODE_SERVERNAME);
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
+ }
+ return null;
+ }
+ });
}
private static void closeRegionSilentlyAndWait(Configuration conf,
- ServerName server, HRegionInfo region)
- throws IOException, InterruptedException {
- HRegionInterface rs =
- HConnectionManager.getConnection(conf).getHRegionConnection(new HServerAddress(server.getHostname(), server.getPort()));
- rs.closeRegion(region, false);
- long timeout = conf.getLong("hbase.hbck.close.timeout", 120000);
- long expiration = timeout + System.currentTimeMillis();
- while (System.currentTimeMillis() < expiration) {
+ ServerName server, HRegionInfo region) throws IOException,
+ InterruptedException {
+ HConnection connection = HConnectionManager.getConnection(conf);
+ boolean success = false;
+ try {
+ HRegionInterface rs = connection.getHRegionConnection(new HServerAddress(
+ server.getHostname(), server.getPort()));
+ rs.closeRegion(region, false);
+ long timeout = conf.getLong("hbase.hbck.close.timeout", 120000);
+ long expiration = timeout + System.currentTimeMillis();
+ while (System.currentTimeMillis() < expiration) {
+ try {
+ HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName());
+ if (rsRegion == null)
+ throw new NotServingRegionException();
+ } catch (Exception e) {
+ success = true;
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ throw new IOException("Region " + region + " failed to close within"
+ + " timeout " + timeout);
+ } finally {
try {
- HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName());
- if (rsRegion == null) throw new NotServingRegionException();
- } catch (Exception e) {
- return;
+ connection.close();
+ } catch (IOException ioe) {
+ if (success) {
+ throw ioe;
+ }
}
- Thread.sleep(1000);
}
- throw new IOException("Region " + region + " failed to close within" +
- " timeout " + timeout);
}
}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HMerge.java Tue May 3 04:43:53 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -103,10 +104,14 @@ class HMerge {
throws IOException {
boolean masterIsRunning = false;
if (testMasterRunning) {
- HConnection connection = HConnectionManager.getConnection(conf);
- masterIsRunning = connection.isMasterRunning();
+ masterIsRunning = HConnectionManager
+ .execute(new HConnectable<Boolean>(conf) {
+ @Override
+ public Boolean connect(HConnection connection) throws IOException {
+ return connection.isMasterRunning();
+ }
+ });
}
- HConnectionManager.deleteConnection(conf, true);
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
if (masterIsRunning) {
throw new IllegalStateException(
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Tue May 3 04:43:53 2011
@@ -154,7 +154,7 @@ public class ZKUtil {
"[\\t\\n\\x0B\\f\\r]", ""));
StringBuilder builder = new StringBuilder(ensemble);
builder.append(":");
- builder.append(conf.get("hbase.zookeeper.property.clientPort"));
+ builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
builder.append(":");
builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
if (name != null && !name.isEmpty()) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Tue May 3 04:43:53 2011
@@ -129,7 +129,8 @@ public class ZooKeeperWatcher implements
// Apparently this is recoverable. Retry a while.
// See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
// TODO: Generalize out in ZKUtil.
- long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000);
+ long wait = conf.getLong(HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
+ HConstants.DEFAULT_ZOOKEPER_RECOVERABLE_WAITIME);
long finished = System.currentTimeMillis() + wait;
KeeperException ke = null;
do {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java Tue May 3 04:43:53 2011
@@ -100,8 +100,7 @@ public class TestRegionRebalancing exten
* @throws InterruptedException
*/
public void testRebalancing() throws IOException, InterruptedException {
- HConnection connection = HConnectionManager.getConnection(conf);
- CatalogTracker ct = new CatalogTracker(connection);
+ CatalogTracker ct = new CatalogTracker(conf);
ct.start();
Map<HRegionInfo, ServerName> regions = MetaReader.fullScan(ct);
for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Tue May 3 04:43:53 2011
@@ -98,7 +98,8 @@ public class TestCatalogTracker {
private CatalogTracker constructAndStartCatalogTracker(final HConnection c)
throws IOException, InterruptedException {
- CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable);
+ CatalogTracker ct = new CatalogTracker(this.watcher, null, c,
+ this.abortable, 0);
ct.start();
return ct;
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditor.java Tue May 3 04:43:53 2011
@@ -70,8 +70,7 @@ public class TestMetaReaderEditor {
@Before public void setup() throws IOException, InterruptedException {
Configuration c = new Configuration(UTIL.getConfiguration());
zkw = new ZooKeeperWatcher(c, "TestMetaReaderEditor", ABORTABLE);
- HConnection connection = HConnectionManager.getConnection(c);
- ct = new CatalogTracker(zkw, connection, ABORTABLE);
+ ct = new CatalogTracker(zkw, c, ABORTABLE);
ct.start();
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Tue May 3 04:43:53 2011
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.util.Bytes;
@@ -42,6 +43,7 @@ import org.apache.commons.logging.LogFac
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* This class is for testing HCM features
@@ -71,10 +73,7 @@ public class TestHCM {
* @throws SecurityException
* @see https://issues.apache.org/jira/browse/HBASE-2925
*/
- // Disabling. Of course this test will OOME using new Configuration each time
- // St.Ack 20110428
- // @Test
- public void testManyNewConnectionsDoesnotOOME()
+ @Test public void testManyNewConnectionsDoesnotOOME()
throws SecurityException, IllegalArgumentException,
ZooKeeperConnectionException, NoSuchFieldException, IllegalAccessException,
InterruptedException {
@@ -92,7 +91,8 @@ public class TestHCM {
Configuration configuration = HBaseConfiguration.create();
configuration.set("somekey", String.valueOf(_randy.nextInt()));
System.out.println("Hash Code: " + configuration.hashCode());
- HConnection connection = HConnectionManager.getConnection(configuration);
+ HConnection connection =
+ HConnectionManager.getConnection(configuration);
if (last != null) {
if (last == connection) {
System.out.println("!! Got same connection for once !!");
@@ -108,9 +108,9 @@ public class TestHCM {
+ getValidKeyCount());
Thread.sleep(100);
}
- Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES,
+ Assert.assertEquals(1,
getHConnectionManagerCacheSize());
- Assert.assertEquals(HConnectionManager.MAX_CACHED_HBASE_INSTANCES,
+ Assert.assertEquals(1,
getValidKeyCount());
}
@@ -158,4 +158,69 @@ public class TestHCM {
HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
assertNull("What is this location?? " + rl, rl);
}
+
+ /**
+ * Make sure that {@link HConfiguration} instances that are essentially the
+ * same map to the same {@link HConnection} instance.
+ */
+ @Test
+ public void testConnectionSameness() throws Exception {
+ HConnection previousConnection = null;
+ for (int i = 0; i < 2; i++) {
+ // set random key to differentiate the connection from previous ones
+ Configuration configuration = TEST_UTIL.getConfiguration();
+ configuration.set("some_key", String.valueOf(_randy.nextInt()));
+ LOG.info("The hash code of the current configuration is: "
+ + configuration.hashCode());
+ HConnection currentConnection = HConnectionManager
+ .getConnection(configuration);
+ if (previousConnection != null) {
+ assertTrue(
+ "Did not get the same connection even though its key didn't change",
+ previousConnection == currentConnection);
+ }
+ previousConnection = currentConnection;
+ // change the configuration, so that it is no longer reachable from the
+ // client's perspective. However, since its part of the LRU doubly linked
+ // list, it will eventually get thrown out, at which time it should also
+ // close the corresponding {@link HConnection}.
+ configuration.set("other_key", String.valueOf(_randy.nextInt()));
+ }
+ }
+
+ /**
+ * Makes sure that there is no leaking of
+ * {@link HConnectionManager.TableServers} in the {@link HConnectionManager}
+ * class.
+ */
+ @Test
+ public void testConnectionUniqueness() throws Exception {
+ HConnection previousConnection = null;
+ for (int i = 0; i < HConnectionManager.MAX_CACHED_HBASE_INSTANCES + 10; i++) {
+ // set random key to differentiate the connection from previous ones
+ Configuration configuration = TEST_UTIL.getConfiguration();
+ configuration.set("some_key", String.valueOf(_randy.nextInt()));
+ configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
+ String.valueOf(_randy.nextInt()));
+ LOG.info("The hash code of the current configuration is: "
+ + configuration.hashCode());
+ HConnection currentConnection = HConnectionManager
+ .getConnection(configuration);
+ if (previousConnection != null) {
+ assertTrue("Got the same connection even though its key changed!",
+ previousConnection != currentConnection);
+ }
+ // change the configuration, so that it is no longer reachable from the
+ // client's perspective. However, since its part of the LRU doubly linked
+ // list, it will eventually get thrown out, at which time it should also
+ // close the corresponding {@link HConnection}.
+ configuration.set("other_key", String.valueOf(_randy.nextInt()));
+
+ previousConnection = currentConnection;
+ LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: "
+ + getHConnectionManagerCacheSize()
+ + ", and the number of valid keys is: " + getValidKeyCount());
+ Thread.sleep(50);
+ }
+ }
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java Tue May 3 04:43:53 2011
@@ -139,6 +139,7 @@ public class TestTableMapReduce extends
// verify map-reduce results
verify(Bytes.toString(table.getTableName()));
} finally {
+ table.close();
mrCluster.shutdown();
if (job != null) {
FileUtil.fullyDelete(
@@ -170,6 +171,7 @@ public class TestTableMapReduce extends
}
}
assertTrue(verified);
+ table.close();
}
/**
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestClockSkewDetection.java Tue May 3 04:43:53 2011
@@ -73,7 +73,7 @@ public class TestClockSkewDetection {
@Override
public void stop(String why) {
- }}, null);
+ }}, null, false);
LOG.debug("regionServerStartup 1");
InetAddress ia1 = InetAddress.getLocalHost();
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java?rev=1098901&r1=1098900&r2=1098901&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java Tue May 3 04:43:53 2011
@@ -112,8 +112,7 @@ public class TestMergeTable {
LOG.info("Starting mini hbase cluster");
UTIL.startMiniHBaseCluster(1, 1);
Configuration c = new Configuration(UTIL.getConfiguration());
- HConnection connection = HConnectionManager.getConnection(c);
- CatalogTracker ct = new CatalogTracker(connection);
+ CatalogTracker ct = new CatalogTracker(c);
ct.start();
List<HRegionInfo> originalTableRegions =
MetaReader.getTableRegions(ct, desc.getName());