You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/04/19 21:05:29 UTC
svn commit: r935708 [1/2] - in /hadoop/hbase/branches/0.20: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/filter/
src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop...
Author: rawson
Date: Mon Apr 19 19:05:29 2010
New Revision: 935708
URL: http://svn.apache.org/viewvc?rev=935708&view=rev
Log:
HBASE-2248 Provide new non-copy mechanism to assure atomic reads in get and scan
Added:
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
Removed:
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
Modified:
hadoop/hbase/branches/0.20/CHANGES.txt
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Mon Apr 19 19:05:29 2010
@@ -10,6 +10,8 @@ Release 0.20.4 - Unreleased
HBASE-2392 Upgrade to ZooKeeper 3.3.0
HBASE-2165 Improve fragmentation display and implementation
HBASE-2448 Remove 'indexed' contrib
+ HBASE-2248 Provide new non-copy mechanism to assure atomic reads in
+ get and scan
BUG FIXES
HBASE-2173 New idx javadoc not included with the rest
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java Mon Apr 19 19:05:29 2010
@@ -200,6 +200,23 @@ public class KeyValue implements Writabl
private int offset = 0;
private int length = 0;
+ /** Here be dragons **/
+
+ // used to achieve atomic operations in the memstore.
+ public long getMemstoreTS() {
+ return memstoreTS;
+ }
+
+ public void setMemstoreTS(long memstoreTS) {
+ this.memstoreTS = memstoreTS;
+ }
+
+ // default value is 0, aka DNC
+ private long memstoreTS = 0;
+
+ /** Dragon time over, return to normal business */
+
+
/** Writable Constructor -- DO NOT USE */
public KeyValue() {}
@@ -1503,6 +1520,21 @@ public class KeyValue implements Writabl
}
/**
+ * Creates a KeyValue that is last on the specified row id. That is,
+ * every other possible KeyValue for the given row would compareTo()
+ * less than the result of this call.
+ * @param row row key
+ * @return Last possible KeyValue on passed <code>row</code>
+ */
+ public static KeyValue createLastOnRow(final byte[] row) {
+ return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
+ }
+
+ /**
+ * Create a KeyValue that is smaller than all other possible KeyValues
+ * for the given row. That is any (valid) KeyValue on 'row' would sort
+ * _after_ the result.
+ *
* @param row - row key (arbitrary byte array)
* @return First possible KeyValue on passed <code>row</code>
*/
@@ -1511,6 +1543,8 @@ public class KeyValue implements Writabl
}
/**
+ * Creates a KeyValue that is smaller than all other KeyValues that
+ * are older than the passed timestamp.
* @param row - row key (arbitrary byte array)
* @param ts - timestamp
* @return First possible key on passed <code>row</code> and timestamp.
@@ -1522,8 +1556,11 @@ public class KeyValue implements Writabl
/**
* @param row - row key (arbitrary byte array)
+ * @param c column - {@link #parseColumn(byte[])} is called to split
+ * the column.
* @param ts - timestamp
* @return First possible key on passed <code>row</code>, column and timestamp
+ * @deprecated
*/
public static KeyValue createFirstOnRow(final byte [] row, final byte [] c,
final long ts) {
@@ -1532,14 +1569,17 @@ public class KeyValue implements Writabl
}
/**
+ * Create a KeyValue for the specified row, family and qualifier that would be
+ * smaller than all other possible KeyValues that have the same row,family,qualifier.
+ * Used for seeking.
* @param row - row key (arbitrary byte array)
- * @param f - family name
- * @param q - column qualifier
+ * @param family - family name
+ * @param qualifier - column qualifier
* @return First possible key on passed <code>row</code>, and column.
*/
- public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
- final byte [] q) {
- return new KeyValue(row, f, q, HConstants.LATEST_TIMESTAMP, Type.Maximum);
+ public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
+ final byte [] qualifier) {
+ return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
}
/**
@@ -1706,9 +1746,6 @@ public class KeyValue implements Writabl
return compare;
}
- // if row matches, and no column in the 'left' AND put type is 'minimum',
- // then return that left is larger than right.
-
// Compare column family. Start compare past row and family length.
int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset;
int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset;
@@ -1717,17 +1754,25 @@ public class KeyValue implements Writabl
int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
(rcolumnoffset - roffset);
+ // if row matches, and no column in the 'left' AND put type is 'minimum',
+ // then return that left is larger than right.
+
// This supports 'last key on a row' - the magic is if there is no column in the
// left operand, and the left operand has a type of '0' - magical value,
// then we say the left is bigger. This will let us seek to the last key in
// a row.
byte ltype = left[loffset + (llength - 1)];
+ byte rtype = right[roffset + (rlength - 1)];
if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
return 1; // left is bigger.
}
+ if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
+ return -1;
+ }
+ // TODO the family and qualifier should be compared separately
compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right,
rcolumnoffset, rcolumnlength);
if (compare != 0) {
@@ -1749,9 +1794,6 @@ public class KeyValue implements Writabl
if (!this.ignoreType) {
// Compare types. Let the delete types sort ahead of puts; i.e. types
// of higher numbers sort before those of lesser numbers
-
- // ltype is defined above
- byte rtype = right[roffset + (rlength - 1)];
return (0xff & rtype) - (0xff & ltype);
}
return 0;
@@ -1791,7 +1833,8 @@ public class KeyValue implements Writabl
public long heapSize() {
return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
ClassSize.align(ClassSize.ARRAY + length) +
- (2 * Bytes.SIZEOF_INT));
+ (2 * Bytes.SIZEOF_INT) +
+ Bytes.SIZEOF_LONG);
}
// this overload assumes that the length bytes have already been read,
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java Mon Apr 19 19:05:29 2010
@@ -660,6 +660,7 @@ public class HTable {
*/
public void close() throws IOException{
flushCommits();
+ this.pool.shutdownNow();
}
/**
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java Mon Apr 19 19:05:29 2010
@@ -168,10 +168,29 @@ public class Scan implements Writable {
}
/**
+ * Builds a scan object with the same specs as get.
+ * @param get get to model scan after
+ */
+ public Scan(Get get) {
+ this.startRow = get.getRow();
+ this.stopRow = get.getRow();
+ this.filter = get.getFilter();
+ this.maxVersions = get.getMaxVersions();
+ this.tr = get.getTimeRange();
+ this.familyMap = get.getFamilyMap();
+ }
+
+ public boolean isGetScan() {
+ return this.startRow != null && this.startRow.length > 0 &&
+ Bytes.equals(this.startRow, this.stopRow);
+ }
+
+ /**
* Get all columns from the specified family.
* <p>
* Overrides previous calls to addColumn for this family.
* @param family family name
+ * @return this
*/
public Scan addFamily(byte [] family) {
familyMap.remove(family);
@@ -185,6 +204,7 @@ public class Scan implements Writable {
* Overrides previous calls to addFamily for this family.
* @param family family name
* @param qualifier column qualifier
+ * @return this
*/
public Scan addColumn(byte [] family, byte [] qualifier) {
NavigableSet<byte []> set = familyMap.get(family);
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Mon Apr 19 19:05:29 2010
@@ -178,9 +178,6 @@ public class SingleColumnValueFilter imp
// byte array copy?
int compareResult =
this.comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length));
- if (LOG.isDebugEnabled()) {
- LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length));
- }
switch (this.compareOp) {
case LESS:
return compareResult <= 0;
Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java?rev=935708&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java Mon Apr 19 19:05:29 2010
@@ -0,0 +1,50 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DebugPrint {
+
+private static final AtomicBoolean enabled = new AtomicBoolean(false);
+ private static final Object sync = new Object();
+ public static StringBuilder out = new StringBuilder();
+
+ static public void enable() {
+ enabled.set(true);
+ }
+ static public void disable() {
+ enabled.set(false);
+ }
+
+ static public void reset() {
+ synchronized (sync) {
+ enable(); // someone wants us enabled basically.
+
+ out = new StringBuilder();
+ }
+ }
+ static public void dumpToFile(String file) throws IOException {
+ FileWriter f = new FileWriter(file);
+ synchronized (sync) {
+ f.write(out.toString());
+ }
+ f.close();
+ }
+
+ public static void println(String m) {
+ if (!enabled.get()) {
+ System.out.println(m);
+ return;
+ }
+
+ synchronized (sync) {
+ String threadName = Thread.currentThread().getName();
+ out.append("<");
+ out.append(threadName);
+ out.append("> ");
+ out.append(m);
+ out.append("\n");
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Apr 19 19:05:29 2010
@@ -1,5 +1,5 @@
/**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -67,10 +67,11 @@ package org.apache.hadoop.hbase.regionse
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
+ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@@ -176,6 +177,8 @@ public class HRegion implements HConstan
/**
* Set flags that make this region read-only.
+ *
+ * @param onOff flip value for region r/o setting
*/
synchronized void setReadOnly(final boolean onOff) {
this.writesEnabled = !onOff;
@@ -191,7 +194,7 @@ public class HRegion implements HConstan
}
}
- private volatile WriteState writestate = new WriteState();
+ private final WriteState writestate = new WriteState();
final long memstoreFlushSize;
private volatile long lastFlushTime;
@@ -210,7 +213,10 @@ public class HRegion implements HConstan
private final Object splitLock = new Object();
private long minSequenceId;
private boolean splitRequest;
-
+
+ private final ReadWriteConsistencyControl rwcc =
+ new ReadWriteConsistencyControl();
+
/**
* Name of the region info file that resides just under the region directory.
*/
@@ -296,9 +302,9 @@ public class HRegion implements HConstan
* Initialize this region and get it ready to roll.
* Called after construction.
*
- * @param initialFiles
- * @param reporter
- * @throws IOException
+ * @param initialFiles path
+ * @param reporter progressable
+ * @throws IOException e
*/
public void initialize(Path initialFiles, final Progressable reporter)
throws IOException {
@@ -436,6 +442,10 @@ public class HRegion implements HConstan
return this.closing.get();
}
+ public ReadWriteConsistencyControl getRWCC() {
+ return rwcc;
+ }
+
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
@@ -447,7 +457,7 @@ public class HRegion implements HConstan
* HStores make use of. It's a list of all HStoreFile objects. Returns empty
* vector if already closed and null if judged that it should not close.
*
- * @throws IOException
+ * @throws IOException e
*/
public List<StoreFile> close() throws IOException {
return close(false);
@@ -465,7 +475,7 @@ public class HRegion implements HConstan
* HStores make use of. It's a list of HStoreFile objects. Can be null if
* we are not to close at this time or we are already closed.
*
- * @throws IOException
+ * @throws IOException e
*/
public List<StoreFile> close(final boolean abort) throws IOException {
if (isClosed()) {
@@ -597,6 +607,7 @@ public class HRegion implements HConstan
}
/** @return the last time the region was flushed */
+ @SuppressWarnings({"UnusedDeclaration"})
public long getLastFlushTime() {
return this.lastFlushTime;
}
@@ -698,8 +709,7 @@ public class HRegion implements HConstan
HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
- HRegion regions[] = new HRegion [] {regionA, regionB};
- return regions;
+ return new HRegion [] {regionA, regionB};
}
}
@@ -773,7 +783,7 @@ public class HRegion implements HConstan
* server does them sequentially and not in parallel.
*
* @return mid key if split is needed
- * @throws IOException
+ * @throws IOException e
*/
public byte [] compactStores() throws IOException {
boolean majorCompaction = this.forceMajorCompaction;
@@ -794,7 +804,7 @@ public class HRegion implements HConstan
*
* @param majorCompaction True to force a major compaction regardless of thresholds
* @return split row if split is needed
- * @throws IOException
+ * @throws IOException e
*/
byte [] compactStores(final boolean majorCompaction)
throws IOException {
@@ -863,7 +873,7 @@ public class HRegion implements HConstan
*
* @return true if cache was flushed
*
- * @throws IOException
+ * @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
@@ -929,7 +939,7 @@ public class HRegion implements HConstan
*
* @return true if the region needs compacting
*
- * @throws IOException
+ * @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
@@ -957,18 +967,28 @@ public class HRegion implements HConstan
// during the flush
long sequenceId = -1L;
long completeSequenceId = -1L;
+
+ // we have to take a write lock during snapshot, or else a write could
+ // end up in both snapshot and memstore (makes it difficult to do atomic
+ // rows then)
this.updatesLock.writeLock().lock();
- // Get current size of memstores.
final long currentMemStoreSize = this.memstoreSize.get();
- List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>();
+ List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
sequenceId = log.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
- // create the store flushers
+
for (Store s : stores.values()) {
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
}
+ // This thread is going to cause a whole bunch of scanners to reseek.
+ // They are depending
+ // on a thread-local to know where to read from.
+ // The reason why we set it up high is so that each HRegionScanner only
+ // has a single read point for all its sub-StoreScanners.
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
// prepare flush (take a snapshot)
for (StoreFlusher flusher: storeFlushers) {
flusher.prepare();
@@ -977,6 +997,8 @@ public class HRegion implements HConstan
this.updatesLock.writeLock().unlock();
}
+ LOG.debug("Finished snapshotting, commencing flushing stores");
+
// Any failure from here on out will be catastrophic requiring server
// restart so hlog content can be replayed and put back into the memstore.
// Otherwise, the snapshot content while backed up in the hlog, it will not
@@ -990,13 +1012,28 @@ public class HRegion implements HConstan
flusher.flushCache();
}
- internalPreFlashcacheCommit();
+ Callable<Void> atomicWork = internalPreFlushcacheCommit();
+
+ LOG.debug("Caches flushed, doing commit now (which includes update scanners)");
/**
- * Switch between memstore and the new store file
+ * Switch between memstore(snapshot) and the new store file
*/
- this.newScannerLock.writeLock().lock();
+ if (atomicWork != null) {
+ LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock");
+ newScannerLock.writeLock().lock();
+ }
+
try {
+ // update this again to make sure we are 'fresh'
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+ if (atomicWork != null) {
+ atomicWork.call();
+ }
+
+ // Switch snapshot (in memstore) -> new hfile (thus causing
+ // all the store scanners to reset/reseek).
for (StoreFlusher flusher : storeFlushers) {
boolean needsCompaction = flusher.commit();
if (needsCompaction) {
@@ -1004,10 +1041,11 @@ public class HRegion implements HConstan
}
}
} finally {
- this.newScannerLock.writeLock().unlock();
+ if (atomicWork != null) {
+ newScannerLock.writeLock().unlock();
+ }
}
- // clear the stireFlushers list
storeFlushers.clear();
// Set down the memstore size by amount of flush.
this.memstoreSize.addAndGet(-currentMemStoreSize);
@@ -1057,9 +1095,14 @@ public class HRegion implements HConstan
* A hook for sub classed wishing to perform operations prior to the cache
* flush commit stage.
*
+ * If a subclass wishes that an atomic update of their work and the
+ * flush commit stage happens, they should return a callable. The new scanner
+ * lock will be acquired and released.
+
* @throws java.io.IOException allow children to throw exception
*/
- protected void internalPreFlashcacheCommit() throws IOException {
+ protected Callable<Void> internalPreFlushcacheCommit() throws IOException {
+ return null;
}
/**
@@ -1097,9 +1140,9 @@ public class HRegion implements HConstan
* <i>ts</i>.
*
* @param row row key
- * @param family
+ * @param family column family to find on
* @return map of values
- * @throws IOException
+ * @throws IOException read exceptions
*/
public Result getClosestRowBefore(final byte [] row, final byte [] family)
throws IOException {
@@ -1116,11 +1159,9 @@ public class HRegion implements HConstan
if (key == null) {
return null;
}
- // This will get all results for this store. TODO: Do we need to do this?
Get get = new Get(key.getRow());
- List<KeyValue> results = new ArrayList<KeyValue>();
- store.get(get, null, results);
- return new Result(results);
+ get.addFamily(family);
+ return get(get, null);
} finally {
splitsAndClosesLock.readLock().unlock();
}
@@ -1134,7 +1175,7 @@ public class HRegion implements HConstan
*
* @param scan configured {@link Scan}
* @return InternalScanner
- * @throws IOException
+ * @throws IOException read exceptions
*/
public InternalScanner getScanner(Scan scan)
throws IOException {
@@ -1172,24 +1213,23 @@ public class HRegion implements HConstan
// set() methods for client use.
//////////////////////////////////////////////////////////////////////////////
/**
- * @param delete
- * @param lockid
- * @param writeToWAL
- * @throws IOException
+ * @param delete delete object
+ * @param lockid existing lock id, or null for grab a lock
+ * @param writeToWAL append to the write ahead lock or not
+ * @throws IOException read exceptions
*/
public void delete(Delete delete, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
checkResources();
Integer lid = null;
- newScannerLock.writeLock().lock();
splitsAndClosesLock.readLock().lock();
try {
byte [] row = delete.getRow();
// If we did not pass an existing row lock, obtain a new one
lid = getLock(lockid, row);
- //Check to see if this is a deleteRow insert
+ // Check to see if this is a deleteRow insert
if(delete.getFamilyMap().isEmpty()){
for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
// Don't eat the timestamp
@@ -1210,7 +1250,6 @@ public class HRegion implements HConstan
} finally {
if(lockid == null) releaseRowLock(lid);
splitsAndClosesLock.readLock().unlock();
- newScannerLock.writeLock().unlock();
}
}
@@ -1225,7 +1264,9 @@ public class HRegion implements HConstan
long now = System.currentTimeMillis();
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
- this.updatesLock.readLock().lock();
+
+ updatesLock.readLock().lock();
+ ReadWriteConsistencyControl.WriteEntry w = null;
try {
@@ -1242,21 +1283,21 @@ public class HRegion implements HConstan
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
Integer count = kvCount.get(qual);
if (count == null) {
- kvCount.put(qual, new Integer(1));
+ kvCount.put(qual, 1);
} else {
- kvCount.put(qual, new Integer(count+1));
+ kvCount.put(qual, count + 1);
}
count = kvCount.get(qual);
- List<KeyValue> result = new ArrayList<KeyValue>(1);
- Get g = new Get(kv.getRow());
- g.setMaxVersions(count);
- NavigableSet<byte []> qualifiers =
- new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
- qualifiers.add(qual);
- get(store, g, qualifiers, result);
+ Get get = new Get(kv.getRow());
+ get.setMaxVersions(count);
+ get.addColumn(family, qual);
+
+ List<KeyValue> result = get(get);
+
if (result.size() < count) {
// Nothing to delete
kv.updateLatestStamp(byteNow);
@@ -1301,11 +1342,11 @@ public class HRegion implements HConstan
}
}
+ // Now make changes to the memstore.
+
long size = 0;
+ w = rwcc.beginMemstoreInsert();
- //
- // Now make changes to the memstore.
- //
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -1313,13 +1354,17 @@ public class HRegion implements HConstan
Store store = getStore(family);
for (KeyValue kv: kvs) {
+ kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.delete(kv));
}
}
flush = isFlushSize(size);
} finally {
+ if (w != null) rwcc.completeMemstoreInsert(w);
+
this.updatesLock.readLock().unlock();
}
+
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@@ -1367,8 +1412,8 @@ public class HRegion implements HConstan
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
- newScannerLock.writeLock().lock();
splitsAndClosesLock.readLock().lock();
+
try {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
@@ -1378,6 +1423,7 @@ public class HRegion implements HConstan
byte [] row = put.getRow();
// If we did not pass an existing row lock, obtain a new one
Integer lid = getLock(lockid, row);
+
byte [] now = Bytes.toBytes(System.currentTimeMillis());
try {
// All edits for the given row (across all column families) must happen atomically.
@@ -1387,7 +1433,6 @@ public class HRegion implements HConstan
}
} finally {
splitsAndClosesLock.readLock().unlock();
- newScannerLock.writeLock().unlock();
}
}
@@ -1427,15 +1472,12 @@ public class HRegion implements HConstan
Integer lid = getLock(lockId, get.getRow());
List<KeyValue> result = new ArrayList<KeyValue>();
try {
- //Getting data
- for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
- get.getFamilyMap().entrySet()) {
- get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
- }
+ result = get(get);
+
boolean matches = false;
if (result.size() == 0 && expectedValue.length == 0) {
matches = true;
- } else if(result.size() == 1) {
+ } else if (result.size() == 1) {
//Compare the expected value with the actual value
byte [] actualValue = result.get(0).getValue();
matches = Bytes.equals(expectedValue, actualValue);
@@ -1551,6 +1593,7 @@ public class HRegion implements HConstan
/**
* Add updates first to the hlog and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
+ * @param family
* @param edits Cell updates by column
* @praram now
* @throws IOException
@@ -1575,6 +1618,7 @@ public class HRegion implements HConstan
byte[] byteNow = Bytes.toBytes(now);
boolean flush = false;
this.updatesLock.readLock().lock();
+ ReadWriteConsistencyControl.WriteEntry w = null;
try {
WALEdit walEdit = new WALEdit();
@@ -1618,6 +1662,8 @@ public class HRegion implements HConstan
long size = 0;
+ w = rwcc.beginMemstoreInsert();
+
// now make changes to the memstore
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -1625,11 +1671,14 @@ public class HRegion implements HConstan
Store store = getStore(family);
for (KeyValue kv: edits) {
+ kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.add(kv));
}
}
flush = isFlushSize(size);
} finally {
+ if (w != null) rwcc.completeMemstoreInsert(w);
+
this.updatesLock.readLock().unlock();
}
if (flush) {
@@ -1871,9 +1920,13 @@ public class HRegion implements HConstan
private Filter filter;
private RowFilterInterface oldFilter;
private List<KeyValue> results = new ArrayList<KeyValue>();
+ private int isScan;
private int batch;
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+ //DebugPrint.println("HRegionScanner.<init>, threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint());
this.filter = scan.getFilter();
this.batch = scan.getBatch();
this.oldFilter = scan.getOldFilter();
@@ -1882,12 +1935,13 @@ public class HRegion implements HConstan
} else {
this.stopRow = scan.getStopRow();
}
+ this.isScan = scan.isGetScan() ? -1 : 0;
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
scanners.add(store.getScanner(scan, entry.getValue()));
@@ -1910,6 +1964,9 @@ public class HRegion implements HConstan
if (oldFilter != null) {
oldFilter.reset();
}
+
+ // Start the next row read and reset the thread point
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
}
public boolean next(List<KeyValue> outResults, int limit) throws IOException {
@@ -1918,6 +1975,9 @@ public class HRegion implements HConstan
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing=" + closing.get() + " or closed=" + closed.get());
}
+
+ // This could be a new thread from the last time we called next().
+ ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
results.clear();
boolean returnResult = nextInternal(limit);
if (!returnResult && filterRow()) {
@@ -1999,7 +2059,7 @@ public class HRegion implements HConstan
return currentRow == null ||
(this.stopRow != null &&
comparator.compareRows(this.stopRow, 0, this.stopRow.length,
- currentRow, 0, currentRow.length) <= 0);
+ currentRow, 0, currentRow.length) <= isScan);
}
private boolean filterRow() {
@@ -2519,10 +2579,10 @@ public class HRegion implements HConstan
// HBASE-880
//
/**
- * @param get
- * @param lockid
+ * @param get get object
+ * @param lockid existing lock id, or null for no previous lock
* @return result
- * @throws IOException
+ * @throws IOException read exceptions
*/
public Result get(final Get get, final Integer lockid) throws IOException {
// Verify families are all valid
@@ -2535,24 +2595,28 @@ public class HRegion implements HConstan
get.addFamily(family);
}
}
- // Lock row
- Integer lid = getLock(lockid, get.getRow());
- List<KeyValue> result = new ArrayList<KeyValue>();
- try {
- for (Map.Entry<byte[],NavigableSet<byte[]>> entry:
- get.getFamilyMap().entrySet()) {
- get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
- }
- } finally {
- if(lockid == null) releaseRowLock(lid);
- }
+ List<KeyValue> result = get(get);
+
return new Result(result);
}
- private void get(final Store store, final Get get,
- final NavigableSet<byte []> qualifiers, List<KeyValue> result)
- throws IOException {
- store.get(get, qualifiers, result);
+ /*
+ * Do a get based on the get parameter.
+ */
+ private List<KeyValue> get(final Get get) throws IOException {
+ Scan scan = new Scan(get);
+
+ List<KeyValue> results = new ArrayList<KeyValue>();
+
+ InternalScanner scanner = null;
+ try {
+ scanner = getScanner(scan);
+ scanner.next(results);
+ } finally {
+ if (scanner != null)
+ scanner.close();
+ }
+ return results;
}
/**
@@ -2561,6 +2625,7 @@ public class HRegion implements HConstan
* @param family
* @param qualifier
* @param amount
+ * @param writeToWAL
* @return The new value.
* @throws IOException
*/
@@ -2575,6 +2640,7 @@ public class HRegion implements HConstan
try {
Store store = stores.get(family);
+ // TODO call the proper GET API
// Get the old value:
Get get = new Get(row);
get.addColumn(family, qualifier);
@@ -2640,7 +2706,7 @@ public class HRegion implements HConstan
public static final long FIXED_OVERHEAD = ClassSize.align(
(5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
- (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+ (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java Mon Apr 19 19:05:29 2010
@@ -1,5 +1,5 @@
/**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.KeyValue;
+
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
@@ -28,8 +30,6 @@ import java.util.SortedSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.hadoop.hbase.KeyValue;
-
/**
* A {@link java.util.Set} of {@link KeyValue}s implemented on top of a
* {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a
@@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
* has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
* get and set and won't throw ConcurrentModificationException when iterating.
*/
-class KeyValueSkipListSet implements NavigableSet<KeyValue>, Cloneable {
+class KeyValueSkipListSet implements NavigableSet<KeyValue> {
private ConcurrentNavigableMap<KeyValue, KeyValue> delegatee;
KeyValueSkipListSet(final KeyValue.KVComparator c) {
@@ -167,6 +167,7 @@ class KeyValueSkipListSet implements Nav
}
public boolean contains(Object o) {
+ //noinspection SuspiciousMethodCalls
return this.delegatee.containsKey(o);
}
@@ -201,17 +202,4 @@ class KeyValueSkipListSet implements Nav
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("Not implemented");
}
-
- @Override
- public KeyValueSkipListSet clone() {
- assert this.delegatee.getClass() == ConcurrentSkipListMap.class;
- KeyValueSkipListSet clonedSet = null;
- try {
- clonedSet = (KeyValueSkipListSet) super.clone();
- } catch (CloneNotSupportedException e) {
- throw new InternalError(e.getMessage());
- }
- clonedSet.delegatee = ((ConcurrentSkipListMap) this.delegatee).clone();
- return clonedSet;
- }
}
\ No newline at end of file
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Mon Apr 19 19:05:29 2010
@@ -28,7 +28,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.SortedSet;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -110,7 +112,7 @@ public class MemStore implements HeapSiz
/**
* Creates a snapshot of the current memstore.
- * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)}
+ * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.SortedSet)}
* To get the snapshot made by this method, use {@link #getSnapshot()}
*/
void snapshot() {
@@ -140,7 +142,7 @@ public class MemStore implements HeapSiz
* call to {@link #snapshot()}
* @return Return snapshot.
* @see {@link #snapshot()}
- * @see {@link #clearSnapshot(java.util.Map)}
+ * @see {@link #clearSnapshot(java.util.SortedSet)}
*/
KeyValueSkipListSet getSnapshot() {
return this.snapshot;
@@ -187,7 +189,7 @@ public class MemStore implements HeapSiz
return s;
}
- /**
+ /**
* Write a delete
* @param delete
* @return approximate size of the passed key and value.
@@ -195,69 +197,8 @@ public class MemStore implements HeapSiz
long delete(final KeyValue delete) {
long s = 0;
this.lock.readLock().lock();
- //Have to find out what we want to do here, to find the fastest way of
- //removing things that are under a delete.
- //Actions that will take place here are:
- //1. Insert a delete and remove all the affected entries already in memstore
- //2. In the case of a Delete and the matching put is found then don't insert
- // the delete
- //TODO Would be nice with if we had an iterator for this, so we could remove
- //things that needs to be removed while iterating and don't have to go
- //back and do it afterwards
try {
- boolean notpresent = false;
- List<KeyValue> deletes = new ArrayList<KeyValue>();
- SortedSet<KeyValue> tail = this.kvset.tailSet(delete);
-
- //Parse the delete, so that it is only done once
- byte [] deleteBuffer = delete.getBuffer();
- int deleteOffset = delete.getOffset();
-
- int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
- deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
-
- short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
- deleteOffset += Bytes.SIZEOF_SHORT;
- int deleteRowOffset = deleteOffset;
-
- deleteOffset += deleteRowLen;
-
- byte deleteFamLen = deleteBuffer[deleteOffset];
- deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
-
- int deleteQualifierOffset = deleteOffset;
- int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
- Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
- Bytes.SIZEOF_BYTE;
-
- deleteOffset += deleteQualifierLen;
-
- int deleteTimestampOffset = deleteOffset;
- deleteOffset += Bytes.SIZEOF_LONG;
- byte deleteType = deleteBuffer[deleteOffset];
-
- //Comparing with tail from memstore
- for (KeyValue kv : tail) {
- DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
- deleteRowOffset, deleteRowLen, deleteQualifierOffset,
- deleteQualifierLen, deleteTimestampOffset, deleteType,
- comparator.getRawComparator());
- if (res == DeleteCode.DONE) {
- break;
- } else if (res == DeleteCode.DELETE) {
- deletes.add(kv);
- } // SKIP
- }
-
- //Delete all the entries effected by the last added delete
- for (KeyValue kv : deletes) {
- notpresent = this.kvset.remove(kv);
- s -= heapSizeChange(kv, notpresent);
- }
-
- // Adding the delete to memstore. Add any value, as long as
- // same instance each time.
s += heapSizeChange(delete, this.kvset.add(delete));
} finally {
this.lock.readLock().unlock();
@@ -265,7 +206,7 @@ public class MemStore implements HeapSiz
this.size.addAndGet(s);
return s;
}
-
+
/**
* @param kv Find the row that comes after this one. If null, we return the
* first.
@@ -318,7 +259,7 @@ public class MemStore implements HeapSiz
}
/**
- * @param state
+ * @param state column/delete tracking state
*/
void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
this.lock.readLock().lock();
@@ -442,8 +383,7 @@ public class MemStore implements HeapSiz
this.lock.readLock().lock();
try {
KeyValueScanner [] scanners = new KeyValueScanner[1];
- scanners[0] = new MemStoreScanner(this.kvset.clone(),
- this.snapshot.clone(), this.comparator);
+ scanners[0] = new MemStoreScanner();
return scanners;
} finally {
this.lock.readLock().unlock();
@@ -465,10 +405,8 @@ public class MemStore implements HeapSiz
* @param matcher Column matcher
* @param result List to add results to
* @return true if done with store (early-out), false if not
- * @throws IOException
*/
- public boolean get(QueryMatcher matcher, List<KeyValue> result)
- throws IOException {
+ public boolean get(QueryMatcher matcher, List<KeyValue> result) {
this.lock.readLock().lock();
try {
if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
@@ -485,11 +423,11 @@ public class MemStore implements HeapSiz
* Gets from either the memstore or the snapshop, and returns a code
* to let you know which is which.
*
- * @param matcher
- * @param result
+ * @param matcher query matcher
+ * @param result puts results here
* @return 1 == memstore, 2 == snapshot, 0 == none
*/
- int getWithCode(QueryMatcher matcher, List<KeyValue> result) throws IOException {
+ int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
this.lock.readLock().lock();
try {
boolean fromMemstore = internalGet(this.kvset, matcher, result);
@@ -517,18 +455,16 @@ public class MemStore implements HeapSiz
void readLockUnlock() {
this.lock.readLock().unlock();
}
-
+
/**
*
* @param set memstore or snapshot
* @param matcher query matcher
* @param result list to add results to
* @return true if done with store (early-out), false if not
- * @throws IOException
*/
boolean internalGet(final NavigableSet<KeyValue> set,
- final QueryMatcher matcher, final List<KeyValue> result)
- throws IOException {
+ final QueryMatcher matcher, final List<KeyValue> result) {
if(set.isEmpty()) return false;
// Seek to startKey
SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
@@ -550,11 +486,152 @@ public class MemStore implements HeapSiz
}
return false;
}
+
+ /*
+ * MemStoreScanner implements the KeyValueScanner.
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * This behaves as if it were a real scanner but does not maintain position.
+ */
+ protected class MemStoreScanner implements KeyValueScanner {
+ // Next row information for either kvset or snapshot
+ private KeyValue kvsetNextRow = null;
+ private KeyValue snapshotNextRow = null;
+
+ // iterator based scanning.
+ Iterator<KeyValue> kvsetIt;
+ Iterator<KeyValue> snapshotIt;
+
+ /*
+ Some notes...
+
+ So memstorescanner is fixed at creation time. this includes pointers/iterators into
+ existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
+ snapshot is moved. since kvset is null there is no point on reseeking on both,
+ we can save us the trouble. During the snapshot->hfile transition, the memstore
+ scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
+ potentially do something smarter by adjusting the existing memstore scanner.
+
+ But there is a greater problem here, that being once a scanner has progressed
+ during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
+ if a scan lasts a little while, there is a chance for new entries in kvset to
+ become available but we will never see them. This needs to be handled at the
+ StoreScanner level with coordination with MemStoreScanner.
+
+ */
+
+ MemStoreScanner() {
+ super();
+
+ //DebugPrint.println(" MS new@" + hashCode());
+ }
+
+ protected KeyValue getNext(Iterator<KeyValue> it) {
+ KeyValue ret = null;
+ long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
+
+ while (ret == null && it.hasNext()) {
+ KeyValue v = it.next();
+ if (v.getMemstoreTS() <= readPoint) {
+ // keep it.
+ ret = v;
+ }
+ }
+ return ret;
+ }
+
+ public synchronized boolean seek(KeyValue key) {
+ if (key == null) {
+ close();
+ return false;
+ }
+
+ // kvset and snapshot will never be empty.
+ // if tailSet cant find anything, SS is empty (not null).
+ SortedSet<KeyValue> kvTail = kvset.tailSet(key);
+ SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
+
+ kvsetIt = kvTail.iterator();
+ snapshotIt = snapshotTail.iterator();
+
+ kvsetNextRow = getNext(kvsetIt);
+ snapshotNextRow = getNext(snapshotIt);
+
+
+ //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
+ // kvset.size() + " threadread = " + readPoint);
+ //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
+ // snapshot.size() + " threadread = " + readPoint);
+
+
+ KeyValue lowest = getLowest();
+
+ // has data := (lowest != null)
+ return lowest != null;
+ }
+
+ public synchronized KeyValue peek() {
+ //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+ return getLowest();
+ }
+
+
+ public synchronized KeyValue next() {
+ KeyValue theNext = getLowest();
+
+ if (theNext == null) {
+ return null;
+ }
+
+ // Advance one of the iterators
+ if (theNext == kvsetNextRow) {
+ kvsetNextRow = getNext(kvsetIt);
+ } else {
+ snapshotNextRow = getNext(snapshotIt);
+ }
+
+ //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
+ //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
+ // getLowest() + " threadpoint=" + readpoint);
+ return theNext;
+ }
+
+ protected KeyValue getLowest() {
+ return getLower(kvsetNextRow,
+ snapshotNextRow);
+ }
+
+ /*
+ * Returns the lower of the two key values, or null if they are both null.
+ * This uses comparator.compare() to compare the KeyValue using the memstore
+ * comparator.
+ */
+ protected KeyValue getLower(KeyValue first, KeyValue second) {
+ if (first == null && second == null) {
+ return null;
+ }
+ if (first != null && second != null) {
+ int compare = comparator.compare(first, second);
+ return (compare <= 0 ? first : second);
+ }
+ return (first != null ? first : second);
+ }
+
+ public synchronized void close() {
+ this.kvsetNextRow = null;
+ this.snapshotNextRow = null;
+
+ this.kvsetIt = null;
+ this.snapshotIt = null;
+ }
+ }
public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
-
+
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
@@ -568,11 +645,11 @@ public class MemStore implements HeapSiz
* @return Size
*/
long heapSizeChange(final KeyValue kv, final boolean notpresent) {
- return notpresent ?
+ return notpresent ?
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
0;
}
-
+
/**
* Get the entire heap usage for this MemStore not including keys in the
* snapshot.
@@ -581,7 +658,7 @@ public class MemStore implements HeapSiz
public long heapSize() {
return size.get();
}
-
+
/**
* Get the heap usage of KVs in this MemStore.
*/
@@ -603,7 +680,7 @@ public class MemStore implements HeapSiz
* enough. See hbase-900. Fills memstores then waits so user can heap
* dump and bring up resultant hprof in something like jprofiler which
* allows you get 'deep size' on objects.
- * @param args
+ * @param args main args
*/
public static void main(String [] args) {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
@@ -638,5 +715,4 @@ public class MemStore implements HeapSiz
}
LOG.info("Exiting.");
}
-
}
Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=935708&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Mon Apr 19 19:05:29 2010
@@ -0,0 +1,106 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+public class ReadWriteConsistencyControl {
+ private final AtomicLong memstoreRead = new AtomicLong();
+ private final AtomicLong memstoreWrite = new AtomicLong();
+ // This is the pending queue of writes.
+ private final LinkedList<WriteEntry> writeQueue =
+ new LinkedList<WriteEntry>();
+
+ private static final ThreadLocal<Long> perThreadReadPoint =
+ new ThreadLocal<Long>();
+
+ public static long getThreadReadPoint() {
+ return perThreadReadPoint.get();
+ }
+
+ public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
+ perThreadReadPoint.set(rwcc.memstoreReadPoint());
+ return getThreadReadPoint();
+ }
+
+ public WriteEntry beginMemstoreInsert() {
+ synchronized (writeQueue) {
+ long nextWriteNumber = memstoreWrite.incrementAndGet();
+ WriteEntry e = new WriteEntry(nextWriteNumber);
+ writeQueue.add(e);
+ return e;
+ }
+ }
+ public void completeMemstoreInsert(WriteEntry e) {
+ synchronized (writeQueue) {
+ e.markCompleted();
+
+ long nextReadValue = -1;
+ boolean ranOnce=false;
+ while (!writeQueue.isEmpty()) {
+ ranOnce=true;
+ WriteEntry queueFirst = writeQueue.getFirst();
+
+ if (nextReadValue > 0) {
+ if (nextReadValue+1 != queueFirst.getWriteNumber()) {
+ throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ + nextReadValue + " next: " + queueFirst.getWriteNumber());
+ }
+ }
+
+ if (queueFirst.isCompleted()) {
+ nextReadValue = queueFirst.getWriteNumber();
+ writeQueue.removeFirst();
+ } else {
+ break;
+ }
+ }
+
+ if (!ranOnce) {
+ throw new RuntimeException("never was a first");
+ }
+
+ if (nextReadValue > 0) {
+ memstoreRead.set(nextReadValue);
+ }
+ }
+
+ // Spin until any other concurrent puts have finished. This makes sure that
+ // if we move on to construct a scanner, we'll get read-your-own-writes
+ // consistency. We anticipate that since puts to the memstore are very fast,
+ // this will be on the order of microseconds - so spinning should be faster
+ // than a condition variable.
+ int spun = 0;
+ while (memstoreRead.get() < e.getWriteNumber()) {
+ spun++;
+ }
+ // Could potentially expose spun as a metric
+ }
+
+ public long memstoreReadPoint() {
+ return memstoreRead.get();
+ }
+
+
+ public static class WriteEntry {
+ private long writeNumber;
+ private boolean completed = false;
+ WriteEntry(long writeNumber) {
+ this.writeNumber = writeNumber;
+ }
+ void markCompleted() {
+ this.completed = true;
+ }
+ boolean isCompleted() {
+ return this.completed;
+ }
+ long getWriteNumber() {
+ return this.writeNumber;
+ }
+ }
+}
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Mon Apr 19 19:05:29 2010
@@ -55,7 +55,11 @@ public class ScanQueryMatcher extends Qu
this.rowComparator = rowComparator;
this.deletes = new ScanDeleteTracker();
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
- this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+ if (scan.isGetScan()) {
+ this.stopKey = KeyValue.createLastOnRow(scan.getStopRow());
+ } else {
+ this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+ }
this.filter = scan.getFilter();
this.oldFilter = scan.getOldFilter();
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Apr 19 19:05:29 2010
@@ -509,7 +509,7 @@ public class Store implements HConstants
/**
* Snapshot this stores memstore. Call before running
- * {@link #flushCache(long)} so it has some work to do.
+ * {@link #flushCache(long, java.util.SortedSet)} so it has some work to do.
*/
void snapshot() {
this.memstore.snapshot();
@@ -614,9 +614,12 @@ public class Store implements HConstants
this.lock.writeLock().lock();
try {
this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
+
+ this.memstore.clearSnapshot(set);
+
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
- this.memstore.clearSnapshot(set);
+
return this.storefiles.size() >= this.compactionThreshold;
} finally {
this.lock.writeLock().unlock();
@@ -644,10 +647,8 @@ public class Store implements HConstants
* @param o Observer no longer interested in changes in set of Readers.
*/
void deleteChangedReaderObserver(ChangedReadersObserver o) {
- if(this.changedReaderObservers.size() > 0) {
- if (!this.changedReaderObservers.remove(o)) {
- LOG.warn("Not in set" + o);
- }
+ if (!this.changedReaderObservers.remove(o)) {
+ LOG.warn("Not in set" + o);
}
}
@@ -873,7 +874,6 @@ public class Store implements HConstants
/**
* Do a minor/major compaction. Uses the scan infrastructure to make it easy.
*
- * @param writer output writer
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @param maxId Readers maximum sequence id.
@@ -1006,6 +1006,10 @@ public class Store implements HConstants
Long orderVal = Long.valueOf(result.getMaxSequenceId());
this.storefiles.put(orderVal, result);
}
+
+ // WARN ugly hack here, but necessary sadly.
+ ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
+
// Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
@@ -1496,7 +1500,12 @@ public class Store implements HConstants
}
/**
- * Increments the value for the given row/family/qualifier
+ * Increments the value for the given row/family/qualifier.
+ *
+ * This function will always be seen as atomic by other readers
+ * because it only puts a single KV to memstore. Thus no
+ * read/write control necessary.
+ *
* @param row
* @param f
* @param qualifier
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Mon Apr 19 19:05:29 2010
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,10 +45,15 @@ class StoreScanner implements KeyValueSc
private boolean cacheBlocks;
// Used to indicate that the scanner has closed (see HBASE-1107)
- private final AtomicBoolean closing = new AtomicBoolean(false);
+ private boolean closing = false;
+ private final boolean isGet;
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles.
+ *
+ * @param store who we scan
+ * @param scan the spec
+ * @param columns which columns we are scanning
*/
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
this.store = store;
@@ -58,9 +62,11 @@ class StoreScanner implements KeyValueSc
columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));
+ this.isGet = scan.isGetScan();
List<KeyValueScanner> scanners = getScanners();
// Seek all scanners to the initial key
+ // TODO if scan.isGetScan, use bloomfilters to skip seeking
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
@@ -76,10 +82,14 @@ class StoreScanner implements KeyValueSc
* Used for major compactions.<p>
*
* Opens a scanner across specified StoreFiles.
+ * @param store who we scan
+ * @param scan the spec
+ * @param scanners ancilliary scanners
*/
StoreScanner(Store store, Scan scan, KeyValueScanner [] scanners) {
this.store = store;
this.cacheBlocks = false;
+ this.isGet = false;
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));
@@ -99,6 +109,7 @@ class StoreScanner implements KeyValueSc
final NavigableSet<byte[]> columns,
final KeyValueScanner [] scanners) {
this.store = null;
+ this.isGet = false;
this.cacheBlocks = scan.getCacheBlocks();
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
comparator.getRawComparator(), scan.getMaxVersions());
@@ -132,8 +143,8 @@ class StoreScanner implements KeyValueSc
}
public synchronized void close() {
- boolean state = this.closing.getAndSet(true);
- if (state) return;
+ if (this.closing) return;
+ this.closing = true;
// under test, we dont have a this.store
if (this.store != null)
this.store.deleteChangedReaderObserver(this);
@@ -146,11 +157,12 @@ class StoreScanner implements KeyValueSc
/**
* Get the next row of values from this Store.
- * @param result
+ * @param outResult
* @param limit
* @return true if there are more rows, false if scanner is done
*/
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
+ //DebugPrint.println("SS.next");
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
@@ -161,6 +173,7 @@ class StoreScanner implements KeyValueSc
List<KeyValue> results = new ArrayList<KeyValue>();
LOOP: while((kv = this.heap.peek()) != null) {
QueryMatcher.MatchCode qcode = matcher.match(kv);
+ //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
switch(qcode) {
case INCLUDE:
KeyValue next = this.heap.next();
@@ -228,8 +241,8 @@ class StoreScanner implements KeyValueSc
LOG.warn("StoreFile " + sf + " has null Reader");
continue;
}
- // Get a scanner that does not use pread.
- s.add(r.getScanner(this.cacheBlocks, false));
+ // If isGet, use pread, else false, dont use pread
+ s.add(r.getScanner(this.cacheBlocks, isGet));
}
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(s.size()+1);
@@ -241,16 +254,20 @@ class StoreScanner implements KeyValueSc
// Implementation of ChangedReadersObserver
public synchronized void updateReaders() throws IOException {
- if (this.closing.get()) return;
+ if (this.closing) return;
KeyValue topKey = this.peek();
if (topKey == null) return;
+
List<KeyValueScanner> scanners = getScanners();
- // Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
scanner.seek(topKey);
}
+ // close the previous scanners:
+ this.heap.close(); // bubble thru and close all scanners.
+ this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
+
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(
scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);
Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java Mon Apr 19 19:05:29 2010
@@ -367,6 +367,7 @@ public class FSUtils {
return true;
}
+ // TODO move this method OUT of FSUtils. No dependencies to HMaster
/**
* Expects to find -ROOT- directory.
* @param fs
Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java Mon Apr 19 19:05:29 2010
@@ -276,4 +276,49 @@ public class TestKeyValue extends TestCa
// TODO actually write this test!
}
+
+ private final byte[] rowA = Bytes.toBytes("rowA");
+ private final byte[] rowB = Bytes.toBytes("rowB");
+
+ private final byte[] family = Bytes.toBytes("family");
+ private final byte[] qualA = Bytes.toBytes("qfA");
+
+ private void assertKVLess(KeyValue.KVComparator c,
+ KeyValue less,
+ KeyValue greater) {
+ int cmp = c.compare(less,greater);
+ assertTrue(cmp < 0);
+ cmp = c.compare(greater,less);
+ assertTrue(cmp > 0);
+ }
+
+ public void testFirstLastOnRow() {
+ final KVComparator c = KeyValue.COMPARATOR;
+ long ts = 1;
+
+ // These are listed in sort order (ie: every one should be less
+ // than the one on the next line).
+ final KeyValue firstOnRowA = KeyValue.createFirstOnRow(rowA);
+ final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put);
+ final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put);
+
+ final KeyValue lastOnRowA = KeyValue.createLastOnRow(rowA);
+ final KeyValue firstOnRowB = KeyValue.createFirstOnRow(rowB);
+ final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put);
+
+ assertKVLess(c, firstOnRowA, firstOnRowB);
+ assertKVLess(c, firstOnRowA, kvA_1);
+ assertKVLess(c, firstOnRowA, kvA_2);
+ assertKVLess(c, kvA_1, kvA_2);
+ assertKVLess(c, kvA_2, firstOnRowB);
+ assertKVLess(c, kvA_1, firstOnRowB);
+
+ assertKVLess(c, lastOnRowA, firstOnRowB);
+ assertKVLess(c, firstOnRowB, kvB);
+ assertKVLess(c, lastOnRowA, kvB);
+
+ assertKVLess(c, kvA_2, lastOnRowA);
+ assertKVLess(c, kvA_1, lastOnRowA);
+ assertKVLess(c, firstOnRowA, lastOnRowA);
+ }
}
Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java Mon Apr 19 19:05:29 2010
@@ -67,7 +67,7 @@ public class TestClient extends HBaseClu
super();
}
- /**
+ /*
* Test from client side of an involved filter against a multi family that
* involves deletes.
*
@@ -196,7 +196,7 @@ public class TestClient extends HBaseClu
}
}
- /**
+ /*
* Test filters when multiple regions. It does counts. Needs eye-balling of
* logs to ensure that we're not scanning more regions that we're supposed to.
* Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
@@ -253,7 +253,7 @@ public class TestClient extends HBaseClu
assertEquals(rowCount - endKeyCount, countGreater);
}
- /**
+ /*
* Load table with rows from 'aaa' to 'zzz'.
* @param t
* @return Count of rows loaded.
@@ -418,7 +418,7 @@ public class TestClient extends HBaseClu
scanner.close();
}
- /**
+ /*
* Test simple table and non-existent row cases.
*/
public void testSimpleMissing() throws Exception {
@@ -531,7 +531,7 @@ public class TestClient extends HBaseClu
assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
}
- /**
+ /*
* Test basic puts, gets, scans, and deletes for a single row
* in a multiple family table.
*/
@@ -1438,7 +1438,7 @@ public class TestClient extends HBaseClu
ht.put(put);
delete = new Delete(ROW);
- delete.deleteColumn(FAMILIES[0], QUALIFIER);
+ delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4]
ht.delete(delete);
get = new Get(ROW);
@@ -1473,23 +1473,24 @@ public class TestClient extends HBaseClu
// But alas, this is not to be. We can't put them back in either case.
put = new Put(ROW);
- put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
- put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
+ put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000
+ put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000
ht.put(put);
- // The Get returns the latest value but then does not return the
- // oldest, which was never deleted, ts[1].
-
+
+ // It used to be due to the internal implementation of Get, that
+ // the Get() call would return ts[4] UNLIKE the Scan below. With
+ // the switch to using Scan for Get this is no longer the case.
get = new Get(ROW);
get.addFamily(FAMILIES[0]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
- new long [] {ts[2], ts[3], ts[4]},
- new byte[][] {VALUES[2], VALUES[3], VALUES[4]},
+ new long [] {ts[1], ts[2], ts[3]},
+ new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
0, 2);
- // The Scanner returns the previous values, the expected-unexpected behavior
+ // The Scanner returns the previous values, the expected-naive-unexpected behavior
scan = new Scan(ROW);
scan.addFamily(FAMILIES[0]);
@@ -1553,7 +1554,7 @@ public class TestClient extends HBaseClu
result = ht.get(get);
assertTrue("Expected 2 keys but received " + result.size(),
result.size() == 2);
- assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER,
+ assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER,
new long [] {ts[0], ts[1]},
new byte[][] {VALUES[0], VALUES[1]},
0, 1);
@@ -1591,9 +1592,8 @@ public class TestClient extends HBaseClu
get.addFamily(FAMILIES[2]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
- assertTrue("Expected 1 key but received " + result.size() + ": " + result,
- result.size() == 1);
- assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
+ assertEquals(1, result.size());
+ assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long [] {ts[2]},
new byte[][] {VALUES[2]},
0, 0);
@@ -1603,9 +1603,8 @@ public class TestClient extends HBaseClu
scan.addFamily(FAMILIES[2]);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
- assertTrue("Expected 1 key but received " + result.size(),
- result.size() == 1);
- assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
+ assertEquals(1, result.size());
+ assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long [] {ts[2]},
new byte[][] {VALUES[2]},
0, 0);
@@ -1691,7 +1690,7 @@ public class TestClient extends HBaseClu
}
}
- /**
+ /*
* Baseline "scalability" test.
*
* Tests one hundred families, one million columns, one million versions
@@ -1738,7 +1737,7 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* Explicitly test JIRAs related to HBASE-880 / Client API
*/
public void testJIRAs() throws Exception {
@@ -1754,7 +1753,7 @@ public class TestClient extends HBaseClu
// JIRA Testers
//
- /**
+ /*
* HBASE-867
* If millions of columns in a column family, hbase scanner won't come up
*
@@ -1844,7 +1843,7 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* HBASE-861
* get with timestamp will return a value if there is a version with an
* earlier timestamp
@@ -1907,7 +1906,7 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* HBASE-33
* Add a HTable get/obtainScanner method that retrieves all versions of a
* particular column and row between two timestamps
@@ -1956,7 +1955,7 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* HBASE-1014
* commit(BatchUpdate) method should return timestamp
*/
@@ -1980,7 +1979,7 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* HBASE-1182
* Scan for columns > some timestamp
*/
@@ -2025,7 +2024,7 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* HBASE-52
* Add a means of scanning over all versions
*/
@@ -2423,7 +2422,7 @@ public class TestClient extends HBaseClu
- /**
+ /*
* Verify a single column using gets.
* Expects family and qualifier arrays to be valid for at least
* the range: idx-2 < idx < idx+2
@@ -2480,7 +2479,7 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* Verify a single column using scanners.
* Expects family and qualifier arrays to be valid for at least
* the range: idx-2 to idx+2
@@ -2542,11 +2541,11 @@ public class TestClient extends HBaseClu
}
- /**
+ /*
* Verify we do not read any values by accident around a single column
* Same requirements as getVerifySingleColumn
*/
- private void getVerifySingleEmpty(HTable ht,
+ private void getVerifySingleEmpty(HTable ht,
byte [][] ROWS, int ROWIDX,
byte [][] FAMILIES, int FAMILYIDX,
byte [][] QUALIFIERS, int QUALIFIERIDX)
@@ -2668,12 +2667,11 @@ public class TestClient extends HBaseClu
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
int expectedResults = end - start + 1;
- assertTrue("Expected " + expectedResults + " keys but result contains "
- + result.size(), result.size() == expectedResults);
-
+ assertEquals(expectedResults, result.size());
+
KeyValue [] keys = result.sorted();
- for(int i=0;i<keys.length;i++) {
+ for (int i=0; i<keys.length; i++) {
byte [] value = values[end-i];
long ts = stamps[end-i];
KeyValue key = keys[i];
@@ -2692,7 +2690,7 @@ public class TestClient extends HBaseClu
}
}
- /**
+ /*
* Validate that result contains two specified keys, exactly.
* It is assumed key A sorts before key B.
*/
@@ -2728,10 +2726,7 @@ public class TestClient extends HBaseClu
equals(valueB, kvB.getValue()));
}
- /**
- *
- */
- private void assertSingleResult(Result result, byte [] row, byte [] family,
+ private void assertSingleResult(Result result, byte [] row, byte [] family,
byte [] qualifier, byte [] value)
throws Exception {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
@@ -2808,7 +2803,7 @@ public class TestClient extends HBaseClu
}
byte [][] ret = new byte[n][];
for(int i=0;i<n;i++) {
- byte [] tail = Bytes.toBytes(new Integer(i).toString());
+ byte [] tail = Bytes.toBytes(Integer.toString(i));
ret[i] = Bytes.add(base, tail);
}
return ret;
@@ -2898,22 +2893,14 @@ public class TestClient extends HBaseClu
return new HTable(conf, tableName);
}
+ @SuppressWarnings({"SimplifiableIfStatement"})
private boolean equals(byte [] left, byte [] right) {
- if(left == null && right == null) return true;
- if(left == null && right.length == 0) return true;
- if(right == null && left.length == 0) return true;
+ if (left == null && right == null) return true;
+ if (left == null && right.length == 0) return true;
+ if (right == null && left.length == 0) return true;
return Bytes.equals(left, right);
}
-
-
-
-
-
-
-
-
-
public void XtestDuplicateVersions() throws Exception {
byte [] TABLE = Bytes.toBytes("testDuplicateVersions");
Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java Mon Apr 19 19:05:29 2010
@@ -254,8 +254,8 @@ public class TestGetDeleteTracker extend
}
//update()
dt.update();
- assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts3));
- assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts1));
+ assertFalse(dt.isDeleted(col2, 0, col2Len, ts3));
+ assertFalse(dt.isDeleted(col2, 0, col2Len, ts1));
}
public void testIsDeleted_Delete(){
//Building lists
Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Mon Apr 19 19:05:29 2010
@@ -94,7 +94,7 @@ public class TestHRegion extends HBaseTe
//////////////////////////////////////////////////////////////////////////////
- /**
+ /*
* An involved filter test. Has multiple column families and deletes in mix.
*/
public void testWeirdCacheBehaviour() throws Exception {
@@ -366,6 +366,34 @@ public class TestHRegion extends HBaseTe
//////////////////////////////////////////////////////////////////////////////
// Delete tests
//////////////////////////////////////////////////////////////////////////////
+ public void testDelete_multiDeleteColumn() throws IOException {
+ byte [] tableName = Bytes.toBytes("testtable");
+ byte [] row1 = Bytes.toBytes("row1");
+ byte [] fam1 = Bytes.toBytes("fam1");
+ byte [] qual = Bytes.toBytes("qualifier");
+ byte [] value = Bytes.toBytes("value");
+
+ Put put = new Put(row1);
+ put.add(fam1, qual, 1, value);
+ put.add(fam1, qual, 2, value);
+
+ String method = this.getName();
+ initHRegion(tableName, method, fam1);
+
+ region.put(put);
+
+ // We do support deleting more than 1 'latest' version
+ Delete delete = new Delete(row1);
+ delete.deleteColumn(fam1, qual);
+ delete.deleteColumn(fam1, qual);
+ region.delete(delete, null, false);
+
+ Get get = new Get(row1);
+ get.addFamily(fam1);
+ Result r = region.get(get, null);
+ assertEquals(0, r.size());
+ }
+
public void testDelete_CheckFamily() throws IOException {
byte [] tableName = Bytes.toBytes("testtable");
byte [] row1 = Bytes.toBytes("row1");
@@ -374,11 +402,9 @@ public class TestHRegion extends HBaseTe
byte [] fam3 = Bytes.toBytes("fam3");
byte [] fam4 = Bytes.toBytes("fam4");
- byte[][] families = {fam1, fam2, fam3};
-
//Setting up region
String method = this.getName();
- initHRegion(tableName, method, families);
+ initHRegion(tableName, method, fam1, fam2, fam3);
List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(row1, fam4, null, null));
@@ -1439,6 +1465,41 @@ public class TestHRegion extends HBaseTe
assertICV(row, fam1, qual1, value+amount);
}
+ public void testIncrementColumnValue_BumpSnapshot() throws IOException {
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 42L;
+ long incr = 44L;
+
+ // first put something in kvset, then snapshot it.
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ region.put(put);
+
+ // get the store in question:
+ Store s = region.getStore(fam1);
+ s.snapshot(); //bam
+
+ // now increment:
+ long newVal = region.incrementColumnValue(row, fam1, qual1,
+ incr, false);
+
+ assertEquals(value+incr, newVal);
+
+ // get both versions:
+ Get get = new Get(row);
+ get.setMaxVersions();
+ get.addColumn(fam1,qual1);
+
+ Result r = region.get(get, null);
+ assertEquals(2, r.size());
+ KeyValue first = r.raw()[0];
+ KeyValue second = r.raw()[1];
+
+ assertTrue("ICV failed to upgrade timestamp",
+ first.getTimestamp() != second.getTimestamp());
+ }
+
public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
initHRegion(tableName, getName(), fam1);
@@ -1652,7 +1713,7 @@ public class TestHRegion extends HBaseTe
assertEquals(expected.get(i), actual.get(i));
}
}
-
+
//////////////////////////////////////////////////////////////////////////////
// Split test
//////////////////////////////////////////////////////////////////////////////
@@ -1935,9 +1996,9 @@ public class TestHRegion extends HBaseTe
FlushThread flushThread = new FlushThread();
flushThread.start();
- Scan scan = new Scan();
- scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
- new BinaryComparator(Bytes.toBytes("row0"))));
+ Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
+// scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
+// new BinaryComparator(Bytes.toBytes("row0"))));
int expectedCount = numFamilies * numQualifiers;
List<KeyValue> res = new ArrayList<KeyValue>();
@@ -1950,7 +2011,7 @@ public class TestHRegion extends HBaseTe
}
if (i != 0 && i % flushInterval == 0) {
- //System.out.println("scan iteration = " + i);
+ //System.out.println("flush scan iteration = " + i);
flushThread.flush();
}
@@ -1959,14 +2020,18 @@ public class TestHRegion extends HBaseTe
InternalScanner scanner = region.getScanner(scan);
while (scanner.next(res)) ;
if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
- Assert.assertEquals("i=" + i, expectedCount, res.size());
+ assertEquals("i=" + i, expectedCount, res.size());
long timestamp = res.get(0).getTimestamp();
- Assert.assertTrue(timestamp >= prevTimestamp);
+ assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
+ timestamp >= prevTimestamp);
prevTimestamp = timestamp;
}
}
putThread.done();
+
+ region.flushcache();
+
putThread.join();
putThread.checkNoError();
@@ -2011,15 +2076,16 @@ public class TestHRegion extends HBaseTe
for (int r = 0; r < numRows; r++) {
byte[] row = Bytes.toBytes("row" + r);
Put put = new Put(row);
- for (int f = 0; f < families.length; f++) {
- for (int q = 0; q < qualifiers.length; q++) {
- put.add(families[f], qualifiers[q], (long) val,
- Bytes.toBytes(val));
+ for (byte[] family : families) {
+ for (byte[] qualifier : qualifiers) {
+ put.add(family, qualifier, (long) val,
+ Bytes.toBytes(val));
}
}
+// System.out.println("Putting of kvsetsize=" + put.size());
region.put(put);
- if (val > 0 && val % 47 == 0){
- //System.out.println("put iteration = " + val);
+ if (val > 0 && val % 47 == 0) {
+ System.out.println("put iteration = " + val);
Delete delete = new Delete(row, (long)val-30, null);
region.delete(delete, null, true);
}
@@ -2099,6 +2165,9 @@ public class TestHRegion extends HBaseTe
}
putThread.done();
+
+ region.flushcache();
+
putThread.join();
putThread.checkNoError();