You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2011/10/28 23:49:38 UTC

svn commit: r1190606 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/regionserve...

Author: jgray
Date: Fri Oct 28 21:49:38 2011
New Revision: 1190606

URL: http://svn.apache.org/viewvc?rev=1190606&view=rev
Log:
HBASE-4528  The put operation can release the rowlock before sync-ing the Hlog (dhruba via jgray)

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Oct 28 21:49:38 2011
@@ -4,6 +4,8 @@ Release 0.93.0 - Unreleased
    HBASE-4460  Support running an embedded ThriftServer within a RegionServer (jgray)
    HBASE-4536  Allow CF to retain deleted rows (Lars H)
    HBASE-4629  Enable automated patch testing for hbase (Giridharan Kesavan)
+   HBASE-4528  The put operation can release the rowlock before sync-ing the
+               Hlog (dhruba via jgray)
 
   IMPROVEMENT
    HBASE-4132  Extend the WALActionsListener API to accomodate log archival

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Oct 28 21:49:38 2011
@@ -1209,6 +1209,7 @@ public class HRegion implements HeapSize
     // during the flush
     long sequenceId = -1L;
     long completeSequenceId = -1L;
+    ReadWriteConsistencyControl.WriteEntry w = null;
 
     // We have to take a write lock during snapshot, or else a write could
     // end up in both snapshot and memstore (makes it difficult to do atomic
@@ -1219,6 +1220,10 @@ public class HRegion implements HeapSize
     final long currentMemStoreSize = this.memstoreSize.get();
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
     try {
+      // Record the rwcc for all transactions in progress.
+      w = rwcc.beginMemstoreInsert();
+      rwcc.advanceMemstore(w);
+
       sequenceId = (wal == null)? myseqid :
         wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
@@ -1234,8 +1239,17 @@ public class HRegion implements HeapSize
     } finally {
       this.updatesLock.writeLock().unlock();
     }
-    status.setStatus("Flushing stores");
+    status.setStatus("Waiting for rwcc");
+    LOG.debug("Finished snapshotting, commencing waiting for rwcc");
 
+    // wait for all in-progress transactions to commit to HLog before
+    // we can start the flush. This prevents
+    // uncommitted transactions from being written into HFiles.
+    // We have to block before we start the flush, otherwise keys that
+    // were removed via a rollbackMemstore could be written to Hfiles.
+    rwcc.waitForRead(w);
+
+    status.setStatus("Flushing stores");
     LOG.debug("Finished snapshotting, commencing flushing stores");
 
     // Any failure from here on out will be catastrophic requiring server
@@ -1246,15 +1260,17 @@ public class HRegion implements HeapSize
     try {
       // A.  Flush memstore to all the HStores.
       // Keep running vector of all store files that includes both old and the
-      // just-made new flush store file.
+      // just-made new flush store file. The new flushed file is still in the
+      // tmp directory.
 
       for (StoreFlusher flusher : storeFlushers) {
         flusher.flushCache(status);
       }
+
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
       for (StoreFlusher flusher : storeFlushers) {
-        boolean needsCompaction = flusher.commit();
+        boolean needsCompaction = flusher.commit(status);
         if (needsCompaction) {
           compactionRequested = true;
         }
@@ -1483,11 +1499,12 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * This is used only by unit tests. Not required to be a public API.
    * @param familyMap map of family to edits for the given family.
    * @param writeToWAL
    * @throws IOException
    */
-  public void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
+  void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
       boolean writeToWAL) throws IOException {
     Delete delete = new Delete();
     delete.setFamilyMap(familyMap);
@@ -1577,7 +1594,7 @@ public class HRegion implements HeapSize
       }
 
       // Now make changes to the memstore.
-      long addedSize = applyFamilyMapToMemstore(familyMap);
+      long addedSize = applyFamilyMapToMemstore(familyMap, null);
       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
 
       if (coprocessorHost != null) {
@@ -1745,8 +1762,9 @@ public class HRegion implements HeapSize
       }
     }
 
-    long now = EnvironmentEdgeManager.currentTimeMillis();
-    byte[] byteNow = Bytes.toBytes(now);
+    ReadWriteConsistencyControl.WriteEntry w = null;
+    long txid = 0;
+    boolean walSyncSuccessful = false;
     boolean locked = false;
 
     /** Keep track of the locks we hold so we can release them in finally clause */
@@ -1805,6 +1823,12 @@ public class HRegion implements HeapSize
         lastIndexExclusive++;
         numReadyToWrite++;
       }
+
+      // we should record the timestamp only after we have acquired the rowLock,
+      // otherwise, newer puts are not guaranteed to have a newer timestamp
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      byte[] byteNow = Bytes.toBytes(now);
+
       // Nothing to put -- an exception in the above such as NoSuchColumnFamily?
       if (numReadyToWrite <= 0) return 0L;
 
@@ -1823,46 +1847,89 @@ public class HRegion implements HeapSize
             byteNow);
       }
 
-
       this.updatesLock.readLock().lock();
       locked = true;
 
+      //
+      // ------------------------------------
+      // Acquire the latest rwcc number
+      // ----------------------------------
+      w = rwcc.beginMemstoreInsert();
+
       // ------------------------------------
-      // STEP 3. Write to WAL
+      // STEP 3. Write back to memstore
+      // Write to memstore. It is ok to write to memstore
+      // first without updating the HLog because we do not roll
+      // forward the memstore RWCC. The RWCC will be moved up when
+      // the complete operation is done. These changes are not yet
+      // visible to scanners till we update the RWCC. The RWCC is
+      // moved only when the sync is complete.
       // ----------------------------------
+      long addedSize = 0;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
-        // Skip puts that were determined to be invalid during preprocessing
         if (batchOp.retCodeDetails[i].getOperationStatusCode()
             != OperationStatusCode.NOT_RUN) {
           continue;
         }
-
-        Put p = batchOp.operations[i].getFirst();
-        if (!p.getWriteToWAL()) continue;
-        addFamilyMapToWALEdit(familyMaps[i], walEdit);
+        addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
       }
 
-      // Append the edit to WAL
-      Put first = batchOp.operations[firstIndex].getFirst();
-      this.log.append(regionInfo, this.htableDescriptor.getName(),
-          walEdit, first.getClusterId(), now, this.htableDescriptor);
-
       // ------------------------------------
-      // STEP 4. Write back to memstore
+      // STEP 4. Build WAL edit
       // ----------------------------------
-      long addedSize = 0;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
+        // Skip puts that were determined to be invalid during preprocessing
         if (batchOp.retCodeDetails[i].getOperationStatusCode()
             != OperationStatusCode.NOT_RUN) {
           continue;
         }
-        addedSize += applyFamilyMapToMemstore(familyMaps[i]);
-        batchOp.retCodeDetails[i] = new OperationStatus(
-            OperationStatusCode.SUCCESS);
+        batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS);
+
+        Put p = batchOp.operations[i].getFirst();
+        if (!p.getWriteToWAL()) continue;
+        addFamilyMapToWALEdit(familyMaps[i], walEdit);
+      }
+
+      // -------------------------
+      // STEP 5. Append the edit to WAL. Do not sync wal.
+      // -------------------------
+      Put first = batchOp.operations[firstIndex].getFirst();
+      txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
+               walEdit, first.getClusterId(), now, this.htableDescriptor);
+
+      // -------------------------------
+      // STEP 6. Release row locks, etc.
+      // -------------------------------
+      if (locked) {
+        this.updatesLock.readLock().unlock();
+        locked = false;
+      }
+      if (acquiredLocks != null) {
+        for (Integer toRelease : acquiredLocks) {
+          releaseRowLock(toRelease);
+        }
+        acquiredLocks = null;
+      }
+      // -------------------------
+      // STEP 7. Sync wal.
+      // -------------------------
+      if (walEdit.size() > 0 &&
+          (this.regionInfo.isMetaRegion() || 
+           !this.htableDescriptor.isDeferredLogFlush())) {
+        this.log.sync(txid);
+      }
+      walSyncSuccessful = true;
+      // ------------------------------------------------------------------
+      // STEP 8. Advance rwcc. This will make this put visible to scanners and getters.
+      // ------------------------------------------------------------------
+      if (w != null) {
+        rwcc.completeMemstoreInsert(w);
+        w = null;
       }
 
       // ------------------------------------
-      // STEP 5. Run coprocessor post hooks
+      // STEP 9. Run coprocessor post hooks. This should be done after the wal is
+      // sycned so that the coprocessor contract is adhered to.
       // ------------------------------------
       if (coprocessorHost != null) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -1879,11 +1946,21 @@ public class HRegion implements HeapSize
       success = true;
       return addedSize;
     } finally {
-      if (locked)
+
+      // if the wal sync was unsuccessful, remove keys from memstore
+      if (!walSyncSuccessful) {
+        rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
+      }
+      if (w != null) rwcc.completeMemstoreInsert(w);
+
+      if (locked) {
         this.updatesLock.readLock().unlock();
+      }
 
-      for (Integer toRelease : acquiredLocks) {
-        releaseRowLock(toRelease);
+      if (acquiredLocks != null) {
+        for (Integer toRelease : acquiredLocks) {
+          releaseRowLock(toRelease);
+        }
       }
       if (!success) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -2121,7 +2198,7 @@ public class HRegion implements HeapSize
             walEdit, clusterId, now, this.htableDescriptor);
       }
 
-      long addedSize = applyFamilyMapToMemstore(familyMap);
+      long addedSize = applyFamilyMapToMemstore(familyMap, null);
       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
     } finally {
       this.updatesLock.readLock().unlock();
@@ -2143,14 +2220,22 @@ public class HRegion implements HeapSize
    * should already have locked updatesLock.readLock(). This also does
    * <b>not</b> check the families for validity.
    *
+   * @param familyMap Map of kvs per family
+   * @param localizedWriteEntry The WriteEntry of the RWCC for this transaction.
+   *        If null, then this method internally creates a rwcc transaction.
    * @return the additional memory usage of the memstore caused by the
    * new entries.
    */
-  private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
-    ReadWriteConsistencyControl.WriteEntry w = null;
+  private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
+    ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) {
     long size = 0;
+    boolean freerwcc = false;
+
     try {
-      w = rwcc.beginMemstoreInsert();
+      if (localizedWriteEntry == null) {
+        localizedWriteEntry = rwcc.beginMemstoreInsert();
+        freerwcc = true;
+      }
 
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
         byte[] family = e.getKey();
@@ -2158,17 +2243,55 @@ public class HRegion implements HeapSize
 
         Store store = getStore(family);
         for (KeyValue kv: edits) {
-          kv.setMemstoreTS(w.getWriteNumber());
+          kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
           size += store.add(kv);
         }
       }
     } finally {
-      rwcc.completeMemstoreInsert(w);
+      if (freerwcc) {
+        rwcc.completeMemstoreInsert(localizedWriteEntry);
+      }
     }
     return size;
   }
 
   /**
+   * Remove all the keys listed in the map from the memstore. This method is
+   * called when a Put has updated memstore but subequently fails to update 
+   * the wal. This method is then invoked to rollback the memstore.
+   */
+  private void rollbackMemstore(BatchOperationInProgress<Pair<Put, Integer>> batchOp,
+                                Map<byte[], List<KeyValue>>[] familyMaps,
+                                int start, int end) {
+    int kvsRolledback = 0;
+    for (int i = start; i < end; i++) {
+      // skip over request that never succeeded in the first place.
+      if (batchOp.retCodeDetails[i].getOperationStatusCode()
+            != OperationStatusCode.SUCCESS) {
+        continue;
+      }
+
+      // Rollback all the kvs for this row. 
+      Map<byte[], List<KeyValue>> familyMap  = familyMaps[i]; 
+      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+        byte[] family = e.getKey();
+        List<KeyValue> edits = e.getValue();
+
+        // Remove those keys from the memstore that matches our 
+        // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
+        // that even the memstoreTS has to match for keys that will be rolleded-back.
+        Store store = getStore(family);
+        for (KeyValue kv: edits) {
+          store.rollback(kv);
+          kvsRolledback++;
+        }
+      }
+    }
+    LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
+        " keyvalues from start:" + start + " to end:" + end);
+  }
+
+  /**
    * Check the collection of families for validity.
    * @throws NoSuchColumnFamilyException if a family does not exist.
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java Fri Oct 28 21:49:38 2011
@@ -165,6 +165,10 @@ class KeyValueSkipListSet implements Nav
     throw new UnsupportedOperationException("Not implemented");
   }
 
+  public KeyValue get(KeyValue kv) {
+    return this.delegatee.get(kv);
+  }
+
   public int size() {
     return this.delegatee.size();
   }
@@ -176,4 +180,4 @@ class KeyValueSkipListSet implements Nav
   public <T> T[] toArray(T[] a) {
     throw new UnsupportedOperationException("Not implemented");
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Oct 28 21:49:38 2011
@@ -250,6 +250,38 @@ public class MemStore implements HeapSiz
   }
 
   /**
+   * Remove n key from the memstore. Only kvs that have the same key and the
+   * same memstoreTS are removed.  It is ok to not update timeRangeTracker 
+   * in this call. It is possible that we can optimize this method by using 
+   * tailMap/iterator, but since this method is called rarely (only for 
+   * error recovery), we can leave those optimization for the future.
+   * @param kv
+   */
+  void rollback(final KeyValue kv) {
+    this.lock.readLock().lock();
+    try {
+      // If the key is in the snapshot, delete it. We should not update
+      // this.size, because that tracks the size of only the memstore and
+      // not the snapshot. The flush of this snapshot to disk has not
+      // yet started because Store.flush() waits for all rwcc transactions to
+      // commit before starting the flush to disk.
+      KeyValue found = this.snapshot.get(kv);
+      if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+        this.snapshot.remove(kv);
+      }
+      // If the key is in the memstore, delete it. Update this.size.
+      found = this.kvset.get(kv);
+      if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+        this.kvset.remove(kv);
+        long s = heapSizeChange(kv, true);
+        this.size.addAndGet(-s);
+      }
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /**
    * Write a delete
    * @param delete
    * @return approximate size of the passed key and value.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Fri Oct 28 21:49:38 2011
@@ -87,6 +87,11 @@ public class ReadWriteConsistencyControl
   }
 
   public void completeMemstoreInsert(WriteEntry e) {
+    advanceMemstore(e);
+    waitForRead(e);
+  }
+
+  boolean advanceMemstore(WriteEntry e) {
     synchronized (writeQueue) {
       e.markCompleted();
 
@@ -120,10 +125,19 @@ public class ReadWriteConsistencyControl
           memstoreRead = nextReadValue;
           readWaiters.notifyAll();
         }
-
       }
+      if (memstoreRead >= e.getWriteNumber()) {
+        return true;
+      }
+      return false;
     }
+  }
 
+  /**
+   * Wait for the global readPoint to advance upto
+   * the specified transaction number.
+   */
+  public void waitForRead(WriteEntry e) {
     boolean interrupted = false;
     synchronized (readWaiters) {
       while (memstoreRead < e.getWriteNumber()) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Oct 28 21:49:38 2011
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -322,6 +323,22 @@ public class Store implements HeapSize {
   }
 
   /**
+   * Removes a kv from the memstore. The KeyValue is removed only
+   * if its key & memstoreTS matches the key & memstoreTS value of the 
+   * kv parameter.
+   *
+   * @param kv
+   */
+  protected void rollback(final KeyValue kv) {
+    lock.readLock().lock();
+    try {
+      this.memstore.rollback(kv);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
    * @return All store files.
    */
   List<StoreFile> getStorefiles() {
@@ -447,34 +464,41 @@ public class Store implements HeapSize {
    * @param logCacheFlushId flush sequence number
    * @param snapshot
    * @param snapshotTimeRangeTracker
-   * @return true if a compaction is needed
+   * @param flushedSize The number of bytes flushed
+   * @param status
+   * @return Path The path name of the tmp file to which the store was flushed
    * @throws IOException
    */
-  private StoreFile flushCache(final long logCacheFlushId,
+  private Path flushCache(final long logCacheFlushId,
       SortedSet<KeyValue> snapshot,
       TimeRangeTracker snapshotTimeRangeTracker,
+      AtomicLong flushedSize,
       MonitoredTask status) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
     return internalFlushCache(
-        snapshot, logCacheFlushId, snapshotTimeRangeTracker, status);
+        snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
   }
 
   /*
    * @param cache
    * @param logCacheFlushId
-   * @return StoreFile created.
+   * @param snapshotTimeRangeTracker
+   * @param flushedSize The number of bytes flushed
+   * @return Path The path name of the tmp file to which the store was flushed
    * @throws IOException
    */
-  private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
+  private Path internalFlushCache(final SortedSet<KeyValue> set,
       final long logCacheFlushId,
       TimeRangeTracker snapshotTimeRangeTracker,
+      AtomicLong flushedSize,
       MonitoredTask status)
       throws IOException {
     StoreFile.Writer writer;
     String fileName;
     long flushed = 0;
+    Path pathName;
     // Don't flush if there are no entries.
     if (set.size() == 0) {
       return null;
@@ -496,7 +520,7 @@ public class Store implements HeapSize {
         // A. Write the map out to the disk
         writer = createWriterInTmp(set.size());
         writer.setTimeRangeTracker(snapshotTimeRangeTracker);
-        fileName = writer.getPath().getName();
+        pathName = writer.getPath();
         try {
           List<KeyValue> kvs = new ArrayList<KeyValue>();
           boolean hasMore;
@@ -520,17 +544,39 @@ public class Store implements HeapSize {
         }
       }
     } finally {
+      flushedSize.set(flushed);
       scanner.close();
     }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Flushed " + 
+               ", sequenceid=" + logCacheFlushId +
+               ", memsize=" + StringUtils.humanReadableInt(flushed) +
+               ", into tmp file " + pathName);
+    }
+    return pathName;
+  }
 
+  /*
+   * @param path The pathname of the tmp file into which the store was flushed
+   * @param logCacheFlushId
+   * @return StoreFile created.
+   * @throws IOException
+   */
+  private StoreFile commitFile(final Path path,
+      final long logCacheFlushId,
+      TimeRangeTracker snapshotTimeRangeTracker,
+      AtomicLong flushedSize,
+      MonitoredTask status)
+      throws IOException {
     // Write-out finished successfully, move into the right spot
+    String fileName = path.getName();
     Path dstPath = new Path(homedir, fileName);
-    validateStoreFile(writer.getPath());
-    String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
+    validateStoreFile(path);
+    String msg = "Renaming flushed file at " + path + " to " + dstPath;
     LOG.info(msg);
     status.setStatus("Flushing " + this + ": " + msg);
-    if (!fs.rename(writer.getPath(), dstPath)) {
-      LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
+    if (!fs.rename(path, dstPath)) {
+      LOG.warn("Unable to rename " + path + " to " + dstPath);
     }
 
     status.setStatus("Flushing " + this + ": reopening flushed file");
@@ -546,11 +592,10 @@ public class Store implements HeapSize {
     // HRegion.internalFlushcache, which indirectly calls this to actually do
     // the flushing through the StoreFlusherImpl class
     HRegion.incrNumericPersistentMetric("cf." + this.toString() + ".flushSize",
-        flushed);
+        flushedSize.longValue());
     if(LOG.isInfoEnabled()) {
       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
         ", sequenceid=" + logCacheFlushId +
-        ", memsize=" + StringUtils.humanReadableInt(flushed) +
         ", filesize=" + StringUtils.humanReadableInt(r.length()));
     }
     return sf;
@@ -1815,10 +1860,13 @@ public class Store implements HeapSize {
     private long cacheFlushId;
     private SortedSet<KeyValue> snapshot;
     private StoreFile storeFile;
+    private Path storeFilePath;
     private TimeRangeTracker snapshotTimeRangeTracker;
+    private AtomicLong flushedSize;
 
     private StoreFlusherImpl(long cacheFlushId) {
       this.cacheFlushId = cacheFlushId;
+      this.flushedSize = new AtomicLong();
     }
 
     @Override
@@ -1830,15 +1878,17 @@ public class Store implements HeapSize {
 
     @Override
     public void flushCache(MonitoredTask status) throws IOException {
-      storeFile = Store.this.flushCache(
-          cacheFlushId, snapshot, snapshotTimeRangeTracker, status);
+      storeFilePath = Store.this.flushCache(
+        cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status);
     }
 
     @Override
-    public boolean commit() throws IOException {
-      if (storeFile == null) {
+    public boolean commit(MonitoredTask status) throws IOException {
+      if (storeFilePath == null) {
         return false;
       }
+      storeFile = Store.this.commitFile(storeFilePath, cacheFlushId,
+                               snapshotTimeRangeTracker, flushedSize, status);
       // Add new file to store files.  Clear snapshot too while we have
       // the Store write lock.
       return Store.this.updateStorefiles(storeFile, snapshot);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Fri Oct 28 21:49:38 2011
@@ -60,5 +60,5 @@ interface StoreFlusher {
    * @return
    * @throws IOException
    */
-  boolean commit() throws IOException;
+  boolean commit(MonitoredTask status) throws IOException;
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Oct 28 21:49:38 2011
@@ -1230,7 +1230,7 @@ public class HLog implements Syncable {
         logSyncerThread.hlogFlush(this.writer);
         this.writer.sync();
         syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
-        this.syncedTillHere = doneUpto;
+        this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
       } catch(IOException io) {
         syncSuccessful = false;
       }

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java?rev=1190606&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java Fri Oct 28 21:49:38 2011
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.NullComparator;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Testing of multiPut in parallel.
+ *
+ */
+public class TestParallelPut extends HBaseTestCase {
+  static final Log LOG = LogFactory.getLog(TestParallelPut.class);
+
+  private static HRegion region = null;
+  private static HBaseTestingUtility hbtu = new HBaseTestingUtility();
+  private static final String DIR = hbtu.getDataTestDir() + "/TestParallelPut/";
+
+  // Test names
+  static final byte[] tableName = Bytes.toBytes("testtable");;
+  static final byte[] qual1 = Bytes.toBytes("qual1");
+  static final byte[] qual2 = Bytes.toBytes("qual2");
+  static final byte[] qual3 = Bytes.toBytes("qual3");
+  static final byte[] value1 = Bytes.toBytes("value1");
+  static final byte[] value2 = Bytes.toBytes("value2");
+  static final byte [] row = Bytes.toBytes("rowA");
+  static final byte [] row2 = Bytes.toBytes("rowB");
+
+  /**
+   * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    EnvironmentEdgeManagerTestHelper.reset();
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // New tests that don't spin up a mini cluster but rather just test the
+  // individual code pieces in the HRegion. 
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Test one put command.
+   */
+  public void testPut() throws IOException {
+    LOG.info("Starting testPut");
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 1L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
+
+    assertGet(row, fam1, qual1, Bytes.toBytes(value));
+  }
+
+  /**
+   * Test multi-threaded Puts.
+   */
+  public void testParallelPuts() throws IOException {
+
+    LOG.info("Starting testParallelPuts");
+    initHRegion(tableName, getName(), fam1);
+    int numOps = 1000; // these many operations per thread
+
+    // create 100 threads, each will do its own puts
+    int numThreads = 100;
+    Putter[] all = new Putter[numThreads];
+
+    // create all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i] = new Putter(region, i, numOps);
+    }
+
+    // run all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i].start();
+    }
+
+    // wait for all threads to finish
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        all[i].join();
+      } catch (InterruptedException e) {
+        LOG.warn("testParallelPuts encountered InterruptedException." +
+                 " Ignoring....", e);
+      }
+    }
+    LOG.info("testParallelPuts successfully verified " + 
+             (numOps * numThreads) + " put operations.");
+  }
+
+
+  static private void assertGet(byte [] row,
+                         byte [] familiy,
+                         byte[] qualifier,
+                         byte[] value) throws IOException {
+    // run a get and see if the value matches
+    Get get = new Get(row);
+    get.addColumn(familiy, qualifier);
+    Result result = region.get(get, null);
+    assertEquals(1, result.size());
+
+    KeyValue kv = result.raw()[0];
+    byte[] r = kv.getValue();
+    assertTrue(Bytes.compareTo(r, value) == 0);
+  }
+
+  private void initHRegion(byte [] tableName, String callingMethod,
+    byte[] ... families)
+  throws IOException {
+    initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
+  }
+
+  private void initHRegion(byte [] tableName, String callingMethod,
+    Configuration conf, byte [] ... families)
+  throws IOException{
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for(byte [] family : families) {
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
+    Path path = new Path(DIR + callingMethod);
+    if (fs.exists(path)) {
+      if (!fs.delete(path, true)) {
+        throw new IOException("Failed delete of " + path);
+      }
+    }
+    region = HRegion.createHRegion(info, path, conf, htd);
+  }
+
+  /**
+   * A thread that makes a few put calls
+   */
+  public static class Putter extends Thread {
+
+    private final HRegion region;
+    private final int threadNumber;
+    private final int numOps;
+    private final Random rand = new Random();
+    byte [] rowkey = null;
+
+    public Putter(HRegion region, int threadNumber, int numOps) {
+      this.region = region;
+      this.threadNumber = threadNumber;
+      this.numOps = numOps;
+      this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      byte[] value = new byte[100];
+      Put[]  in = new Put[1];
+
+      // iterate for the specified number of operations
+      for (int i=0; i<numOps; i++) {
+        // generate random bytes
+        rand.nextBytes(value);  
+
+        // put the randombytes and verify that we can read it. This is one
+        // way of ensuring that rwcc manipulation in HRegion.put() is fine.
+        Put put = new Put(rowkey);
+        put.add(fam1, qual1, value);
+        in[0] = put;
+        try {
+          OperationStatus[] ret = region.put(in);
+          assertEquals(1, ret.length);
+          assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
+          assertGet(rowkey, fam1, qual1, value);
+        } catch (IOException e) {
+          assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
+                     false);
+        }
+      }
+    }
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Fri Oct 28 21:49:38 2011
@@ -618,7 +618,7 @@ public class TestStore extends TestCase 
     StoreFlusher storeFlusher = store.getStoreFlusher(id);
     storeFlusher.prepare();
     storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
-    storeFlusher.commit();
+    storeFlusher.commit(Mockito.mock(MonitoredTask.class));
   }
 
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1190606&r1=1190605&r2=1190606&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Oct 28 21:49:38 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.MiniHBase
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -250,6 +251,17 @@ public class TestLogRolling  {
     }
   }
 
+  void validateData(HTable table, int rownum) throws IOException {
+    String row = "row" + String.format("%1$04d", rownum);
+    Get get = new Get(Bytes.toBytes(row));
+    get.addFamily(HConstants.CATALOG_FAMILY);
+    Result result = table.get(get);
+    assertTrue(result.size() == 1);
+    assertTrue(Bytes.equals(value,
+                result.getValue(HConstants.CATALOG_FAMILY, null)));
+    LOG.info("Validated row " + row);
+  }
+
   void batchWriteAndWait(HTable table, int start, boolean expect, int timeout)
       throws IOException {
     for (int i = 0; i < 10; i++) {
@@ -462,6 +474,7 @@ public class TestLogRolling  {
     Thread.sleep(1000);
     dfsCluster.waitActive();
     LOG.info("Data Nodes restarted");
+    validateData(table, 1002);
 
     // this write should succeed, but trigger a log roll
     writeData(table, 1003);
@@ -469,6 +482,7 @@ public class TestLogRolling  {
 
     assertTrue("Missing datanode should've triggered a log roll",
         newFilenum > oldFilenum && newFilenum > curTime);
+    validateData(table, 1003);
 
     writeData(table, 1004);
 
@@ -477,6 +491,7 @@ public class TestLogRolling  {
     Thread.sleep(1000);
     dfsCluster.waitActive();
     LOG.info("Data Nodes restarted");
+    validateData(table, 1004);
 
     // this write should succeed, but trigger a log roll
     writeData(table, 1005);