You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/03 22:34:30 UTC
svn commit: r525267 [2/5] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/
src/contrib/hbase/src/ src/contrib/hbase/src/java/
src/contrib/hbase/src/java/org/ src/contrib/hbase/src/java/org/apache/
src/contrib/hbase/src/java/org/apache/hadoop/ src/contr...
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,1275 @@
+/**
+ * Copyright 2006-7 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.ipc.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HMaster is the "master server" for a HBase.
+ * There is only one HMaster for a single HBase deployment.
+ ******************************************************************************/
+public class HMaster extends HGlobals
+ implements HConstants, HMasterInterface, HMasterRegionInterface {
+
+ private boolean closed;
+ private Path dir;
+ private Configuration conf;
+ private FileSystem fs;
+ private Random rand;
+ private long maxRegionOpenTime;
+
+ // The 'msgQueue' is used to assign work to the client processor thread
+
+ private Vector<PendingOperation> msgQueue;
+
+ private Leases serverLeases;
+ private Server server;
+
+ private HClient client;
+
+ private long metaRescanInterval;
+
+ private HServerAddress rootRegionLocation;
+
+ //////////////////////////////////////////////////////////////////////////////
+ // The ROOT/META scans, performed each time a meta region comes on-line
+ // Since both have identical structure, we only need one class to get the work
+ // done, however we have to create separate objects for each.
+ //////////////////////////////////////////////////////////////////////////////
+
+ private boolean rootScanned;
+ private int numMetaRegions;
+
+ /**
+ * How do we know if all regions are assigned?
+ *
+ * 1. After the initial scan of the root and meta regions, all regions known
+ * at that time will have been or are in the process of being assigned.
+ *
+ * 2. When a region is split the region server notifies the master of the split
+ * and the new regions are assigned. But suppose the master loses the split
+ * message? We need to periodically rescan the root and meta regions.
+ *
+ * - If we rescan, any regions that are new but not assigned will have no
+ * server info. Any regions that are not being served by the same server
+ * will get re-assigned.
+ *
+ * - Thus a periodic rescan of the root region will find any new meta
+ * regions where we missed the meta split message or we failed to detect
+ * a server death and consequently need to assign the region to a new
+ * server.
+ *
+ * - if we keep track of all the known meta regions, then we can rescan
+ * them periodically. If we do this then we can detect an regions for
+ * which we missed a region split message.
+ *
+ * Thus just keeping track of all the meta regions permits periodic rescanning
+ * which will detect unassigned regions (new or otherwise) without the need to
+ * keep track of every region.
+ *
+ * So the root region scanner needs to wake up
+ * 1. when the master receives notification that the root region has been opened.
+ * 2. periodically after the first scan
+ *
+ * The meta scanner needs to wake up:
+ * 1. when a meta region comes on line
+ * 2. periodically to rescan the known meta regions
+ *
+ * A meta region is not 'known' until it has been scanned once.
+ *
+ */
+ private class RootScanner implements Runnable {
+ public RootScanner() {
+ }
+
+ public void run() {
+ Text cols[] = {
+ ROOT_COLUMN_FAMILY
+ };
+ Text firstRow = new Text();
+
+ while((! closed)) {
+ int metaRegions = 0;
+ while(rootRegionLocation == null) {
+ try {
+ rootRegionLocation.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+
+ HRegionInterface server = null;
+ HScannerInterface scanner = null;
+
+ try {
+ server = client.getHRegionConnection(rootRegionLocation);
+ scanner = server.openScanner(rootRegionInfo.regionName, cols, firstRow);
+
+ } catch(IOException iex) {
+ try {
+ close();
+
+ } catch(IOException iex2) {
+ }
+ break;
+ }
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ DataInputBuffer inbuf = new DataInputBuffer();
+
+ while(scanner.next(key, results)) {
+ byte hRegionInfoBytes[] = results.get(ROOT_COL_REGIONINFO);
+ inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+ HRegionInfo info = new HRegionInfo();
+ info.readFields(inbuf);
+
+ byte serverBytes[] = results.get(ROOT_COL_SERVER);
+ String serverName = new String(serverBytes, UTF8_ENCODING);
+
+ byte startCodeBytes[] = results.get(ROOT_COL_STARTCODE);
+ long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
+
+ // Note META region to load.
+
+ HServerInfo storedInfo = null;
+ synchronized(serversToServerInfo) {
+ storedInfo = serversToServerInfo.get(serverName);
+ if(storedInfo == null
+ || storedInfo.getStartCode() != startCode) {
+
+ // The current assignment is no good; load the region.
+
+ synchronized(unassignedRegions) {
+ unassignedRegions.put(info.regionName, info);
+ assignAttempts.put(info.regionName, 0L);
+ }
+ }
+ }
+ results.clear();
+ metaRegions += 1;
+ }
+
+ } catch(Exception iex) {
+ } finally {
+ try {
+ scanner.close();
+
+ } catch(IOException iex2) {
+ }
+ }
+ rootScanned = true;
+ numMetaRegions = metaRegions;
+ try {
+ Thread.sleep(metaRescanInterval);
+
+ } catch(InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ private RootScanner rootScanner;
+ private Thread rootScannerThread;
+
+ /** Contains information the meta scanner needs to process a "new" meta region */
+ private class MetaRegion {
+ public HServerAddress server;
+ public Text regionName;
+ public Text startKey;
+ }
+
+ /** Work for the meta scanner is queued up here */
+ private Vector<MetaRegion> metaRegionsToScan;
+
+ private TreeMap<Text, MetaRegion> knownMetaRegions;
+ private Boolean allMetaRegionsScanned;
+
+ /**
+ * MetaScanner scans a region either in the META table.
+ *
+ * When a meta server comes on line, a MetaRegion object is queued up by
+ * regionServerReport() and this thread wakes up.
+ *
+ * It's important to do this work in a separate thread, or else the blocking
+ * action would prevent other work from getting done.
+ */
+ private class MetaScanner implements Runnable {
+ private final Text cols[] = {
+ META_COLUMN_FAMILY
+ };
+ private final Text firstRow = new Text();
+
+ public MetaScanner() {
+ }
+
+ private void scanRegion(MetaRegion region) {
+ HRegionInterface server = null;
+ HScannerInterface scanner = null;
+
+ try {
+ server = client.getHRegionConnection(region.server);
+ scanner = server.openScanner(region.regionName, cols, firstRow);
+
+ } catch(IOException iex) {
+ try {
+ close();
+
+ } catch(IOException iex2) {
+ }
+ return;
+ }
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ DataInputBuffer inbuf = new DataInputBuffer();
+
+ while(scanner.next(key, results)) {
+ byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
+ inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+ HRegionInfo info = new HRegionInfo();
+ info.readFields(inbuf);
+
+ byte serverBytes[] = results.get(META_COL_SERVER);
+ String serverName = new String(serverBytes, UTF8_ENCODING);
+
+ byte startCodeBytes[] = results.get(META_COL_STARTCODE);
+ long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
+
+ // Note HRegion to load.
+
+ HServerInfo storedInfo = null;
+ synchronized(serversToServerInfo) {
+ storedInfo = serversToServerInfo.get(serverName);
+ if(storedInfo == null
+ || storedInfo.getStartCode() != startCode) {
+
+ // The current assignment is no good; load the region.
+
+ synchronized(unassignedRegions) {
+ unassignedRegions.put(info.regionName, info);
+ assignAttempts.put(info.regionName, 0L);
+ }
+ }
+ }
+ results.clear();
+ }
+ } catch(Exception iex) {
+ } finally {
+ try {
+ scanner.close();
+
+ } catch(IOException iex2) {
+ }
+ }
+ }
+
+ public void run() {
+ while((! closed)) {
+ MetaRegion region = null;
+
+ while(region == null) {
+ synchronized(metaRegionsToScan) {
+ if(metaRegionsToScan.size() != 0) {
+ region = metaRegionsToScan.remove(0);
+ }
+ }
+ if(region == null) {
+ try {
+ metaRegionsToScan.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+ }
+
+ scanRegion(region);
+
+ synchronized(knownMetaRegions) {
+ knownMetaRegions.put(region.startKey, region);
+ if(rootScanned && knownMetaRegions.size() == numMetaRegions) {
+ allMetaRegionsScanned = true;
+ allMetaRegionsScanned.notifyAll();
+ }
+ }
+
+ do {
+ try {
+ Thread.sleep(metaRescanInterval);
+
+ } catch(InterruptedException ex) {
+ }
+ if(! allMetaRegionsScanned) {
+ break; // A region must have split
+ }
+
+ // Rescan the known meta regions every so often
+
+ Vector<MetaRegion> v = new Vector<MetaRegion>();
+ v.addAll(knownMetaRegions.values());
+
+ for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) {
+ scanRegion(i.next());
+ }
+ } while(true);
+ }
+ }
+ }
+
+ private MetaScanner metaScanner;
+ private Thread metaScannerThread;
+
+ // Client processing
+
+ private ClientProcessor clientProcessor;
+ private Thread clientProcessorThread;
+
+ // The 'unassignedRegions' table maps from a region name to a HRegionInfo record,
+ // which includes the region's table, its id, and its start/end keys.
+ //
+ // We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
+ // set of all known valid regions.
+
+ private TreeMap<Text, HRegionInfo> unassignedRegions;
+
+ // The 'assignAttempts' table maps from regions to a timestamp that indicates
+ // the last time we *tried* to assign the region to a RegionServer. If the
+ // timestamp is out of date, then we can try to reassign it.
+
+ private TreeMap<Text, Long> assignAttempts;
+
+ // 'killList' indicates regions that we hope to close and then never reopen
+ // (because we're merging them, say).
+
+ private TreeMap<String, TreeMap<Text, HRegionInfo>> killList;
+
+ // 'serversToServerInfo' maps from the String to its HServerInfo
+
+ private TreeMap<String, HServerInfo> serversToServerInfo;
+
+ /** Build the HMaster out of a raw configuration item. */
+ public HMaster(Configuration conf) throws IOException {
+ this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
+ new HServerAddress(conf.get(MASTER_DEFAULT_NAME)),
+ conf);
+ }
+
+ /**
+ * Build the HMaster
+ * @param dir - base directory
+ * @param address - server address and port number
+ * @param conf - configuration
+ *
+ * @throws IOException
+ */
+ public HMaster(Path dir, HServerAddress address, Configuration conf) throws IOException {
+ this.closed = true;
+ this.dir = dir;
+ this.conf = conf;
+ this.fs = FileSystem.get(conf);
+ this.rand = new Random();
+
+ // Make sure the root directory exists!
+
+ if(! fs.exists(dir)) {
+ fs.mkdirs(dir);
+ }
+
+ Path rootRegionDir = HStoreFile.getHRegionDir(dir, rootRegionInfo.regionName);
+ if(! fs.exists(rootRegionDir)) {
+
+ // Bootstrap! Need to create the root region and the first meta region.
+ //TODO is the root region self referential?
+
+ HRegion root = createNewHRegion(rootTableDesc, 0L);
+ HRegion meta = createNewHRegion(metaTableDesc, 1L);
+
+ addTableToMeta(root, meta);
+ }
+
+ this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
+ this.msgQueue = new Vector<PendingOperation>();
+ this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 15 * 1000),
+ conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+ this.server = RPC.getServer(this, address.getBindAddress(),
+ address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+ this.client = new HClient(conf);
+
+ this.metaRescanInterval
+ = conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
+
+ // The root region
+
+ this.rootRegionLocation = null;
+ this.rootScanned = false;
+ this.rootScanner = new RootScanner();
+ this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner");
+
+ // Scans the meta table
+
+ this.numMetaRegions = 0;
+ this.metaRegionsToScan = new Vector<MetaRegion>();
+ this.knownMetaRegions = new TreeMap<Text, MetaRegion>();
+ this.allMetaRegionsScanned = new Boolean(false);
+ this.metaScanner = new MetaScanner();
+ this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
+
+ // Process updates to meta asychronously
+
+ this.clientProcessor = new ClientProcessor();
+ this.clientProcessorThread = new Thread(clientProcessor, "HMaster.clientProcessor");
+
+ this.unassignedRegions = new TreeMap<Text, HRegionInfo>();
+ this.unassignedRegions.put(rootRegionInfo.regionName, rootRegionInfo);
+
+ this.assignAttempts = new TreeMap<Text, Long>();
+ this.assignAttempts.put(rootRegionInfo.regionName, 0L);
+
+ this.killList = new TreeMap<String, TreeMap<Text, HRegionInfo>>();
+ this.serversToServerInfo = new TreeMap<String, HServerInfo>();
+
+ // We're almost open for business
+
+ this.closed = false;
+
+ try {
+ // Start things up
+
+ this.rootScannerThread.start();
+ this.metaScannerThread.start();
+ this.clientProcessorThread.start();
+
+ // Start the server last so everything else is running before we start
+ // receiving requests
+
+ this.server.start();
+
+ } catch(IOException e) {
+ // Something happend during startup. Shut things down.
+
+ this.closed = true;
+ throw e;
+ }
+ }
+
+ /** Turn off the HMaster. Turn off all the threads, close files, etc. */
+ public void close() throws IOException {
+ closed = true;
+
+ try {
+ client.close();
+
+ } catch(IOException iex) {
+ }
+
+ try {
+ server.stop();
+ server.join();
+
+ } catch(InterruptedException iex) {
+ }
+ try {
+ clientProcessorThread.join();
+
+ } catch(Exception iex) {
+ }
+ try {
+ metaScannerThread.join();
+
+ } catch(Exception iex) {
+ }
+ try {
+ rootScannerThread.join();
+
+ } catch(Exception iex) {
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // HMasterRegionInterface
+ //////////////////////////////////////////////////////////////////////////////
+
+ /** HRegionServers call this method upon startup. */
+ public void regionServerStartup(HServerInfo serverInfo) throws IOException {
+ String server = serverInfo.getServerAddress().toString();
+ HServerInfo storedInfo = null;
+
+ // If we get the startup message but there's an old server by that
+ // name, then we can timeout the old one right away and register
+ // the new one.
+
+ synchronized(serversToServerInfo) {
+ storedInfo = serversToServerInfo.get(server);
+
+ if(storedInfo != null) {
+ serversToServerInfo.remove(server);
+
+ synchronized(msgQueue) {
+ msgQueue.add(new PendingServerShutdown(storedInfo));
+ msgQueue.notifyAll();
+ }
+
+ }
+
+ // Either way, record the new server
+
+ serversToServerInfo.put(server, serverInfo);
+
+
+ Text serverLabel = new Text(server);
+ serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server));
+ }
+ }
+
+ /** HRegionServers call this method repeatedly. */
+ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException {
+ String server = serverInfo.getServerAddress().toString();
+
+ synchronized(serversToServerInfo) {
+ HServerInfo storedInfo = serversToServerInfo.get(server);
+
+ if(storedInfo == null) {
+
+ // The HBaseMaster may have been restarted.
+ // Tell the RegionServer to start over and call regionServerStartup()
+
+ HMsg returnMsgs[] = new HMsg[1];
+ returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP);
+ return returnMsgs;
+
+ } else if(storedInfo.getStartCode() != serverInfo.getStartCode()) {
+
+ // This state is reachable if:
+ //
+ // 1) RegionServer A started
+ // 2) RegionServer B started on the same machine, then
+ // clobbered A in regionServerStartup.
+ // 3) RegionServer A returns, expecting to work as usual.
+ //
+ // The answer is to ask A to shut down for good.
+
+ HMsg returnMsgs[] = new HMsg[1];
+ returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING);
+ return returnMsgs;
+
+ } else {
+
+ // All's well. Renew the server's lease.
+ // This will always succeed; otherwise, the fetch of serversToServerInfo
+ // would have failed above.
+
+ Text serverLabel = new Text(server);
+ serverLeases.renewLease(serverLabel, serverLabel);
+
+ // Refresh the info object
+ serversToServerInfo.put(server, serverInfo);
+
+ // Next, process messages for this server
+ return processMsgs(serverInfo, msgs);
+ }
+ }
+ }
+
+ /** Process all the incoming messages from a server that's contacted us. */
+ HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException {
+ Vector<HMsg> returnMsgs = new Vector<HMsg>();
+
+ // Process the kill list
+
+ TreeMap<Text, HRegionInfo> regionsToKill = killList.get(info.toString());
+ if(regionsToKill != null) {
+ for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
+ i.hasNext(); ) {
+
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_AND_DELETE, i.next()));
+ }
+ }
+
+ // Get reports on what the RegionServer did.
+
+ synchronized(unassignedRegions) {
+ for(int i = 0; i < incomingMsgs.length; i++) {
+ HRegionInfo region = incomingMsgs[i].getRegionInfo();
+
+ switch(incomingMsgs[i].getMsg()) {
+
+ case HMsg.MSG_REPORT_OPEN:
+ HRegionInfo regionInfo = unassignedRegions.get(region.regionName);
+
+ if(regionInfo == null) {
+
+ // This Region should not have been opened.
+ // Ask the server to shut it down, but don't report it as closed.
+ // Otherwise the HMaster will think the Region was closed on purpose,
+ // and then try to reopen it elsewhere; that's not what we want.
+
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region));
+
+ } else {
+
+ // Remove from unassigned list so we don't assign it to someone else
+
+ unassignedRegions.remove(region.regionName);
+ assignAttempts.remove(region.regionName);
+
+ if(region.regionName.compareTo(rootRegionInfo.regionName) == 0) {
+
+ // Store the Root Region location (in memory)
+
+ rootRegionLocation = new HServerAddress(info.getServerAddress());
+
+ // Wake up the root scanner
+
+ rootRegionLocation.notifyAll();
+ break;
+
+ } else if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
+
+ // It's a meta region. Put it on the queue to be scanned.
+
+ MetaRegion r = new MetaRegion();
+ r.server = info.getServerAddress();
+ r.regionName = region.regionName;
+ r.startKey = region.startKey;
+
+ synchronized(metaRegionsToScan) {
+ metaRegionsToScan.add(r);
+ metaRegionsToScan.notifyAll();
+ }
+ }
+
+ // Queue up an update to note the region location.
+
+ synchronized(msgQueue) {
+ msgQueue.add(new PendingOpenReport(info, region.regionName));
+ msgQueue.notifyAll();
+ }
+ }
+ break;
+
+ case HMsg.MSG_REPORT_CLOSE:
+ if(region.regionName.compareTo(rootRegionInfo.regionName) == 0) { // Root region
+ rootRegionLocation = null;
+ unassignedRegions.put(region.regionName, region);
+ assignAttempts.put(region.regionName, 0L);
+
+ } else {
+ boolean reassignRegion = true;
+
+ if(regionsToKill.containsKey(region.regionName)) {
+ regionsToKill.remove(region.regionName);
+
+ if(regionsToKill.size() > 0) {
+ killList.put(info.toString(), regionsToKill);
+
+ } else {
+ killList.remove(info.toString());
+ }
+ reassignRegion = false;
+ }
+
+ synchronized(msgQueue) {
+ msgQueue.add(new PendingCloseReport(region, reassignRegion));
+ msgQueue.notifyAll();
+ }
+
+ // NOTE: we cannot put the region into unassignedRegions as that
+ // could create a race with the pending close if it gets
+ // reassigned before the close is processed.
+
+ }
+ break;
+
+ case HMsg.MSG_NEW_REGION:
+ if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
+ // A meta region has split.
+
+ allMetaRegionsScanned = false;
+ }
+ synchronized(unassignedRegions) {
+ unassignedRegions.put(region.regionName, region);
+ assignAttempts.put(region.regionName, 0L);
+ }
+ break;
+
+ default:
+ throw new IOException("Impossible state during msg processing. Instruction: "
+ + incomingMsgs[i].getMsg());
+ }
+ }
+
+ // Figure out what the RegionServer ought to do, and write back.
+
+ if(unassignedRegions.size() > 0) {
+
+ // Open new regions as necessary
+
+ int targetForServer = (int) Math.ceil(unassignedRegions.size()
+ / (1.0 * serversToServerInfo.size()));
+
+ int counter = 0;
+ long now = System.currentTimeMillis();
+
+ for(Iterator<Text> it = unassignedRegions.keySet().iterator();
+ it.hasNext(); ) {
+
+ Text curRegionName = it.next();
+ HRegionInfo regionInfo = unassignedRegions.get(curRegionName);
+ long assignedTime = assignAttempts.get(curRegionName);
+
+ if(now - assignedTime > maxRegionOpenTime) {
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
+
+ assignAttempts.put(curRegionName, now);
+ counter++;
+ }
+
+ if(counter >= targetForServer) {
+ break;
+ }
+ }
+ }
+ }
+ return (HMsg[]) returnMsgs.toArray(new HMsg[returnMsgs.size()]);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Some internal classes to manage msg-passing and client operations
+ //////////////////////////////////////////////////////////////////////////////
+
+ private class ClientProcessor implements Runnable {
+ public ClientProcessor() {
+ }
+
+ public void run() {
+ while(! closed) {
+ PendingOperation op = null;
+
+ synchronized(msgQueue) {
+ while(msgQueue.size() == 0) {
+ try {
+ msgQueue.wait();
+
+ } catch(InterruptedException iex) {
+ }
+ }
+ op = msgQueue.elementAt(msgQueue.size()-1);
+ msgQueue.removeElementAt(msgQueue.size()-1);
+ }
+ try {
+ op.process();
+
+ } catch(Exception ex) {
+ synchronized(msgQueue) {
+ msgQueue.insertElementAt(op, 0);
+ }
+ }
+ }
+ }
+ }
+
+ abstract class PendingOperation {
+ protected final Text[] columns = {
+ META_COLUMN_FAMILY
+ };
+ protected final Text startRow = new Text();
+ protected long clientId;
+
+ public PendingOperation() {
+ this.clientId = rand.nextLong();
+ }
+
+ public abstract void process() throws IOException;
+ }
+
+ class PendingServerShutdown extends PendingOperation {
+ String deadServer;
+ long oldStartCode;
+
+ public PendingServerShutdown(HServerInfo serverInfo) {
+ super();
+ this.deadServer = serverInfo.getServerAddress().toString();
+ this.oldStartCode = serverInfo.getStartCode();
+ }
+
+ private void scanMetaRegion(HRegionInterface server, HScannerInterface scanner,
+ Text regionName) throws IOException {
+
+ Vector<HStoreKey> toDoList = new Vector<HStoreKey>();
+ TreeMap<Text, HRegionInfo> regions = new TreeMap<Text, HRegionInfo>();
+
+ DataInputBuffer inbuf = new DataInputBuffer();
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+
+ while(scanner.next(key, results)) {
+ byte serverBytes[] = results.get(META_COL_SERVER);
+ String serverName = new String(serverBytes, UTF8_ENCODING);
+
+ if(deadServer.compareTo(serverName) != 0) {
+ // This isn't the server you're looking for - move along
+ continue;
+ }
+
+ byte startCodeBytes[] = results.get(META_COL_STARTCODE);
+ long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
+
+ if(oldStartCode != startCode) {
+ // Close but no cigar
+ continue;
+ }
+
+ // Bingo! Found it.
+
+ byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
+ inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+ HRegionInfo info = new HRegionInfo();
+ info.readFields(inbuf);
+
+ // Add to our to do lists
+
+ toDoList.add(key);
+ regions.put(info.regionName, info);
+ }
+
+ } finally {
+ scanner.close();
+ }
+
+ // Remove server from root/meta entries
+
+ for(int i = 0; i < toDoList.size(); i++) {
+ long lockid = server.startUpdate(regionName, clientId, toDoList.get(i).getRow());
+ server.delete(regionName, clientId, lockid, META_COL_SERVER);
+ server.delete(regionName, clientId, lockid, META_COL_STARTCODE);
+ server.commit(regionName, clientId, lockid);
+ }
+
+ // Put all the regions we found on the unassigned region list
+
+ for(Iterator<Map.Entry<Text, HRegionInfo>> i = regions.entrySet().iterator();
+ i.hasNext(); ) {
+
+ Map.Entry<Text, HRegionInfo> e = i.next();
+ Text region = e.getKey();
+ HRegionInfo regionInfo = e.getValue();
+
+ synchronized(unassignedRegions) {
+ unassignedRegions.put(region, regionInfo);
+ assignAttempts.put(region, 0L);
+ }
+ }
+ }
+
+ public void process() throws IOException {
+
+ // We can not scan every meta region if they have not already been assigned
+ // and scanned.
+
+ while(!allMetaRegionsScanned) {
+ try {
+ allMetaRegionsScanned.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+
+ // First scan the ROOT region
+
+ HRegionInterface server = client.getHRegionConnection(rootRegionLocation);
+ HScannerInterface scanner = server.openScanner(rootRegionInfo.regionName,
+ columns, startRow);
+
+ scanMetaRegion(server, scanner, rootRegionInfo.regionName);
+ for(Iterator<MetaRegion> i = knownMetaRegions.values().iterator();
+ i.hasNext(); ) {
+
+ MetaRegion r = i.next();
+
+ server = client.getHRegionConnection(r.server);
+ scanner = server.openScanner(r.regionName, columns, startRow);
+ scanMetaRegion(server, scanner, r.regionName);
+ }
+ }
+ }
+
+ /** PendingCloseReport is a close message that is saved in a different thread. */
+ class PendingCloseReport extends PendingOperation {
+ HRegionInfo regionInfo;
+ boolean reassignRegion;
+ boolean rootRegion;
+
+ public PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion) {
+ super();
+
+ this.regionInfo = regionInfo;
+ this.reassignRegion = reassignRegion;
+
+ // If the region closing down is a meta region then we need to update
+ // the ROOT table
+
+ if(this.regionInfo.regionName.find(metaTableDesc.getName().toString()) == 0) {
+ this.rootRegion = true;
+
+ } else {
+ this.rootRegion = false;
+ }
+ }
+
+ public void process() throws IOException {
+
+ // We can not access any meta region if they have not already been assigned
+ // and scanned.
+
+ while(!allMetaRegionsScanned) {
+ try {
+ allMetaRegionsScanned.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+
+ // Mark the Region as unavailable in the appropriate meta table
+
+ Text metaRegionName;
+ HRegionInterface server;
+ if(rootRegion) {
+ metaRegionName = rootRegionInfo.regionName;
+ server = client.getHRegionConnection(rootRegionLocation);
+
+ } else {
+ Text metaStartRow = knownMetaRegions.headMap(regionInfo.regionName).lastKey();
+ MetaRegion r = knownMetaRegions.get(metaStartRow);
+ metaRegionName = r.regionName;
+ server = client.getHRegionConnection(r.server);
+ }
+ long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName);
+ server.delete(metaRegionName, clientId, lockid, META_COL_SERVER);
+ server.delete(metaRegionName, clientId, lockid, META_COL_STARTCODE);
+ server.commit(metaRegionName, clientId, lockid);
+
+ if(reassignRegion) {
+ synchronized(unassignedRegions) {
+ unassignedRegions.put(regionInfo.regionName, regionInfo);
+ assignAttempts.put(regionInfo.regionName, 0L);
+ }
+ }
+ }
+ }
+
+ /** PendingOpenReport is an open message that is saved in a different thread. */
+ class PendingOpenReport extends PendingOperation {
+ boolean rootRegion;
+ Text regionName;
+ BytesWritable serverAddress;
+ BytesWritable startCode;
+
+ public PendingOpenReport(HServerInfo info, Text regionName) {
+ if(regionName.find(metaTableDesc.getName().toString()) == 0) {
+
+ // The region which just came on-line is a META region.
+ // We need to look in the ROOT region for its information.
+
+ this.rootRegion = true;
+
+ } else {
+
+ // Just an ordinary region. Look for it in the META table.
+
+ this.rootRegion = false;
+ }
+ this.regionName = regionName;
+
+ try {
+ this.serverAddress = new BytesWritable(
+ info.getServerAddress().toString().getBytes(UTF8_ENCODING));
+
+ this.startCode = new BytesWritable(
+ String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
+
+ } catch(UnsupportedEncodingException e) {
+ }
+
+ }
+
+ public void process() throws IOException {
+
+ // We can not access any meta region if they have not already been assigned
+ // and scanned.
+
+ while(!allMetaRegionsScanned) {
+ try {
+ allMetaRegionsScanned.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+
+ // Register the newly-available Region's location.
+
+ Text metaRegionName;
+ HRegionInterface server;
+ if(rootRegion) {
+ metaRegionName = rootRegionInfo.regionName;
+ server = client.getHRegionConnection(rootRegionLocation);
+
+ } else {
+ Text metaStartRow = knownMetaRegions.headMap(regionName).lastKey();
+ MetaRegion r = knownMetaRegions.get(metaStartRow);
+ metaRegionName = r.regionName;
+ server = client.getHRegionConnection(r.server);
+ }
+ long lockid = server.startUpdate(metaRegionName, clientId, regionName);
+ server.put(metaRegionName, clientId, lockid, META_COL_SERVER, serverAddress);
+ server.put(metaRegionName, clientId, lockid, META_COL_STARTCODE, startCode);
+ server.commit(metaRegionName, clientId, lockid);
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // HMasterInterface
+ //////////////////////////////////////////////////////////////////////////////
+
+ public void createTable(HTableDescriptor desc) throws IOException {
+ HRegionInfo newRegion = new HRegionInfo(rand.nextLong(), desc, null, null);
+
+ // We can not access any meta region if they have not already been assigned
+ // and scanned.
+
+ while(!allMetaRegionsScanned) {
+ try {
+ allMetaRegionsScanned.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+
+ // 1. Check to see if table already exists
+
+ Text metaStartRow = knownMetaRegions.headMap(newRegion.regionName).lastKey();
+ MetaRegion m = knownMetaRegions.get(metaStartRow);
+ Text metaRegionName = m.regionName;
+ HRegionInterface server = client.getHRegionConnection(m.server);
+
+
+ BytesWritable bytes = server.get(metaRegionName, desc.getName(), META_COL_REGIONINFO);
+ if(bytes != null && bytes.getSize() != 0) {
+ byte[] infoBytes = bytes.get();
+ DataInputBuffer inbuf = new DataInputBuffer();
+ inbuf.reset(infoBytes, infoBytes.length);
+ HRegionInfo info = new HRegionInfo();
+ info.readFields(inbuf);
+ if(info.tableDesc.getName().compareTo(desc.getName()) == 0) {
+ throw new IOException("table already exists");
+ }
+ }
+
+ // 2. Create the HRegion
+
+ HRegion r = createNewHRegion(desc, newRegion.regionId);
+
+ // 3. Insert into meta
+
+ HRegionInfo info = r.getRegionInfo();
+ Text regionName = r.getRegionName();
+ ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
+ DataOutputStream s = new DataOutputStream(byteValue);
+ info.write(s);
+
+ long clientId = rand.nextLong();
+ long lockid = server.startUpdate(metaRegionName, clientId, regionName);
+ server.put(metaRegionName, clientId, lockid, META_COL_REGIONINFO,
+ new BytesWritable(byteValue.toByteArray()));
+ server.commit(metaRegionName, clientId, lockid);
+
+ // 4. Get it assigned to a server
+
+ synchronized(unassignedRegions) {
+ unassignedRegions.put(regionName, info);
+ assignAttempts.put(regionName, 0L);
+ }
+ }
+
+ /**
+ * Internal method to create a new HRegion. Used by createTable and by the
+ * bootstrap code in the HMaster constructor
+ *
+ * @param desc - table descriptor
+ * @param regionId - region id
+ * @return - new HRegion
+ *
+ * @throws IOException
+ */
+ private HRegion createNewHRegion(HTableDescriptor desc, long regionId)
+ throws IOException {
+
+ HRegionInfo info = new HRegionInfo(regionId, desc, null, null);
+ Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
+ fs.mkdirs(regionDir);
+
+ return new HRegion(dir, new HLog(fs, new Path(regionDir, "log"), conf), fs,
+ conf, info, null, null);
+ }
+
+ /**
+ * Inserts a new table's meta information into the meta table. Used by
+ * the HMaster bootstrap code.
+ *
+ * @param meta - HRegion to be updated
+ * @param table - HRegion of new table
+ *
+ * @throws IOException
+ */
+ private void addTableToMeta(HRegion meta, HRegion table) throws IOException {
+
+ // The row key is the region name
+
+ long writeid = meta.startUpdate(table.getRegionName());
+
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ DataOutputStream s = new DataOutputStream(bytes);
+
+ table.getRegionInfo().write(s);
+
+ s.writeLong(table.getRegionId());
+ meta.put(writeid, META_COL_REGIONINFO, bytes.toByteArray());
+
+ bytes.reset();
+ new HServerAddress().write(s);
+ meta.put(writeid, META_COL_SERVER, bytes.toByteArray());
+
+ bytes.reset();
+ s.writeLong(0L);
+ meta.put(writeid, META_COL_STARTCODE, bytes.toByteArray());
+
+ meta.commit(writeid);
+ }
+
+ public void deleteTable(Text tableName) throws IOException {
+ Text[] columns = {
+ META_COLUMN_FAMILY
+ };
+
+ // We can not access any meta region if they have not already been assigned
+ // and scanned.
+
+ while(!allMetaRegionsScanned) {
+ try {
+ allMetaRegionsScanned.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+
+ for(Iterator<MetaRegion> i = knownMetaRegions.tailMap(tableName).values().iterator();
+ i.hasNext(); ) {
+
+ // Find all the regions that make up this table
+
+ long clientId = rand.nextLong();
+ MetaRegion m = i.next();
+ HRegionInterface server = client.getHRegionConnection(m.server);
+ try {
+ HScannerInterface scanner
+ = server.openScanner(m.regionName, columns, tableName);
+
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ DataInputBuffer inbuf = new DataInputBuffer();
+
+ Vector<Text> rowsToDelete = new Vector<Text>();
+
+ while(scanner.next(key, results)) {
+ byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
+ inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+ HRegionInfo info = new HRegionInfo();
+ info.readFields(inbuf);
+
+ if(info.tableDesc.getName().compareTo(tableName) > 0) {
+ break; // Beyond any more entries for this table
+ }
+
+ // Is it being served?
+
+ byte serverBytes[] = results.get(META_COL_SERVER);
+ String serverName = new String(serverBytes, UTF8_ENCODING);
+
+ byte startCodeBytes[] = results.get(META_COL_STARTCODE);
+ long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
+
+ synchronized(serversToServerInfo) {
+ HServerInfo s = serversToServerInfo.get(serverName);
+ if(s != null && s.getStartCode() == startCode) {
+
+ // It is being served. Tell the server to stop it and not report back
+
+ TreeMap<Text, HRegionInfo> regionsToKill = killList.get(serverName);
+ if(regionsToKill == null) {
+ regionsToKill = new TreeMap<Text, HRegionInfo>();
+ }
+ regionsToKill.put(info.regionName, info);
+ killList.put(serverName, regionsToKill);
+ }
+ }
+ }
+ for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
+ long lockid = server.startUpdate(m.regionName, clientId, row.next());
+ server.delete(m.regionName, clientId, lockid, columns[0]);
+ server.commit(m.regionName, clientId, lockid);
+ }
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public HServerAddress findRootRegion() {
+ return rootRegionLocation;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Managing leases
+ //////////////////////////////////////////////////////////////////////////////
+
+ class ServerExpirer extends LeaseListener {
+ String server = null;
+
+ public ServerExpirer(String server) {
+ this.server = new String(server);
+ }
+
+ public void leaseExpired() {
+ HServerInfo storedInfo = null;
+
+ synchronized(serversToServerInfo) {
+ storedInfo = serversToServerInfo.remove(server);
+ }
+ synchronized(msgQueue) {
+ msgQueue.add(new PendingServerShutdown(storedInfo));
+ msgQueue.notifyAll();
+ }
+ }
+ }
+}
+
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.IOException;
+
+/*******************************************************************************
+ * Clients interact with the HMasterInterface to gain access to meta-level HBase
+ * functionality, like finding an HRegionServer and creating/destroying tables.
+ ******************************************************************************/
+public interface HMasterInterface {
+ public static final long versionID = 1L; // initial version
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Admin tools would use these cmds
+ //////////////////////////////////////////////////////////////////////////////
+
+ public void createTable(HTableDescriptor desc) throws IOException;
+ public void deleteTable(Text tableName) throws IOException;
+
+ //////////////////////////////////////////////////////////////////////////////
+ // These are the method calls of last resort when trying to find an HRegion
+ //////////////////////////////////////////////////////////////////////////////
+
+ public HServerAddress findRootRegion();
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,27 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.*;
+/*******************************************************************************
+ * HRegionServers interact with the HMasterRegionInterface to report on local
+ * goings-on and to obtain data-handling instructions from the HMaster.
+ *********************************************/
+public interface HMasterRegionInterface {
+ public static final long versionId = 1L;
+ public void regionServerStartup(HServerInfo info) throws IOException;
+ public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[]) throws IOException;
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,368 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * The HMemcache holds in-memory modifications to the HRegion. This is really a
+ * wrapper around a TreeMap that helps us when staging the Memcache out to disk.
+ ******************************************************************************/
+public class HMemcache {
+ private static final Log LOG = LogFactory.getLog(HMemcache.class);
+
+ TreeMap<HStoreKey, BytesWritable> memcache
+ = new TreeMap<HStoreKey, BytesWritable>();
+
+ Vector<TreeMap<HStoreKey, BytesWritable>> history
+ = new Vector<TreeMap<HStoreKey, BytesWritable>>();
+
+ TreeMap<HStoreKey, BytesWritable> snapshot = null;
+
+ HLocking locking = new HLocking();
+
+ public HMemcache() {
+ }
+
+ public static class Snapshot {
+ public TreeMap<HStoreKey, BytesWritable> memcacheSnapshot = null;
+ public long sequenceId = 0;
+
+ public Snapshot() {
+ }
+ }
+
+ /**
+ * We want to return a snapshot of the current HMemcache with a known HLog
+ * sequence number at the same time.
+ *
+ * Return both the frozen HMemcache TreeMap, as well as the HLog seq number.
+ *
+ * We need to prevent any writing to the cache during this time, so we obtain
+ * a write lock for the duration of the operation.
+ */
+ public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
+ Snapshot retval = new Snapshot();
+
+ locking.obtainWriteLock();
+ try {
+ if(snapshot != null) {
+ throw new IOException("Snapshot in progress!");
+ }
+ if(memcache.size() == 0) {
+ LOG.debug("memcache empty. Skipping snapshot");
+ return retval;
+ }
+
+ LOG.debug("starting memcache snapshot");
+
+ retval.memcacheSnapshot = memcache;
+ this.snapshot = memcache;
+ history.add(memcache);
+ memcache = new TreeMap<HStoreKey, BytesWritable>();
+ retval.sequenceId = log.startCacheFlush();
+
+ LOG.debug("memcache snapshot complete");
+
+ return retval;
+
+ } finally {
+ locking.releaseWriteLock();
+ }
+ }
+
+ /**
+ * Delete the snapshot, remove from history.
+ *
+ * Modifying the structure means we need to obtain a writelock.
+ */
+ public void deleteSnapshot() throws IOException {
+ locking.obtainWriteLock();
+
+ try {
+ if(snapshot == null) {
+ throw new IOException("Snapshot not present!");
+ }
+ LOG.debug("deleting snapshot");
+
+ for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator();
+ it.hasNext(); ) {
+
+ TreeMap<HStoreKey, BytesWritable> cur = it.next();
+ if(snapshot == cur) {
+ it.remove();
+ break;
+ }
+ }
+ this.snapshot = null;
+
+ LOG.debug("snapshot deleted");
+
+ } finally {
+ locking.releaseWriteLock();
+ }
+ }
+
+ /**
+ * Store a value.
+ *
+ * Operation uses a write lock.
+ */
+ public void add(Text row, TreeMap<Text, byte[]> columns, long timestamp) {
+ locking.obtainWriteLock();
+ try {
+ for(Iterator<Text> it = columns.keySet().iterator(); it.hasNext(); ) {
+ Text column = it.next();
+ byte[] val = columns.get(column);
+
+ HStoreKey key = new HStoreKey(row, column, timestamp);
+ memcache.put(key, new BytesWritable(val));
+ }
+
+ } finally {
+ locking.releaseWriteLock();
+ }
+ }
+
+ /**
+ * Look back through all the backlog TreeMaps to find the target.
+ *
+ * We only need a readlock here.
+ */
+ public byte[][] get(HStoreKey key, int numVersions) {
+ Vector<byte[]> results = new Vector<byte[]>();
+ locking.obtainReadLock();
+ try {
+ Vector<byte[]> result = get(memcache, key, numVersions-results.size());
+ results.addAll(0, result);
+
+ for(int i = history.size()-1; i >= 0; i--) {
+ if(numVersions > 0 && results.size() >= numVersions) {
+ break;
+ }
+
+ result = get(history.elementAt(i), key, numVersions-results.size());
+ results.addAll(results.size(), result);
+ }
+
+ if(results.size() == 0) {
+ return null;
+
+ } else {
+ return (byte[][]) results.toArray(new byte[results.size()][]);
+ }
+
+ } finally {
+ locking.releaseReadLock();
+ }
+ }
+
+ /**
+ * Return all the available columns for the given key. The key indicates a
+ * row and timestamp, but not a column name.
+ *
+ * The returned object should map column names to byte arrays (byte[]).
+ */
+ public TreeMap<Text, byte[]> getFull(HStoreKey key) throws IOException {
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ locking.obtainReadLock();
+ try {
+ internalGetFull(memcache, key, results);
+ for(int i = history.size()-1; i >= 0; i--) {
+ TreeMap<HStoreKey, BytesWritable> cur = history.elementAt(i);
+ internalGetFull(cur, key, results);
+ }
+ return results;
+
+ } finally {
+ locking.releaseReadLock();
+ }
+ }
+
+ void internalGetFull(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key,
+ TreeMap<Text, byte[]> results) {
+
+ SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(key);
+
+ for(Iterator<HStoreKey> it = tailMap.keySet().iterator(); it.hasNext(); ) {
+ HStoreKey itKey = it.next();
+ Text itCol = itKey.getColumn();
+
+ if(results.get(itCol) == null
+ && key.matchesWithoutColumn(itKey)) {
+ BytesWritable val = tailMap.get(itKey);
+ results.put(itCol, val.get());
+
+ } else if(key.getRow().compareTo(itKey.getRow()) > 0) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Examine a single map for the desired key.
+ *
+ * We assume that all locking is done at a higher-level. No locking within
+ * this method.
+ *
+ * TODO - This is kinda slow. We need a data structure that allows for
+ * proximity-searches, not just precise-matches.
+ */
+ Vector<byte[]> get(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, int numVersions) {
+ Vector<byte[]> result = new Vector<byte[]>();
+ HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
+ SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(curKey);
+
+ for(Iterator<HStoreKey> it = tailMap.keySet().iterator(); it.hasNext(); ) {
+ HStoreKey itKey = it.next();
+
+ if(itKey.matchesRowCol(curKey)) {
+ result.add(tailMap.get(itKey).get());
+ curKey.setVersion(itKey.getTimestamp() - 1);
+ }
+
+ if(numVersions > 0 && result.size() >= numVersions) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Return a scanner over the keys in the HMemcache
+ */
+ public HScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
+ throws IOException {
+
+ return new HMemcacheScanner(timestamp, targetCols, firstRow);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // HMemcacheScanner implements the HScannerInterface.
+ // It lets the caller scan the contents of the Memcache.
+ //////////////////////////////////////////////////////////////////////////////
+
+ class HMemcacheScanner extends HAbstractScanner {
+ TreeMap<HStoreKey, BytesWritable> backingMaps[];
+ Iterator<HStoreKey> keyIterators[];
+
+ @SuppressWarnings("unchecked")
+ public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
+ throws IOException {
+
+ super(timestamp, targetCols);
+
+ locking.obtainReadLock();
+ try {
+ this.backingMaps = new TreeMap[history.size() + 1];
+ int i = 0;
+ for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator();
+ it.hasNext(); ) {
+
+ backingMaps[i++] = it.next();
+ }
+ backingMaps[backingMaps.length - 1] = memcache;
+
+ this.keyIterators = new Iterator[backingMaps.length];
+ this.keys = new HStoreKey[backingMaps.length];
+ this.vals = new BytesWritable[backingMaps.length];
+
+ // Generate list of iterators
+
+ HStoreKey firstKey = new HStoreKey(firstRow);
+ for(i = 0; i < backingMaps.length; i++) {
+ if(firstRow.getLength() != 0) {
+ keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator();
+
+ } else {
+ keyIterators[i] = backingMaps[i].keySet().iterator();
+ }
+
+ while(getNext(i)) {
+ if(! findFirstRow(i, firstRow)) {
+ continue;
+ }
+ if(columnMatch(i)) {
+ break;
+ }
+ }
+ }
+
+ } catch(Exception ex) {
+ close();
+ }
+ }
+
+ /**
+ * The user didn't want to start scanning at the first row. This method
+ * seeks to the requested row.
+ *
+ * @param i - which iterator to advance
+ * @param firstRow - seek to this row
+ * @return - true if this is the first row
+ */
+ boolean findFirstRow(int i, Text firstRow) {
+ return ((firstRow.getLength() == 0) || (keys[i].getRow().equals(firstRow)));
+ }
+
+ /**
+ * Get the next value from the specified iterater.
+ *
+ * @param i - which iterator to fetch next value from
+ * @return - true if there is more data available
+ */
+ boolean getNext(int i) {
+ if(! keyIterators[i].hasNext()) {
+ closeSubScanner(i);
+ return false;
+ }
+ this.keys[i] = keyIterators[i].next();
+ this.vals[i] = backingMaps[i].get(keys[i]);
+ return true;
+ }
+
+ /** Shut down an individual map iterator. */
+ void closeSubScanner(int i) {
+ keyIterators[i] = null;
+ keys[i] = null;
+ vals[i] = null;
+ backingMaps[i] = null;
+ }
+
+ /** Shut down map iterators, and release the lock */
+ public void close() throws IOException {
+ if(! scannerClosed) {
+ try {
+ for(int i = 0; i < keys.length; i++) {
+ if(keyIterators[i] != null) {
+ closeSubScanner(i);
+ }
+ }
+
+ } finally {
+ locking.releaseReadLock();
+ scannerClosed = true;
+ }
+ }
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2006-7 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * HMsg is for communicating instructions between the HMaster and the
+ * HRegionServers.
+ ******************************************************************************/
+public class HMsg implements Writable {
+ public static final byte MSG_REGION_OPEN = 1;
+ public static final byte MSG_REGION_CLOSE = 2;
+ public static final byte MSG_REGION_MERGE = 3;
+ public static final byte MSG_CALL_SERVER_STARTUP = 4;
+ public static final byte MSG_REGIONSERVER_ALREADY_RUNNING = 5;
+ public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6;
+ public static final byte MSG_REGION_CLOSE_AND_DELETE = 7;
+
+ public static final byte MSG_REPORT_OPEN = 100;
+ public static final byte MSG_REPORT_CLOSE = 101;
+ public static final byte MSG_REGION_SPLIT = 102;
+ public static final byte MSG_NEW_REGION = 103;
+
+ byte msg;
+ HRegionInfo info;
+
+ public HMsg() {
+ this.info = new HRegionInfo();
+ }
+
+ public HMsg(byte msg) {
+ this.msg = msg;
+ this.info = new HRegionInfo();
+ }
+
+ public HMsg(byte msg, HRegionInfo info) {
+ this.msg = msg;
+ this.info = info;
+ }
+
+ public byte getMsg() {
+ return msg;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return info;
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(msg);
+ info.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.msg = in.readByte();
+ this.info.readFields(in);
+ }
+}