You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/06/04 19:14:13 UTC
svn commit: r544188 [1/3] - in /lucene/hadoop/trunk/src/contrib/hbase: ./
conf/ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Author: jimk
Date: Mon Jun 4 10:14:10 2007
New Revision: 544188
URL: http://svn.apache.org/viewvc?view=rev&rev=544188
Log:
HADOOP-1445 Support updates across region splits and compactions
Added:
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionNotFoundException.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/UnknownScannerException.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/WrongRegionException.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompare.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-site.xml
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/EvaluationClient.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Mon Jun 4 10:14:10 2007
@@ -22,3 +22,4 @@
add/remove column.
12. HADOOP-1392. Part2: includes table compaction by merging adjacent regions
that have shrunk in size.
+ 13. HADOOP-1445 Support updates across region splits and compactions
Modified: lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml Mon Jun 4 10:14:10 2007
@@ -21,21 +21,18 @@
</description>
</property>
<property>
- <name>hbase.client.timeout.length</name>
- <value>10000</value>
- <description>Client timeout in milliseconds</description>
- </property>
- <property>
- <name>hbase.client.timeout.number</name>
- <value>5</value>
- <description>Try this many timeouts before giving up.
- </description>
+ <name>hbase.client.pause</name>
+ <value>30000</value>
+ <description>General client pause value. Used mostly as value to wait
+ before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
- <value>2</value>
- <description>Count of maximum retries fetching the root region from root
- region server.
+ <value>5</value>
+ <description>Maximum retries. Used as maximum for all retryable
+ operations such as fetching of the root region from root region
+ server, getting a cell's value, starting a row update, etc.
+ Default: 5.
</description>
</property>
<property>
@@ -52,6 +49,12 @@
30 seconds.</description>
</property>
<property>
+ <name>hbase.regionserver.lease.period</name>
+ <value>180000</value>
+ <description>HRegion server lease period in milliseconds. Default is
+ 180 seconds.</description>
+ </property>
+ <property>
<name>hbase.server.thread.wakefrequency</name>
<value>10000</value>
<description>Time to sleep in between searches for work (in milliseconds).
@@ -59,12 +62,6 @@
</description>
</property>
<property>
- <name>hbase.regionserver.lease.period</name>
- <value>30000</value>
- <description>HRegion server lease period in milliseconds. Default is
- 30 seconds.</description>
- </property>
- <property>
<name>hbase.regionserver.handler.count</name>
<value>10</value>
<description>Count of RPC Server instances spun up on RegionServers
@@ -80,5 +77,27 @@
tests to be responsive.
</description>
</property>
-
+ <property>
+ <name>hbase.regionserver.maxlogentries</name>
+ <value>30000</value>
+ <description>Rotate the logs when count of entries exceeds this value.
+ Default: 30,000
+ </description>
+ </property>
+ <property>
+ <name>hbase.hregion.maxunflushed</name>
+ <value>10000</value>
+ <description>
+ Memcache will be flushed to disk if number of Memcache writes
+ are in excess of this number.
+ </description>
+ </property>
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>134217728</value>
+ <description>
+ Maximum desired file size for an HRegion. If filesize exceeds
+ value + (value / 2), the HRegion is split in two. Default: 128M.
+ </description>
+ </property>
</configuration>
Modified: lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-site.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-site.xml?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-site.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-site.xml Mon Jun 4 10:14:10 2007
@@ -1,4 +1,54 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
+ <property>
+ <name>hbase.regiondir</name>
+ <value>hbase</value>
+ <description>The directory shared by region servers.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.msginterval</name>
+ <value>1000</value>
+ <description>Interval between messages from the RegionServer to HMaster
+ in milliseconds. Default is 15. Set this value low if you want unit
+ tests to be responsive.
+ </description>
+ </property>
+ <!--
+ <property>
+ <name>hbase.master.meta.thread.rescanfrequency</name>
+ <value>600000</value>
+ <description>How long the HMaster sleeps (in milliseconds) between scans of
+ the root and meta tables.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.lease.period</name>
+ <value>360000</value>
+ <description>HMaster server lease period in milliseconds. Default is
+ 180 seconds.</description>
+ </property>
+ <property>
+ <name>hbase.regionserver.lease.period</name>
+ <value>360000</value>
+ <description>HMaster server lease period in milliseconds. Default is
+ 180 seconds.</description>
+ </property>
+ -->
+ <!--
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>3421223</value>
+ <description>
+ Maximum desired file size for an HRegion. If filesize exceeds
+ value + (value / 2), the HRegion is split in two. Default: 128M.
+ </description>
+ </property>
+ <property>
+ <name>hbase.client.timeout.length</name>
+ <value>10000</value>
+ <description>Client timeout in milliseconds</description>
+ </property>
+ -->
</configuration>
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java Mon Jun 4 10:14:10 2007
@@ -21,6 +21,8 @@
import java.util.regex.Pattern;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.BytesWritable;
@@ -31,10 +33,11 @@
* Used by the concrete HMemcacheScanner and HStoreScanners
******************************************************************************/
public abstract class HAbstractScanner implements HInternalScannerInterface {
+ final Log LOG = LogFactory.getLog(this.getClass().getName());
// Pattern to determine if a column key is a regex
- private static Pattern isRegexPattern = Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$");
+ static Pattern isRegexPattern = Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$");
// The kind of match we are doing on a column:
@@ -42,7 +45,7 @@
FAMILY_ONLY, // Just check the column family name
REGEX, // Column family + matches regex
SIMPLE // Literal matching
- };
+ }
// This class provides column matching functions that are more sophisticated
// than a simple string compare. There are three types of matching:
@@ -89,15 +92,15 @@
// Matching method
- boolean matches(Text col) throws IOException {
+ boolean matches(Text c) throws IOException {
if(this.matchType == MATCH_TYPE.SIMPLE) {
- return col.equals(this.col);
+ return c.equals(this.col);
} else if(this.matchType == MATCH_TYPE.FAMILY_ONLY) {
- return col.toString().startsWith(this.family);
+ return c.toString().startsWith(this.family);
} else if(this.matchType == MATCH_TYPE.REGEX) {
- return this.columnMatcher.matcher(col.toString()).matches();
+ return this.columnMatcher.matcher(c.toString()).matches();
} else {
throw new IOException("Invalid match type: " + this.matchType);
@@ -201,20 +204,19 @@
public boolean isMultipleMatchScanner() {
return this.multipleMatchers;
}
+
/**
* Get the next set of values for this scanner.
*
- * @param key - The key that matched
- * @param results - all the results for that key.
- * @return - true if a match was found
+ * @param key The key that matched
+ * @param results All the results for <code>key</code>
+ * @return true if a match was found
*
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
*/
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results)
- throws IOException {
-
+ throws IOException {
// Find the next row label (and timestamp)
-
Text chosenRow = null;
long chosenTimestamp = -1;
for(int i = 0; i < keys.length; i++) {
@@ -232,7 +234,6 @@
}
// Grab all the values that match this row/timestamp
-
boolean insertedItem = false;
if(chosenRow != null) {
key.setRow(chosenRow);
@@ -241,7 +242,6 @@
for(int i = 0; i < keys.length; i++) {
// Fetch the data
-
while((keys[i] != null)
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
@@ -255,10 +255,8 @@
break;
}
- if(columnMatch(i)) {
-
+ if(columnMatch(i)) {
// We only want the first result for any specific family member
-
if(!results.containsKey(keys[i].getColumn())) {
results.put(new Text(keys[i].getColumn()), vals[i]);
insertedItem = true;
@@ -277,7 +275,6 @@
&& ((keys[i].getRow().compareTo(chosenRow) <= 0)
|| (keys[i].getTimestamp() > this.timestamp)
|| (! columnMatch(i)))) {
-
getNext(i);
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java Mon Jun 4 10:14:10 2007
@@ -18,11 +18,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +33,9 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -37,7 +43,7 @@
* HClient manages a connection to a single HRegionServer.
*/
public class HClient implements HConstants {
- private final Log LOG = LogFactory.getLog(this.getClass().getName());
+ final Log LOG = LogFactory.getLog(this.getClass().getName());
private static final Text[] META_COLUMNS = {
COLUMN_FAMILY
@@ -49,51 +55,55 @@
private static final Text EMPTY_START_ROW = new Text();
- private long clientTimeout;
- private int numTimeouts;
- private int numRetries;
+ long pause;
+ int numRetries;
private HMasterInterface master;
private final Configuration conf;
- private static class TableInfo {
+ /*
+ * Data structure that holds current location for a region and its info.
+ */
+ static class RegionLocation {
public HRegionInfo regionInfo;
public HServerAddress serverAddress;
- TableInfo(HRegionInfo regionInfo, HServerAddress serverAddress) {
+ RegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) {
this.regionInfo = regionInfo;
this.serverAddress = serverAddress;
}
+
+ @Override
+ public String toString() {
+ return "address: " + this.serverAddress.toString() + ", regioninfo: " +
+ this.regionInfo;
+ }
}
// Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
-
- private TreeMap<Text, SortedMap<Text, TableInfo>> tablesToServers;
+ private TreeMap<Text, SortedMap<Text, RegionLocation>> tablesToServers;
// For the "current" table: Map startRow -> (HRegionInfo, HServerAddress)
+ private SortedMap<Text, RegionLocation> tableServers;
- private SortedMap<Text, TableInfo> tableServers;
-
- // Known region HServerAddress.toString() -> HRegionInterface
-
+ // Known region HServerAddress.toString() -> HRegionInterface
private TreeMap<String, HRegionInterface> servers;
// For row mutation operations
- private Text currentRegion;
- private HRegionInterface currentServer;
- private Random rand;
- private long clientid;
+ Text currentRegion;
+ HRegionInterface currentServer;
+ Random rand;
+ long clientid;
/** Creates a new HClient */
public HClient(Configuration conf) {
this.conf = conf;
- this.clientTimeout = conf.getLong("hbase.client.timeout.length", 30 * 1000);
- this.numTimeouts = conf.getInt("hbase.client.timeout.number", 5);
- this.numRetries = conf.getInt("hbase.client.retries.number", 2);
+ this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
+ this.numRetries = conf.getInt("hbase.client.retries.number", 5);
this.master = null;
- this.tablesToServers = new TreeMap<Text, SortedMap<Text, TableInfo>>();
+ this.tablesToServers = new TreeMap<Text, SortedMap<Text, RegionLocation>>();
this.tableServers = null;
this.servers = new TreeMap<String, HRegionInterface>();
@@ -129,11 +139,13 @@
}
}
- /* Find the address of the master and connect to it */
+ /* Find the address of the master and connect to it
+ */
private void checkMaster() throws MasterNotRunningException {
if (this.master != null) {
return;
}
+
for(int tries = 0; this.master == null && tries < numRetries; tries++) {
HServerAddress masterLocation =
new HServerAddress(this.conf.get(MASTER_ADDRESS,
@@ -142,9 +154,8 @@
try {
HMasterInterface tryMaster =
(HMasterInterface)RPC.getProxy(HMasterInterface.class,
- HMasterInterface.versionID, masterLocation.getInetSocketAddress(),
- this.conf);
-
+ HMasterInterface.versionID, masterLocation.getInetSocketAddress(),
+ this.conf);
if(tryMaster.isMasterRunning()) {
this.master = tryMaster;
break;
@@ -154,16 +165,18 @@
// This was our last chance - don't bother sleeping
break;
}
+ LOG.info("Attempt " + tries + " of " + this.numRetries +
+ " failed with <" + e + ">. Retrying after sleep of " + this.pause);
}
- // We either cannot connect to the master or it is not running.
- // Sleep and retry
-
+ // We either cannot connect to master or it is not running. Sleep & retry
try {
- Thread.sleep(this.clientTimeout);
+ Thread.sleep(this.pause);
} catch(InterruptedException e) {
+ // continue
}
}
+
if(this.master == null) {
throw new MasterNotRunningException();
}
@@ -210,7 +223,7 @@
// Save the current table
- SortedMap<Text, TableInfo> oldServers = this.tableServers;
+ SortedMap<Text, RegionLocation> oldServers = this.tableServers;
try {
// Wait for new table to come on-line
@@ -229,23 +242,21 @@
public synchronized void deleteTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
- TableInfo firstMetaServer = getFirstMetaServerForTable(tableName);
+ RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
try {
this.master.deleteTable(tableName);
-
} catch(RemoteException e) {
handleRemoteException(e);
}
// Wait until first region is deleted
-
- HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress);
-
+ HRegionInterface server =
+ getHRegionConnection(firstMetaServer.serverAddress);
DataInputBuffer inbuf = new DataInputBuffer();
HStoreKey key = new HStoreKey();
HRegionInfo info = new HRegionInfo();
- for(int tries = 0; tries < numRetries; tries++) {
+ for (int tries = 0; tries < numRetries; tries++) {
long scannerId = -1L;
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
@@ -258,7 +269,8 @@
for(int j = 0; j < values.length; j++) {
if(values[j].getLabel().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
- System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
+ System.arraycopy(values[j].getData().get(), 0, bytes, 0,
+ bytes.length);
inbuf.reset(bytes, bytes.length);
info.readFields(inbuf);
if(info.tableDesc.getName().equals(tableName)) {
@@ -274,27 +286,19 @@
if(scannerId != -1L) {
try {
server.close(scannerId);
-
} catch(Exception e) {
LOG.warn(e);
}
}
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sleep. Waiting for first region to be deleted from " + tableName);
- }
+
try {
- Thread.sleep(clientTimeout);
-
+ Thread.sleep(pause);
} catch(InterruptedException e) {
+ // continue
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Wake. Waiting for first region to be deleted from " + tableName);
- }
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug("table deleted " + tableName);
}
+ LOG.info("table " + tableName + " deleted");
}
public synchronized void addColumn(Text tableName, HColumnDescriptor column) throws IOException {
@@ -322,7 +326,7 @@
public synchronized void enableTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
- TableInfo firstMetaServer = getFirstMetaServerForTable(tableName);
+ RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
try {
this.master.enableTable(tableName);
@@ -379,20 +383,22 @@
LOG.debug("Sleep. Waiting for first region to be enabled from " + tableName);
}
try {
- Thread.sleep(clientTimeout);
+ Thread.sleep(pause);
} catch(InterruptedException e) {
+ // continue
}
if(LOG.isDebugEnabled()) {
LOG.debug("Wake. Waiting for first region to be enabled from " + tableName);
}
}
+ LOG.info("Enabled table " + tableName);
}
public synchronized void disableTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
- TableInfo firstMetaServer = getFirstMetaServerForTable(tableName);
+ RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
try {
this.master.disableTable(tableName);
@@ -449,14 +455,15 @@
LOG.debug("Sleep. Waiting for first region to be disabled from " + tableName);
}
try {
- Thread.sleep(clientTimeout);
-
+ Thread.sleep(pause);
} catch(InterruptedException e) {
+ // continue
}
if(LOG.isDebugEnabled()) {
LOG.debug("Wake. Waiting for first region to be disabled from " + tableName);
}
}
+ LOG.info("Disabled table " + tableName);
}
public synchronized void shutdown() throws IOException {
@@ -477,8 +484,8 @@
}
}
- private TableInfo getFirstMetaServerForTable(Text tableName) throws IOException {
- SortedMap<Text, TableInfo> metaservers = findMetaServersForTable(tableName);
+ private RegionLocation getFirstMetaServerForTable(Text tableName) throws IOException {
+ SortedMap<Text, RegionLocation> metaservers = findMetaServersForTable(tableName);
return metaservers.get(metaservers.firstKey());
}
@@ -497,7 +504,10 @@
throw new IllegalArgumentException("table name cannot be null or zero length");
}
this.tableServers = tablesToServers.get(tableName);
- if(this.tableServers == null ) {
+ if (this.tableServers == null ) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No servers for " + tableName + ". Doing a find...");
+ }
// We don't know where the table is.
// Load the information from meta.
this.tableServers = findServersForTable(tableName);
@@ -511,23 +521,25 @@
* @return - map of first row to table info for all regions in the table
* @throws IOException
*/
- private SortedMap<Text, TableInfo> findServersForTable(Text tableName)
+ private SortedMap<Text, RegionLocation> findServersForTable(Text tableName)
throws IOException {
-
- SortedMap<Text, TableInfo> servers = null;
+ SortedMap<Text, RegionLocation> servers = null;
if(tableName.equals(ROOT_TABLE_NAME)) {
servers = locateRootRegion();
-
} else if(tableName.equals(META_TABLE_NAME)) {
servers = loadMetaFromRoot();
-
} else {
- servers = new TreeMap<Text, TableInfo>();
- for(TableInfo t: findMetaServersForTable(tableName).values()) {
+ servers = new TreeMap<Text, RegionLocation>();
+ for(RegionLocation t: findMetaServersForTable(tableName).values()) {
servers.putAll(scanOneMetaRegion(t, tableName));
}
this.tablesToServers.put(tableName, servers);
}
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Text, RegionLocation> e: servers.entrySet()) {
+ LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue());
+ }
+ }
return servers;
}
@@ -537,18 +549,15 @@
* @return - returns a SortedMap of the meta servers
* @throws IOException
*/
- private SortedMap<Text, TableInfo> findMetaServersForTable(Text tableName)
- throws IOException {
-
- SortedMap<Text, TableInfo> metaServers =
+ private SortedMap<Text, RegionLocation> findMetaServersForTable(final Text tableName)
+ throws IOException {
+ SortedMap<Text, RegionLocation> metaServers =
this.tablesToServers.get(META_TABLE_NAME);
-
if(metaServers == null) { // Don't know where the meta is
metaServers = loadMetaFromRoot();
}
Text firstMetaRegion = (metaServers.containsKey(tableName)) ?
- tableName : metaServers.headMap(tableName).lastKey();
-
+ tableName : metaServers.headMap(tableName).lastKey();
return metaServers.tailMap(firstMetaRegion);
}
@@ -558,10 +567,9 @@
* @return map of first row to TableInfo for all meta regions
* @throws IOException
*/
- private TreeMap<Text, TableInfo> loadMetaFromRoot() throws IOException {
- SortedMap<Text, TableInfo> rootRegion =
+ private TreeMap<Text, RegionLocation> loadMetaFromRoot() throws IOException {
+ SortedMap<Text, RegionLocation> rootRegion =
this.tablesToServers.get(ROOT_TABLE_NAME);
-
if(rootRegion == null) {
rootRegion = locateRootRegion();
}
@@ -570,34 +578,34 @@
/*
* Repeatedly try to find the root region by asking the master for where it is
- *
* @return TreeMap<Text, TableInfo> for root regin if found
* @throws NoServerForRegionException - if the root region can not be located after retrying
* @throws IOException
*/
- private TreeMap<Text, TableInfo> locateRootRegion() throws IOException {
+ private TreeMap<Text, RegionLocation> locateRootRegion() throws IOException {
checkMaster();
HServerAddress rootRegionLocation = null;
- for(int tries = 0; rootRegionLocation == null && tries < numRetries; tries++){
+ for(int tries = 0; tries < numRetries; tries++) {
int localTimeouts = 0;
- while(rootRegionLocation == null && localTimeouts < numTimeouts) {
+ while(rootRegionLocation == null && localTimeouts < numRetries) {
rootRegionLocation = master.findRootRegion();
-
if(rootRegionLocation == null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping. Waiting for root region.");
}
- Thread.sleep(this.clientTimeout);
+ Thread.sleep(this.pause);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch(InterruptedException iex) {
+ // continue
}
localTimeouts++;
}
}
+
if(rootRegionLocation == null) {
throw new NoServerForRegionException(
"Timed out trying to locate root region");
@@ -608,7 +616,6 @@
try {
rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName);
break;
-
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// Don't bother sleeping. We've run out of retries.
@@ -616,16 +623,16 @@
}
// Sleep and retry finding root region.
-
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Root region location changed. Sleeping.");
}
- Thread.sleep(this.clientTimeout);
+ Thread.sleep(this.pause);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch(InterruptedException iex) {
+ // continue
}
}
rootRegionLocation = null;
@@ -633,12 +640,12 @@
if (rootRegionLocation == null) {
throw new NoServerForRegionException(
- "unable to locate root region server");
+ "unable to locate root region server");
}
- TreeMap<Text, TableInfo> rootServer = new TreeMap<Text, TableInfo>();
+ TreeMap<Text, RegionLocation> rootServer = new TreeMap<Text, RegionLocation>();
rootServer.put(EMPTY_START_ROW,
- new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation));
+ new RegionLocation(HGlobals.rootRegionInfo, rootRegionLocation));
this.tablesToServers.put(ROOT_TABLE_NAME, rootServer);
return rootServer;
@@ -649,10 +656,9 @@
* @return - TreeMap of meta region servers
* @throws IOException
*/
- private TreeMap<Text, TableInfo> scanRoot(TableInfo rootRegion)
+ private TreeMap<Text, RegionLocation> scanRoot(RegionLocation rootRegion)
throws IOException {
-
- TreeMap<Text, TableInfo> metaservers =
+ TreeMap<Text, RegionLocation> metaservers =
scanOneMetaRegion(rootRegion, META_TABLE_NAME);
this.tablesToServers.put(META_TABLE_NAME, metaservers);
return metaservers;
@@ -663,16 +669,16 @@
* @param t the meta region we're going to scan
* @param tableName the name of the table we're looking for
* @return returns a map of startingRow to TableInfo
- * @throws NoSuchElementException - if table does not exist
+ * @throws RegionNotFoundException - if table does not exist
* @throws IllegalStateException - if table is offline
* @throws NoServerForRegionException - if table can not be found after retrying
* @throws IOException
*/
- private TreeMap<Text, TableInfo> scanOneMetaRegion(TableInfo t, Text tableName)
- throws IOException {
-
+ private TreeMap<Text, RegionLocation> scanOneMetaRegion(final RegionLocation t,
+ final Text tableName)
+ throws IOException {
HRegionInterface server = getHRegionConnection(t.serverAddress);
- TreeMap<Text, TableInfo> servers = new TreeMap<Text, TableInfo>();
+ TreeMap<Text, RegionLocation> servers = new TreeMap<Text, RegionLocation>();
for(int tries = 0; servers.size() == 0 && tries < this.numRetries;
tries++) {
@@ -690,13 +696,15 @@
if(values.length == 0) {
if(servers.size() == 0) {
// If we didn't find any servers then the table does not exist
-
- throw new NoSuchElementException("table '" + tableName
- + "' does not exist");
+ throw new RegionNotFoundException("table '" + tableName +
+ "' does not exist in " + t);
}
// We found at least one server for the table and now we're done.
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found " + servers.size() + " server(s) for " +
+ "location: " + t + " for tablename " + tableName);
+ }
break;
}
@@ -714,6 +722,9 @@
if(!regionInfo.tableDesc.getName().equals(tableName)) {
// We're done
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found " + tableName);
+ }
break;
}
@@ -724,7 +735,6 @@
bytes = results.get(COL_SERVER);
if(bytes == null || bytes.length == 0) {
// We need to rescan because the table we want is unassigned.
-
if(LOG.isDebugEnabled()) {
LOG.debug("no server address for " + regionInfo.toString());
}
@@ -732,15 +742,13 @@
break;
}
serverAddress = new String(bytes, UTF8_ENCODING);
-
servers.put(regionInfo.startKey,
- new TableInfo(regionInfo, new HServerAddress(serverAddress)));
+ new RegionLocation(regionInfo, new HServerAddress(serverAddress)));
}
} finally {
if(scannerId != -1L) {
try {
server.close(scannerId);
-
} catch(Exception e) {
LOG.warn(e);
}
@@ -752,19 +760,20 @@
+ tableName + " after " + this.numRetries + " retries");
}
- // The table is not yet being served. Sleep and retry.
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sleeping. Table " + tableName
- + " not currently being served.");
- }
- try {
- Thread.sleep(this.clientTimeout);
-
- } catch(InterruptedException e) {
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Wake. Retry finding table " + tableName);
+ if (servers.size() <= 0) {
+ // The table is not yet being served. Sleep and retry.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping. Table " + tableName +
+ " not currently being served.");
+ }
+ try {
+ Thread.sleep(this.pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wake. Retry finding table " + tableName);
+ }
}
}
return servers;
@@ -804,7 +813,7 @@
throws IOException {
TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
- SortedMap<Text, TableInfo> metaTables =
+ SortedMap<Text, RegionLocation> metaTables =
this.tablesToServers.get(META_TABLE_NAME);
if(metaTables == null) {
@@ -812,7 +821,7 @@
metaTables = loadMetaFromRoot();
}
- for (TableInfo t: metaTables.values()) {
+ for (RegionLocation t: metaTables.values()) {
HRegionInterface server = getHRegionConnection(t.serverAddress);
long scannerId = -1L;
try {
@@ -846,11 +855,15 @@
}
}
}
- return (HTableDescriptor[])uniqueTables.
- toArray(new HTableDescriptor[uniqueTables.size()]);
+ return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
}
- private synchronized TableInfo getTableInfo(Text row) {
+ /*
+ * Find region location hosting passed row using cached info
+ * @param row Row to find.
+ * @return Location of row.
+ */
+ synchronized RegionLocation getRegionLocation(Text row) {
if(row == null || row.getLength() == 0) {
throw new IllegalArgumentException("row key cannot be null or zero length");
}
@@ -859,41 +872,42 @@
}
// Only one server will have the row we are looking for
-
- Text serverKey = null;
- if(this.tableServers.containsKey(row)) {
- serverKey = row;
-
- } else {
- serverKey = this.tableServers.headMap(row).lastKey();
- }
+ Text serverKey = (this.tableServers.containsKey(row))? row:
+ this.tableServers.headMap(row).lastKey();
return this.tableServers.get(serverKey);
}
- private synchronized void findRegion(TableInfo info) throws IOException {
-
+ /*
+ * Clear caches of passed region location, reload servers for the passed
+ * region's table and then ensure region location can be found.
+ * @param info Region location to find.
+ * @throws IOException
+ */
+ synchronized void findRegion(final RegionLocation info) throws IOException {
// Wipe out everything we know about this table
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wiping out all we know of " + info);
+ }
this.tablesToServers.remove(info.regionInfo.tableDesc.getName());
this.tableServers.clear();
// Reload information for the whole table
-
this.tableServers = findServersForTable(info.regionInfo.tableDesc.getName());
-
- if(this.tableServers.get(info.regionInfo.startKey) == null ) {
- throw new IOException("region " + info.regionInfo.regionName
- + " does not exist");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Result of findRegion: " + this.tableServers.toString());
+ }
+ if (this.tableServers.get(info.regionInfo.startKey) == null) {
+ throw new RegionNotFoundException(info.regionInfo.regionName.toString());
}
}
/** Get a single value for the specified row and column */
public byte[] get(Text row, Text column) throws IOException {
- TableInfo info = null;
+ RegionLocation info = null;
BytesWritable value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
- info = getTableInfo(row);
+ info = getRegionLocation(row);
try {
value = getHRegionConnection(info.serverAddress).get(
@@ -919,11 +933,11 @@
/** Get the specified number of versions of the specified row and column */
public byte[][] get(Text row, Text column, int numVersions) throws IOException {
- TableInfo info = null;
+ RegionLocation info = null;
BytesWritable[] values = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
- info = getTableInfo(row);
+ info = getRegionLocation(row);
try {
values = getHRegionConnection(info.serverAddress).get(
@@ -956,11 +970,11 @@
* the specified timestamp.
*/
public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException {
- TableInfo info = null;
+ RegionLocation info = null;
BytesWritable[] values = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
- info = getTableInfo(row);
+ info = getRegionLocation(row);
try {
values = getHRegionConnection(info.serverAddress).get(
@@ -990,11 +1004,11 @@
/** Get all the data for the specified row */
public LabelledData[] getRow(Text row) throws IOException {
- TableInfo info = null;
+ RegionLocation info = null;
LabelledData[] value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
- info = getTableInfo(row);
+ info = getRegionLocation(row);
try {
value = getHRegionConnection(info.serverAddress).getRow(
@@ -1023,38 +1037,81 @@
}
return new ClientScanner(columns, startRow);
}
-
- /** Start an atomic row insertion or update */
- public long startUpdate(Text row) throws IOException {
- TableInfo info = null;
- long lockid = -1L;
+
+ /*
+ * @return General HClient RetryPolicy instance.
+ */
+ RetryPolicy getRetryPolicy() {
+ Map<Class <? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class <? extends Exception>, RetryPolicy>();
+ // Pass numRetries - 1 because it does less-than-equal internally rather
+ // than the less-than we do elsewhere where we use numRetries.
+ RetryPolicy rp =
+ RetryPolicies.retryUpToMaximumCountWithProportionalSleep(numRetries,
+ this.pause, TimeUnit.MILLISECONDS);
+ exceptionToPolicyMap.put(NotServingRegionException.class, rp);
+ exceptionToPolicyMap.put(WrongRegionException.class, rp);
+ exceptionToPolicyMap.put(RegionNotFoundException.class, rp);
+ return RetryPolicies.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+ exceptionToPolicyMap);
- for(int tries = 0; tries < numRetries && info == null; tries++) {
- info = getTableInfo(row);
-
- try {
- this.currentServer = getHRegionConnection(info.serverAddress);
- this.currentRegion = info.regionInfo.regionName;
- this.clientid = rand.nextLong();
- lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row);
-
- } catch(NotServingRegionException e) {
- if(tries == numRetries - 1) {
- // No more tries
- throw e;
- }
- findRegion(info);
- info = null;
+ }
+
+ /*
+ * Interface for {@link #startUpate()} used by the
+ * {@link org.apache.hadoop.io.retry} mechanism.
+ */
+ private interface StartUpdateInterface {
+ /**
+ * @return row lockid for the update
+ * @throws IOException
+ */
+ long startUpdate() throws IOException;
+ }
- } catch(IOException e) {
- this.currentServer = null;
- this.currentRegion = null;
- throw e;
- }
-
- }
-
- return lockid;
+ /* Start an atomic row insertion or update
+ * @param row Name of row to start update against.
+ * @return Row lockid.
+ */
+ public long startUpdate(final Text row) throws IOException {
+ // Implemention of the StartUpdate interface.
+ StartUpdateInterface implementation = new StartUpdateInterface() {
+ private RegionLocation info = null;
+ private int attempts = 0;
+
+ /*
+ * Wrapped method. Proxy wrapper is configured to judge whether
+ * exception merits retry.
+ * @return lockid
+ * @throws IOException
+ */
+ public long startUpdate() throws IOException {
+ this.attempts++;
+ if (this.info != null) {
+ LOG.info("Retry of startUpdate. Attempt " + this.attempts +
+ " for row " + row);
+ // If a retry. Something wrong w/ region we have. Refind.
+ try {
+ findRegion(info);
+ } catch (RegionNotFoundException e) {
+ // continue. If no longer exists, perhaps we just came through
+ // a split and region is now gone. Below getRegionLocation should
+ // recalibrate client.
+ }
+ }
+ this.info = getRegionLocation(row);
+ currentServer = getHRegionConnection(info.serverAddress);
+ currentRegion = info.regionInfo.regionName;
+ clientid = rand.nextLong();
+ return currentServer.startUpdate(currentRegion, clientid, row);
+ }
+ };
+
+ // Get retry proxy wrapper around 'implementation'.
+ StartUpdateInterface retryProxy = (StartUpdateInterface)RetryProxy.
+ create(StartUpdateInterface.class, implementation, getRetryPolicy());
+ // Run retry.
+ return retryProxy.startUpdate();
}
/** Change a value for the specified column */
@@ -1062,12 +1119,11 @@
try {
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
new BytesWritable(val));
-
} catch(IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-
} catch(IOException e2) {
+ LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
@@ -1078,13 +1134,13 @@
/** Delete the value for a column */
public void delete(long lockid, Text column) throws IOException {
try {
- this.currentServer.delete(this.currentRegion, this.clientid, lockid, column);
-
+ this.currentServer.delete(this.currentRegion, this.clientid, lockid,
+ column);
} catch(IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-
} catch(IOException e2) {
+ LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
@@ -1107,7 +1163,6 @@
public void commit(long lockid) throws IOException {
try {
this.currentServer.commit(this.currentRegion, this.clientid, lockid);
-
} finally {
this.currentServer = null;
this.currentRegion = null;
@@ -1123,7 +1178,7 @@
private Text[] columns;
private Text startRow;
private boolean closed;
- private TableInfo[] regions;
+ private RegionLocation[] regions;
private int currentRegion;
private HRegionInterface server;
private long scannerId;
@@ -1139,8 +1194,8 @@
} else {
firstServer = tableServers.headMap(startRow).lastKey();
}
- Collection<TableInfo> info = tableServers.tailMap(firstServer).values();
- this.regions = info.toArray(new TableInfo[info.size()]);
+ Collection<RegionLocation> info = tableServers.tailMap(firstServer).values();
+ this.regions = info.toArray(new RegionLocation[info.size()]);
}
public ClientScanner(Text[] columns, Text startRow) throws IOException {
@@ -1173,7 +1228,7 @@
this.server = getHRegionConnection(this.regions[currentRegion].serverAddress);
for(int tries = 0; tries < numRetries; tries++) {
- TableInfo info = this.regions[currentRegion];
+ RegionLocation info = this.regions[currentRegion];
try {
this.scannerId = this.server.openScanner(info.regionInfo.regionName,
@@ -1247,15 +1302,37 @@
System.err.println(" address is read from configuration.");
System.err.println("Commands:");
System.err.println(" shutdown Shutdown the HBase cluster.");
- System.err.println(" createTable Takes table name, column families,... ");
- System.err.println(" deleteTable Takes a table name.");
- System.err.println(" iistTables List all tables.");
+ System.err.println(" createTable Create named table.");
+ System.err.println(" deleteTable Delete named table.");
+ System.err.println(" listTables List all tables.");
System.err.println("Example Usage:");
System.err.println(" % java " + this.getClass().getName() + " shutdown");
System.err.println(" % java " + this.getClass().getName() +
" createTable webcrawl contents: anchors: 10");
}
+ private void printCreateTableUsage(final String message) {
+ if (message != null && message.length() > 0) {
+ System.err.println(message);
+ }
+ System.err.println("Usage: java " + this.getClass().getName() +
+ " [options] createTable <name> <colfamily1> ... <max_versions>");
+ System.err.println("Example Usage:");
+ System.err.println(" % java " + this.getClass().getName() +
+ " createTable testtable column_x column_y column_z 3");
+ }
+
+ private void printDeleteTableUsage(final String message) {
+ if (message != null && message.length() > 0) {
+ System.err.println(message);
+ }
+ System.err.println("Usage: java " + this.getClass().getName() +
+ " [options] deleteTable <name>");
+ System.err.println("Example Usage:");
+ System.err.println(" % java " + this.getClass().getName() +
+ " deleteTable testtable");
+ }
+
public int doCommandLine(final String args[]) {
// Process command-line args. TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options).
@@ -1296,8 +1373,10 @@
if (cmd.equals("createTable")) {
if (i + 2 > args.length) {
- throw new IllegalArgumentException("Must supply a table name " +
- "and at least one column family");
+ printCreateTableUsage("Error: Supply a table name," +
+ " at least one column family, and maximum versions");
+ errCode = 1;
+ break;
}
HTableDescriptor desc = new HTableDescriptor(args[i + 1]);
boolean addedFamily = false;
@@ -1316,7 +1395,9 @@
if (cmd.equals("deleteTable")) {
if (i + 1 > args.length) {
- throw new IllegalArgumentException("Must supply a table name");
+ printDeleteTableUsage("Error: Must supply a table name");
+ errCode = 1;
+ break;
}
deleteTable(new Text(args[i + 1]));
errCode = 0;
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java Mon Jun 4 10:14:10 2007
@@ -49,6 +49,8 @@
// TODO: Someone may try to name a column family 'log'. If they
// do, it will clash with the HREGION log dir subdirectory. FIX.
static final String HREGION_LOGDIR_NAME = "log";
+
+ static final long DEFAULT_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB
// Always store the location of the root table's HRegion.
// This HRegion is never split.
@@ -72,7 +74,6 @@
// Other constants
- static final long DESIRED_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB
static final String UTF8_ENCODING = "UTF-8";
static final BytesWritable DELETE_BYTES =
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Mon Jun 4 10:14:10 2007
@@ -50,7 +50,8 @@
public class HMaster implements HConstants, HMasterInterface,
HMasterRegionInterface, Runnable {
- public long getProtocolVersion(String protocol, long clientVersion)
+ public long getProtocolVersion(String protocol,
+ @SuppressWarnings("unused") long clientVersion)
throws IOException {
if (protocol.equals(HMasterInterface.class.getName())) {
return HMasterInterface.versionID;
@@ -61,43 +62,41 @@
}
}
- private static final Log LOG =
+ static final Log LOG =
LogFactory.getLog(org.apache.hadoop.hbase.HMaster.class.getName());
- private volatile boolean closed;
- private Path dir;
+ volatile boolean closed;
+ Path dir;
private Configuration conf;
- private FileSystem fs;
- private Random rand;
+ FileSystem fs;
+ Random rand;
private long threadWakeFrequency;
private int numRetries;
private long maxRegionOpenTime;
- // The 'msgQueue' is used to assign work to the client processor thread
-
- private Vector<PendingOperation> msgQueue;
+ Vector<PendingOperation> msgQueue;
private Leases serverLeases;
private Server server;
private HServerAddress address;
- private HClient client;
+ HClient client;
- private long metaRescanInterval;
+ long metaRescanInterval;
private HServerAddress rootRegionLocation;
/**
* Columns in the 'meta' ROOT and META tables.
*/
- private static final Text METACOLUMNS[] = {
+ static final Text METACOLUMNS[] = {
COLUMN_FAMILY
};
static final String MASTER_NOT_RUNNING = "Master not running";
- private boolean rootScanned;
- private int numMetaRegions;
+ boolean rootScanned;
+ int numMetaRegions;
/**
* Base HRegion scanner class. Holds utilty common to <code>ROOT</code> and
@@ -146,116 +145,80 @@
* <p>A <code>META</code> region is not 'known' until it has been scanned
* once.
*/
- private abstract class BaseScanner implements Runnable {
+ abstract class BaseScanner implements Runnable {
private final Text FIRST_ROW = new Text();
+ /**
+ * @param region Region to scan
+ * @return True if scan completed.
+ * @throws IOException
+ */
protected boolean scanRegion(final MetaRegion region)
throws IOException {
boolean scannedRegion = false;
- HRegionInterface server = null;
+ HRegionInterface regionServer = null;
long scannerId = -1L;
if (LOG.isDebugEnabled()) {
- LOG.debug("scanning meta region " + region.regionName);
+ LOG.debug(Thread.currentThread().getName() + " scanning meta region " +
+ region.regionName);
}
try {
- server = client.getHRegionConnection(region.server);
- scannerId = server.openScanner(region.regionName, METACOLUMNS, FIRST_ROW);
-
- DataInputBuffer inbuf = new DataInputBuffer();
+ regionServer = client.getHRegionConnection(region.server);
+ scannerId = regionServer.openScanner(region.regionName, METACOLUMNS,
+ FIRST_ROW);
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
HStoreKey key = new HStoreKey();
-
- LabelledData[] values = server.next(scannerId, key);
-
+ LabelledData[] values = regionServer.next(scannerId, key);
if (values.length == 0) {
break;
}
-
+
for (int i = 0; i < values.length; i++) {
byte[] bytes = new byte[values[i].getData().getSize()];
- System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
+ System.arraycopy(values[i].getData().get(), 0, bytes, 0,
+ bytes.length);
results.put(values[i].getLabel(), bytes);
}
- HRegionInfo info = getRegionInfo(COL_REGIONINFO, results, inbuf);
- String serverName = getServerName(COL_SERVER, results);
- long startCode = getStartCode(COL_STARTCODE, results);
+ HRegionInfo info = HRegion.getRegionInfo(results);
+ String serverName = HRegion.getServerName(results);
+ long startCode = HRegion.getStartCode(results);
if(LOG.isDebugEnabled()) {
- LOG.debug("row: " + info.toString() + ", server: " + serverName
- + ", startCode: " + startCode);
+ LOG.debug(Thread.currentThread().getName() + " scanner: " +
+ Long.valueOf(scannerId) + " regioninfo: {" + info.toString() +
+ "}, server: " + serverName + ", startCode: " + startCode);
}
// Note Region has been assigned.
checkAssigned(info, serverName, startCode);
-
scannedRegion = true;
}
+ } catch (UnknownScannerException e) {
+ // Reset scannerId so we do not try closing a scanner the other side
+ // has lost account of: prevents duplicated stack trace out of the
+ // below close in the finally.
+ scannerId = -1L;
} finally {
try {
if (scannerId != -1L) {
- server.close(scannerId);
+ if (regionServer != null) {
+ regionServer.close(scannerId);
+ }
}
} catch (IOException e) {
LOG.error(e);
}
- scannerId = -1L;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("scan of meta region " + region.regionName + " complete");
+ LOG.debug(Thread.currentThread().getName() + " scan of meta region " +
+ region.regionName + " complete");
}
return scannedRegion;
}
- protected HRegionInfo getRegionInfo(final Text key,
- final TreeMap<Text, byte[]> data, final DataInputBuffer in)
- throws IOException {
- byte[] bytes = data.get(key);
- if (bytes == null || bytes.length == 0) {
- throw new IOException("no value for " + key);
- }
- in.reset(bytes, bytes.length);
- HRegionInfo info = new HRegionInfo();
- info.readFields(in);
- return info;
- }
-
- protected String getServerName(final Text key,
- final TreeMap<Text, byte[]> data) {
-
- byte [] bytes = data.get(key);
- String name = null;
- try {
- name = (bytes != null && bytes.length != 0) ?
- new String(bytes, UTF8_ENCODING): null;
-
- } catch(UnsupportedEncodingException e) {
- assert(false);
- }
- return (name != null)? name.trim(): name;
- }
-
- protected long getStartCode(final Text key,
- final TreeMap<Text, byte[]> data) {
-
- long startCode = -1L;
- byte [] bytes = data.get(key);
- if(bytes != null && bytes.length != 0) {
- try {
- startCode = Long.valueOf(new String(bytes, UTF8_ENCODING).trim());
-
- } catch(NumberFormatException e) {
- assert(false);
-
- } catch(UnsupportedEncodingException e) {
- assert(false);
- }
- }
- return startCode;
- }
-
protected void checkAssigned(final HRegionInfo info,
final String serverName, final long startCode) {
@@ -327,23 +290,17 @@
rootScanned = true;
}
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("RootScanner going to sleep");
- }
Thread.sleep(metaRescanInterval);
- if (LOG.isDebugEnabled()) {
- LOG.debug("RootScanner woke up");
- }
} catch(InterruptedException e) {
// Catch and go around again. If interrupt, its spurious or we're
// being shutdown. Go back up to the while test.
}
}
} catch(IOException e) {
- LOG.error(e);
+ LOG.error("ROOT scanner", e);
closed = true;
}
- LOG.debug("ROOT scanner exiting");
+ LOG.info("ROOT scanner exiting");
}
}
@@ -369,7 +326,6 @@
}
// Comparable
-
public int compareTo(Object o) {
MetaRegion other = (MetaRegion)o;
@@ -383,11 +339,11 @@
}
/** Work for the meta scanner is queued up here */
- private Vector<MetaRegion> metaRegionsToScan;
+ Vector<MetaRegion> metaRegionsToScan;
- private SortedMap<Text, MetaRegion> knownMetaRegions;
+ SortedMap<Text, MetaRegion> knownMetaRegions;
- private boolean allMetaRegionsScanned;
+ boolean allMetaRegionsScanned;
/**
* MetaScanner <code>META</code> table.
@@ -399,6 +355,7 @@
* action would prevent other work from getting done.
*/
private class MetaScanner extends BaseScanner {
+ @SuppressWarnings("null")
public void run() {
while (!closed) {
if (LOG.isDebugEnabled()) {
@@ -412,13 +369,7 @@
}
if (region == null) {
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("MetaScanner going into wait");
- }
metaRegionsToScan.wait();
- if (LOG.isDebugEnabled()) {
- LOG.debug("MetaScanner woke up");
- }
} catch (InterruptedException e) {
// Catch and go around again. We've been woken because there
// are new meta regions available or because we are being
@@ -445,13 +396,7 @@
do {
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleep for meta rescan interval");
- }
Thread.sleep(metaRescanInterval);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleep for meta rescan interval");
- }
} catch(InterruptedException ex) {
// Catch and go around again.
}
@@ -472,13 +417,11 @@
} while(true);
} catch(IOException e) {
- LOG.error(e);
+ LOG.error("META scanner", e);
closed = true;
}
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("META scanner exiting");
- }
+ LOG.info("META scanner exiting");
}
private synchronized void metaRegionsScanned() {
@@ -488,25 +431,17 @@
public synchronized void waitForMetaScan() {
while(!closed && !allMetaRegionsScanned) {
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Wait for all meta regions scanned");
- }
wait();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Wake from wait for all meta regions scanned");
- }
} catch(InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Wake from wait for all meta regions scanned (IE)");
- }
+ // continue
}
}
}
}
- private MetaScanner metaScanner;
+ MetaScanner metaScanner;
private Thread metaScannerThread;
- private Integer metaScannerLock = 0;
+ Integer metaScannerLock = 0;
// The 'unassignedRegions' table maps from a region name to a HRegionInfo record,
// which includes the region's table, its id, and its start/end keys.
@@ -514,31 +449,28 @@
// We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
// set of all known valid regions.
- private SortedMap<Text, HRegionInfo> unassignedRegions;
+ SortedMap<Text, HRegionInfo> unassignedRegions;
// The 'assignAttempts' table maps from regions to a timestamp that indicates
// the last time we *tried* to assign the region to a RegionServer. If the
// timestamp is out of date, then we can try to reassign it.
- private SortedMap<Text, Long> assignAttempts;
-
- // 'killList' indicates regions that we hope to close and not reopen
- // (because we're merging them, or taking the table offline, for example).
+ SortedMap<Text, Long> assignAttempts;
- private SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
+ SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
// 'killedRegions' contains regions that are in the process of being closed
- private SortedSet<Text> killedRegions;
+ SortedSet<Text> killedRegions;
// 'regionsToDelete' contains regions that need to be deleted, but cannot be
// until the region server closes it
- private SortedSet<Text> regionsToDelete;
+ SortedSet<Text> regionsToDelete;
// A map of known server names to server info
- private SortedMap<String, HServerInfo> serversToServerInfo =
+ SortedMap<String, HServerInfo> serversToServerInfo =
Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
/** Build the HMaster out of a raw configuration item. */
@@ -576,18 +508,16 @@
if(! fs.exists(rootRegionDir)) {
LOG.info("bootstrap: creating ROOT and first META regions");
try {
- HRegion root = HRegion.createNewHRegion(fs, dir, conf,
- HGlobals.rootTableDesc, 0L, null, null);
- HRegion meta = HRegion.createNewHRegion(fs, dir, conf,
- HGlobals.metaTableDesc, 1L, null, null);
-
- HRegion.addRegionToMeta(root, meta);
-
+ HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc,
+ this.dir, this.conf);
+ HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc,
+ this.dir, this.conf);
+ // Add first region from the META table to the ROOT region.
+ HRegion.addRegionToMETA(root, meta);
root.close();
root.getLog().close();
meta.close();
meta.getLog().close();
-
} catch(IOException e) {
LOG.error(e);
}
@@ -690,6 +620,7 @@
try {
msgQueue.wait(threadWakeFrequency);
} catch(InterruptedException iex) {
+ // continue
}
}
if(closed) {
@@ -751,9 +682,7 @@
LOG.warn(iex);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("HMaster main thread exiting");
- }
+ LOG.info("HMaster main thread exiting");
}
/**
@@ -1085,7 +1014,7 @@
}
}
}
- return (HMsg[]) returnMsgs.toArray(new HMsg[returnMsgs.size()]);
+ return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
}
private synchronized void rootRegionIsAvailable() {
@@ -1195,8 +1124,8 @@
long startCode = -1L;
try {
- startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
-
+ startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)).
+ longValue();
} catch(UnsupportedEncodingException e) {
LOG.error(e);
break;
@@ -1558,7 +1487,7 @@
}
}
- private synchronized void waitForRootRegion() {
+ synchronized void waitForRootRegion() {
while (rootRegionLocation == null) {
try {
if (LOG.isDebugEnabled()) {
@@ -1625,8 +1554,8 @@
// 2. Create the HRegion
- HRegion r = HRegion.createNewHRegion(fs, dir, conf, desc,
- newRegion.regionId, null, null);
+ HRegion r = HRegion.createHRegion(newRegion.regionId, desc, this.dir,
+ this.conf);
// 3. Insert into meta
@@ -1874,7 +1803,6 @@
protected boolean isBeingServed(String serverName, long startCode) {
boolean result = false;
-
if(serverName != null && startCode != -1L) {
HServerInfo s = serversToServerInfo.get(serverName);
result = s != null && s.getStartCode() == startCode;
@@ -1889,29 +1817,30 @@
protected abstract void processScanItem(String serverName, long startCode,
HRegionInfo info) throws IOException;
- protected abstract void postProcessMeta(MetaRegion m,
- HRegionInterface server) throws IOException;
+ protected abstract void postProcessMeta(MetaRegion m,
+ HRegionInterface server)
+ throws IOException;
}
private class ChangeTableState extends TableOperation {
private boolean online;
- protected TreeMap<String, TreeSet<HRegionInfo>> servedRegions;
+ protected TreeMap<String, TreeSet<HRegionInfo>> servedRegions =
+ new TreeMap<String, TreeSet<HRegionInfo>>();
protected long lockid;
protected long clientId;
public ChangeTableState(Text tableName, boolean onLine) throws IOException {
super(tableName);
this.online = onLine;
- this.servedRegions = new TreeMap<String, TreeSet<HRegionInfo>>();
}
protected void processScanItem(String serverName, long startCode,
- HRegionInfo info) throws IOException {
-
- if(isBeingServed(serverName, startCode)) {
+ HRegionInfo info)
+ throws IOException {
+ if (isBeingServed(serverName, startCode)) {
TreeSet<HRegionInfo> regions = servedRegions.get(serverName);
- if(regions == null) {
+ if (regions == null) {
regions = new TreeSet<HRegionInfo>();
}
regions.add(info);
@@ -1921,16 +1850,12 @@
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
-
// Process regions not being served
-
if(LOG.isDebugEnabled()) {
LOG.debug("processing unserved regions");
}
for(HRegionInfo i: unservedRegions) {
-
// Update meta table
-
if(LOG.isDebugEnabled()) {
LOG.debug("updating columns in row: " + i.regionName);
}
@@ -1986,13 +1911,12 @@
}
for(Map.Entry<String, TreeSet<HRegionInfo>> e: servedRegions.entrySet()) {
String serverName = e.getKey();
-
- if(online) {
+ if (online) {
+ LOG.debug("Already online");
continue; // Already being served
}
- // Cause regions being served to be take off-line and disabled
-
+ // Cause regions being served to be taken off-line and disabled
TreeMap<Text, HRegionInfo> localKillList = killList.get(serverName);
if(localKillList == null) {
localKillList = new TreeMap<Text, HRegionInfo>();
@@ -2003,9 +1927,10 @@
}
localKillList.put(i.regionName, i);
}
- if(localKillList != null && localKillList.size() > 0) {
+ if(localKillList.size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug("inserted local kill list into kill list for server " + serverName);
+ LOG.debug("inserted local kill list into kill list for server " +
+ serverName);
}
killList.put(serverName, localKillList);
}
@@ -2036,23 +1961,18 @@
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
-
- // For regions that are being served, mark them for deletion
-
- for(TreeSet<HRegionInfo> s: servedRegions.values()) {
- for(HRegionInfo i: s) {
+ // For regions that are being served, mark them for deletion
+ for (TreeSet<HRegionInfo> s: servedRegions.values()) {
+ for (HRegionInfo i: s) {
regionsToDelete.add(i.regionName);
}
}
// Unserved regions we can delete now
-
- for(HRegionInfo i: unservedRegions) {
+ for (HRegionInfo i: unservedRegions) {
// Delete the region
-
try {
HRegion.deleteRegion(fs, dir, i.regionName);
-
} catch(IOException e) {
LOG.error("failed to delete region " + i.regionName);
LOG.error(e);
@@ -2062,22 +1982,23 @@
}
@Override
- protected void updateRegionInfo(HRegionInterface server, Text regionName,
- HRegionInfo i) throws IOException {
-
+ protected void updateRegionInfo(
+ @SuppressWarnings("hiding") HRegionInterface server, Text regionName,
+ @SuppressWarnings("unused") HRegionInfo i)
+ throws IOException {
server.delete(regionName, clientId, lockid, COL_REGIONINFO);
}
}
private abstract class ColumnOperation extends TableOperation {
-
protected ColumnOperation(Text tableName) throws IOException {
super(tableName);
}
- protected void processScanItem(String serverName, long startCode,
- HRegionInfo info) throws IOException {
-
+ protected void processScanItem(
+ @SuppressWarnings("unused") String serverName,
+ @SuppressWarnings("unused") long startCode, final HRegionInfo info)
+ throws IOException {
if(isEnabled(info)) {
throw new TableNotDisabledException(tableName.toString());
}
@@ -2196,10 +2117,7 @@
}
public void leaseExpired() {
- if(LOG.isDebugEnabled()) {
- LOG.debug(server + " lease expired");
- }
-
+ LOG.info(server + " lease expired");
HServerInfo storedInfo = serversToServerInfo.remove(server);
synchronized(msgQueue) {
msgQueue.add(new PendingServerShutdown(storedInfo));
@@ -2218,7 +2136,7 @@
System.exit(0);
}
- public static void main(String [] args) throws IOException {
+ public static void main(String [] args) {
if (args.length < 1) {
printUsageAndExit();
}
@@ -2261,4 +2179,4 @@
printUsageAndExit();
}
}
-}
\ No newline at end of file
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Mon Jun 4 10:14:10 2007
@@ -27,12 +27,12 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-/*******************************************************************************
+/**
* The HMemcache holds in-memory modifications to the HRegion. This is really a
* wrapper around a TreeMap that helps us when staging the Memcache out to disk.
- ******************************************************************************/
+ */
public class HMemcache {
- private static final Log LOG = LogFactory.getLog(HMemcache.class);
+ private final Log LOG = LogFactory.getLog(this.getClass().getName());
TreeMap<HStoreKey, BytesWritable> memcache
= new TreeMap<HStoreKey, BytesWritable>();
@@ -42,7 +42,7 @@
TreeMap<HStoreKey, BytesWritable> snapshot = null;
- private final HLocking lock = new HLocking();
+ final HLocking lock = new HLocking();
public HMemcache() {
super();
@@ -147,7 +147,8 @@
*
* Operation uses a write lock.
*/
- public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) {
+ public void add(final Text row, final TreeMap<Text, BytesWritable> columns,
+ final long timestamp) {
this.lock.obtainWriteLock();
try {
for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) {
@@ -239,7 +240,6 @@
Vector<BytesWritable> result = new Vector<BytesWritable>();
HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(curKey);
-
for (Map.Entry<HStoreKey, BytesWritable> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
@@ -257,9 +257,9 @@
/**
* Return a scanner over the keys in the HMemcache
*/
- public HInternalScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
- throws IOException {
-
+ public HInternalScannerInterface getScanner(long timestamp,
+ Text targetCols[], Text firstRow)
+ throws IOException {
return new HMemcacheScanner(timestamp, targetCols, firstRow);
}
@@ -295,16 +295,11 @@
this.vals = new BytesWritable[backingMaps.length];
// Generate list of iterators
-
HStoreKey firstKey = new HStoreKey(firstRow);
for(int i = 0; i < backingMaps.length; i++) {
- if(firstRow.getLength() != 0) {
- keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator();
-
- } else {
- keyIterators[i] = backingMaps[i].keySet().iterator();
- }
-
+ keyIterators[i] = (firstRow.getLength() != 0)?
+ backingMaps[i].tailMap(firstKey).keySet().iterator():
+ backingMaps[i].keySet().iterator();
while(getNext(i)) {
if(! findFirstRow(i, firstRow)) {
continue;
@@ -314,7 +309,6 @@
}
}
}
-
} catch(IOException ex) {
LOG.error(ex);
close();
@@ -326,9 +320,9 @@
* The user didn't want to start scanning at the first row. This method
* seeks to the requested row.
*
- * @param i - which iterator to advance
- * @param firstRow - seek to this row
- * @return - true if this is the first row
+ * @param i which iterator to advance
+ * @param firstRow seek to this row
+ * @return true if this is the first row
*/
boolean findFirstRow(int i, Text firstRow) {
return ((firstRow.getLength() == 0)
@@ -338,11 +332,11 @@
/**
* Get the next value from the specified iterater.
*
- * @param i - which iterator to fetch next value from
- * @return - true if there is more data available
+ * @param i Which iterator to fetch next value from
+ * @return true if there is more data available
*/
boolean getNext(int i) {
- if(! keyIterators[i].hasNext()) {
+ if (!keyIterators[i].hasNext()) {
closeSubScanner(i);
return false;
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java?view=diff&rev=544188&r1=544187&r2=544188
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java Mon Jun 4 10:14:10 2007
@@ -146,7 +146,9 @@
nextSize = nextRegion.largestHStore();
- if((currentSize + nextSize) <= (DESIRED_MAX_FILE_SIZE / 2)) {
+ long maxFilesize =
+ conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
+ if((currentSize + nextSize) <= (maxFilesize / 2)) {
// We merge two adjacent regions if their total size is less than
// one half of the desired maximum size