You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/07/30 07:35:08 UTC
svn commit: r799173 [2/4] - in /hadoop/hbase/trunk_on_hadoop-0.18.3: ./ bin/
src/contrib/transactional/
src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/
src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionse...
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Jul 30 05:35:05 2009
@@ -95,7 +95,7 @@
protected final MemStore memstore;
// This stores directory in the filesystem.
private final Path homedir;
- private final HRegionInfo regioninfo;
+ private final HRegion region;
private final HColumnDescriptor family;
final FileSystem fs;
private final HBaseConfiguration conf;
@@ -144,7 +144,7 @@
* Constructor
* @param basedir qualified path under which the region directory lives;
* generally the table subdirectory
- * @param info HRegionInfo for this region
+ * @param region
* @param family HColumnDescriptor for this column
* @param fs file system object
* @param reconstructionLog existing log file to apply if any
@@ -154,13 +154,14 @@
* failed. Can be null.
* @throws IOException
*/
- protected Store(Path basedir, HRegionInfo info, HColumnDescriptor family,
+ protected Store(Path basedir, HRegion region, HColumnDescriptor family,
FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
final Progressable reporter)
- throws IOException {
+ throws IOException {
+ HRegionInfo info = region.regionInfo;
this.homedir = getStoreHomedir(basedir, info.getEncodedName(),
family.getName());
- this.regioninfo = info;
+ this.region = region;
this.family = family;
this.fs = fs;
this.conf = conf;
@@ -294,6 +295,7 @@
// TODO: This could grow large and blow heap out. Need to get it into
// general memory usage accounting.
long maxSeqIdInLog = -1;
+ long firstSeqIdInLog = -1;
// TODO: Move this memstoring over into MemStore.
KeyValueSkipListSet reconstructedCache =
new KeyValueSkipListSet(this.comparator);
@@ -308,6 +310,9 @@
int reportInterval =
this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while (logReader.next(key, val)) {
+ if (firstSeqIdInLog == -1) {
+ firstSeqIdInLog = key.getLogSeqNum();
+ }
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
if (key.getLogSeqNum() <= maxSeqID) {
skippedEdits++;
@@ -317,7 +322,7 @@
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (/* commented out for now - stack via jgray key.isTransactionEntry() || */
val.matchingFamily(HLog.METAFAMILY) ||
- !Bytes.equals(key.getRegionName(), regioninfo.getRegionName()) ||
+ !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
!val.matchingFamily(family.getName())) {
continue;
}
@@ -329,10 +334,14 @@
if (reporter != null && (editsCount % reportInterval) == 0) {
reporter.progress();
}
+ // Instantiate a new KeyValue to perform Writable on
+ val = new KeyValue();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
- " because sequence id <= " + maxSeqID);
+ "; store maxSeqID=" + maxSeqID +
+ ", firstSeqIdInLog=" + firstSeqIdInLog +
+ ", maxSeqIdInLog=" + maxSeqIdInLog);
}
} finally {
logReader.close();
@@ -534,7 +543,7 @@
", sequenceid=" + logCacheFlushId +
", memsize=" + StringUtils.humanReadableInt(flushed) +
", filesize=" + StringUtils.humanReadableInt(r.length()) +
- " to " + this.regioninfo.getRegionNameAsString());
+ " to " + this.region.regionInfo.getRegionNameAsString());
}
return sf;
}
@@ -619,49 +628,51 @@
* thread must be able to block for long periods.
*
* <p>During this time, the Store can work as usual, getting values from
- * MapFiles and writing new MapFiles from the memstore.
+ * StoreFiles and writing new StoreFiles from the memstore.
*
- * Existing MapFiles are not destroyed until the new compacted TreeMap is
+ * Existing StoreFiles are not destroyed until the new compacted StoreFile is
* completely written-out to disk.
*
- * The compactLock prevents multiple simultaneous compactions.
+ * <p>The compactLock prevents multiple simultaneous compactions.
* The structureLock prevents us from interfering with other write operations.
*
- * We don't want to hold the structureLock for the whole time, as a compact()
+ * <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
- * @param mc True to force a major compaction regardless of
- * thresholds
+ * @param mc True to force a major compaction regardless of thresholds
* @return row to split around if a split is needed, null otherwise
* @throws IOException
*/
StoreSize compact(final boolean mc) throws IOException {
- boolean forceSplit = this.regioninfo.shouldSplit(false);
+ boolean forceSplit = this.region.shouldSplit(false);
boolean majorcompaction = mc;
synchronized (compactLock) {
- long maxId = -1;
// filesToCompact are sorted oldest to newest.
- List<StoreFile> filesToCompact = null;
- filesToCompact = new ArrayList<StoreFile>(this.storefiles.values());
- if (filesToCompact.size() <= 0) {
+ List<StoreFile> filesToCompact =
+ new ArrayList<StoreFile>(this.storefiles.values());
+ if (filesToCompact.isEmpty()) {
LOG.debug(this.storeNameStr + ": no store files to compact");
return null;
}
- // The max-sequenceID in any of the to-be-compacted TreeMaps is the
- // last key of storefiles.
- maxId = this.storefiles.lastKey().longValue();
+
+ // Max-sequenceID is the last key of the storefiles TreeMap
+ long maxId = this.storefiles.lastKey().longValue();
+
// Check to see if we need to do a major compaction on this region.
// If so, change doMajorCompaction to true to skip the incremental
// compacting below. Only check if doMajorCompaction is not true.
if (!majorcompaction) {
majorcompaction = isMajorCompaction(filesToCompact);
}
+
boolean references = hasReferences(filesToCompact);
if (!majorcompaction && !references &&
(forceSplit || (filesToCompact.size() < compactionThreshold))) {
return checkSplit(forceSplit);
}
- if (!fs.exists(this.regionCompactionDir) && !fs.mkdirs(this.regionCompactionDir)) {
+
+ if (!fs.exists(this.regionCompactionDir) &&
+ !fs.mkdirs(this.regionCompactionDir)) {
LOG.warn("Mkdir on " + this.regionCompactionDir.toString() + " failed");
return checkSplit(forceSplit);
}
@@ -670,7 +681,7 @@
// selection.
int countOfFiles = filesToCompact.size();
long totalSize = 0;
- long[] fileSizes = new long[countOfFiles];
+ long [] fileSizes = new long[countOfFiles];
long skipped = 0;
int point = 0;
for (int i = 0; i < countOfFiles; i++) {
@@ -689,6 +700,7 @@
fileSizes[i] = len;
totalSize += len;
}
+
if (!majorcompaction && !references) {
// Here we select files for incremental compaction.
// The rule is: if the largest(oldest) one is more than twice the
@@ -719,26 +731,17 @@
}
}
- // Step through them, writing to the brand-new file
- HFile.Writer writer = getWriter(this.regionCompactionDir);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
- (references? ", hasReferences=true,": " ") + " into " +
- FSUtils.getPath(writer.getPath()));
- }
- try {
- compact(writer, filesToCompact, majorcompaction);
- } finally {
- // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
- StoreFile.appendMetadata(writer, maxId, majorcompaction);
- writer.close();
- }
-
+ // Ready to go. Have list of files to compact.
+ LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
+ (references? ", hasReferences=true,": " ") + " into " +
+ FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId);
+ HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
// Move the compaction into place.
- completeCompaction(filesToCompact, writer);
+ StoreFile sf = completeCompaction(filesToCompact, writer);
if (LOG.isDebugEnabled()) {
LOG.debug("Completed" + (majorcompaction? " major ": " ") +
"compaction of " + this.storeNameStr +
+ "; new storefile is " + (sf == null? "none": sf.toString()) +
"; store size is " + StringUtils.humanReadableInt(storeSize));
}
}
@@ -834,16 +837,17 @@
* @param writer output writer
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+ * @param maxId Readers maximum sequence id.
+ * @return Product of compaction or null if all cells expired or deleted and
+ * nothing made it through the compaction.
* @throws IOException
*/
- private void compact(HFile.Writer writer,
- List<StoreFile> filesToCompact,
- boolean majorCompaction) throws IOException {
- // for each file, obtain a scanner:
+ private HFile.Writer compact(final List<StoreFile> filesToCompact,
+ final boolean majorCompaction, final long maxId)
+ throws IOException {
+ // For each file, obtain a scanner:
KeyValueScanner [] scanners = new KeyValueScanner[filesToCompact.size()];
- // init:
for (int i = 0; i < filesToCompact.size(); ++i) {
- // TODO open a new HFile.Reader w/o block cache.
Reader r = filesToCompact.get(i).getReader();
if (r == null) {
LOG.warn("StoreFile " + filesToCompact.get(i) + " has a null Reader");
@@ -853,82 +857,101 @@
scanners[i] = new StoreFileScanner(r.getScanner(false));
}
+ // Make the instantiation lazy in case compaction produces no product; i.e.
+ // where all source cells are expired or deleted.
+ HFile.Writer writer = null;
+ try {
if (majorCompaction) {
InternalScanner scanner = null;
-
try {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
scanner = new StoreScanner(this, scan, scanners);
-
// since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
- ArrayList<KeyValue> row = new ArrayList<KeyValue>();
+ ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
boolean more = true;
- while ( more ) {
- more = scanner.next(row);
+ while (more) {
+ more = scanner.next(kvs);
// output to writer:
- for (KeyValue kv : row) {
+ for (KeyValue kv : kvs) {
+ if (writer == null) {
+ writer = getWriter(this.regionCompactionDir);
+ }
writer.append(kv);
}
- row.clear();
+ kvs.clear();
}
} finally {
- if (scanner != null)
+ if (scanner != null) {
scanner.close();
+ }
}
} else {
MinorCompactingStoreScanner scanner = null;
try {
scanner = new MinorCompactingStoreScanner(this, scanners);
-
- while ( scanner.next(writer) ) { }
+ writer = getWriter(this.regionCompactionDir);
+ while (scanner.next(writer)) {
+ // Nothing to do
+ }
} finally {
if (scanner != null)
scanner.close();
}
}
-
+ } finally {
+ if (writer != null) {
+ StoreFile.appendMetadata(writer, maxId, majorCompaction);
+ writer.close();
+ }
+ }
+ return writer;
}
/*
* It's assumed that the compactLock will be acquired prior to calling this
* method! Otherwise, it is not thread-safe!
*
- * It works by processing a compaction that's been written to disk.
+ * <p>It works by processing a compaction that's been written to disk.
*
* <p>It is usually invoked at the end of a compaction, but might also be
* invoked at HStore startup, if the prior execution died midway through.
*
* <p>Moving the compacted TreeMap into place means:
* <pre>
- * 1) Moving the new compacted MapFile into place
- * 2) Unload all replaced MapFiles, close and collect list to delete.
+ * 1) Moving the new compacted StoreFile into place
+ * 2) Unload all replaced StoreFile, close and collect list to delete.
* 3) Loading the new TreeMap.
* 4) Compute new store size
* </pre>
*
* @param compactedFiles list of files that were compacted
- * @param compactedFile HStoreFile that is the result of the compaction
+ * @param compactedFile StoreFile that is the result of the compaction
+ * @return StoreFile created. May be null.
* @throws IOException
*/
- private void completeCompaction(final List<StoreFile> compactedFiles,
+ private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
final HFile.Writer compactedFile)
throws IOException {
- // 1. Moving the new files into place.
- Path p = null;
- try {
- p = StoreFile.rename(this.fs, compactedFile.getPath(),
- StoreFile.getRandomFilename(fs, this.homedir));
- } catch (IOException e) {
- LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
- return;
+ // 1. Moving the new files into place -- if there is a new file (may not
+ // be if all cells were expired or deleted).
+ StoreFile result = null;
+ if (compactedFile != null) {
+ Path p = null;
+ try {
+ p = StoreFile.rename(this.fs, compactedFile.getPath(),
+ StoreFile.getRandomFilename(fs, this.homedir));
+ } catch (IOException e) {
+ LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
+ return null;
+ }
+ result = new StoreFile(this.fs, p, blockcache, this.conf, this.inMemory);
}
- StoreFile finalCompactedFile = new StoreFile(this.fs, p, blockcache,
- this.conf, this.inMemory);
this.lock.writeLock().lock();
try {
try {
+ // 2. Unloading
// 3. Loading the new TreeMap.
// Change this.storefiles so it reflects new state but do not
// delete old store files until we have sent out notification of
@@ -939,10 +962,12 @@
this.storefiles.remove(e.getKey());
}
}
- // Add new compacted Reader and store file.
- Long orderVal = Long.valueOf(finalCompactedFile.getMaxSequenceId());
- this.storefiles.put(orderVal, finalCompactedFile);
- // Tell observers that list of Readers has changed.
+ // If a StoreFile result, move it into place. May be null.
+ if (result != null) {
+ Long orderVal = Long.valueOf(result.getMaxSequenceId());
+ this.storefiles.put(orderVal, result);
+ }
+ // Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
for (StoreFile hsf: compactedFiles) {
@@ -950,11 +975,10 @@
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed replacing compacted files for " +
- this.storeNameStr +
- ". Compacted file is " + finalCompactedFile.toString() +
- ". Files replaced are " + compactedFiles.toString() +
- " some of which may have been already removed", e);
+ LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+ ". Compacted file is " + (result == null? "none": result.toString()) +
+ ". Files replaced " + compactedFiles.toString() +
+ " some of which may have been already removed", e);
}
// 4. Compute new store size
this.storeSize = 0L;
@@ -969,6 +993,7 @@
} finally {
this.lock.writeLock().unlock();
}
+ return result;
}
// ////////////////////////////////////////////////////////////////////////////
@@ -1459,8 +1484,12 @@
}
}
+ HRegion getHRegion() {
+ return this.region;
+ }
+
HRegionInfo getHRegionInfo() {
- return this.regioninfo;
+ return this.region.regionInfo;
}
/**
@@ -1595,22 +1624,46 @@
// Setting up the QueryMatcher
Get get = new Get(row);
- NavigableSet<byte[]> qualifiers =
+ NavigableSet<byte[]> qualifiers =
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qualifier);
QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
keyComparator, 1);
-
- // Read from memstore
- if (this.memstore.get(matcher, result)) {
+
+ boolean newTs = true;
+ KeyValue kv = null;
+ // Read from memstore first:
+ this.memstore.internalGet(this.memstore.kvset,
+ matcher, result);
+ if (!result.isEmpty()) {
+ kv = result.get(0).clone();
+ newTs = false;
+ } else {
+ // try the snapshot.
+ this.memstore.internalGet(this.memstore.snapshot,
+ matcher, result);
+ if (!result.isEmpty()) {
+ kv = result.get(0).clone();
+ }
+ }
+
+ if (kv != null) {
// Received early-out from memstore
// Make a copy of the KV and increment it
- KeyValue kv = result.get(0).clone();
byte [] buffer = kv.getBuffer();
int valueOffset = kv.getValueOffset();
value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
Bytes.SIZEOF_LONG);
+ if (newTs) {
+ long currTs = System.currentTimeMillis();
+ if (currTs == kv.getTimestamp()) {
+ currTs++; // just in case something wacky happens.
+ }
+ byte [] stampBytes = Bytes.toBytes(currTs);
+ Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0,
+ Bytes.SIZEOF_LONG);
+ }
return new ICVResult(value, 0, kv);
}
// Check if we even have storefiles
@@ -1663,4 +1716,4 @@
public long heapSize() {
return DEEP_OVERHEAD + this.memstore.heapSize();
}
-}
\ No newline at end of file
+}
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java Thu Jul 30 05:35:05 2009
@@ -62,7 +62,7 @@
public void get(List<KeyValue> result) throws IOException {
for(HFileScanner scanner : this.scanners) {
this.matcher.update();
- if(getStoreFile(scanner, result) || matcher.isDone()) {
+ if (getStoreFile(scanner, result) || matcher.isDone()) {
return;
}
}
@@ -77,11 +77,13 @@
*/
public boolean getStoreFile(HFileScanner scanner, List<KeyValue> result)
throws IOException {
- if(scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(),
+ if (scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(),
startKey.getKeyLength()) == -1) {
// No keys in StoreFile at or after specified startKey
// First row may be = our row, so we have to check anyways.
byte [] firstKey = scanner.getReader().getFirstKey();
+ // Key may be null if storefile is empty.
+ if (firstKey == null) return false;
short rowLen = Bytes.toShort(firstKey, 0, Bytes.SIZEOF_SHORT);
int rowOffset = Bytes.SIZEOF_SHORT;
if (this.matcher.rowComparator.compareRows(firstKey, rowOffset, rowLen,
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Jul 30 05:35:05 2009
@@ -162,20 +162,12 @@
continue;
case DONE:
- if (matcher.filterEntireRow()) {
- // nuke all results, and then return.
- results.clear();
- }
// copy jazz
outResult.addAll(results);
return true;
case DONE_SCAN:
- if (matcher.filterEntireRow()) {
- // nuke all results, and then return.
- results.clear();
- }
close();
// copy jazz
@@ -202,11 +194,6 @@
throw new RuntimeException("UNEXPECTED");
}
}
-
- if (matcher.filterEntireRow()) {
- // nuke all results, and then return.
- results.clear();
- }
if (!results.isEmpty()) {
// copy jazz
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift Thu Jul 30 05:35:05 2009
@@ -74,12 +74,11 @@
2:i32 maxVersions = 3,
3:string compression = "NONE",
4:bool inMemory = 0,
- 5:i32 maxValueLength = 2147483647,
- 6:string bloomFilterType = "NONE",
- 7:i32 bloomFilterVectorSize = 0,
- 8:i32 bloomFilterNbHashes = 0,
- 9:bool blockCacheEnabled = 0,
- 10:i32 timeToLive = -1
+ 5:string bloomFilterType = "NONE",
+ 6:i32 bloomFilterVectorSize = 0,
+ 7:i32 bloomFilterNbHashes = 0,
+ 8:bool blockCacheEnabled = 0,
+ 9:i32 timeToLive = -1
}
/**
@@ -463,6 +462,20 @@
4:list<Text> columns)
throws (1:IOError io)
+ /**
+ * Open a scanner for a given prefix. That is all rows will have the specified
+ * prefix. No other rows will be returned.
+ *
+ * @param tableName name of table
+ * @param startAndPrefix the prefix (and thus start row) of the keys you want
+ * @param columns the columns you want returned
+ * @return scanner id to use with other scanner calls
+ */
+ ScannerID scannerOpenWithPrefix(1:Text tableName,
+ 2:Text startAndPrefix,
+ 3:list<Text> columns)
+ throws (1:IOError io)
+
/**
* Get a scanner on the current table starting at the specified row and
* ending at the last row in the table. Return the specified columns.
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Thu Jul 30 05:35:05 2009
@@ -35,6 +35,9 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -346,7 +349,11 @@
Get get = new Get(row);
for(byte [] column : columnArr) {
byte [][] famAndQf = KeyValue.parseColumn(column);
- get.addColumn(famAndQf[0], famAndQf[1]);
+ if (famAndQf[1] == null || famAndQf[1].length == 0) {
+ get.addFamily(famAndQf[0]);
+ } else {
+ get.addColumn(famAndQf[0], famAndQf[1]);
+ }
}
get.setTimeRange(Long.MIN_VALUE, timestamp);
Result result = table.get(get);
@@ -433,18 +440,31 @@
mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
}
- public void mutateRowTs(byte[] tableName, byte[] row,
+ public void mutateRowTs(byte[] tableName, byte[] row,
List<Mutation> mutations, long timestamp) throws IOError, IllegalArgument {
HTable table = null;
try {
table = getTable(tableName);
Put put = new Put(row);
put.setTimeStamp(timestamp);
+
+ Delete delete = new Delete(row);
+
for (Mutation m : mutations) {
- byte [][] famAndQf = KeyValue.parseColumn(m.column);
- put.add(famAndQf[0], famAndQf[1], m.value);
+ byte[][] famAndQf = KeyValue.parseColumn(m.column);
+ if (m.isDelete) {
+ if (famAndQf[1].length == 0)
+ delete.deleteFamily(famAndQf[0], timestamp);
+ else
+ delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+ } else {
+ put.add(famAndQf[0], famAndQf[1], m.value);
+ }
}
- table.put(put);
+ if (!delete.isEmpty())
+ table.delete(delete);
+ if (!put.isEmpty())
+ table.put(put);
} catch (IOException e) {
throw new IOError(e.getMessage());
} catch (IllegalArgumentException e) {
@@ -456,7 +476,7 @@
throws IOError, IllegalArgument, TException {
mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
}
-
+
public void mutateRowsTs(byte[] tableName, List<BatchMutation> rowBatches, long timestamp)
throws IOError, IllegalArgument, TException {
List<Put> puts = new ArrayList<Put>();
@@ -469,9 +489,13 @@
Put put = new Put(row);
put.setTimeStamp(timestamp);
for (Mutation m : mutations) {
- byte [][] famAndQf = KeyValue.parseColumn(m.column);
+ byte[][] famAndQf = KeyValue.parseColumn(m.column);
if (m.isDelete) {
- delete.deleteColumns(famAndQf[0], famAndQf[1]);
+ // no qualifier, family only.
+ if (famAndQf[1].length == 0)
+ delete.deleteFamily(famAndQf[0], timestamp);
+ else
+ delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
} else {
put.add(famAndQf[0], famAndQf[1], m.value);
}
@@ -582,7 +606,24 @@
throw new IOError(e.getMessage());
}
}
-
+
+ @Override
+ public int scannerOpenWithPrefix(byte[] tableName, byte[] startAndPrefix, List<byte[]> columns) throws IOError, TException {
+ try {
+ HTable table = getTable(tableName);
+ byte [][] columnsArray = null;
+ columnsArray = columns.toArray(new byte[0][]);
+ Scan scan = new Scan(startAndPrefix);
+ scan.addColumns(columnsArray);
+ Filter f = new WhileMatchFilter(
+ new PrefixFilter(startAndPrefix));
+ scan.setFilter(f);
+ return addScanner(table.getScanner(scan));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
public int scannerOpenTs(byte[] tableName, byte[] startRow,
List<byte[]> columns, long timestamp) throws IOError, TException {
try {
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/AlreadyExists.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/AlreadyExists.java?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/AlreadyExists.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/AlreadyExists.java Thu Jul 30 05:35:05 2009
@@ -1,4 +1,6 @@
-/**
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,11 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- * Autogenerated by Thrift
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- */
+
package org.apache.hadoop.hbase.thrift.generated;
import java.util.List;
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/BatchMutation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/BatchMutation.java?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/BatchMutation.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/BatchMutation.java Thu Jul 30 05:35:05 2009
@@ -1,4 +1,6 @@
-/**
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,11 +17,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- * Autogenerated by Thrift
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- */
package org.apache.hadoop.hbase.thrift.generated;
import java.util.List;
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java?rev=799173&r1=799172&r2=799173&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java Thu Jul 30 05:35:05 2009
@@ -1,4 +1,6 @@
-/**
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,11 +17,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- * Autogenerated by Thrift
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- */
package org.apache.hadoop.hbase.thrift.generated;
import java.util.List;
@@ -129,7 +126,6 @@
int maxVersions,
String compression,
boolean inMemory,
- int maxValueLength,
String bloomFilterType,
int bloomFilterVectorSize,
int bloomFilterNbHashes,