You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/03 22:34:30 UTC
svn commit: r525267 [4/5] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/
src/contrib/hbase/src/ src/contrib/hbase/src/java/
src/contrib/hbase/src/java/org/ src/contrib/hbase/src/java/org/apache/
src/contrib/hbase/src/java/org/apache/hadoop/ src/contr...
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,976 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HStore maintains a bunch of data files. It is responsible for maintaining
+ * the memory/file hierarchy and for periodic flushes to disk and compacting
+ * edits to the file.
+ *
+ * Locking and transactions are handled at a higher level. This API should not
+ * be called directly by any writer, but rather by an HRegion manager.
+ ******************************************************************************/
+public class HStore {
+ private static final Log LOG = LogFactory.getLog(HStore.class);
+
+ static final String COMPACTION_DIR = "compaction.tmp";
+ static final String WORKING_COMPACTION = "compaction.inprogress";
+ static final String COMPACTION_TO_REPLACE = "toreplace";
+ static final String COMPACTION_DONE = "done";
+
+ Path dir;
+ Text regionName;
+ Text colFamily;
+ int maxVersions;
+ FileSystem fs;
+ Configuration conf;
+ Path mapdir;
+ Path compactdir;
+ Path loginfodir;
+
+ Integer compactLock = new Integer(0);
+ Integer flushLock = new Integer(0);
+
+ HLocking locking = new HLocking();
+
+ TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
+ TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
+
+ Random rand = new Random();
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Constructors, destructors, etc
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * An HStore is a set of zero or more MapFiles, which stretch backwards over
+ * time. A given HStore is responsible for a certain set of columns for a row
+ * in the HRegion.
+ *
+ * The HRegion starts writing to its set of HStores when the HRegion's
+ * memcache is flushed. This results in a round of new MapFiles, one for
+ * each HStore.
+ *
+ * There's no reason to consider append-logging at this level; all logging and
+ * locking is handled at the HRegion level. HStore just provides services to
+ * manage sets of MapFiles. One of the most important of those services is
+ * MapFile-compaction services.
+ *
+ * The only thing having to do with logs that HStore needs to deal with is
+ * the reconstructionLog. This is a segment of an HRegion's log that might
+ * be present upon startup. If the param is NULL, there's nothing to do.
+ * If the param is non-NULL, we need to process the log to reconstruct
+ * a TreeMap that might not have been written to disk before the process died.
+ *
+ * It's assumed that after this constructor returns, the reconstructionLog file
+ * will be deleted (by whoever has instantiated the HStore).
+ */
+ public HStore(Path dir, Text regionName, Text colFamily, int maxVersions,
+ FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException {
+
+ this.dir = dir;
+ this.regionName = regionName;
+ this.colFamily = colFamily;
+ this.maxVersions = maxVersions;
+ this.fs = fs;
+ this.conf = conf;
+
+ this.mapdir = HStoreFile.getMapDir(dir, regionName, colFamily);
+ fs.mkdirs(mapdir);
+ this.loginfodir = HStoreFile.getInfoDir(dir, regionName, colFamily);
+ fs.mkdirs(loginfodir);
+
+ LOG.debug("starting HStore for " + regionName + "/"+ colFamily);
+
+ // Either restart or get rid of any leftover compaction work. Either way,
+ // by the time processReadyCompaction() returns, we can get rid of the
+ // existing compaction-dir.
+
+ this.compactdir = new Path(dir, COMPACTION_DIR);
+ Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
+ if(fs.exists(curCompactStore)) {
+ processReadyCompaction();
+ fs.delete(curCompactStore);
+ }
+
+ // Go through the 'mapdir' and 'loginfodir' together, make sure that all
+ // MapFiles are in a reliable state. Every entry in 'mapdir' must have a
+ // corresponding one in 'loginfodir'. Without a corresponding log info file,
+ // the entry in 'mapdir'must be deleted.
+
+ Vector<HStoreFile> hstoreFiles
+ = HStoreFile.loadHStoreFiles(conf, dir, regionName, colFamily, fs);
+
+ for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ mapFiles.put(hsf.loadInfo(fs), hsf);
+ }
+
+ // Now go through all the HSTORE_LOGINFOFILEs and figure out the most-recent
+ // log-seq-ID that's present. The most-recent such ID means we can ignore
+ // all log messages up to and including that ID (because they're already
+ // reflected in the TreeMaps).
+ //
+ // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That
+ // means it was built prior to the previous run of HStore, and so it cannot
+ // contain any updates also contained in the log.
+
+ long maxSeqID = -1;
+ for(Iterator<HStoreFile> it = hstoreFiles.iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ long seqid = hsf.loadInfo(fs);
+ if(seqid > 0) {
+ if(seqid > maxSeqID) {
+ maxSeqID = seqid;
+ }
+ }
+ }
+
+ // Read the reconstructionLog to see whether we need to build a brand-new
+ // MapFile out of non-flushed log entries.
+ //
+ // We can ignore any log message that has a sequence ID that's equal to or
+ // lower than maxSeqID. (Because we know such log messages are already
+ // reflected in the MapFiles.)
+
+ LOG.debug("reading reconstructionLog");
+
+ if(reconstructionLog != null && fs.exists(reconstructionLog)) {
+ long maxSeqIdInLog = -1;
+ TreeMap<HStoreKey, BytesWritable> reconstructedCache
+ = new TreeMap<HStoreKey, BytesWritable>();
+
+ SequenceFile.Reader login
+ = new SequenceFile.Reader(fs, reconstructionLog, conf);
+
+ try {
+ HLogKey key = new HLogKey();
+ HLogEdit val = new HLogEdit();
+ while(login.next(key, val)) {
+ maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
+ if(key.getLogSeqNum() <= maxSeqID) {
+ continue;
+ }
+ reconstructedCache.put(new HStoreKey(key.getRow(), val.getColumn(),
+ val.getTimestamp()), val.getVal());
+ }
+
+ } finally {
+ login.close();
+ }
+
+ if(reconstructedCache.size() > 0) {
+
+ // We create a "virtual flush" at maxSeqIdInLog+1.
+
+ LOG.debug("flushing reconstructionCache");
+
+ flushCacheHelper(reconstructedCache, maxSeqIdInLog+1, true);
+ }
+ }
+
+ // Compact all the MapFiles into a single file. The resulting MapFile
+ // should be "timeless"; that is, it should not have an associated seq-ID,
+ // because all log messages have been reflected in the TreeMaps at this point.
+
+ if(mapFiles.size() >= 1) {
+ compactHelper(true);
+ }
+
+ // Finally, start up all the map readers! (There should be just one at this
+ // point, as we've compacted them all.)
+
+ LOG.debug("starting map readers");
+
+ for(Iterator<Long> it = mapFiles.keySet().iterator(); it.hasNext(); ) {
+ Long key = it.next().longValue();
+ HStoreFile hsf = mapFiles.get(key);
+
+ //TODO - is this really necessary? Don't I do this inside compact()?
+
+ maps.put(key, new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf));
+ }
+
+ LOG.info("HStore online for " + this.regionName + "/" + this.colFamily);
+ }
+
+ /** Turn off all the MapFile readers */
+ public void close() throws IOException {
+ locking.obtainWriteLock();
+ LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
+
+ try {
+ for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
+ MapFile.Reader map = it.next();
+ map.close();
+ }
+ maps.clear();
+ mapFiles.clear();
+
+ LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
+
+ } finally {
+ locking.releaseWriteLock();
+ }
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Flush changes to disk
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Write out a brand-new set of items to the disk.
+ *
+ * We should only store key/vals that are appropriate for the data-columns
+ * stored in this HStore.
+ *
+ * Also, we are not expecting any reads of this MapFile just yet.
+ *
+ * Return the entire list of HStoreFiles currently used by the HStore.
+ */
+ public Vector<HStoreFile> flushCache(TreeMap<HStoreKey, BytesWritable> inputCache,
+ long logCacheFlushId) throws IOException {
+
+ return flushCacheHelper(inputCache, logCacheFlushId, true);
+ }
+
+ Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, BytesWritable> inputCache,
+ long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
+
+ synchronized(flushLock) {
+ LOG.debug("flushing HStore " + this.regionName + "/" + this.colFamily);
+
+ // A. Write the TreeMap out to the disk
+
+ HStoreFile flushedFile
+ = HStoreFile.obtainNewHStoreFile(conf, dir, regionName, colFamily, fs);
+
+ Path mapfile = flushedFile.getMapFilePath();
+ MapFile.Writer out = new MapFile.Writer(conf, fs, mapfile.toString(),
+ HStoreKey.class, BytesWritable.class);
+
+ try {
+ for(Iterator<HStoreKey> it = inputCache.keySet().iterator(); it.hasNext(); ) {
+ HStoreKey curkey = it.next();
+ if(this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+ BytesWritable val = inputCache.get(curkey);
+ out.append(curkey, val);
+ }
+ }
+ LOG.debug("HStore " + this.regionName + "/" + this.colFamily + " flushed");
+
+ } finally {
+ out.close();
+ }
+
+ // B. Write out the log sequence number that corresponds to this output
+ // MapFile. The MapFile is current up to and including the log seq num.
+
+ LOG.debug("writing log cache flush id");
+ flushedFile.writeInfo(fs, logCacheFlushId);
+
+ // C. Finally, make the new MapFile available.
+
+ if(addToAvailableMaps) {
+ locking.obtainWriteLock();
+
+ try {
+ maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
+ mapFiles.put(logCacheFlushId, flushedFile);
+ LOG.debug("HStore available for " + this.regionName + "/" + this.colFamily);
+
+ } finally {
+ locking.releaseWriteLock();
+ }
+ }
+ return getAllMapFiles();
+ }
+ }
+
+ public Vector<HStoreFile> getAllMapFiles() {
+ Vector<HStoreFile> flushedFiles = new Vector<HStoreFile>();
+ for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ flushedFiles.add(hsf);
+ }
+ return flushedFiles;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Compaction
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Compact the back-HStores. This method may take some time, so the calling
+ * thread must be able to block for long periods.
+ *
+ * During this time, the HStore can work as usual, getting values from MapFiles
+ * and writing new MapFiles from given memcaches.
+ *
+ * Existing MapFiles are not destroyed until the new compacted TreeMap is
+ * completely written-out to disk.
+ *
+ * The compactLock block prevents multiple simultaneous compactions.
+ * The structureLock prevents us from interfering with other write operations.
+ *
+ * We don't want to hold the structureLock for the whole time, as a compact()
+ * can be lengthy and we want to allow cache-flushes during this period.
+ */
+ public void compact() throws IOException {
+ compactHelper(false);
+ }
+
+ void compactHelper(boolean deleteSequenceInfo) throws IOException {
+ synchronized(compactLock) {
+ LOG.debug("started compaction of " + this.regionName + "/" + this.colFamily);
+
+ Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
+ fs.mkdirs(curCompactStore);
+
+ try {
+
+ // Grab a list of files to compact.
+
+ Vector<HStoreFile> toCompactFiles = null;
+ locking.obtainWriteLock();
+ try {
+ toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
+
+ } finally {
+ locking.releaseWriteLock();
+ }
+
+ // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
+
+ long maxSeenSeqID = -1;
+ for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ long seqid = hsf.loadInfo(fs);
+ if(seqid > 0) {
+ if(seqid > maxSeenSeqID) {
+ maxSeenSeqID = seqid;
+ }
+ }
+ }
+ LOG.debug("max sequence id =" + maxSeenSeqID);
+
+ HStoreFile compactedOutputFile
+ = new HStoreFile(conf, compactdir, regionName, colFamily, -1);
+
+ if(toCompactFiles.size() == 1) {
+ LOG.debug("nothing to compact for " + this.regionName + "/" + this.colFamily);
+
+ HStoreFile hsf = toCompactFiles.elementAt(0);
+ if(hsf.loadInfo(fs) == -1) {
+ return;
+ }
+ }
+
+ // Step through them, writing to the brand-new TreeMap
+
+ MapFile.Writer compactedOut = new MapFile.Writer(conf, fs,
+ compactedOutputFile.getMapFilePath().toString(), HStoreKey.class,
+ BytesWritable.class);
+
+ try {
+
+ // We create a new set of MapFile.Reader objects so we don't screw up
+ // the caching associated with the currently-loaded ones.
+ //
+ // Our iteration-based access pattern is practically designed to ruin
+ // the cache.
+ //
+ // We work by opening a single MapFile.Reader for each file, and
+ // iterating through them in parallel. We always increment the
+ // lowest-ranked one. Updates to a single row/column will appear
+ // ranked by timestamp. This allows us to throw out deleted values or
+ // obsolete versions.
+
+ MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
+ HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
+ BytesWritable[] vals = new BytesWritable[toCompactFiles.size()];
+ boolean[] done = new boolean[toCompactFiles.size()];
+ int pos = 0;
+ for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ readers[pos] = new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf);
+ keys[pos] = new HStoreKey();
+ vals[pos] = new BytesWritable();
+ done[pos] = false;
+ pos++;
+ }
+
+ // Now, advance through the readers in order. This will have the
+ // effect of a run-time sort of the entire dataset.
+
+ LOG.debug("processing HStoreFile readers");
+
+ int numDone = 0;
+ for(int i = 0; i < readers.length; i++) {
+ readers[i].reset();
+ done[i] = ! readers[i].next(keys[i], vals[i]);
+ if(done[i]) {
+ numDone++;
+ }
+ }
+
+ int timesSeen = 0;
+ Text lastRow = new Text();
+ Text lastColumn = new Text();
+ while(numDone < done.length) {
+
+ // Find the reader with the smallest key
+
+ int smallestKey = -1;
+ for(int i = 0; i < readers.length; i++) {
+ if(done[i]) {
+ continue;
+ }
+
+ if(smallestKey < 0) {
+ smallestKey = i;
+
+ } else {
+ if(keys[i].compareTo(keys[smallestKey]) < 0) {
+ smallestKey = i;
+ }
+ }
+ }
+
+ // Reflect the current key/val in the output
+
+ HStoreKey sk = keys[smallestKey];
+ if(lastRow.equals(sk.getRow())
+ && lastColumn.equals(sk.getColumn())) {
+
+ timesSeen++;
+
+ } else {
+ timesSeen = 1;
+ }
+
+ if(timesSeen <= maxVersions) {
+
+ // Keep old versions until we have maxVersions worth.
+ // Then just skip them.
+
+ if(sk.getRow().getLength() != 0
+ && sk.getColumn().getLength() != 0) {
+
+ // Only write out objects which have a non-zero length key and value
+
+ compactedOut.append(sk, vals[smallestKey]);
+ }
+
+ }
+
+ //TODO: I don't know what to do about deleted values. I currently
+ // include the fact that the item was deleted as a legitimate
+ // "version" of the data. Maybe it should just drop the deleted val?
+
+ // Update last-seen items
+
+ lastRow.set(sk.getRow());
+ lastColumn.set(sk.getColumn());
+
+ // Advance the smallest key. If that reader's all finished, then
+ // mark it as done.
+
+ if(! readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
+ done[smallestKey] = true;
+ readers[smallestKey].close();
+ numDone++;
+ }
+ }
+
+ LOG.debug("all HStores processed");
+
+ } finally {
+ compactedOut.close();
+ }
+
+ LOG.debug("writing new compacted HStore");
+
+ // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
+
+ if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
+ compactedOutputFile.writeInfo(fs, maxSeenSeqID);
+
+ } else {
+ compactedOutputFile.writeInfo(fs, -1);
+ }
+
+ // Write out a list of data files that we're replacing
+
+ Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
+ DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
+ try {
+ out.writeInt(toCompactFiles.size());
+ for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ hsf.write(out);
+ }
+
+ } finally {
+ out.close();
+ }
+
+ // Indicate that we're done.
+
+ Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
+ out = new DataOutputStream(fs.create(doneFile));
+
+ try {
+ } finally {
+ out.close();
+ }
+
+ // Move the compaction into place.
+
+ processReadyCompaction();
+
+ LOG.debug("compaction complete for " + this.regionName + "/" + this.colFamily);
+
+ } finally {
+ fs.delete(compactdir);
+ }
+ }
+ }
+
+ /**
+ * It's assumed that the compactLock will be acquired prior to calling this
+ * method! Otherwise, it is not thread-safe!
+ *
+ * It works by processing a compaction that's been written to disk.
+ *
+ * It is usually invoked at the end of a compaction, but might also be invoked
+ * at HStore startup, if the prior execution died midway through.
+ */
+ void processReadyCompaction() throws IOException {
+
+ // Move the compacted TreeMap into place.
+ // That means:
+ // 1) Acquiring the write-lock
+ // 2) Figuring out what MapFiles are going to be replaced
+ // 3) Unloading all the replaced MapFiles.
+ // 4) Deleting all the old MapFile files.
+ // 5) Moving the new MapFile into place
+ // 6) Loading the new TreeMap.
+ // 7) Releasing the write-lock
+
+ // 1. Acquiring the write-lock
+
+ locking.obtainWriteLock();
+ Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
+ try {
+ Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
+ if(! fs.exists(doneFile)) {
+
+ // The last execution didn't finish the compaction, so there's nothing
+ // we can do. We'll just have to redo it. Abandon it and return.
+
+ return;
+ }
+
+ // OK, there's actually compaction work that needs to be put into place.
+
+ LOG.debug("compaction starting");
+
+ // 2. Load in the files to be deleted.
+ // (Figuring out what MapFiles are going to be replaced)
+
+ Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
+ Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
+ DataInputStream in = new DataInputStream(fs.open(filesToReplace));
+ try {
+ int numfiles = in.readInt();
+ for(int i = 0; i < numfiles; i++) {
+ HStoreFile hsf = new HStoreFile(conf);
+ hsf.readFields(in);
+ toCompactFiles.add(hsf);
+ }
+
+ } finally {
+ in.close();
+ }
+
+ LOG.debug("loaded files to be deleted");
+
+ // 3. Unload all the replaced MapFiles.
+
+ Iterator<HStoreFile> it2 = mapFiles.values().iterator();
+ for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
+ MapFile.Reader curReader = it.next();
+ HStoreFile curMapFile = it2.next();
+ if(toCompactFiles.contains(curMapFile)) {
+ curReader.close();
+ it.remove();
+ }
+ }
+
+ for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
+ HStoreFile curMapFile = it.next();
+ if(toCompactFiles.contains(curMapFile)) {
+ it.remove();
+ }
+ }
+
+ LOG.debug("unloaded existing MapFiles");
+
+ // What if we crash at this point? No big deal; we will restart
+ // processReadyCompaction(), and nothing has been lost.
+
+ // 4. Delete all the old files, no longer needed
+
+ for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ fs.delete(hsf.getMapFilePath());
+ fs.delete(hsf.getInfoFilePath());
+ }
+
+ LOG.debug("old files deleted");
+
+ // What if we fail now? The above deletes will fail silently. We'd better
+ // make sure not to write out any new files with the same names as
+ // something we delete, though.
+
+ // 5. Moving the new MapFile into place
+
+ LOG.debug("moving new MapFile into place");
+
+ HStoreFile compactedFile
+ = new HStoreFile(conf, compactdir, regionName, colFamily, -1);
+
+ HStoreFile finalCompactedFile
+ = HStoreFile.obtainNewHStoreFile(conf, dir, regionName, colFamily, fs);
+
+ fs.rename(compactedFile.getMapFilePath(), finalCompactedFile.getMapFilePath());
+
+ // Fail here? No problem.
+
+ fs.rename(compactedFile.getInfoFilePath(), finalCompactedFile.getInfoFilePath());
+
+ // Fail here? No worries.
+
+ long orderVal = finalCompactedFile.loadInfo(fs);
+
+ // 6. Loading the new TreeMap.
+
+ LOG.debug("loading new TreeMap");
+
+ mapFiles.put(orderVal, finalCompactedFile);
+ maps.put(orderVal, new MapFile.Reader(fs,
+ finalCompactedFile.getMapFilePath().toString(), conf));
+
+ } finally {
+
+ // 7. Releasing the write-lock
+
+ locking.releaseWriteLock();
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Accessors.
+ // (This is the only section that is directly useful!)
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Return all the available columns for the given key. The key indicates a
+ * row and timestamp, but not a column name.
+ *
+ * The returned object should map column names to byte arrays (byte[]).
+ */
+ public void getFull(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
+ locking.obtainReadLock();
+ try {
+ MapFile.Reader[] maparray
+ = maps.values().toArray(new MapFile.Reader[maps.size()]);
+
+ for(int i = maparray.length-1; i >= 0; i--) {
+ MapFile.Reader map = maparray[i];
+
+ synchronized(map) {
+ BytesWritable readval = new BytesWritable();
+ map.reset();
+ HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+
+ do {
+ Text readcol = readkey.getColumn();
+ if(results.get(readcol) == null
+ && key.matchesWithoutColumn(readkey)) {
+ results.put(new Text(readcol), readval.get());
+ readval = new BytesWritable();
+
+ } else if(key.getRow().compareTo(readkey.getRow()) > 0) {
+ break;
+ }
+
+ } while(map.next(readkey, readval));
+ }
+ }
+
+ } finally {
+ locking.releaseReadLock();
+ }
+ }
+
+ /**
+ * Get the value for the indicated HStoreKey. Grab the target value and the
+ * previous 'numVersions-1' values, as well.
+ *
+ * If 'numVersions' is negative, the method returns all available versions.
+ */
+ public byte[][] get(HStoreKey key, int numVersions) throws IOException {
+ if(numVersions == 0) {
+ throw new IllegalArgumentException("Must request at least one value.");
+ }
+
+ Vector<byte[]> results = new Vector<byte[]>();
+ locking.obtainReadLock();
+ try {
+ MapFile.Reader[] maparray
+ = maps.values().toArray(new MapFile.Reader[maps.size()]);
+
+ for(int i = maparray.length-1; i >= 0; i--) {
+ MapFile.Reader map = maparray[i];
+
+ synchronized(map) {
+ BytesWritable readval = new BytesWritable();
+ map.reset();
+ HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
+
+ if(readkey.matchesRowCol(key)) {
+ results.add(readval.get());
+ readval = new BytesWritable();
+
+ while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
+ if(numVersions > 0 && (results.size() >= numVersions)) {
+ break;
+
+ } else {
+ results.add(readval.get());
+ readval = new BytesWritable();
+ }
+ }
+ }
+ }
+ if(results.size() >= numVersions) {
+ break;
+ }
+ }
+
+ if(results.size() == 0) {
+ return null;
+
+ } else {
+ return (byte[][]) results.toArray(new byte[results.size()][]);
+ }
+
+ } finally {
+ locking.releaseReadLock();
+ }
+ }
+
+ /**
+ * Gets the size of the largest MapFile and its mid key.
+ *
+ * @param midKey - the middle key for the largest MapFile
+ * @return - size of the largest MapFile
+ * @throws IOException
+ */
+ public long getLargestFileSize(Text midKey) throws IOException {
+ long maxSize = 0L;
+ long mapIndex = 0L;
+
+ // Iterate through all the MapFiles
+
+ for(Iterator<Map.Entry<Long, HStoreFile>> it = mapFiles.entrySet().iterator();
+ it.hasNext(); ) {
+
+ Map.Entry<Long, HStoreFile> e = it.next();
+ HStoreFile curHSF = e.getValue();
+ long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME));
+
+ if(size > maxSize) { // This is the largest one so far
+ maxSize = size;
+ mapIndex = e.getKey();
+ }
+ }
+
+ MapFile.Reader r = maps.get(mapIndex);
+
+ synchronized(r) {
+ midKey.set(((HStoreKey)r.midKey()).getRow());
+ }
+
+ return maxSize;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // File administration
+ //////////////////////////////////////////////////////////////////////////////
+
+ /** Generate a random unique filename suffix */
+ String obtainFileLabel(Path prefix) throws IOException {
+ String testsuffix = String.valueOf(Math.abs(rand.nextInt()));
+ Path testpath = new Path(prefix.toString() + testsuffix);
+ while(fs.exists(testpath)) {
+ testsuffix = String.valueOf(Math.abs(rand.nextInt()));
+ testpath = new Path(prefix.toString() + testsuffix);
+ }
+ return testsuffix;
+ }
+
+ /**
+ * Return a set of MapFile.Readers, one for each HStore file.
+ * These should be closed after the user is done with them.
+ */
+ public HScannerInterface getScanner(long timestamp, Text targetCols[],
+ Text firstRow) throws IOException {
+
+ return new HStoreScanner(timestamp, targetCols, firstRow);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // This class implements the HScannerInterface.
+ // It lets the caller scan the contents of this HStore.
+ //////////////////////////////////////////////////////////////////////////////
+
+ class HStoreScanner extends HAbstractScanner {
+ MapFile.Reader readers[];
+ Text lastRow = null;
+
+ public HStoreScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException {
+ super(timestamp, targetCols);
+
+ locking.obtainReadLock();
+ try {
+ this.readers = new MapFile.Reader[mapFiles.size()];
+ int i = 0;
+ for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
+ HStoreFile curHSF = it.next();
+ readers[i++] = new MapFile.Reader(fs, curHSF.getMapFilePath().toString(), conf);
+ }
+
+ this.keys = new HStoreKey[readers.length];
+ this.vals = new BytesWritable[readers.length];
+
+ // Advance the readers to the first pos.
+
+ for(i = 0; i < readers.length; i++) {
+ keys[i] = new HStoreKey();
+ vals[i] = new BytesWritable();
+
+ if(firstRow.getLength() != 0) {
+ if(findFirstRow(i, firstRow)) {
+ continue;
+ }
+ }
+
+ while(getNext(i)) {
+ if(columnMatch(i)) {
+ break;
+ }
+ }
+ }
+
+ } catch (Exception ex) {
+ close();
+ }
+ }
+
+ /**
+ * The user didn't want to start scanning at the first row. This method
+ * seeks to the requested row.
+ *
+ * @param i - which iterator to advance
+ * @param firstRow - seek to this row
+ * @return - true if this is the first row or if the row was not found
+ */
+ boolean findFirstRow(int i, Text firstRow) throws IOException {
+ HStoreKey firstKey
+ = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]);
+
+ if(firstKey == null) {
+
+ // Didn't find it. Close the scanner and return TRUE
+
+ closeSubScanner(i);
+ return true;
+ }
+ keys[i].setRow(firstKey.getRow());
+ keys[i].setColumn(firstKey.getColumn());
+ keys[i].setVersion(firstKey.getTimestamp());
+ return columnMatch(i);
+ }
+
+ /**
+ * Get the next value from the specified reader.
+ *
+ * @param i - which reader to fetch next value from
+ * @return - true if there is more data available
+ */
+ boolean getNext(int i) throws IOException {
+ if(! readers[i].next(keys[i], vals[i])) {
+ closeSubScanner(i);
+ return false;
+ }
+ return true;
+ }
+
+ /** Close down the indicated reader. */
+ void closeSubScanner(int i) throws IOException {
+ try {
+ if(readers[i] != null) {
+ readers[i].close();
+ }
+
+ } finally {
+ readers[i] = null;
+ keys[i] = null;
+ vals[i] = null;
+ }
+ }
+
+ /** Shut it down! */
+ public void close() throws IOException {
+ if(! scannerClosed) {
+ try {
+ for(int i = 0; i < readers.length; i++) {
+ if(readers[i] != null) {
+ readers[i].close();
+ }
+ }
+
+ } finally {
+ locking.releaseReadLock();
+ scannerClosed = true;
+ }
+ }
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,378 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * Each HStore maintains a bunch of different data files.
+ *
+ * The filename is a mix of the parent dir, the region name, the column name,
+ * and the file identifier.
+ *
+ * This class handles all that path-building stuff for you.
+ ******************************************************************************/
+public class HStoreFile implements HConstants, WritableComparable {
+ public static final byte INFO_SEQ_NUM = 0;
+ public static final String HSTORE_DATFILE_PREFIX = "mapfile.dat.";
+ public static final String HSTORE_INFOFILE_PREFIX = "mapfile.info.";
+ public static final String HSTORE_DATFILE_DIR = "mapfiles";
+ public static final String HSTORE_INFO_DIR = "info";
+ static Random rand = new Random();
+
+ Path dir;
+ Text regionName;
+ Text colFamily;
+ long fileId;
+ Configuration conf;
+
+ /**
+ * An HStoreFile tracks 4 things: its parent dir, the region identifier, the
+ * column family, and the file identifier. If you know those four things, you
+ * know how to obtain the right HStoreFile.
+ *
+ * When merging or splitting HRegions, we might want to modify one of the
+ * params for an HStoreFile (effectively moving it elsewhere).
+ */
+ public HStoreFile(Configuration conf) {
+ this.conf = conf;
+ this.dir = new Path("");
+ this.regionName = new Text();
+ this.colFamily = new Text();
+ this.fileId = 0;
+ }
+
+ public HStoreFile(Configuration conf, Path dir, Text regionName,
+ Text colFamily, long fileId) {
+
+ this.conf = conf;
+ this.dir = dir;
+ this.regionName = new Text(regionName);
+ this.colFamily = new Text(colFamily);
+ this.fileId = fileId;
+ }
+
+ // Get the individual components
+
+ public Path getDir() {
+ return dir;
+ }
+
+ public Text getRegionName() {
+ return regionName;
+ }
+
+ public Text getColFamily() {
+ return colFamily;
+ }
+
+ public long fileId() {
+ return fileId;
+ }
+
+ // Build full filenames from those components
+
+ public Path getMapFilePath() {
+ return new Path(HStoreFile.getMapDir(dir, regionName, colFamily),
+ HSTORE_DATFILE_PREFIX + fileId);
+ }
+
+ public Path getInfoFilePath() {
+ return new Path(HStoreFile.getInfoDir(dir, regionName, colFamily),
+ HSTORE_INFOFILE_PREFIX + fileId);
+ }
+
+ // Static methods to build partial paths to internal directories. Useful for
+ // HStore construction and log-rebuilding.
+
+ public static Path getMapDir(Path dir, Text regionName, Text colFamily) {
+ return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName,
+ new Path(colFamily.toString(), HSTORE_DATFILE_DIR)));
+ }
+
+ public static Path getInfoDir(Path dir, Text regionName, Text colFamily) {
+ return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName,
+ new Path(colFamily.toString(), HSTORE_INFO_DIR)));
+ }
+
+ public static Path getHStoreDir(Path dir, Text regionName, Text colFamily) {
+ return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName,
+ colFamily.toString()));
+ }
+
+ public static Path getHRegionDir(Path dir, Text regionName) {
+ return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName));
+ }
+
+ /**
+ * Obtain a brand-new randomly-named HStoreFile. Checks the existing
+ * filesystem if the file already exists.
+ */
+ static HStoreFile obtainNewHStoreFile(Configuration conf, Path dir,
+ Text regionName, Text colFamily, FileSystem fs) throws IOException {
+
+ Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily);
+ long fileId = Math.abs(rand.nextLong());
+
+ Path testpath1 = new Path(mapdir, HSTORE_DATFILE_PREFIX + fileId);
+ Path testpath2 = new Path(mapdir, HSTORE_INFOFILE_PREFIX + fileId);
+ while(fs.exists(testpath1) || fs.exists(testpath2)) {
+ fileId = Math.abs(rand.nextLong());
+ testpath1 = new Path(mapdir, HSTORE_DATFILE_PREFIX + fileId);
+ testpath2 = new Path(mapdir, HSTORE_INFOFILE_PREFIX + fileId);
+ }
+ return new HStoreFile(conf, dir, regionName, colFamily, fileId);
+ }
+
+ /**
+ * Create a series of HStoreFiles loaded from the given directory.
+ *
+ * There must be a matching 'mapdir' and 'loginfo' pair of files.
+ * If only one exists, we'll delete it.
+ */
+ static Vector<HStoreFile> loadHStoreFiles(Configuration conf, Path dir,
+ Text regionName, Text colFamily, FileSystem fs) throws IOException {
+
+ Vector<HStoreFile> results = new Vector<HStoreFile>();
+ Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily);
+
+ Path datfiles[] = fs.listPaths(mapdir);
+ for(int i = 0; i < datfiles.length; i++) {
+ String name = datfiles[i].getName();
+
+ if(name.startsWith(HSTORE_DATFILE_PREFIX)) {
+ Long fileId = Long.parseLong(name.substring(HSTORE_DATFILE_PREFIX.length()));
+ HStoreFile curfile = new HStoreFile(conf, dir, regionName, colFamily, fileId);
+ Path mapfile = curfile.getMapFilePath();
+ Path infofile = curfile.getInfoFilePath();
+
+ if(fs.exists(infofile)) {
+ results.add(curfile);
+
+ } else {
+ fs.delete(mapfile);
+ }
+ }
+ }
+
+ Path infodir = HStoreFile.getInfoDir(dir, regionName, colFamily);
+ Path infofiles[] = fs.listPaths(infodir);
+ for(int i = 0; i < infofiles.length; i++) {
+ String name = infofiles[i].getName();
+
+ if(name.startsWith(HSTORE_INFOFILE_PREFIX)) {
+ long fileId = Long.parseLong(name.substring(HSTORE_INFOFILE_PREFIX.length()));
+ HStoreFile curfile = new HStoreFile(conf, dir, regionName, colFamily, fileId);
+ Path mapfile = curfile.getMapFilePath();
+
+ if(! fs.exists(mapfile)) {
+ fs.delete(curfile.getInfoFilePath());
+ }
+ }
+ }
+ return results;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // File handling
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Break this HStoreFile file into two new parts, which live in different
+ * brand-new HRegions.
+ */
+ public void splitStoreFile(Text midKey, HStoreFile dstA, HStoreFile dstB,
+ FileSystem fs, Configuration conf) throws IOException {
+
+ // Copy the appropriate tuples to one MapFile or the other.
+
+ MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), conf);
+ try {
+ MapFile.Writer outA = new MapFile.Writer(conf, fs,
+ dstA.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class);
+
+ try {
+ MapFile.Writer outB = new MapFile.Writer(conf, fs,
+ dstB.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class);
+
+ try {
+ HStoreKey readkey = new HStoreKey();
+ BytesWritable readval = new BytesWritable();
+
+ while(in.next(readkey, readval)) {
+ Text key = readkey.getRow();
+
+ if(key.compareTo(midKey) < 0) {
+ outA.append(readkey, readval);
+
+ } else {
+ outB.append(readkey, readval);
+ }
+ }
+
+ } finally {
+ outB.close();
+ }
+
+ } finally {
+ outA.close();
+ }
+
+ } finally {
+ in.close();
+ }
+
+ // Build an InfoFile for each output
+
+ long seqid = loadInfo(fs);
+ dstA.writeInfo(fs, seqid);
+ dstB.writeInfo(fs, seqid);
+ }
+
+ /**
+ * Write to this HStoreFile with all the contents of the given source HStoreFiles.
+ * We are merging multiple regions into a single new one.
+ */
+ public void mergeStoreFiles(Vector<HStoreFile> srcFiles, FileSystem fs,
+ Configuration conf) throws IOException {
+
+ // Copy all the source MapFile tuples into this HSF's MapFile
+
+ MapFile.Writer out = new MapFile.Writer(conf, fs, getMapFilePath().toString(),
+ HStoreKey.class, BytesWritable.class);
+
+ try {
+ for(Iterator<HStoreFile> it = srcFiles.iterator(); it.hasNext(); ) {
+ HStoreFile src = it.next();
+ MapFile.Reader in = new MapFile.Reader(fs, src.getMapFilePath().toString(), conf);
+
+ try {
+ HStoreKey readkey = new HStoreKey();
+ BytesWritable readval = new BytesWritable();
+ while(in.next(readkey, readval)) {
+ out.append(readkey, readval);
+ }
+
+ } finally {
+ in.close();
+ }
+ }
+
+ } finally {
+ out.close();
+ }
+
+ // Build a unified InfoFile from the source InfoFiles.
+
+ long unifiedSeqId = -1;
+ for(Iterator<HStoreFile> it = srcFiles.iterator(); it.hasNext(); ) {
+ HStoreFile hsf = it.next();
+ long curSeqId = hsf.loadInfo(fs);
+
+ if(curSeqId > unifiedSeqId) {
+ unifiedSeqId = curSeqId;
+ }
+ }
+ writeInfo(fs, unifiedSeqId);
+ }
+
+ /** Read in an info file, give it a unique ID. */
+ public long loadInfo(FileSystem fs) throws IOException {
+ Path p = getInfoFilePath();
+ DataInputStream in = new DataInputStream(fs.open(p));
+
+ try {
+ byte flag = in.readByte();
+ if(flag == INFO_SEQ_NUM) {
+ return in.readLong();
+
+ } else {
+ throw new IOException("Cannot process log file: " + p);
+ }
+
+ } finally {
+ in.close();
+ }
+ }
+
+ /** Write the file-identifier to disk */
+ public void writeInfo(FileSystem fs, long infonum) throws IOException {
+ Path p = getInfoFilePath();
+ DataOutputStream out = new DataOutputStream(fs.create(p));
+
+ try {
+ out.writeByte(INFO_SEQ_NUM);
+ out.writeLong(infonum);
+
+ } finally {
+ out.close();
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(dir.toString());
+ regionName.write(out);
+ colFamily.write(out);
+ out.writeLong(fileId);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.dir = new Path(in.readUTF());
+ this.regionName.readFields(in);
+ this.colFamily.readFields(in);
+ this.fileId = in.readLong();
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Comparable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public int compareTo(Object o) {
+ HStoreFile other = (HStoreFile) o;
+ int result = this.dir.compareTo(other.dir);
+ if(result == 0) {
+ this.regionName.compareTo(other.regionName);
+ }
+ if(result == 0) {
+ result = this.colFamily.compareTo(other.colFamily);
+ }
+ if(result == 0) {
+ if(this.fileId < other.fileId) {
+ result = -1;
+
+ } else if(this.fileId > other.fileId) {
+ result = 1;
+ }
+ }
+ return result;
+ }
+
+
+ public boolean equals(Object o) {
+ return this.compareTo(o) == 0;
+ }
+}
+
+
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,158 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * A Key for a stored row
+ ******************************************************************************/
+public class HStoreKey implements WritableComparable {
+ public static final byte[] DELETE_BYTES = "HSTOREKEY::DELETEVAL".getBytes();
+ public static final byte[] COMPLETE_CACHEFLUSH = "HSTOREKEY::CACHEFLUSH".getBytes();
+
+ public static Text extractFamily(Text col) throws IOException {
+ String column = col.toString();
+ int colpos = column.indexOf(":");
+ if(colpos < 0) {
+ throw new IllegalArgumentException("Illegal column name has no family indicator: " + column);
+ }
+ return new Text(column.substring(0, colpos));
+ }
+
+ Text row;
+ Text column;
+ long timestamp;
+
+ public HStoreKey() {
+ this.row = new Text();
+ this.column = new Text();
+ this.timestamp = Long.MAX_VALUE;
+ }
+
+ public HStoreKey(Text row) {
+ this.row = new Text(row);
+ this.column = new Text();
+ this.timestamp = Long.MAX_VALUE;
+ }
+
+ public HStoreKey(Text row, long timestamp) {
+ this.row = new Text(row);
+ this.column = new Text();
+ this.timestamp = timestamp;
+ }
+
+ public HStoreKey(Text row, Text column) {
+ this.row = new Text(row);
+ this.column = new Text(column);
+ this.timestamp = Long.MAX_VALUE;
+ }
+
+ public HStoreKey(Text row, Text column, long timestamp) {
+ this.row = new Text(row);
+ this.column = new Text(column);
+ this.timestamp = timestamp;
+ }
+
+ public void setRow(Text newrow) {
+ this.row.set(newrow);
+ }
+
+ public void setColumn(Text newcol) {
+ this.column.set(newcol);
+ }
+
+ public void setVersion(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public Text getRow() {
+ return row;
+ }
+
+ public Text getColumn() {
+ return column;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public boolean matchesRowCol(HStoreKey other) {
+ if(this.row.compareTo(other.row) == 0 &&
+ this.column.compareTo(other.column) == 0) {
+ return true;
+
+ } else {
+ return false;
+ }
+ }
+
+ public boolean matchesWithoutColumn(HStoreKey other) {
+ if((this.row.compareTo(other.row) == 0) &&
+ (this.timestamp >= other.getTimestamp())) {
+ return true;
+
+ } else {
+ return false;
+ }
+ }
+
+ public String toString() {
+ return row.toString() + "/" + column.toString() + "/" + timestamp;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Comparable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public int compareTo(Object o) {
+ HStoreKey other = (HStoreKey) o;
+ int result = this.row.compareTo(other.row);
+ if(result == 0) {
+ result = this.column.compareTo(other.column);
+
+ if(result == 0) {
+ if(this.timestamp < other.timestamp) {
+ result = 1;
+
+ } else if(this.timestamp > other.timestamp) {
+ result = -1;
+ }
+ }
+ }
+ return result;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public void write(DataOutput out) throws IOException {
+ row.write(out);
+ column.write(out);
+ out.writeLong(timestamp);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ row.readFields(in);
+ column.readFields(in);
+ timestamp = in.readLong();
+ }
+}
+
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,123 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * HTableDescriptor contains various facts about an HTable, like its columns,
+ * column families, etc.
+ ******************************************************************************/
+public class HTableDescriptor implements WritableComparable {
+ Text name;
+ int maxVersions;
+ TreeSet<Text> families = new TreeSet<Text>();
+
+ public HTableDescriptor() {
+ this.name = new Text();
+ this.families.clear();
+ }
+
+ public HTableDescriptor(String name, int maxVersions) {
+ this.name = new Text(name);
+ this.maxVersions = maxVersions;
+ }
+
+ public Text getName() {
+ return name;
+ }
+
+ public int getMaxVersions() {
+ return maxVersions;
+ }
+
+ /** Add a column */
+ public void addFamily(Text family) {
+ families.add(family);
+ }
+
+ /** Do we contain a given column? */
+ public boolean hasFamily(Text family) {
+ if(families.contains(family)) {
+ return true;
+
+ } else {
+ return false;
+ }
+ }
+
+ /** All the column families in this table. */
+ public TreeSet<Text> families() {
+ return families;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public void write(DataOutput out) throws IOException {
+ name.write(out);
+ out.writeInt(maxVersions);
+ out.writeInt(families.size());
+ for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
+ it.next().write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.name.readFields(in);
+ this.maxVersions = in.readInt();
+ int numCols = in.readInt();
+ families.clear();
+ for(int i = 0; i < numCols; i++) {
+ Text t = new Text();
+ t.readFields(in);
+ families.add(t);
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Comparable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public int compareTo(Object o) {
+ HTableDescriptor htd = (HTableDescriptor) o;
+ int result = name.compareTo(htd.name);
+ if(result == 0) {
+ result = maxVersions - htd.maxVersions;
+ }
+
+ if(result == 0) {
+ result = families.size() - htd.families.size();
+ }
+
+ if(result == 0) {
+ Iterator<Text> it2 = htd.families.iterator();
+ for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
+ Text family1 = it.next();
+ Text family2 = it2.next();
+ result = family1.compareTo(family2);
+ if(result != 0) {
+ return result;
+ }
+ }
+ }
+ return result;
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*******************************************************************************
+ * LabelledData is just a data pair.
+ * It includes a Text label and some associated data.
+ ******************************************************************************/
+public class LabelledData implements Writable {
+ Text label;
+ BytesWritable data;
+
+ public LabelledData() {
+ this.label = new Text();
+ this.data = new BytesWritable();
+ }
+
+ public LabelledData(Text label, byte[] data) {
+ this.label.set(label);
+ this.data = new BytesWritable(data);
+ }
+
+ public Text getLabel() {
+ return label;
+ }
+
+ public BytesWritable getDat() {
+ return data;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public void write(DataOutput out) throws IOException {
+ label.write(out);
+ data.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ label.readFields(in);
+ data.readFields(in);
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+
+/*******************************************************************************
+ * LeaseListener is a small class meant to be overridden by users of the Leases
+ * class.
+ *
+ * It receives events from the Leases class about the status of its accompanying
+ * lease. Users of the Leases class can use a LeaseListener subclass to, for
+ * example, clean up resources after a lease has expired.
+ ******************************************************************************/
+public abstract class LeaseListener {
+ public LeaseListener() {
+ }
+
+ public void leaseRenewed() {
+ }
+
+ /** When the user cancels a lease, this method is called. */
+ public void leaseCancelled() {
+ }
+
+ /** When a lease expires, this method is called. */
+ public void leaseExpired() {
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,220 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+import java.util.*;
+
+/*******************************************************************************
+ * Leases
+ *
+ * There are several server classes in HBase that need to track external clients
+ * that occasionally send heartbeats.
+ *
+ * These external clients hold resources in the server class. Those resources
+ * need to be released if the external client fails to send a heartbeat after
+ * some interval of time passes.
+ *
+ * The Leases class is a general reusable class for this kind of pattern.
+ *
+ * An instance of the Leases class will create a thread to do its dirty work.
+ * You should close() the instance if you want to clean up the thread properly.
+ ******************************************************************************/
+public class Leases {
+ long leasePeriod;
+ long leaseCheckFrequency;
+ LeaseMonitor leaseMonitor;
+ Thread leaseMonitorThread;
+ TreeMap<Text, Lease> leases = new TreeMap<Text, Lease>();
+ TreeSet<Lease> sortedLeases = new TreeSet<Lease>();
+ boolean running = true;
+
+ /** Indicate the length of the lease, in milliseconds */
+ public Leases(long leasePeriod, long leaseCheckFrequency) {
+ this.leasePeriod = leasePeriod;
+
+ this.leaseMonitor = new LeaseMonitor();
+ this.leaseMonitorThread = new Thread(leaseMonitor);
+ leaseMonitorThread.start();
+ }
+
+ /**
+ * Shut down this Leases outfit. All pending leases will be destroyed,
+ * without any cancellation calls.
+ */
+ public void close() {
+ this.running = false;
+ try {
+ this.leaseMonitorThread.join();
+ } catch (InterruptedException iex) {
+ }
+ synchronized(leases) {
+ synchronized(sortedLeases) {
+ leases.clear();
+ sortedLeases.clear();
+ }
+ }
+ }
+
+ /** A client obtains a lease... */
+ public void createLease(Text holderId, Text resourceId, LeaseListener listener) throws IOException {
+ synchronized(leases) {
+ synchronized(sortedLeases) {
+ Lease lease = new Lease(holderId, resourceId, listener);
+ Text leaseId = lease.getLeaseId();
+ if(leases.get(leaseId) != null) {
+ throw new IOException("Impossible state for createLease(): Lease for holderId " + holderId + " and resourceId " + resourceId + " is still held.");
+ }
+ leases.put(leaseId, lease);
+ sortedLeases.add(lease);
+ }
+ }
+ }
+
+ /** A client renews a lease... */
+ public void renewLease(Text holderId, Text resourceId) throws IOException {
+ synchronized(leases) {
+ synchronized(sortedLeases) {
+ Text leaseId = createLeaseId(holderId, resourceId);
+ Lease lease = leases.get(leaseId);
+ if(lease == null) {
+
+ // It's possible that someone tries to renew the lease, but
+ // it just expired a moment ago. So fail.
+
+ throw new IOException("Cannot renew lease is not held (holderId=" + holderId + ", resourceId=" + resourceId + ")");
+ }
+
+ sortedLeases.remove(lease);
+ lease.renew();
+ sortedLeases.add(lease);
+ }
+ }
+ }
+
+ /** A client explicitly cancels a lease. The lease-cleanup method is not called. */
+ public void cancelLease(Text holderId, Text resourceId) throws IOException {
+ synchronized(leases) {
+ synchronized(sortedLeases) {
+ Text leaseId = createLeaseId(holderId, resourceId);
+ Lease lease = leases.get(leaseId);
+ if(lease == null) {
+
+ // It's possible that someone tries to renew the lease, but
+ // it just expired a moment ago. So fail.
+
+ throw new IOException("Cannot cancel lease that is not held (holderId=" + holderId + ", resourceId=" + resourceId + ")");
+ }
+
+ sortedLeases.remove(lease);
+ leases.remove(leaseId);
+
+ lease.cancelled();
+ }
+ }
+ }
+
+ /** LeaseMonitor is a thread that expires Leases that go on too long. */
+ class LeaseMonitor implements Runnable {
+ public void run() {
+ while(running) {
+ synchronized(leases) {
+ synchronized(sortedLeases) {
+ Lease top;
+ while((sortedLeases.size() > 0)
+ && ((top = sortedLeases.first()) != null)) {
+
+ if(top.shouldExpire()) {
+ leases.remove(top.getLeaseId());
+ sortedLeases.remove(top);
+
+ top.expired();
+
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(leaseCheckFrequency);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+
+ /** Create a lease id out of the holder and resource ids. */
+ Text createLeaseId(Text holderId, Text resourceId) {
+ return new Text("_" + holderId + "/" + resourceId + "_");
+ }
+
+ /** This class tracks a single Lease. */
+ class Lease implements Comparable {
+ Text holderId;
+ Text resourceId;
+ LeaseListener listener;
+ long lastUpdate;
+
+ public Lease(Text holderId, Text resourceId, LeaseListener listener) {
+ this.holderId = holderId;
+ this.resourceId = resourceId;
+ this.listener = listener;
+ renew();
+ }
+
+ public Text getLeaseId() {
+ return createLeaseId(holderId, resourceId);
+ }
+
+ public boolean shouldExpire() {
+ return (System.currentTimeMillis() - lastUpdate > leasePeriod);
+ }
+
+ public void renew() {
+ this.lastUpdate = System.currentTimeMillis();
+ listener.leaseRenewed();
+ }
+
+ public void cancelled() {
+ listener.leaseCancelled();
+ }
+
+ public void expired() {
+ listener.leaseExpired();
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Comparable
+ //////////////////////////////////////////////////////////////////////////////
+
+ public int compareTo(Object o) {
+ Lease other = (Lease) o;
+ if(this.lastUpdate < other.lastUpdate) {
+ return -1;
+
+ } else if(this.lastUpdate > other.lastUpdate) {
+ return 1;
+
+ } else {
+ return this.getLeaseId().compareTo(other.getLeaseId());
+ }
+ }
+ }
+}
+
Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java?view=auto&rev=525267
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java Tue Apr 3 13:34:28 2007
@@ -0,0 +1,58 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.log4j.Level;
+/**
+ * Retrieve environment variables that control debugging and logging environment
+ */
+public class Environment {
+ public static boolean debugging = false;
+ public static Level logLevel = Level.INFO;
+
+ private Environment() {}; // Not instantiable
+
+ public static void getenv() {
+ String value = null;
+
+ value = System.getenv("DEBUGGING");
+ if(value != null && value.equalsIgnoreCase("TRUE")) {
+ debugging = true;
+ }
+
+ value = System.getenv("LOGGING_LEVEL");
+ if(value != null && value.length() != 0) {
+ if(value.equalsIgnoreCase("ALL")) {
+ logLevel = Level.ALL;
+ } else if(value.equalsIgnoreCase("DEBUG")) {
+ logLevel = Level.DEBUG;
+ } else if(value.equalsIgnoreCase("ERROR")) {
+ logLevel = Level.ERROR;
+ } else if(value.equalsIgnoreCase("FATAL")) {
+ logLevel = Level.FATAL;
+ } else if(value.equalsIgnoreCase("INFO")) {
+ logLevel = Level.INFO;
+ } else if(value.equalsIgnoreCase("OFF")) {
+ logLevel = Level.OFF;
+ } else if(value.equalsIgnoreCase("TRACE")) {
+ logLevel = Level.TRACE;
+ } else if(value.equalsIgnoreCase("WARN")) {
+ logLevel = Level.WARN;
+ }
+ }
+ }
+
+}