You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2011/10/28 23:49:38 UTC
svn commit: r1190606 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/regionserver/wal/
src/test/java/org/apache/hadoop/hbase/regionserver/
src/test/java/org/apache/hadoop/hbase/regionserve...
Author: jgray
Date: Fri Oct 28 21:49:38 2011
New Revision: 1190606
URL: http://svn.apache.org/viewvc?rev=1190606&view=rev
Log:
HBASE-4528 The put operation can release the rowlock before sync-ing the Hlog (dhruba via jgray)
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Oct 28 21:49:38 2011
@@ -4,6 +4,8 @@ Release 0.93.0 - Unreleased
HBASE-4460 Support running an embedded ThriftServer within a RegionServer (jgray)
HBASE-4536 Allow CF to retain deleted rows (Lars H)
HBASE-4629 Enable automated patch testing for hbase (Giridharan Kesavan)
+ HBASE-4528 The put operation can release the rowlock before sync-ing the
+ Hlog (dhruba via jgray)
IMPROVEMENT
HBASE-4132 Extend the WALActionsListener API to accomodate log archival
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Oct 28 21:49:38 2011
@@ -1209,6 +1209,7 @@ public class HRegion implements HeapSize
// during the flush
long sequenceId = -1L;
long completeSequenceId = -1L;
+ ReadWriteConsistencyControl.WriteEntry w = null;
// We have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic
@@ -1219,6 +1220,10 @@ public class HRegion implements HeapSize
final long currentMemStoreSize = this.memstoreSize.get();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
+ // Record the rwcc for all transactions in progress.
+ w = rwcc.beginMemstoreInsert();
+ rwcc.advanceMemstore(w);
+
sequenceId = (wal == null)? myseqid :
wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
@@ -1234,8 +1239,17 @@ public class HRegion implements HeapSize
} finally {
this.updatesLock.writeLock().unlock();
}
- status.setStatus("Flushing stores");
+ status.setStatus("Waiting for rwcc");
+ LOG.debug("Finished snapshotting, commencing waiting for rwcc");
+ // wait for all in-progress transactions to commit to HLog before
+ // we can start the flush. This prevents
+ // uncommitted transactions from being written into HFiles.
+ // We have to block before we start the flush, otherwise keys that
+ // were removed via a rollbackMemstore could be written to Hfiles.
+ rwcc.waitForRead(w);
+
+ status.setStatus("Flushing stores");
LOG.debug("Finished snapshotting, commencing flushing stores");
// Any failure from here on out will be catastrophic requiring server
@@ -1246,15 +1260,17 @@ public class HRegion implements HeapSize
try {
// A. Flush memstore to all the HStores.
// Keep running vector of all store files that includes both old and the
- // just-made new flush store file.
+ // just-made new flush store file. The new flushed file is still in the
+ // tmp directory.
for (StoreFlusher flusher : storeFlushers) {
flusher.flushCache(status);
}
+
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
for (StoreFlusher flusher : storeFlushers) {
- boolean needsCompaction = flusher.commit();
+ boolean needsCompaction = flusher.commit(status);
if (needsCompaction) {
compactionRequested = true;
}
@@ -1483,11 +1499,12 @@ public class HRegion implements HeapSize
}
/**
+ * This is used only by unit tests. Not required to be a public API.
* @param familyMap map of family to edits for the given family.
* @param writeToWAL
* @throws IOException
*/
- public void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
+ void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
boolean writeToWAL) throws IOException {
Delete delete = new Delete();
delete.setFamilyMap(familyMap);
@@ -1577,7 +1594,7 @@ public class HRegion implements HeapSize
}
// Now make changes to the memstore.
- long addedSize = applyFamilyMapToMemstore(familyMap);
+ long addedSize = applyFamilyMapToMemstore(familyMap, null);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
if (coprocessorHost != null) {
@@ -1745,8 +1762,9 @@ public class HRegion implements HeapSize
}
}
- long now = EnvironmentEdgeManager.currentTimeMillis();
- byte[] byteNow = Bytes.toBytes(now);
+ ReadWriteConsistencyControl.WriteEntry w = null;
+ long txid = 0;
+ boolean walSyncSuccessful = false;
boolean locked = false;
/** Keep track of the locks we hold so we can release them in finally clause */
@@ -1805,6 +1823,12 @@ public class HRegion implements HeapSize
lastIndexExclusive++;
numReadyToWrite++;
}
+
+ // we should record the timestamp only after we have acquired the rowLock,
+ // otherwise, newer puts are not guaranteed to have a newer timestamp
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ byte[] byteNow = Bytes.toBytes(now);
+
// Nothing to put -- an exception in the above such as NoSuchColumnFamily?
if (numReadyToWrite <= 0) return 0L;
@@ -1823,46 +1847,89 @@ public class HRegion implements HeapSize
byteNow);
}
-
this.updatesLock.readLock().lock();
locked = true;
+ //
+ // ------------------------------------
+ // Acquire the latest rwcc number
+ // ----------------------------------
+ w = rwcc.beginMemstoreInsert();
+
// ------------------------------------
- // STEP 3. Write to WAL
+ // STEP 3. Write back to memstore
+ // Write to memstore. It is ok to write to memstore
+ // first without updating the HLog because we do not roll
+ // forward the memstore RWCC. The RWCC will be moved up when
+ // the complete operation is done. These changes are not yet
+ // visible to scanners till we update the RWCC. The RWCC is
+ // moved only when the sync is complete.
// ----------------------------------
+ long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
- // Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
continue;
}
-
- Put p = batchOp.operations[i].getFirst();
- if (!p.getWriteToWAL()) continue;
- addFamilyMapToWALEdit(familyMaps[i], walEdit);
+ addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
}
- // Append the edit to WAL
- Put first = batchOp.operations[firstIndex].getFirst();
- this.log.append(regionInfo, this.htableDescriptor.getName(),
- walEdit, first.getClusterId(), now, this.htableDescriptor);
-
// ------------------------------------
- // STEP 4. Write back to memstore
+ // STEP 4. Build WAL edit
// ----------------------------------
- long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
+ // Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) {
continue;
}
- addedSize += applyFamilyMapToMemstore(familyMaps[i]);
- batchOp.retCodeDetails[i] = new OperationStatus(
- OperationStatusCode.SUCCESS);
+ batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS);
+
+ Put p = batchOp.operations[i].getFirst();
+ if (!p.getWriteToWAL()) continue;
+ addFamilyMapToWALEdit(familyMaps[i], walEdit);
+ }
+
+ // -------------------------
+ // STEP 5. Append the edit to WAL. Do not sync wal.
+ // -------------------------
+ Put first = batchOp.operations[firstIndex].getFirst();
+ txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
+ walEdit, first.getClusterId(), now, this.htableDescriptor);
+
+ // -------------------------------
+ // STEP 6. Release row locks, etc.
+ // -------------------------------
+ if (locked) {
+ this.updatesLock.readLock().unlock();
+ locked = false;
+ }
+ if (acquiredLocks != null) {
+ for (Integer toRelease : acquiredLocks) {
+ releaseRowLock(toRelease);
+ }
+ acquiredLocks = null;
+ }
+ // -------------------------
+ // STEP 7. Sync wal.
+ // -------------------------
+ if (walEdit.size() > 0 &&
+ (this.regionInfo.isMetaRegion() ||
+ !this.htableDescriptor.isDeferredLogFlush())) {
+ this.log.sync(txid);
+ }
+ walSyncSuccessful = true;
+ // ------------------------------------------------------------------
+ // STEP 8. Advance rwcc. This will make this put visible to scanners and getters.
+ // ------------------------------------------------------------------
+ if (w != null) {
+ rwcc.completeMemstoreInsert(w);
+ w = null;
}
// ------------------------------------
- // STEP 5. Run coprocessor post hooks
+ // STEP 9. Run coprocessor post hooks. This should be done after the wal is
+ // sycned so that the coprocessor contract is adhered to.
// ------------------------------------
if (coprocessorHost != null) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -1879,11 +1946,21 @@ public class HRegion implements HeapSize
success = true;
return addedSize;
} finally {
- if (locked)
+
+ // if the wal sync was unsuccessful, remove keys from memstore
+ if (!walSyncSuccessful) {
+ rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
+ }
+ if (w != null) rwcc.completeMemstoreInsert(w);
+
+ if (locked) {
this.updatesLock.readLock().unlock();
+ }
- for (Integer toRelease : acquiredLocks) {
- releaseRowLock(toRelease);
+ if (acquiredLocks != null) {
+ for (Integer toRelease : acquiredLocks) {
+ releaseRowLock(toRelease);
+ }
}
if (!success) {
for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -2121,7 +2198,7 @@ public class HRegion implements HeapSize
walEdit, clusterId, now, this.htableDescriptor);
}
- long addedSize = applyFamilyMapToMemstore(familyMap);
+ long addedSize = applyFamilyMapToMemstore(familyMap, null);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally {
this.updatesLock.readLock().unlock();
@@ -2143,14 +2220,22 @@ public class HRegion implements HeapSize
* should already have locked updatesLock.readLock(). This also does
* <b>not</b> check the families for validity.
*
+ * @param familyMap Map of kvs per family
+ * @param localizedWriteEntry The WriteEntry of the RWCC for this transaction.
+ * If null, then this method internally creates a rwcc transaction.
* @return the additional memory usage of the memstore caused by the
* new entries.
*/
- private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
- ReadWriteConsistencyControl.WriteEntry w = null;
+ private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
+ ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) {
long size = 0;
+ boolean freerwcc = false;
+
try {
- w = rwcc.beginMemstoreInsert();
+ if (localizedWriteEntry == null) {
+ localizedWriteEntry = rwcc.beginMemstoreInsert();
+ freerwcc = true;
+ }
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@@ -2158,17 +2243,55 @@ public class HRegion implements HeapSize
Store store = getStore(family);
for (KeyValue kv: edits) {
- kv.setMemstoreTS(w.getWriteNumber());
+ kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
size += store.add(kv);
}
}
} finally {
- rwcc.completeMemstoreInsert(w);
+ if (freerwcc) {
+ rwcc.completeMemstoreInsert(localizedWriteEntry);
+ }
}
return size;
}
/**
+ * Remove all the keys listed in the map from the memstore. This method is
+ * called when a Put has updated memstore but subequently fails to update
+ * the wal. This method is then invoked to rollback the memstore.
+ */
+ private void rollbackMemstore(BatchOperationInProgress<Pair<Put, Integer>> batchOp,
+ Map<byte[], List<KeyValue>>[] familyMaps,
+ int start, int end) {
+ int kvsRolledback = 0;
+ for (int i = start; i < end; i++) {
+ // skip over request that never succeeded in the first place.
+ if (batchOp.retCodeDetails[i].getOperationStatusCode()
+ != OperationStatusCode.SUCCESS) {
+ continue;
+ }
+
+ // Rollback all the kvs for this row.
+ Map<byte[], List<KeyValue>> familyMap = familyMaps[i];
+ for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+ byte[] family = e.getKey();
+ List<KeyValue> edits = e.getValue();
+
+ // Remove those keys from the memstore that matches our
+ // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
+ // that even the memstoreTS has to match for keys that will be rolleded-back.
+ Store store = getStore(family);
+ for (KeyValue kv: edits) {
+ store.rollback(kv);
+ kvsRolledback++;
+ }
+ }
+ }
+ LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
+ " keyvalues from start:" + start + " to end:" + end);
+ }
+
+ /**
* Check the collection of families for validity.
* @throws NoSuchColumnFamilyException if a family does not exist.
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java Fri Oct 28 21:49:38 2011
@@ -165,6 +165,10 @@ class KeyValueSkipListSet implements Nav
throw new UnsupportedOperationException("Not implemented");
}
+ public KeyValue get(KeyValue kv) {
+ return this.delegatee.get(kv);
+ }
+
public int size() {
return this.delegatee.size();
}
@@ -176,4 +180,4 @@ class KeyValueSkipListSet implements Nav
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("Not implemented");
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Oct 28 21:49:38 2011
@@ -250,6 +250,38 @@ public class MemStore implements HeapSiz
}
/**
+ * Remove n key from the memstore. Only kvs that have the same key and the
+ * same memstoreTS are removed. It is ok to not update timeRangeTracker
+ * in this call. It is possible that we can optimize this method by using
+ * tailMap/iterator, but since this method is called rarely (only for
+ * error recovery), we can leave those optimization for the future.
+ * @param kv
+ */
+ void rollback(final KeyValue kv) {
+ this.lock.readLock().lock();
+ try {
+ // If the key is in the snapshot, delete it. We should not update
+ // this.size, because that tracks the size of only the memstore and
+ // not the snapshot. The flush of this snapshot to disk has not
+ // yet started because Store.flush() waits for all rwcc transactions to
+ // commit before starting the flush to disk.
+ KeyValue found = this.snapshot.get(kv);
+ if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+ this.snapshot.remove(kv);
+ }
+ // If the key is in the memstore, delete it. Update this.size.
+ found = this.kvset.get(kv);
+ if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+ this.kvset.remove(kv);
+ long s = heapSizeChange(kv, true);
+ this.size.addAndGet(-s);
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /**
* Write a delete
* @param delete
* @return approximate size of the passed key and value.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Fri Oct 28 21:49:38 2011
@@ -87,6 +87,11 @@ public class ReadWriteConsistencyControl
}
public void completeMemstoreInsert(WriteEntry e) {
+ advanceMemstore(e);
+ waitForRead(e);
+ }
+
+ boolean advanceMemstore(WriteEntry e) {
synchronized (writeQueue) {
e.markCompleted();
@@ -120,10 +125,19 @@ public class ReadWriteConsistencyControl
memstoreRead = nextReadValue;
readWaiters.notifyAll();
}
-
}
+ if (memstoreRead >= e.getWriteNumber()) {
+ return true;
+ }
+ return false;
}
+ }
+ /**
+ * Wait for the global readPoint to advance upto
+ * the specified transaction number.
+ */
+ public void waitForRead(WriteEntry e) {
boolean interrupted = false;
synchronized (readWaiters) {
while (memstoreRead < e.getWriteNumber()) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Oct 28 21:49:38 2011
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -322,6 +323,22 @@ public class Store implements HeapSize {
}
/**
+ * Removes a kv from the memstore. The KeyValue is removed only
+ * if its key & memstoreTS matches the key & memstoreTS value of the
+ * kv parameter.
+ *
+ * @param kv
+ */
+ protected void rollback(final KeyValue kv) {
+ lock.readLock().lock();
+ try {
+ this.memstore.rollback(kv);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
* @return All store files.
*/
List<StoreFile> getStorefiles() {
@@ -447,34 +464,41 @@ public class Store implements HeapSize {
* @param logCacheFlushId flush sequence number
* @param snapshot
* @param snapshotTimeRangeTracker
- * @return true if a compaction is needed
+ * @param flushedSize The number of bytes flushed
+ * @param status
+ * @return Path The path name of the tmp file to which the store was flushed
* @throws IOException
*/
- private StoreFile flushCache(final long logCacheFlushId,
+ private Path flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot,
TimeRangeTracker snapshotTimeRangeTracker,
+ AtomicLong flushedSize,
MonitoredTask status) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
return internalFlushCache(
- snapshot, logCacheFlushId, snapshotTimeRangeTracker, status);
+ snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
}
/*
* @param cache
* @param logCacheFlushId
- * @return StoreFile created.
+ * @param snapshotTimeRangeTracker
+ * @param flushedSize The number of bytes flushed
+ * @return Path The path name of the tmp file to which the store was flushed
* @throws IOException
*/
- private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
+ private Path internalFlushCache(final SortedSet<KeyValue> set,
final long logCacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker,
+ AtomicLong flushedSize,
MonitoredTask status)
throws IOException {
StoreFile.Writer writer;
String fileName;
long flushed = 0;
+ Path pathName;
// Don't flush if there are no entries.
if (set.size() == 0) {
return null;
@@ -496,7 +520,7 @@ public class Store implements HeapSize {
// A. Write the map out to the disk
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
- fileName = writer.getPath().getName();
+ pathName = writer.getPath();
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
@@ -520,17 +544,39 @@ public class Store implements HeapSize {
}
}
} finally {
+ flushedSize.set(flushed);
scanner.close();
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Flushed " +
+ ", sequenceid=" + logCacheFlushId +
+ ", memsize=" + StringUtils.humanReadableInt(flushed) +
+ ", into tmp file " + pathName);
+ }
+ return pathName;
+ }
+ /*
+ * @param path The pathname of the tmp file into which the store was flushed
+ * @param logCacheFlushId
+ * @return StoreFile created.
+ * @throws IOException
+ */
+ private StoreFile commitFile(final Path path,
+ final long logCacheFlushId,
+ TimeRangeTracker snapshotTimeRangeTracker,
+ AtomicLong flushedSize,
+ MonitoredTask status)
+ throws IOException {
// Write-out finished successfully, move into the right spot
+ String fileName = path.getName();
Path dstPath = new Path(homedir, fileName);
- validateStoreFile(writer.getPath());
- String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
+ validateStoreFile(path);
+ String msg = "Renaming flushed file at " + path + " to " + dstPath;
LOG.info(msg);
status.setStatus("Flushing " + this + ": " + msg);
- if (!fs.rename(writer.getPath(), dstPath)) {
- LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
+ if (!fs.rename(path, dstPath)) {
+ LOG.warn("Unable to rename " + path + " to " + dstPath);
}
status.setStatus("Flushing " + this + ": reopening flushed file");
@@ -546,11 +592,10 @@ public class Store implements HeapSize {
// HRegion.internalFlushcache, which indirectly calls this to actually do
// the flushing through the StoreFlusherImpl class
HRegion.incrNumericPersistentMetric("cf." + this.toString() + ".flushSize",
- flushed);
+ flushedSize.longValue());
if(LOG.isInfoEnabled()) {
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
", sequenceid=" + logCacheFlushId +
- ", memsize=" + StringUtils.humanReadableInt(flushed) +
", filesize=" + StringUtils.humanReadableInt(r.length()));
}
return sf;
@@ -1815,10 +1860,13 @@ public class Store implements HeapSize {
private long cacheFlushId;
private SortedSet<KeyValue> snapshot;
private StoreFile storeFile;
+ private Path storeFilePath;
private TimeRangeTracker snapshotTimeRangeTracker;
+ private AtomicLong flushedSize;
private StoreFlusherImpl(long cacheFlushId) {
this.cacheFlushId = cacheFlushId;
+ this.flushedSize = new AtomicLong();
}
@Override
@@ -1830,15 +1878,17 @@ public class Store implements HeapSize {
@Override
public void flushCache(MonitoredTask status) throws IOException {
- storeFile = Store.this.flushCache(
- cacheFlushId, snapshot, snapshotTimeRangeTracker, status);
+ storeFilePath = Store.this.flushCache(
+ cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status);
}
@Override
- public boolean commit() throws IOException {
- if (storeFile == null) {
+ public boolean commit(MonitoredTask status) throws IOException {
+ if (storeFilePath == null) {
return false;
}
+ storeFile = Store.this.commitFile(storeFilePath, cacheFlushId,
+ snapshotTimeRangeTracker, flushedSize, status);
// Add new file to store files. Clear snapshot too while we have
// the Store write lock.
return Store.this.updateStorefiles(storeFile, snapshot);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Fri Oct 28 21:49:38 2011
@@ -60,5 +60,5 @@ interface StoreFlusher {
* @return
* @throws IOException
*/
- boolean commit() throws IOException;
+ boolean commit(MonitoredTask status) throws IOException;
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Oct 28 21:49:38 2011
@@ -1230,7 +1230,7 @@ public class HLog implements Syncable {
logSyncerThread.hlogFlush(this.writer);
this.writer.sync();
syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
- this.syncedTillHere = doneUpto;
+ this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
} catch(IOException io) {
syncSuccessful = false;
}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java?rev=1190606&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java Fri Oct 28 21:49:38 2011
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.NullComparator;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Testing of multiPut in parallel.
+ *
+ */
+public class TestParallelPut extends HBaseTestCase {
+ static final Log LOG = LogFactory.getLog(TestParallelPut.class);
+
+ private static HRegion region = null;
+ private static HBaseTestingUtility hbtu = new HBaseTestingUtility();
+ private static final String DIR = hbtu.getDataTestDir() + "/TestParallelPut/";
+
+ // Test names
+ static final byte[] tableName = Bytes.toBytes("testtable");;
+ static final byte[] qual1 = Bytes.toBytes("qual1");
+ static final byte[] qual2 = Bytes.toBytes("qual2");
+ static final byte[] qual3 = Bytes.toBytes("qual3");
+ static final byte[] value1 = Bytes.toBytes("value1");
+ static final byte[] value2 = Bytes.toBytes("value2");
+ static final byte [] row = Bytes.toBytes("rowA");
+ static final byte [] row2 = Bytes.toBytes("rowB");
+
+ /**
+ * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
+ */
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ EnvironmentEdgeManagerTestHelper.reset();
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // New tests that don't spin up a mini cluster but rather just test the
+ // individual code pieces in the HRegion.
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Test one put command.
+ */
+ public void testPut() throws IOException {
+ LOG.info("Starting testPut");
+ initHRegion(tableName, getName(), fam1);
+
+ long value = 1L;
+
+ Put put = new Put(row);
+ put.add(fam1, qual1, Bytes.toBytes(value));
+ region.put(put);
+
+ assertGet(row, fam1, qual1, Bytes.toBytes(value));
+ }
+
+ /**
+ * Test multi-threaded Puts.
+ */
+ public void testParallelPuts() throws IOException {
+
+ LOG.info("Starting testParallelPuts");
+ initHRegion(tableName, getName(), fam1);
+ int numOps = 1000; // these many operations per thread
+
+ // create 100 threads, each will do its own puts
+ int numThreads = 100;
+ Putter[] all = new Putter[numThreads];
+
+ // create all threads
+ for (int i = 0; i < numThreads; i++) {
+ all[i] = new Putter(region, i, numOps);
+ }
+
+ // run all threads
+ for (int i = 0; i < numThreads; i++) {
+ all[i].start();
+ }
+
+ // wait for all threads to finish
+ for (int i = 0; i < numThreads; i++) {
+ try {
+ all[i].join();
+ } catch (InterruptedException e) {
+ LOG.warn("testParallelPuts encountered InterruptedException." +
+ " Ignoring....", e);
+ }
+ }
+ LOG.info("testParallelPuts successfully verified " +
+ (numOps * numThreads) + " put operations.");
+ }
+
+
+ static private void assertGet(byte [] row,
+ byte [] familiy,
+ byte[] qualifier,
+ byte[] value) throws IOException {
+ // run a get and see if the value matches
+ Get get = new Get(row);
+ get.addColumn(familiy, qualifier);
+ Result result = region.get(get, null);
+ assertEquals(1, result.size());
+
+ KeyValue kv = result.raw()[0];
+ byte[] r = kv.getValue();
+ assertTrue(Bytes.compareTo(r, value) == 0);
+ }
+
+ private void initHRegion(byte [] tableName, String callingMethod,
+ byte[] ... families)
+ throws IOException {
+ initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
+ }
+
+ private void initHRegion(byte [] tableName, String callingMethod,
+ Configuration conf, byte [] ... families)
+ throws IOException{
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ for(byte [] family : families) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+ HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
+ Path path = new Path(DIR + callingMethod);
+ if (fs.exists(path)) {
+ if (!fs.delete(path, true)) {
+ throw new IOException("Failed delete of " + path);
+ }
+ }
+ region = HRegion.createHRegion(info, path, conf, htd);
+ }
+
+ /**
+ * A thread that makes a few put calls
+ */
+ public static class Putter extends Thread {
+
+ private final HRegion region;
+ private final int threadNumber;
+ private final int numOps;
+ private final Random rand = new Random();
+ byte [] rowkey = null;
+
+ public Putter(HRegion region, int threadNumber, int numOps) {
+ this.region = region;
+ this.threadNumber = threadNumber;
+ this.numOps = numOps;
+ this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ byte[] value = new byte[100];
+ Put[] in = new Put[1];
+
+ // iterate for the specified number of operations
+ for (int i=0; i<numOps; i++) {
+ // generate random bytes
+ rand.nextBytes(value);
+
+ // put the randombytes and verify that we can read it. This is one
+ // way of ensuring that rwcc manipulation in HRegion.put() is fine.
+ Put put = new Put(rowkey);
+ put.add(fam1, qual1, value);
+ in[0] = put;
+ try {
+ OperationStatus[] ret = region.put(in);
+ assertEquals(1, ret.length);
+ assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
+ assertGet(rowkey, fam1, qual1, value);
+ } catch (IOException e) {
+ assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
+ false);
+ }
+ }
+ }
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Fri Oct 28 21:49:38 2011
@@ -618,7 +618,7 @@ public class TestStore extends TestCase
StoreFlusher storeFlusher = store.getStoreFlusher(id);
storeFlusher.prepare();
storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
- storeFlusher.commit();
+ storeFlusher.commit(Mockito.mock(MonitoredTask.class));
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Oct 28 21:49:38 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.MiniHBase
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -250,6 +251,17 @@ public class TestLogRolling {
}
}
+ void validateData(HTable table, int rownum) throws IOException {
+ String row = "row" + String.format("%1$04d", rownum);
+ Get get = new Get(Bytes.toBytes(row));
+ get.addFamily(HConstants.CATALOG_FAMILY);
+ Result result = table.get(get);
+ assertTrue(result.size() == 1);
+ assertTrue(Bytes.equals(value,
+ result.getValue(HConstants.CATALOG_FAMILY, null)));
+ LOG.info("Validated row " + row);
+ }
+
void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
throws IOException {
for (int i = 0; i < 10; i++) {
@@ -462,6 +474,7 @@ public class TestLogRolling {
Thread.sleep(1000);
dfsCluster.waitActive();
LOG.info("Data Nodes restarted");
+ validateData(table, 1002);
// this write should succeed, but trigger a log roll
writeData(table, 1003);
@@ -469,6 +482,7 @@ public class TestLogRolling {
assertTrue("Missing datanode should've triggered a log roll",
newFilenum > oldFilenum && newFilenum > curTime);
+ validateData(table, 1003);
writeData(table, 1004);
@@ -477,6 +491,7 @@ public class TestLogRolling {
Thread.sleep(1000);
dfsCluster.waitActive();
LOG.info("Data Nodes restarted");
+ validateData(table, 1004);
// this write should succeed, but trigger a log roll
writeData(table, 1005);