You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/09/09 19:14:24 UTC
svn commit: r813052 [2/4] - in /hadoop/hbase/branches/0.20_on_hadoop-0.18.3:
./ bin/ conf/ src/java/org/apache/hadoop/hbase/
src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/filter/
src/java/org/apache/hadoop/hbase/io/ src/java...
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed Sep 9 17:14:22 2009
@@ -143,7 +143,14 @@
addToMap(PageFilter.class, code++);
addToMap(InclusiveStopFilter.class, code++);
addToMap(ColumnCountGetFilter.class, code++);
+ addToMap(SingleColumnValueFilter.class, code++);
+ addToMap(BinaryComparator.class, code++);
+ addToMap(CompareFilter.class, code++);
+ addToMap(RowFilter.class, code++);
addToMap(ValueFilter.class, code++);
+ addToMap(QualifierFilter.class, code++);
+ addToMap(SkipFilter.class, code++);
+ addToMap(WritableByteArrayComparable.class, code++);
}
private Class<?> declaredClass;
@@ -400,7 +407,7 @@
if (b.byteValue() == NOT_ENCODED) {
String className = Text.readString(in);
try {
- instanceClass = conf.getClassByName(className);
+ instanceClass = getClassByName(conf, className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
}
@@ -422,6 +429,19 @@
return instance;
}
+ @SuppressWarnings("unchecked")
+ private static Class getClassByName(Configuration conf, String className)
+ throws ClassNotFoundException {
+ if(conf != null) {
+ return conf.getClassByName(className);
+ }
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if(cl == null) {
+ cl = HbaseObjectWritable.class.getClassLoader();
+ }
+ return Class.forName(className, true, cl);
+ }
+
private static void addToMap(final Class<?> clazz, final byte code) {
CLASS_TO_CODE.put(clazz, code);
CODE_TO_CLASS.put(code, clazz);
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java Wed Sep 9 17:14:22 2009
@@ -31,18 +31,27 @@
import java.util.List;
import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.Compressor;
@@ -958,18 +967,25 @@
private ByteBuffer decompress(final long offset, final int compressedSize,
final int decompressedSize)
throws IOException {
- Decompressor decompressor = this.compressAlgo.getDecompressor();
- // My guess is that the bounded range fis is needed to stop the
- // decompressor reading into next block -- IIRC, it just grabs a
- // bunch of data w/o regard to whether decompressor is coming to end of a
- // decompression.
- InputStream is = this.compressAlgo.createDecompressionStream(
- new BoundedRangeFileInputStream(this.istream, offset, compressedSize),
- decompressor, 0);
- ByteBuffer buf = ByteBuffer.allocate(decompressedSize);
- IOUtils.readFully(is, buf.array(), 0, buf.capacity());
- is.close();
- this.compressAlgo.returnDecompressor(decompressor);
+ Decompressor decompressor = null;
+ ByteBuffer buf = null;
+ try {
+ decompressor = this.compressAlgo.getDecompressor();
+ // My guess is that the bounded range fis is needed to stop the
+ // decompressor reading into next block -- IIRC, it just grabs a
+ // bunch of data w/o regard to whether decompressor is coming to end of a
+ // decompression.
+ InputStream is = this.compressAlgo.createDecompressionStream(
+ new BoundedRangeFileInputStream(this.istream, offset, compressedSize),
+ decompressor, 0);
+ buf = ByteBuffer.allocate(decompressedSize);
+ IOUtils.readFully(is, buf.array(), 0, buf.capacity());
+ is.close();
+ } finally {
+ if (null != decompressor) {
+ this.compressAlgo.returnDecompressor(decompressor);
+ }
+ }
return buf;
}
@@ -1594,65 +1610,156 @@
return (int)(l & 0x00000000ffffffffL);
}
-
+ /**
+ * Returns all files belonging to the given region directory. Could return an
+ * empty list.
+ *
+ * @param fs The file system reference.
+ * @param regionDir The region directory to scan.
+ * @return The list of files found.
+ * @throws IOException When scanning the files fails.
+ */
+ static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
+ throws IOException {
+ List<Path> res = new ArrayList<Path>();
+ PathFilter dirFilter = new FSUtils.DirFilter(fs);
+ FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
+ for(FileStatus dir : familyDirs) {
+ FileStatus[] files = fs.listStatus(dir.getPath());
+ for (FileStatus file : files) {
+ if (!file.isDir()) {
+ res.add(file.getPath());
+ }
+ }
+ }
+ return res;
+ }
+
public static void main(String []args) throws IOException {
- if (args.length < 1) {
- System.out.println("usage: <filename> -- dumps hfile stats");
- return;
- }
-
- HBaseConfiguration conf = new HBaseConfiguration();
-
- FileSystem fs = FileSystem.get(conf);
-
- Path path = new Path(args[0]);
-
- if (!fs.exists(path)) {
- System.out.println("File doesnt exist: " + path);
- return;
- }
-
- HFile.Reader reader = new HFile.Reader(fs, path, null, false);
- Map<byte[],byte[]> fileInfo = reader.loadFileInfo();
-
- // scan thru and count the # of unique rows.
-// HashSet<Integer> rows = new HashSet<Integer>(reader.getEntries()/4);
-// long start = System.currentTimeMillis();
-// HFileScanner scanner = reader.getScanner();
-// HStoreKey hsk;
-// scanner.seekTo();
-// do {
-// hsk = new HStoreKey(scanner.getKey());
-// rows.add(Bytes.hashCode(hsk.getRow()));
-// } while (scanner.next());
-// long end = System.currentTimeMillis();
-
-
- HFileScanner scanner = reader.getScanner();
- scanner.seekTo();
- KeyValue kv;
- do {
- kv = scanner.getKeyValue();
- System.out.println("K: " + Bytes.toStringBinary(kv.getKey()) +
- " V: " + Bytes.toStringBinary(kv.getValue()));
- } while (scanner.next());
-
- System.out.println("Block index size as per heapsize: " + reader.indexSize());
- System.out.println(reader.toString());
- System.out.println(reader.getTrailerInfo());
- System.out.println("Fileinfo:");
- for ( Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
- System.out.print(Bytes.toString(e.getKey()) + " = " );
-
- if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY"))==0) {
- long seqid = Bytes.toLong(e.getValue());
- System.out.println(seqid);
- } else {
- System.out.println(Bytes.toStringBinary(e.getValue()));
+ try {
+ // create options
+ Options options = new Options();
+ options.addOption("v", "verbose", false, "verbose output");
+ options.addOption("p", "printkv", false, "print key/value pairs");
+ options.addOption("m", "printmeta", false, "print meta data of file");
+ options.addOption("k", "checkrow", false, "enable row order check");
+ options.addOption("a", "checkfamily", false, "enable family check");
+ options.addOption("f", "file", true, "file to scan");
+ options.addOption("r", "region", true, "region to scan");
+ if (args.length == 0) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("HFile ", options, true);
+ System.exit(-1);
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ boolean verbose = cmd.hasOption("v");
+ boolean printKeyValue = cmd.hasOption("p");
+ boolean printMeta = cmd.hasOption("m");
+ boolean checkRow = cmd.hasOption("k");
+ boolean checkFamily = cmd.hasOption("a");
+ // get configuration, file system and get list of files
+ HBaseConfiguration conf = new HBaseConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ ArrayList<Path> files = new ArrayList<Path>();
+ if (cmd.hasOption("f")) {
+ files.add(new Path(cmd.getOptionValue("f")));
+ }
+ if (cmd.hasOption("r")) {
+ String regionName = cmd.getOptionValue("r");
+ byte[] rn = Bytes.toBytes(regionName);
+ byte[][] hri = HRegionInfo.parseRegionName(rn);
+ Path rootDir = FSUtils.getRootDir(conf);
+ Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
+ int enc = HRegionInfo.encodeRegionName(rn);
+ Path regionDir = new Path(tableDir, Integer.toString(enc));
+ if (verbose) System.out.println("region dir -> " + regionDir);
+ List<Path> regionFiles = getStoreFiles(fs, regionDir);
+ System.out.println("Number of region files found -> " +
+ regionFiles.size());
+ if (verbose) {
+ int i = 1;
+ for (Path p : regionFiles) {
+ System.out.println("Found file[" + i++ + "] -> " + p);
+ }
+ }
+ files.addAll(regionFiles);
}
-
+ // iterate over all files found
+ System.out.println("\nStart scan of files...\n");
+ for (Path file : files) {
+ if (verbose) System.out.println("Scanning -> " + file);
+ if (!fs.exists(file)) {
+ System.err.println("ERROR, file doesnt exist: " + file);
+ continue;
+ }
+ // create reader and load file info
+ HFile.Reader reader = new HFile.Reader(fs, file, null, false);
+ Map<byte[],byte[]> fileInfo = reader.loadFileInfo();
+ // scan over file and read key/value's and check if requested
+ HFileScanner scanner = reader.getScanner();
+ scanner.seekTo();
+ KeyValue pkv = null;
+ int count = 0;
+ do {
+ KeyValue kv = scanner.getKeyValue();
+ // dump key value
+ if (printKeyValue) {
+ System.out.println("K: " + Bytes.toStringBinary(kv.getKey()) +
+ " V: " + Bytes.toStringBinary(kv.getValue()));
+ }
+ // check if rows are in order
+ if (checkRow && pkv != null) {
+ if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
+ System.err.println("WARNING, previous row is greater then" +
+ " current row\n\tfilename -> " + file +
+ "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
+ "\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
+ }
+ }
+ // check if families are consistent
+ if (checkFamily) {
+ String fam = Bytes.toString(kv.getFamily());
+ if (!file.toString().contains(fam)) {
+ System.err.println("WARNING, filename does not match kv family," +
+ "\n\tfilename -> " + file +
+ "\n\tkeyvalue -> " + Bytes.toStringBinary(kv.getKey()));
+ }
+ if (pkv != null && Bytes.compareTo(pkv.getFamily(), kv.getFamily()) != 0) {
+ System.err.println("WARNING, previous kv has different family" +
+ " compared to current key\n\tfilename -> " + file +
+ "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
+ "\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
+ }
+ }
+ pkv = kv;
+ count++;
+ } while (scanner.next());
+ if (verbose || printKeyValue) {
+ System.out.println("Scanned kv count -> " + count);
+ }
+ // print meta data
+ if (printMeta) {
+ System.out.println("Block index size as per heapsize: " + reader.indexSize());
+ System.out.println(reader.toString());
+ System.out.println(reader.getTrailerInfo());
+ System.out.println("Fileinfo:");
+ for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
+ System.out.print(Bytes.toString(e.getKey()) + " = " );
+ if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY"))==0) {
+ long seqid = Bytes.toLong(e.getValue());
+ System.out.println(seqid);
+ } else {
+ System.out.println(Bytes.toStringBinary(e.getValue()));
+ }
+ }
+ }
+ reader.close();
+ }
+ System.out.println("\nDone.");
+ } catch (Exception e) {
+ e.printStackTrace();
}
-
- reader.close();
}
+
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/BaseScanner.java Wed Sep 9 17:14:22 2009
@@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
@@ -153,8 +154,10 @@
int rows = 0;
try {
regionServer = master.connection.getHRegionConnection(region.getServer());
- scannerId = regionServer.openScanner(region.getRegionName(),
- new Scan().addFamily(HConstants.CATALOG_FAMILY));
+ Scan s = new Scan().addFamily(HConstants.CATALOG_FAMILY);
+ // Make this scan do a row at a time otherwise, data can be stale.
+ s.setCaching(1);
+ scannerId = regionServer.openScanner(region.getRegionName(), s);
while (true) {
Result values = regionServer.next(scannerId);
if (values == null || values.size() == 0) {
@@ -165,19 +168,11 @@
emptyRows.add(values.getRow());
continue;
}
- String serverName = "";
- byte [] val = values.getValue(CATALOG_FAMILY, SERVER_QUALIFIER);
- if( val != null) {
- serverName = Bytes.toString(val);
- }
- long startCode = 0L;
- val = values.getValue(CATALOG_FAMILY, STARTCODE_QUALIFIER);
- if(val != null) {
- startCode = Bytes.toLong(val);
- }
+ String serverAddress = getServerAddress(values);
+ long startCode = getStartCode(values);
// Note Region has been assigned.
- checkAssigned(info, serverName, startCode);
+ checkAssigned(regionServer, region, info, serverAddress, startCode);
if (isSplitParent(info)) {
splitParents.put(info, values);
}
@@ -231,6 +226,24 @@
}
/*
+ * @param r
+ * @return Empty String or server address found in <code>r</code>
+ */
+ private String getServerAddress(final Result r) {
+ byte [] val = r.getValue(CATALOG_FAMILY, SERVER_QUALIFIER);
+ return val == null || val.length <= 0? "": Bytes.toString(val);
+ }
+
+ /*
+ * @param r
+ * @return Return 0L or server startcode found in <code>r</code>
+ */
+ private long getStartCode(final Result r) {
+ byte [] val = r.getValue(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+ return val == null || val.length <= 0? 0L: Bytes.toLong(val);
+ }
+
+ /*
* @param info Region to check.
* @return True if this is a split parent.
*/
@@ -326,8 +339,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug(split.getRegionNameAsString() + "/" + split.getEncodedName()
- + " no longer has references to " + Bytes.toStringBinary(parent)
- );
+ + " no longer has references to " + Bytes.toStringBinary(parent));
}
Delete delete = new Delete(parent);
@@ -337,12 +349,43 @@
return result;
}
- protected void checkAssigned(final HRegionInfo info,
+ /*
+ * Check the passed region is assigned. If not, add to unassigned.
+ * @param regionServer
+ * @param meta
+ * @param info
+ * @param serverAddress
+ * @param startCode
+ * @throws IOException
+ */
+ protected void checkAssigned(final HRegionInterface regionServer,
+ final MetaRegion meta, final HRegionInfo info,
final String serverAddress, final long startCode)
throws IOException {
String serverName = null;
- if (serverAddress != null && serverAddress.length() > 0) {
- serverName = HServerInfo.getServerName(serverAddress, startCode);
+ String sa = serverAddress;
+ long sc = startCode;
+ if (sa == null || sa.length() <= 0) {
+ // Scans are sloppy. They don't respect row locks and they get and
+ // cache a row internally so may have data that is a little stale. Make
+ // sure that for sure this serverAddress is null. We are trying to
+ // avoid double-assignments. See hbase-1784. Will have to wait till
+ // 0.21 hbase where we use zk to mediate state transitions to do better.
+ Get g = new Get(info.getRegionName());
+ g.addFamily(HConstants.CATALOG_FAMILY);
+ Result r = regionServer.get(meta.getRegionName(), g);
+ if (r != null && !r.isEmpty()) {
+ sa = getServerAddress(r);
+ if (sa != null && sa.length() > 0) {
+ // Reget startcode in case its changed in the meantime too.
+ sc = getStartCode(r);
+ LOG.debug("GET got values when meta found none: serverAddress=" + sa
+ + ", startCode=" + sc);
+ }
+ }
+ }
+ if (sa != null && sa.length() > 0) {
+ serverName = HServerInfo.getServerName(sa, sc);
}
HServerInfo storedInfo = null;
synchronized (this.master.regionManager) {
@@ -365,8 +408,8 @@
// The current assignment is invalid
if (LOG.isDebugEnabled()) {
LOG.debug("Current assignment of " + info.getRegionNameAsString() +
- " is not valid; " + " serverAddress=" + serverAddress +
- ", startCode=" + startCode + " unknown.");
+ " is not valid; " + " serverAddress=" + sa +
+ ", startCode=" + sc + " unknown.");
}
// Now get the region assigned
this.master.regionManager.setUnassigned(info, true);
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/RegionManager.java Wed Sep 9 17:14:22 2009
@@ -1349,19 +1349,24 @@
double avg = master.serverManager.getAverageLoad();
// nothing to balance if server load not more then average load
- if (servLoad.getLoad() <= Math.ceil(avg) || avg <= 2.0) return;
+ if(servLoad.getLoad() <= Math.ceil(avg) || avg <= 2.0) {
+ return;
+ }
- // check if server is overloaded
+ // check if current server is overloaded
int numRegionsToClose = balanceFromOverloaded(servLoad, avg);
// check if we can unload server by low loaded servers
- if (numRegionsToClose <= 0)
- balanceToLowloaded(info.getServerName(), servLoad, avg);
+ if(numRegionsToClose <= 0) {
+ numRegionsToClose = balanceToLowloaded(info.getServerName(), servLoad,
+ avg);
+ }
- if (maxRegToClose > 0)
+ if(maxRegToClose > 0) {
numRegionsToClose = Math.min(numRegionsToClose, maxRegToClose);
-
- if (numRegionsToClose > 0){
+ }
+
+ if(numRegionsToClose > 0) {
unassignSomeRegions(info, numRegionsToClose, mostLoadedRegions,
returnMsgs);
}
@@ -1416,7 +1421,8 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Server " + srvName + " will be unloaded for " +
"balance. Server load: " + numSrvRegs + " avg: " +
- avgLoad + ", regions can be moved: " + numMoveToLowLoaded);
+ avgLoad + ", regions can be moved: " + numMoveToLowLoaded +
+ ". Regions to close: " + numRegionsToClose);
}
return numRegionsToClose;
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/ServerManager.java Wed Sep 9 17:14:22 2009
@@ -44,6 +44,9 @@
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.HMsg.Type;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
@@ -499,10 +502,8 @@
// This prevents the master from sending a SPLIT message if the table
// has already split by the region server.
master.regionManager.endActions(region.getRegionName());
- HRegionInfo newRegionA = splitA.getRegionInfo();
- master.regionManager.setUnassigned(newRegionA, false);
- HRegionInfo newRegionB = splitB.getRegionInfo();
- master.regionManager.setUnassigned(newRegionB, false);
+ assignSplitDaughter(splitA.getRegionInfo());
+ assignSplitDaughter(splitB.getRegionInfo());
if (region.isMetaTable()) {
// A meta region has split.
master.regionManager.offlineMetaRegion(region.getStartKey());
@@ -512,6 +513,32 @@
}
/*
+ * Assign new daughter-of-a-split UNLESS its already been assigned.
+ * It could have been assigned already in rare case where there was a large
+ * gap between insertion of the daughter region into .META. by the
+ * splitting regionserver and receipt of the split message in master (See
+ * HBASE-1784).
+ * @param hri Region to assign.
+ */
+ private void assignSplitDaughter(final HRegionInfo hri) {
+ MetaRegion mr = this.master.regionManager.getFirstMetaRegionForRegion(hri);
+ Get g = new Get(hri.getRegionName());
+ g.addFamily(HConstants.CATALOG_FAMILY);
+ try {
+ HRegionInterface server =
+ master.connection.getHRegionConnection(mr.getServer());
+ Result r = server.get(mr.getRegionName(), g);
+ // If size > 3 -- presume regioninfo, startcode and server -- then presume
+ // that this daughter already assigned and return.
+ if (r.size() >= 3) return;
+ } catch (IOException e) {
+ LOG.warn("Failed get on " + HConstants.CATALOG_FAMILY_STR +
+ "; possible double-assignment?", e);
+ }
+ this.master.regionManager.setUnassigned(hri, false);
+ }
+
+ /*
* Region server is reporting that a region is now opened
* @param serverInfo
* @param region
Added: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java?rev=813052&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java (added)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java Wed Sep 9 17:14:22 2009
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
+ * Like {@link GetDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * implement the {@link DeleteTracker} interface since state spans rows (There
+ * is no update nor reset method).
+ */
+class GetClosestRowBeforeTracker {
+ private final KeyValue targetkey;
+ // Any cell w/ a ts older than this is expired.
+ private final long oldestts;
+ private KeyValue candidate = null;
+ private final KVComparator kvcomparator;
+ // Flag for whether we're doing getclosest on a metaregion.
+ private final boolean metaregion;
+ // Offset and length into targetkey demarking table name (if in a metaregion).
+ private final int rowoffset;
+ private final int tablenamePlusDelimiterLength;
+
+ // Deletes keyed by row. Comparator compares on row portion of KeyValue only.
+ private final NavigableMap<KeyValue, NavigableSet<KeyValue>> deletes;
+
+ /**
+ * @param c
+ * @param kv Presume first on row: i.e. empty column, maximum timestamp and
+ * a type of Type.Maximum
+ * @param ttl Time to live in ms for this Store
+ * @param metaregion True if this is .META. or -ROOT- region.
+ */
+ GetClosestRowBeforeTracker(final KVComparator c, final KeyValue kv,
+ final long ttl, final boolean metaregion) {
+ super();
+ this.metaregion = metaregion;
+ this.targetkey = kv;
+ // If we are in a metaregion, then our table name is the prefix on the
+ // targetkey.
+ this.rowoffset = kv.getRowOffset();
+ int l = -1;
+ if (metaregion) {
+ l = KeyValue.getDelimiter(kv.getBuffer(), rowoffset, kv.getRowLength(),
+ HRegionInfo.DELIMITER) - this.rowoffset;
+ }
+ this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
+ this.oldestts = System.currentTimeMillis() - ttl;
+ this.kvcomparator = c;
+ KeyValue.RowComparator rc = new KeyValue.RowComparator(this.kvcomparator);
+ this.deletes = new TreeMap<KeyValue, NavigableSet<KeyValue>>(rc);
+ }
+
+ /**
+ * @param kv
+ * @return True if this <code>kv</code> is expired.
+ */
+ boolean isExpired(final KeyValue kv) {
+ return Store.isExpired(kv, this.oldestts);
+ }
+
+ /*
+ * Add the specified KeyValue to the list of deletes.
+ * @param kv
+ */
+ private void addDelete(final KeyValue kv) {
+ NavigableSet<KeyValue> rowdeletes = this.deletes.get(kv);
+ if (rowdeletes == null) {
+ rowdeletes = new TreeSet<KeyValue>(this.kvcomparator);
+ this.deletes.put(kv, rowdeletes);
+ }
+ rowdeletes.add(kv);
+ }
+
+ /*
+ * @param kv Adds candidate if nearer the target than previous candidate.
+ * @return True if updated candidate.
+ */
+ private boolean addCandidate(final KeyValue kv) {
+ if (!isDeleted(kv) && isBetterCandidate(kv)) {
+ this.candidate = kv;
+ return true;
+ }
+ return false;
+ }
+
+ boolean isBetterCandidate(final KeyValue contender) {
+ return this.candidate == null ||
+ (this.kvcomparator.compareRows(this.candidate, contender) < 0 &&
+ this.kvcomparator.compareRows(contender, this.targetkey) <= 0);
+ }
+
+ /*
+ * Check if specified KeyValue buffer has been deleted by a previously
+ * seen delete.
+ * @param kv
+ * @return true is the specified KeyValue is deleted, false if not
+ */
+ private boolean isDeleted(final KeyValue kv) {
+ if (this.deletes.isEmpty()) return false;
+ NavigableSet<KeyValue> rowdeletes = this.deletes.get(kv);
+ if (rowdeletes == null || rowdeletes.isEmpty()) return false;
+ return isDeleted(kv, rowdeletes);
+ }
+
+ /**
+ * Check if the specified KeyValue buffer has been deleted by a previously
+ * seen delete.
+ * @param kv
+ * @param ds
+ * @return True is the specified KeyValue is deleted, false if not
+ */
+ public boolean isDeleted(final KeyValue kv, final NavigableSet<KeyValue> ds) {
+ if (deletes == null || deletes.isEmpty()) return false;
+ for (KeyValue d: ds) {
+ long kvts = kv.getTimestamp();
+ long dts = d.getTimestamp();
+ if (d.isDeleteFamily()) {
+ if (kvts <= dts) return true;
+ continue;
+ }
+ // Check column
+ int ret = Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(),
+ kv.getQualifierLength(),
+ d.getBuffer(), d.getQualifierOffset(), d.getQualifierLength());
+ if (ret <= -1) {
+ // This delete is for an earlier column.
+ continue;
+ } else if (ret >= 1) {
+ // Beyond this kv.
+ break;
+ }
+ // Check Timestamp
+ if (kvts > dts) return false;
+
+ // Check Type
+ switch (KeyValue.Type.codeToType(d.getType())) {
+ case Delete: return kvts == dts;
+ case DeleteColumn: return true;
+ default: continue;
+ }
+ }
+ return false;
+ }
+
+ /*
+ * Handle keys whose values hold deletes.
+ * Add to the set of deletes and then if the candidate keys contain any that
+ * might match, then check for a match and remove it. Implies candidates
+ * is made with a Comparator that ignores key type.
+ * @param kv
+ * @return True if we removed <code>k</code> from <code>candidates</code>.
+ */
+ boolean handleDeletes(final KeyValue kv) {
+ addDelete(kv);
+ boolean deleted = false;
+ if (!hasCandidate()) return deleted;
+ if (isDeleted(this.candidate)) {
+ this.candidate = null;
+ deleted = true;
+ }
+ return deleted;
+ }
+
+ /**
+ * Do right thing with passed key, add to deletes or add to candidates.
+ * @param kv
+ * @return True if we added a candidate
+ */
+ boolean handle(final KeyValue kv) {
+ if (kv.isDelete()) {
+ handleDeletes(kv);
+ return false;
+ }
+ return addCandidate(kv);
+ }
+
+ /**
+ * @return True if has candidate
+ */
+ public boolean hasCandidate() {
+ return this.candidate != null;
+ }
+
+ /**
+ * @return Best candidate or null.
+ */
+ public KeyValue getCandidate() {
+ return this.candidate;
+ }
+
+ public KeyValue getTargetKey() {
+ return this.targetkey;
+ }
+
+ /**
+ * @param kv Current kv
+ * @param First on row kv.
+ * @param state
+ * @return True if we went too far, past the target key.
+ */
+ boolean isTooFar(final KeyValue kv, final KeyValue firstOnRow) {
+ return this.kvcomparator.compareRows(kv, firstOnRow) > 0;
+ }
+
+ boolean isTargetTable(final KeyValue kv) {
+ if (!metaregion) return true;
+ // Compare start of keys row. Compare including delimiter. Saves having
+ // to calculate where tablename ends in the candidate kv.
+ return Bytes.compareTo(this.targetkey.getBuffer(), this.rowoffset,
+ this.tablenamePlusDelimiterLength,
+ kv.getBuffer(), kv.getRowOffset(), this.tablenamePlusDelimiterLength) == 0;
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java Wed Sep 9 17:14:22 2009
@@ -38,8 +38,8 @@
* This class is NOT thread-safe as queries are never multi-threaded
*/
public class GetDeleteTracker implements DeleteTracker {
-
- private long familyStamp = -1L;
+ private static long UNSET = -1L;
+ private long familyStamp = UNSET;
protected List<Delete> deletes = null;
private List<Delete> newDeletes = new ArrayList<Delete>();
private Iterator<Delete> iterator;
@@ -64,7 +64,7 @@
@Override
public void add(byte [] buffer, int qualifierOffset, int qualifierLength,
long timestamp, byte type) {
- if(type == KeyValue.Type.DeleteFamily.getCode()) {
+ if (type == KeyValue.Type.DeleteFamily.getCode()) {
if(timestamp > familyStamp) {
familyStamp = timestamp;
}
@@ -88,14 +88,13 @@
@Override
public boolean isDeleted(byte [] buffer, int qualifierOffset,
int qualifierLength, long timestamp) {
-
// Check against DeleteFamily
if (timestamp <= familyStamp) {
return true;
}
// Check if there are other deletes
- if(this.delete == null) {
+ if (this.delete == null) {
return false;
}
@@ -103,7 +102,7 @@
int ret = Bytes.compareTo(buffer, qualifierOffset, qualifierLength,
this.delete.buffer, this.delete.qualifierOffset,
this.delete.qualifierLength);
- if(ret <= -1) {
+ if (ret <= -1) {
// Have not reached the next delete yet
return false;
} else if(ret >= 1) {
@@ -149,10 +148,8 @@
@Override
public boolean isEmpty() {
- if(this.familyStamp == 0L && this.delete == null) {
- return true;
- }
- return false;
+ return this.familyStamp == UNSET && this.delete == null &&
+ this.newDeletes.isEmpty();
}
@Override
@@ -160,7 +157,7 @@
this.deletes = null;
this.delete = null;
this.newDeletes = new ArrayList<Delete>();
- this.familyStamp = 0L;
+ this.familyStamp = UNSET;
this.iterator = null;
}
@@ -173,7 +170,7 @@
@Override
public void update() {
// If no previous deletes, use new deletes and return
- if(this.deletes == null || this.deletes.size() == 0) {
+ if (this.deletes == null || this.deletes.size() == 0) {
finalize(this.newDeletes);
return;
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Sep 9 17:14:22 2009
@@ -122,7 +122,7 @@
private final Map<Integer, byte []> locksToRows =
new ConcurrentHashMap<Integer, byte []>();
protected final Map<byte [], Store> stores =
- new ConcurrentSkipListMap<byte [], Store>(KeyValue.FAMILY_COMPARATOR);
+ new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
//These variable are just used for getting data out of the region, to test on
//client side
@@ -1022,9 +1022,8 @@
* @return map of values
* @throws IOException
*/
- public Result getClosestRowBefore(final byte [] row,
- final byte [] family)
- throws IOException{
+ public Result getClosestRowBefore(final byte [] row, final byte [] family)
+ throws IOException {
// look across all the HStores for this region and determine what the
// closest key is across all column families, since the data may be sparse
KeyValue key = null;
@@ -1038,22 +1037,16 @@
if (key == null) {
return null;
}
- List<KeyValue> results = new ArrayList<KeyValue>();
- // This will get all results for this store. TODO: Do I have to make a
- // new key?
- if (!this.comparator.matchingRows(kv, key)) {
- kv = new KeyValue(key.getRow(), HConstants.LATEST_TIMESTAMP);
- }
+ // This will get all results for this store. TODO: Do we need to do this?
Get get = new Get(key.getRow());
+ List<KeyValue> results = new ArrayList<KeyValue>();
store.get(get, null, results);
-
return new Result(results);
} finally {
splitsAndClosesLock.readLock().unlock();
}
}
- //TODO
/**
* Return an iterator that scans over the HRegion, returning the indicated
* columns and rows specified by the {@link Scan}.
@@ -1115,7 +1108,8 @@
//Check to see if this is a deleteRow insert
if(delete.getFamilyMap().isEmpty()){
for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
- delete.deleteFamily(family);
+ // Don't eat the timestamp
+ delete.deleteFamily(family, delete.getTimeStamp());
}
} else {
for(byte [] family : delete.getFamilyMap().keySet()) {
@@ -1166,7 +1160,8 @@
Get g = new Get(kv.getRow());
NavigableSet<byte []> qualifiers =
new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(kv.getQualifier());
+ byte [] q = kv.getQualifier();
+ if (q != null && q.length > 0) qualifiers.add(kv.getQualifier());
get(store, g, qualifiers, result);
if (result.isEmpty()) {
// Nothing to delete
@@ -1199,7 +1194,7 @@
* @throws IOException
*/
public void put(Put put) throws IOException {
- this.put(put, null, put.writeToWAL());
+ this.put(put, null, put.getWriteToWAL());
}
/**
@@ -1217,7 +1212,7 @@
* @throws IOException
*/
public void put(Put put, Integer lockid) throws IOException {
- this.put(put, lockid, put.writeToWAL());
+ this.put(put, lockid, put.getWriteToWAL());
}
/**
@@ -1337,10 +1332,9 @@
/**
- * Checks if any stamps are > now. If so, sets them to now.
+ * Checks if any stamps is Long.MAX_VALUE. If so, sets them to now.
* <p>
- * This acts to be prevent users from inserting future stamps as well as
- * to replace LATEST_TIMESTAMP with now.
+ * This acts to replace LATEST_TIMESTAMP with now.
* @param keys
* @param now
* @return <code>true</code> when updating the time stamp completed.
@@ -1350,7 +1344,9 @@
return false;
}
for(KeyValue key : keys) {
- key.updateLatestStamp(now);
+ if(key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
+ key.updateLatestStamp(now);
+ }
}
return true;
}
@@ -1740,6 +1736,9 @@
}
outResults.addAll(results);
resetFilters();
+ if(filter != null && filter.filterAllRemaining()) {
+ return false;
+ }
return returnResult;
}
@@ -1763,6 +1762,9 @@
// see if current row should be filtered based on row key
if ((filter != null && filter.filterRowKey(row, 0, row.length)) ||
(oldFilter != null && oldFilter.filterRowKey(row, 0, row.length))) {
+ if(!results.isEmpty() && !Bytes.equals(currentRow, row)) {
+ return true;
+ }
this.storeHeap.next(results);
results.clear();
resetFilters();
@@ -2432,6 +2434,7 @@
} else {
// Default behavior
Scan scan = new Scan();
+ // scan.addFamily(HConstants.CATALOG_FAMILY);
InternalScanner scanner = region.getScanner(scan);
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
@@ -2444,6 +2447,7 @@
} finally {
scanner.close();
}
+ // System.out.println(region.getClosestRowBefore(Bytes.toBytes("GeneratedCSVContent2,E3652782193BC8D66A0BA1629D0FAAAB,9993372036854775807")));
}
} finally {
region.close();
@@ -2481,7 +2485,6 @@
printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
}
majorCompact = true;
-
}
Path tableDir = new Path(args[0]);
HBaseConfiguration c = new HBaseConfiguration();
@@ -2497,4 +2500,4 @@
if (bc != null) bc.shutdown();
}
}
-}
+}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Wed Sep 9 17:14:22 2009
@@ -28,8 +28,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -56,8 +57,6 @@
public class MemStore implements HeapSize {
private static final Log LOG = LogFactory.getLog(MemStore.class);
- private final long ttl;
-
// MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the
// better semantics. The Map will overwrite if passed a key it already had
// whereas the Set will not add new KV if key is same though value might be
@@ -68,7 +67,7 @@
// Snapshot of memstore. Made for flusher.
volatile KeyValueSkipListSet snapshot;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final KeyValue.KVComparator comparator;
@@ -81,20 +80,22 @@
// Used to track own heapSize
final AtomicLong size;
+ // All access must be synchronized.
+ final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
+ new CopyOnWriteArraySet<ChangedMemStoreObserver>();
+
/**
* Default constructor. Used for tests.
*/
public MemStore() {
- this(HConstants.FOREVER, KeyValue.COMPARATOR);
+ this(KeyValue.COMPARATOR);
}
/**
* Constructor.
- * @param ttl The TTL for cache entries, in milliseconds.
- * @param c
+ * @param c Comparator
*/
- public MemStore(final long ttl, final KeyValue.KVComparator c) {
- this.ttl = ttl;
+ public MemStore(final KeyValue.KVComparator c) {
this.comparator = c;
this.comparatorIgnoreTimestamp =
this.comparator.getComparatorIgnoringTimestamps();
@@ -127,12 +128,10 @@
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
- // We used to synchronize on the memstore here but we're inside a
- // write lock so removed it. Comment is left in case removal was a
- // mistake. St.Ack
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
+ tellChangedMemStoreObservers();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
}
@@ -142,6 +141,15 @@
}
}
+ /*
+ * Tell outstanding scanners that memstore has changed.
+ */
+ private void tellChangedMemStoreObservers() {
+ for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
+ o.changedMemStore();
+ }
+ }
+
/**
* Return the current snapshot.
* Called by flusher to get current snapshot made by a previous
@@ -172,6 +180,7 @@
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator);
+ tellChangedMemStoreObservers();
}
} finally {
this.lock.writeLock().unlock();
@@ -184,15 +193,15 @@
* @return approximate size of the passed key and value.
*/
long add(final KeyValue kv) {
- long size = -1;
+ long s = -1;
this.lock.readLock().lock();
try {
- size = heapSizeChange(kv, this.kvset.add(kv));
- this.size.addAndGet(size);
+ s = heapSizeChange(kv, this.kvset.add(kv));
+ this.size.addAndGet(s);
} finally {
this.lock.readLock().unlock();
}
- return size;
+ return s;
}
/**
@@ -201,7 +210,7 @@
* @return approximate size of the passed key and value.
*/
long delete(final KeyValue delete) {
- long size = 0;
+ long s = 0;
this.lock.readLock().lock();
//Have to find out what we want to do here, to find the fastest way of
//removing things that are under a delete.
@@ -261,17 +270,17 @@
//Delete all the entries effected by the last added delete
for (KeyValue kv : deletes) {
notpresent = this.kvset.remove(kv);
- size -= heapSizeChange(kv, notpresent);
+ s -= heapSizeChange(kv, notpresent);
}
// Adding the delete to memstore. Add any value, as long as
// same instance each time.
- size += heapSizeChange(delete, this.kvset.add(delete));
+ s += heapSizeChange(delete, this.kvset.add(delete));
} finally {
this.lock.readLock().unlock();
}
- this.size.addAndGet(size);
- return size;
+ this.size.addAndGet(s);
+ return s;
}
/**
@@ -325,200 +334,122 @@
return result;
}
-
/**
- * @param row Row to look for.
- * @param candidateKeys Map of candidate keys (Accumulation over lots of
- * lookup over stores and memstores)
- */
- void getRowKeyAtOrBefore(final KeyValue row,
- final NavigableSet<KeyValue> candidateKeys) {
- getRowKeyAtOrBefore(row, candidateKeys,
- new TreeSet<KeyValue>(this.comparator), System.currentTimeMillis());
- }
-
- /**
- * @param kv Row to look for.
- * @param candidates Map of candidate keys (Accumulation over lots of
- * lookup over stores and memstores). Pass a Set with a Comparator that
- * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
- * with a different Type to the candidate key.
- * @param deletes Pass a Set that has a Comparator that ignores key type.
- * @param now
- */
- void getRowKeyAtOrBefore(final KeyValue kv,
- final NavigableSet<KeyValue> candidates,
- final NavigableSet<KeyValue> deletes, final long now) {
+ * @param state
+ */
+ void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
this.lock.readLock().lock();
try {
- getRowKeyAtOrBefore(kvset, kv, candidates, deletes, now);
- getRowKeyAtOrBefore(snapshot, kv, candidates, deletes, now);
+ getRowKeyAtOrBefore(kvset, state);
+ getRowKeyAtOrBefore(snapshot, state);
} finally {
this.lock.readLock().unlock();
}
}
+ /*
+ * @param set
+ * @param state Accumulates deletes and candidates.
+ */
private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
- final KeyValue kv, final NavigableSet<KeyValue> candidates,
- final NavigableSet<KeyValue> deletes, final long now) {
+ final GetClosestRowBeforeTracker state) {
if (set.isEmpty()) {
return;
}
- // We want the earliest possible to start searching from. Start before
- // the candidate key in case it turns out a delete came in later.
- KeyValue search = candidates.isEmpty()? kv: candidates.first();
-
- // Get all the entries that come equal or after our search key
- SortedSet<KeyValue> tail = set.tailSet(search);
-
- // if there are items in the tail map, there's either a direct match to
- // the search key, or a range of values between the first candidate key
- // and the ultimate search key (or the end of the cache)
- if (!tail.isEmpty() &&
- this.comparator.compareRows(tail.first(), search) <= 0) {
- // Keep looking at cells as long as they are no greater than the
- // ultimate search key and there's still records left in the map.
- KeyValue deleted = null;
- KeyValue found = null;
- for (Iterator<KeyValue> iterator = tail.iterator();
- iterator.hasNext() && (found == null ||
- this.comparator.compareRows(found, kv) <= 0);) {
- found = iterator.next();
- if (this.comparator.compareRows(found, kv) <= 0) {
- if (found.isDeleteType()) {
- Store.handleDeletes(found, candidates, deletes);
- if (deleted == null) {
- deleted = found;
- }
- } else {
- if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
- candidates.add(found);
- } else {
- if (deleted == null) {
- deleted = found;
- }
- // TODO: Check this removes the right key.
- // Its expired. Remove it.
- iterator.remove();
- }
- }
- }
+ if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
+ // Found nothing in row. Try backing up.
+ getRowKeyBefore(set, state);
+ }
+ }
+
+ /*
+ * Walk forward in a row from <code>firstOnRow</code>. Presumption is that
+ * we have been passed the first possible key on a row. As we walk forward
+ * we accumulate deletes until we hit a candidate on the row at which point
+ * we return.
+ * @param set
+ * @param firstOnRow First possible key on this row.
+ * @param state
+ * @return True if we found a candidate walking this row.
+ */
+ private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
+ final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
+ boolean foundCandidate = false;
+ SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
+ if (tail.isEmpty()) return foundCandidate;
+ for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
+ KeyValue kv = i.next();
+ // Did we go beyond the target row? If so break.
+ if (state.isTooFar(kv, firstOnRow)) break;
+ if (state.isExpired(kv)) {
+ i.remove();
+ continue;
}
- if (candidates.isEmpty() && deleted != null) {
- getRowKeyBefore(set, deleted, candidates, deletes, now);
+ // If we added something, this row is a contender. break.
+ if (state.handle(kv)) {
+ foundCandidate = true;
+ break;
}
- } else {
- // The tail didn't contain any keys that matched our criteria, or was
- // empty. Examine all the keys that proceed our splitting point.
- getRowKeyBefore(set, search, candidates, deletes, now);
}
+ return foundCandidate;
}
/*
- * Get row key that comes before passed <code>search_key</code>
- * Use when we know search_key is not in the map and we need to search
- * earlier in the cache.
+ * Walk backwards through the passed set a row at a time until we run out of
+ * set or until we get a candidate.
* @param set
- * @param search
- * @param candidates
- * @param deletes Pass a Set that has a Comparator that ignores key type.
- * @param now
+ * @param state
*/
private void getRowKeyBefore(NavigableSet<KeyValue> set,
- KeyValue search, NavigableSet<KeyValue> candidates,
- final NavigableSet<KeyValue> deletes, final long now) {
- NavigableSet<KeyValue> head = set.headSet(search, false);
- // If we tried to create a headMap and got an empty map, then there are
- // no keys at or before the search key, so we're done.
- if (head.isEmpty()) {
- return;
+ final GetClosestRowBeforeTracker state) {
+ KeyValue firstOnRow = state.getTargetKey();
+ for (Member p = memberOfPreviousRow(set, state, firstOnRow);
+ p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
+ // Make sure we don't fall out of our table.
+ if (!state.isTargetTable(p.kv)) break;
+ // Stop looking if we've exited the better candidate range.
+ if (!state.isBetterCandidate(p.kv)) break;
+ // Make into firstOnRow
+ firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
+ // If we find something, break;
+ if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
}
+ }
- // If there aren't any candidate keys at this point, we need to search
- // backwards until we find at least one candidate or run out of headMap.
- if (candidates.isEmpty()) {
- KeyValue lastFound = null;
- // TODO: Confirm we're iterating in the right order
- for (Iterator<KeyValue> i = head.descendingIterator();
- i.hasNext();) {
- KeyValue found = i.next();
- // if the last row we found a candidate key for is different than
- // the row of the current candidate, we can stop looking -- if its
- // not a delete record.
- boolean deleted = found.isDeleteType();
- if (lastFound != null &&
- this.comparator.matchingRows(lastFound, found) && !deleted) {
- break;
- }
- // If this isn't a delete, record it as a candidate key. Also
- // take note of this candidate so that we'll know when
- // we cross the row boundary into the previous row.
- if (!deleted) {
- if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
- lastFound = found;
- candidates.add(found);
- } else {
- // Its expired.
- Store.expiredOrDeleted(set, found);
- }
- } else {
- // We are encountering items in reverse. We may have just added
- // an item to candidates that this later item deletes. Check. If we
- // found something in candidates, remove it from the set.
- if (Store.handleDeletes(found, candidates, deletes)) {
- remove(set, found);
- }
- }
- }
- } else {
- // If there are already some candidate keys, we only need to consider
- // the very last row's worth of keys in the headMap, because any
- // smaller acceptable candidate keys would have caused us to start
- // our search earlier in the list, and we wouldn't be searching here.
- SortedSet<KeyValue> rowTail =
- head.tailSet(head.last().cloneRow(HConstants.LATEST_TIMESTAMP));
- Iterator<KeyValue> i = rowTail.iterator();
- do {
- KeyValue found = i.next();
- if (found.isDeleteType()) {
- Store.handleDeletes(found, candidates, deletes);
- } else {
- if (ttl == HConstants.FOREVER ||
- now < found.getTimestamp() + ttl ||
- !deletes.contains(found)) {
- candidates.add(found);
- } else {
- Store.expiredOrDeleted(set, found);
- }
- }
- } while (i.hasNext());
+ /*
+ * Immutable data structure to hold member found in set and the set it was
+ * found in. Include set because it is carrying context.
+ */
+ private class Member {
+ final KeyValue kv;
+ final NavigableSet<KeyValue> set;
+ Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
+ this.kv = kv;
+ this.set = s;
}
}
-
/*
- * @param set
- * @param delete This is a delete record. Remove anything behind this of same
- * r/c/ts.
- * @return True if we removed anything.
- */
- private boolean remove(final NavigableSet<KeyValue> set,
- final KeyValue delete) {
- SortedSet<KeyValue> s = set.tailSet(delete);
- if (s.isEmpty()) {
- return false;
- }
- boolean removed = false;
- for (KeyValue kv: s) {
- if (this.comparatorIgnoreType.compare(kv, delete) == 0) {
- // Same r/c/ts. Remove it.
- s.remove(kv);
- removed = true;
+ * @param set Set to walk back in. Pass a first in row or we'll return
+ * same row (loop).
+ * @param state Utility and context.
+ * @param firstOnRow First item on the row after the one we want to find a
+ * member in.
+ * @return Null or member of row previous to <code>firstOnRow</code>
+ */
+ private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
+ final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
+ NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
+ if (head.isEmpty()) return null;
+ for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
+ KeyValue found = i.next();
+ if (state.isExpired(found)) {
+ i.remove();
continue;
}
- break;
+ return new Member(head, found);
}
- return removed;
+ return null;
}
/**
@@ -527,9 +458,8 @@
KeyValueScanner [] getScanners() {
this.lock.readLock().lock();
try {
- KeyValueScanner [] scanners = new KeyValueScanner[2];
- scanners[0] = new MemStoreScanner(this.kvset);
- scanners[1] = new MemStoreScanner(this.snapshot);
+ KeyValueScanner [] scanners = new KeyValueScanner[1];
+ scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
return scanners;
} finally {
this.lock.readLock().unlock();
@@ -603,18 +533,22 @@
/*
* MemStoreScanner implements the KeyValueScanner.
- * It lets the caller scan the contents of a memstore.
- * This behaves as if it were a real scanner but does not maintain position
- * in the passed memstore tree.
- */
- protected class MemStoreScanner implements KeyValueScanner {
- private final NavigableSet<KeyValue> kvs;
- private KeyValue current = null;
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * This behaves as if it were a real scanner but does not maintain position.
+ */
+ protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
private List<KeyValue> result = new ArrayList<KeyValue>();
private int idx = 0;
-
- MemStoreScanner(final NavigableSet<KeyValue> s) {
- this.kvs = s;
+ // Make access atomic.
+ private FirstOnRow firstOnNextRow = new FirstOnRow();
+ // Keep reference to Set so can remove myself when closed.
+ private final Set<ChangedMemStoreObserver> observers;
+
+ MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
+ super();
+ this.observers = observers;
+ this.observers.add(this);
}
public boolean seek(KeyValue key) {
@@ -623,7 +557,7 @@
close();
return false;
}
- this.current = key;
+ this.firstOnNextRow.set(key);
return cacheNextRow();
} catch(Exception e) {
close();
@@ -652,47 +586,117 @@
}
/**
- * @return True if we successfully cached a NavigableSet aligned on
- * next row.
+ * @return True if successfully cached a next row.
*/
boolean cacheNextRow() {
- SortedSet<KeyValue> keys;
+ // Prevent snapshot being cleared while caching a row.
+ lock.readLock().lock();
+ this.result.clear();
+ this.idx = 0;
try {
- keys = this.kvs.tailSet(this.current);
- } catch (Exception e) {
- close();
- return false;
- }
- if (keys == null || keys.isEmpty()) {
- close();
- return false;
+ // Look at each set, kvset and snapshot.
+ // Both look for matching entries for this.current row returning what
+ // they
+ // have as next row after this.current (or null if nothing in set or if
+ // nothing follows.
+ KeyValue kvsetNextRow = cacheNextRow(kvset);
+ KeyValue snapshotNextRow = cacheNextRow(snapshot);
+ if (kvsetNextRow == null && snapshotNextRow == null) {
+ // Nothing more in memstore but we might have gotten current row
+ // results
+ // Indicate at end of store by setting next row to null.
+ this.firstOnNextRow.set(null);
+ return !this.result.isEmpty();
+ } else if (kvsetNextRow != null && snapshotNextRow != null) {
+ // Set current at the lowest of the two values.
+ int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
+ this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
+ } else {
+ this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
+ }
+ return true;
+ } finally {
+ lock.readLock().unlock();
}
- this.current = null;
- byte [] row = keys.first().getRow();
- for (KeyValue kv: keys) {
- if (comparator.compareRows(kv, row) != 0) {
- this.current = kv;
+ }
+
+ /*
+ * See if set has entries for the <code>this.current</code> row. If so,
+ * add them to <code>this.result</code>.
+ * @param set Set to examine
+ * @return Next row in passed <code>set</code> or null if nothing in this
+ * passed <code>set</code>
+ */
+ private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
+ if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
+ SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
+ if (tail == null || tail.isEmpty()) return null;
+ KeyValue first = tail.first();
+ KeyValue nextRow = null;
+ for (KeyValue kv: tail) {
+ if (comparator.compareRows(first, kv) != 0) {
+ nextRow = kv;
break;
}
- result.add(kv);
+ this.result.add(kv);
}
- return true;
+ return nextRow;
}
public void close() {
- current = null;
+ this.firstOnNextRow.set(null);
idx = 0;
if (!result.isEmpty()) {
result.clear();
}
+ this.observers.remove(this);
+ }
+
+ public void changedMemStore() {
+ this.firstOnNextRow.reset();
}
}
-
+
+ /*
+ * Private class that holds firstOnRow and utility.
+ * Usually firstOnRow is the first KeyValue we find on next row rather than
+ * the absolute minimal first key (empty column, Type.Maximum, maximum ts).
+ * Usually its ok being sloppy with firstOnRow letting it be the first thing
+ * found on next row -- this works -- but if the memstore changes on us, reset
+ * firstOnRow to be the ultimate firstOnRow. We play sloppy with firstOnRow
+ * usually so we don't have to allocate a new KeyValue each time firstOnRow
+ * is updated.
+ */
+ private static class FirstOnRow {
+ private KeyValue firstOnRow = null;
+
+ FirstOnRow() {
+ super();
+ }
+
+ synchronized void set(final KeyValue kv) {
+ this.firstOnRow = kv;
+ }
+
+ /* Reset firstOnRow to a 'clean', absolute firstOnRow.
+ */
+ synchronized void reset() {
+ if (this.firstOnRow == null) return;
+ this.firstOnRow =
+ new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
+ }
+
+ synchronized KeyValue get() {
+ return this.firstOnRow;
+ }
+ }
+
public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + Bytes.SIZEOF_LONG + (7 * ClassSize.REFERENCE));
+ ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
+ ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
(2 * ClassSize.CONCURRENT_SKIPLISTMAP));
/*
@@ -764,4 +768,16 @@
}
LOG.info("Exiting.");
}
+
+ /**
+ * Observers want to know about MemStore changes.
+ * Called when snapshot is cleared and when we make one.
+ */
+ interface ChangedMemStoreObserver {
+ /**
+ * Notify observers.
+ * @throws IOException
+ */
+ void changedMemStore();
+ }
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Wed Sep 9 17:14:22 2009
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@@ -27,6 +29,8 @@
* Keeps track of the columns for a scan if they are not explicitly specified
*/
public class ScanWildcardColumnTracker implements ColumnTracker {
+ private static final Log LOG =
+ LogFactory.getLog(ScanWildcardColumnTracker.class);
private byte [] columnBuffer = null;
private int columnOffset = 0;
private int columnLength = 0;
@@ -79,15 +83,27 @@
columnOffset = offset;
columnLength = length;
currentCount = 0;
-
if (++currentCount > maxVersions)
return MatchCode.SKIP;
return MatchCode.INCLUDE;
}
+
// new col < oldcol
// if (cmp < 0) {
- throw new RuntimeException("ScanWildcardColumnTracker.checkColumn ran " +
- "into a column actually smaller than the previous column!");
+ // WARNING: This means that very likely an edit for some other family
+ // was incorrectly stored into the store for this one. Continue, but
+ // complain.
+ LOG.error("ScanWildcardColumnTracker.checkColumn ran " +
+ "into a column actually smaller than the previous column: " +
+ Bytes.toStringBinary(bytes, offset, length));
+ // switched columns
+ columnBuffer = bytes;
+ columnOffset = offset;
+ columnLength = length;
+ currentCount = 0;
+ if (++currentCount > maxVersions)
+ return MatchCode.SKIP;
+ return MatchCode.INCLUDE;
}
@Override
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Sep 9 17:14:22 2009
@@ -50,7 +50,6 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -58,6 +57,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
@@ -181,7 +181,7 @@
// second -> ms adjust for user data
this.ttl *= 1000;
}
- this.memstore = new MemStore(this.ttl, this.comparator);
+ this.memstore = new MemStore(this.comparator);
this.regionCompactionDir = new Path(HRegion.getCompactionDir(basedir),
Integer.toString(info.getEncodedName()));
this.storeName = this.family.getName();
@@ -1028,293 +1028,144 @@
}
}
+ static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
+ return key.getTimestamp() < oldestTimestamp;
+ }
+
/**
* Find the key that matches <i>row</i> exactly, or the one that immediately
* preceeds it. WARNING: Only use this method on a table where writes occur
- * with stricly increasing timestamps. This method assumes this pattern of
- * writes in order to make it reasonably performant.
- * @param targetkey
- * @return Found keyvalue
+ * with strictly increasing timestamps. This method assumes this pattern of
+ * writes in order to make it reasonably performant. Also our search is
+ * dependent on the axiom that deletes are for cells that are in the container
+ * that follows whether a memstore snapshot or a storefile, not for the
+ * current container: i.e. we'll see deletes before we come across cells we
+ * are to delete. Presumption is that the memstore#kvset is processed before
+ * memstore#snapshot and so on.
+ * @param kv First possible item on targeted row; i.e. empty columns, latest
+ * timestamp and maximum type.
+ * @return Found keyvalue or null if none found.
* @throws IOException
*/
- KeyValue getRowKeyAtOrBefore(final KeyValue targetkey)
- throws IOException{
- // Map of keys that are candidates for holding the row key that
- // most closely matches what we're looking for. We'll have to update it as
- // deletes are found all over the place as we go along before finally
- // reading the best key out of it at the end. Use a comparator that
- // ignores key types. Otherwise, we can't remove deleted items doing
- // set.remove because of the differing type between insert and delete.
- NavigableSet<KeyValue> candidates =
- new TreeSet<KeyValue>(this.comparator.getComparatorIgnoringType());
-
- // Keep a list of deleted cell keys. We need this because as we go through
- // the store files, the cell with the delete marker may be in one file and
- // the old non-delete cell value in a later store file. If we don't keep
- // around the fact that the cell was deleted in a newer record, we end up
- // returning the old value if user is asking for more than one version.
- // This List of deletes should not be large since we are only keeping rows
- // and columns that match those set on the scanner and which have delete
- // values. If memory usage becomes an issue, could redo as bloom filter.
- NavigableSet<KeyValue> deletes =
- new TreeSet<KeyValue>(this.comparatorIgnoringType);
- long now = System.currentTimeMillis();
+ KeyValue getRowKeyAtOrBefore(final KeyValue kv)
+ throws IOException {
+ GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
+ this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion());
this.lock.readLock().lock();
try {
// First go to the memstore. Pick up deletes and candidates.
- this.memstore.getRowKeyAtOrBefore(targetkey, candidates, deletes, now);
- // Process each store file. Run through from newest to oldest.
+ this.memstore.getRowKeyAtOrBefore(state);
+ // Check if match, if we got a candidate on the asked for 'kv' row.
+ // Process each store file. Run through from newest to oldest.
Map<Long, StoreFile> m = this.storefiles.descendingMap();
- for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
+ for (Map.Entry<Long, StoreFile> e : m.entrySet()) {
// Update the candidate keys from the current map file
- rowAtOrBeforeFromStoreFile(e.getValue(), targetkey, candidates,
- deletes, now);
+ rowAtOrBeforeFromStoreFile(e.getValue(), state);
}
- // Return the best key from candidateKeys
- return candidates.isEmpty()? null: candidates.last();
+ return state.getCandidate();
} finally {
this.lock.readLock().unlock();
}
}
/*
- * Check an individual MapFile for the row at or before a given key
- * and timestamp
+ * Check an individual MapFile for the row at or before a given row.
* @param f
- * @param targetkey
- * @param candidates Pass a Set with a Comparator that
- * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
- * with a different Type to the candidate key.
+ * @param state
* @throws IOException
*/
private void rowAtOrBeforeFromStoreFile(final StoreFile f,
- final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
- final NavigableSet<KeyValue> deletes, final long now)
- throws IOException {
- // if there aren't any candidate keys yet, we'll do some things different
- if (candidates.isEmpty()) {
- rowAtOrBeforeCandidate(f, targetkey, candidates, deletes, now);
- } else {
- rowAtOrBeforeWithCandidates(f, targetkey, candidates, deletes, now);
- }
- }
-
- /*
- * @param ttlSetting
- * @param hsk
- * @param now
- * @param deletes A Set whose Comparator ignores Type.
- * @return True if key has not expired and is not in passed set of deletes.
- */
- static boolean notExpiredAndNotInDeletes(final long ttl,
- final KeyValue key, final long now, final Set<KeyValue> deletes) {
- return !isExpired(key, now-ttl) && (deletes == null || deletes.isEmpty() ||
- !deletes.contains(key));
- }
-
- static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
- }
-
- /* Find a candidate for row that is at or before passed key, searchkey, in hfile.
- * @param f
- * @param targetkey Key to go search the hfile with.
- * @param candidates
- * @param now
- * @throws IOException
- * @see {@link #rowAtOrBeforeCandidate(HStoreKey, org.apache.hadoop.io.MapFile.Reader, byte[], SortedMap, long)}
- */
- private void rowAtOrBeforeCandidate(final StoreFile f,
- final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
- final NavigableSet<KeyValue> deletes, final long now)
+ final GetClosestRowBeforeTracker state)
throws IOException {
- KeyValue search = targetkey;
- // If the row we're looking for is past the end of this mapfile, set the
- // search key to be the last key. If its a deleted key, then we'll back
- // up to the row before and return that.
- // TODO: Cache last key as KV over in the file.
Reader r = f.getReader();
if (r == null) {
LOG.warn("StoreFile " + f + " has a null Reader");
return;
}
- byte [] lastkey = r.getLastKey();
- KeyValue lastKeyValue =
- KeyValue.createKeyValueFromKey(lastkey, 0, lastkey.length);
- if (this.comparator.compareRows(lastKeyValue, targetkey) < 0) {
- search = lastKeyValue;
+ // TODO: Cache these keys rather than make each time?
+ byte [] fk = r.getFirstKey();
+ KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
+ byte [] lk = r.getLastKey();
+ KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
+ KeyValue firstOnRow = state.getTargetKey();
+ if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
+ // If last key in file is not of the target table, no candidates in this
+ // file. Return.
+ if (!state.isTargetTable(lastKV)) return;
+ // If the row we're looking for is past the end of file, set search key to
+ // last key. TODO: Cache last and first key rather than make each time.
+ firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
}
- KeyValue knownNoGoodKey = null;
HFileScanner scanner = r.getScanner();
- for (boolean foundCandidate = false; !foundCandidate;) {
- // Seek to the exact row, or the one that would be immediately before it
- int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
- search.getKeyLength());
- if (result < 0) {
- // Not in file.
- break;
- }
- KeyValue deletedOrExpiredRow = null;
- KeyValue kv = null;
- do {
- kv = scanner.getKeyValue();
- if (this.comparator.compareRows(kv, search) <= 0) {
- if (!kv.isDeleteType()) {
- if (handleNonDelete(kv, now, deletes, candidates)) {
- foundCandidate = true;
- // NOTE! Continue.
- continue;
- }
- }
- deletes.add(kv);
- if (deletedOrExpiredRow == null) {
- deletedOrExpiredRow = kv;
- }
- } else if (this.comparator.compareRows(kv, search) > 0) {
- // if the row key we just read is beyond the key we're searching for,
- // then we're done.
- break;
- } else {
- // So, the row key doesn't match, but we haven't gone past the row
- // we're seeking yet, so this row is a candidate for closest
- // (assuming that it isn't a delete).
- if (!kv.isDeleteType()) {
- if (handleNonDelete(kv, now, deletes, candidates)) {
- foundCandidate = true;
- // NOTE: Continue
- continue;
- }
- }
- deletes.add(kv);
- if (deletedOrExpiredRow == null) {
- deletedOrExpiredRow = kv;
- }
- }
- } while(scanner.next() && (knownNoGoodKey == null ||
- this.comparator.compare(kv, knownNoGoodKey) < 0));
-
- // If we get here and have no candidates but we did find a deleted or
- // expired candidate, we need to look at the key before that
- if (!foundCandidate && deletedOrExpiredRow != null) {
- knownNoGoodKey = deletedOrExpiredRow;
- if (!scanner.seekBefore(deletedOrExpiredRow.getBuffer(),
- deletedOrExpiredRow.getKeyOffset(),
- deletedOrExpiredRow.getKeyLength())) {
- // Not in file -- what can I do now but break?
- break;
- }
- search = scanner.getKeyValue();
- } else {
- // No candidates and no deleted or expired candidates. Give up.
- break;
- }
+ // Seek scanner. If can't seek it, return.
+ if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
+ // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
+ // Unlikely that there'll be an instance of actual first row in table.
+ if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
+ // If here, need to start backing up.
+ while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
+ firstOnRow.getKeyLength())) {
+ KeyValue kv = scanner.getKeyValue();
+ if (!state.isTargetTable(kv)) break;
+ if (!state.isBetterCandidate(kv)) break;
+ // Make new first on row.
+ firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
+ // Seek scanner. If can't seek it, break.
+ if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
+ // If we find something, break;
+ if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
}
-
- // Arriving here just means that we consumed the whole rest of the map
- // without going "past" the key we're searching for. we can just fall
- // through here.
}
- private void rowAtOrBeforeWithCandidates(final StoreFile f,
- final KeyValue targetkey,
- final NavigableSet<KeyValue> candidates,
- final NavigableSet<KeyValue> deletes, final long now)
+ /*
+ * Seek the file scanner to firstOnRow or first entry in file.
+ * @param scanner
+ * @param firstOnRow
+ * @param firstKV
+ * @return True if we successfully seeked scanner.
+ * @throws IOException
+ */
+ private boolean seekToScanner(final HFileScanner scanner,
+ final KeyValue firstOnRow, final KeyValue firstKV)
throws IOException {
- // if there are already candidate keys, we need to start our search
- // at the earliest possible key so that we can discover any possible
- // deletes for keys between the start and the search key. Back up to start
- // of the row in case there are deletes for this candidate in this mapfile
- // BUT do not backup before the first key in the store file.
- KeyValue firstCandidateKey = candidates.first();
- KeyValue search = null;
- if (this.comparator.compareRows(firstCandidateKey, targetkey) < 0) {
- search = targetkey;
- } else {
- search = firstCandidateKey;
- }
+ KeyValue kv = firstOnRow;
+ // If firstOnRow < firstKV, set to firstKV
+ if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
+ int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
+ kv.getKeyLength());
+ return result >= 0;
+ }
- // Seek to the exact row, or the one that would be immediately before it
- Reader r = f.getReader();
- if (r == null) {
- LOG.warn("StoreFile " + f + " has a null Reader");
- return;
- }
- HFileScanner scanner = r.getScanner();
- int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
- search.getKeyLength());
- if (result < 0) {
- // Key is before start of this file. Return.
- return;
- }
+ /*
+ * When we come in here, we are probably at the kv just before we break into
+ * the row that firstOnRow is on. Usually need to increment one time to get
+ * on to the row we are interested in.
+ * @param scanner
+ * @param firstOnRow
+ * @param state
+ * @return True we found a candidate.
+ * @throws IOException
+ */
+ private boolean walkForwardInSingleRow(final HFileScanner scanner,
+ final KeyValue firstOnRow, final GetClosestRowBeforeTracker state)
+ throws IOException {
+ boolean foundCandidate = false;
do {
KeyValue kv = scanner.getKeyValue();
- // if we have an exact match on row, and it's not a delete, save this
- // as a candidate key
- if (this.comparator.matchingRows(kv, targetkey)) {
- handleKey(kv, now, deletes, candidates);
- } else if (this.comparator.compareRows(kv, targetkey) > 0 ) {
- // if the row key we just read is beyond the key we're searching for,
- // then we're done.
+ // If we are not in the row, skip.
+ if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
+ // Did we go beyond the target row? If so break.
+ if (state.isTooFar(kv, firstOnRow)) break;
+ if (state.isExpired(kv)) {
+ continue;
+ }
+ // If we added something, this row is a contender. break.
+ if (state.handle(kv)) {
+ foundCandidate = true;
break;
- } else {
- // So, the row key doesn't match, but we haven't gone past the row
- // we're seeking yet, so this row is a candidate for closest
- // (assuming that it isn't a delete).
- handleKey(kv, now, deletes, candidates);
}
} while(scanner.next());
- }
-
- /*
- * Used calculating keys at or just before a passed key.
- * @param readkey
- * @param now
- * @param deletes Set with Comparator that ignores key type.
- * @param candidate Set with Comprator that ignores key type.
- */
- private void handleKey(final KeyValue readkey, final long now,
- final NavigableSet<KeyValue> deletes,
- final NavigableSet<KeyValue> candidates) {
- if (!readkey.isDeleteType()) {
- handleNonDelete(readkey, now, deletes, candidates);
- } else {
- handleDeletes(readkey, candidates, deletes);
- }
- }
-
- /*
- * Used calculating keys at or just before a passed key.
- * @param readkey
- * @param now
- * @param deletes Set with Comparator that ignores key type.
- * @param candidates Set with Comparator that ignores key type.
- * @return True if we added a candidate.
- */
- private boolean handleNonDelete(final KeyValue readkey, final long now,
- final NavigableSet<KeyValue> deletes,
- final NavigableSet<KeyValue> candidates) {
- if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
- candidates.add(readkey);
- return true;
- }
- return false;
- }
-
- /**
- * Handle keys whose values hold deletes.
- * Add to the set of deletes and then if the candidate keys contain any that
- * might match, then check for a match and remove it. Implies candidates
- * is made with a Comparator that ignores key type.
- * @param k
- * @param candidates
- * @param deletes
- * @return True if we removed <code>k</code> from <code>candidates</code>.
- */
- static boolean handleDeletes(final KeyValue k,
- final NavigableSet<KeyValue> candidates,
- final NavigableSet<KeyValue> deletes) {
- deletes.add(k);
- return candidates.remove(k);
+ return foundCandidate;
}
/**
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Sep 9 17:14:22 2009
@@ -145,7 +145,6 @@
* @return true if there are more rows, false if scanner is done
*/
public synchronized boolean next(List<KeyValue> outResult) throws IOException {
- List<KeyValue> results = new ArrayList<KeyValue>();
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
@@ -153,6 +152,7 @@
}
matcher.setRow(peeked.getRow());
KeyValue kv;
+ List<KeyValue> results = new ArrayList<KeyValue>();
while((kv = this.heap.peek()) != null) {
QueryMatcher.MatchCode qcode = matcher.match(kv);
switch(qcode) {
@@ -162,7 +162,6 @@
continue;
case DONE:
-
// copy jazz
outResult.addAll(results);
return true;
@@ -198,7 +197,6 @@
if (!results.isEmpty()) {
// copy jazz
outResult.addAll(results);
-
return true;
}
Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/rest/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/rest/package.html?rev=813052&r1=813051&r2=813052&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/rest/package.html (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/rest/package.html Wed Sep 9 17:14:22 2009
@@ -28,6 +28,8 @@
This directory contains a REST service implementation for an Hbase RPC
service.
+DEPRECATED since 0.20.0; use the <a href="../stargate/package-summary.html">stargate contrib</a> instead.
+
<h2><a name="description">Description</a></h2>
<p>
By default, an instance of the REST servlet runs in the master UI; just browse