You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/04/19 21:05:29 UTC

svn commit: r935708 [1/2] - in /hadoop/hbase/branches/0.20: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop...

Author: rawson
Date: Mon Apr 19 19:05:29 2010
New Revision: 935708

URL: http://svn.apache.org/viewvc?rev=935708&view=rev
Log:
HBASE-2248  Provide new non-copy mechanism to assure atomic reads in get and scan


Added:
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
Removed:
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Mon Apr 19 19:05:29 2010
@@ -10,6 +10,8 @@ Release 0.20.4 - Unreleased
    HBASE-2392  Upgrade to ZooKeeper 3.3.0
    HBASE-2165  Improve fragmentation display and implementation
    HBASE-2448  Remove 'indexed' contrib
+   HBASE-2248  Provide new non-copy mechanism to assure atomic reads in 
+   	       get and scan
 
   BUG FIXES
    HBASE-2173  New idx javadoc not included with the rest

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/KeyValue.java Mon Apr 19 19:05:29 2010
@@ -200,6 +200,23 @@ public class KeyValue implements Writabl
   private int offset = 0;
   private int length = 0;
 
+  /** Here be dragons **/
+
+  // used to achieve atomic operations in the memstore.
+  public long getMemstoreTS() {
+    return memstoreTS;
+  }
+
+  public void setMemstoreTS(long memstoreTS) {
+    this.memstoreTS = memstoreTS;
+  }
+
+  // default value is 0, aka DNC
+  private long memstoreTS = 0;
+
+  /** Dragon time over, return to normal business */
+
+  
   /** Writable Constructor -- DO NOT USE */
   public KeyValue() {}
 
@@ -1503,6 +1520,21 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Creates a KeyValue that is last on the specified row id. That is,
+   * every other possible KeyValue for the given row would compareTo()
+   * less than the result of this call.
+   * @param row row key
+   * @return Last possible KeyValue on passed <code>row</code>
+   */
+  public static KeyValue createLastOnRow(final byte[] row) {
+    return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
+  }
+
+  /**
+   * Create a KeyValue that is smaller than all other possible KeyValues
+   * for the given row. That is any (valid) KeyValue on 'row' would sort
+   * _after_ the result.
+   * 
    * @param row - row key (arbitrary byte array)
    * @return First possible KeyValue on passed <code>row</code>
    */
@@ -1511,6 +1543,8 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Creates a KeyValue that is smaller than all other KeyValues that
+   * are older than the passed timestamp.
    * @param row - row key (arbitrary byte array)
    * @param ts - timestamp
    * @return First possible key on passed <code>row</code> and timestamp.
@@ -1522,8 +1556,11 @@ public class KeyValue implements Writabl
 
   /**
    * @param row - row key (arbitrary byte array)
+   * @param c column - {@link #parseColumn(byte[])} is called to split
+   * the column.
    * @param ts - timestamp
    * @return First possible key on passed <code>row</code>, column and timestamp
+   * @deprecated
    */
   public static KeyValue createFirstOnRow(final byte [] row, final byte [] c,
       final long ts) {
@@ -1532,14 +1569,17 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Create a KeyValue for the specified row, family and qualifier that would be
+   * smaller than all other possible KeyValues that have the same row,family,qualifier.
+   * Used for seeking.
    * @param row - row key (arbitrary byte array)
-   * @param f - family name
-   * @param q - column qualifier
+   * @param family - family name
+   * @param qualifier - column qualifier
    * @return First possible key on passed <code>row</code>, and column.
    */
-  public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
-      final byte [] q) {
-    return new KeyValue(row, f, q, HConstants.LATEST_TIMESTAMP, Type.Maximum);
+  public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
+      final byte [] qualifier) {
+    return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
   }
 
   /**
@@ -1706,9 +1746,6 @@ public class KeyValue implements Writabl
         return compare;
       }
 
-      // if row matches, and no column in the 'left' AND put type is 'minimum',
-      // then return that left is larger than right.
-
       // Compare column family.  Start compare past row and family length.
       int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset;
       int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset;
@@ -1717,17 +1754,25 @@ public class KeyValue implements Writabl
       int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
         (rcolumnoffset - roffset);
 
+      // if row matches, and no column in the 'left' AND put type is 'minimum',
+      // then return that left is larger than right.
+      
       // This supports 'last key on a row' - the magic is if there is no column in the
       // left operand, and the left operand has a type of '0' - magical value,
       // then we say the left is bigger.  This will let us seek to the last key in
       // a row.
 
       byte ltype = left[loffset + (llength - 1)];
+      byte rtype = right[roffset + (rlength - 1)];
 
       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
         return 1; // left is bigger.
       }
+      if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
+        return -1;
+      }
 
+      // TODO the family and qualifier should be compared separately
       compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right,
           rcolumnoffset, rcolumnlength);
       if (compare != 0) {
@@ -1749,9 +1794,6 @@ public class KeyValue implements Writabl
       if (!this.ignoreType) {
         // Compare types. Let the delete types sort ahead of puts; i.e. types
         // of higher numbers sort before those of lesser numbers
-
-        // ltype is defined above
-        byte rtype = right[roffset + (rlength - 1)];
         return (0xff & rtype) - (0xff & ltype);
       }
       return 0;
@@ -1791,7 +1833,8 @@ public class KeyValue implements Writabl
   public long heapSize() {
     return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE + 
         ClassSize.align(ClassSize.ARRAY + length) + 
-        (2 * Bytes.SIZEOF_INT));
+        (2 * Bytes.SIZEOF_INT) +
+        Bytes.SIZEOF_LONG);
   }
   
   // this overload assumes that the length bytes have already been read,

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/HTable.java Mon Apr 19 19:05:29 2010
@@ -660,6 +660,7 @@ public class HTable {
   */
   public void close() throws IOException{
     flushCommits();
+    this.pool.shutdownNow();
   }
   
   /**

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/client/Scan.java Mon Apr 19 19:05:29 2010
@@ -168,10 +168,29 @@ public class Scan implements Writable {
   }
 
   /**
+   * Builds a scan object with the same specs as get.
+   * @param get get to model scan after
+   */
+  public Scan(Get get) {
+    this.startRow = get.getRow();
+    this.stopRow = get.getRow();
+    this.filter = get.getFilter();
+    this.maxVersions = get.getMaxVersions();
+    this.tr = get.getTimeRange();
+    this.familyMap = get.getFamilyMap();
+  }
+
+  public boolean isGetScan() {
+    return this.startRow != null && this.startRow.length > 0 &&
+      Bytes.equals(this.startRow, this.stopRow);
+  }
+
+  /**
    * Get all columns from the specified family.
    * <p>
    * Overrides previous calls to addColumn for this family.
    * @param family family name
+   * @return this
    */
   public Scan addFamily(byte [] family) {
     familyMap.remove(family);
@@ -185,6 +204,7 @@ public class Scan implements Writable {
    * Overrides previous calls to addFamily for this family.
    * @param family family name
    * @param qualifier column qualifier
+   * @return this
    */
   public Scan addColumn(byte [] family, byte [] qualifier) {
     NavigableSet<byte []> set = familyMap.get(family);

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Mon Apr 19 19:05:29 2010
@@ -178,9 +178,6 @@ public class SingleColumnValueFilter imp
     // byte array copy?
     int compareResult =
       this.comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length));
-    }
     switch (this.compareOp) {
     case LESS:
       return compareResult <= 0;

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java?rev=935708&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java Mon Apr 19 19:05:29 2010
@@ -0,0 +1,50 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DebugPrint {
+
+private static final AtomicBoolean enabled = new AtomicBoolean(false);
+  private static final Object sync = new Object();
+  public static StringBuilder out = new StringBuilder();
+
+  static public void enable() {
+    enabled.set(true);
+  }
+  static public void disable() {
+    enabled.set(false);
+  }
+
+  static public void reset() {
+    synchronized (sync) {
+      enable(); // someone wants us enabled basically.
+
+      out = new StringBuilder();
+    }
+  }
+  static public void dumpToFile(String file) throws IOException {
+    FileWriter f = new FileWriter(file);
+    synchronized (sync) {
+      f.write(out.toString());
+    }
+    f.close();
+  }
+
+  public static void println(String m) {
+    if (!enabled.get()) {
+      System.out.println(m);
+      return;
+    }
+
+    synchronized (sync) {
+      String threadName = Thread.currentThread().getName();
+      out.append("<");
+      out.append(threadName);
+      out.append("> ");
+      out.append(m);
+      out.append("\n");
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Apr 19 19:05:29 2010
@@ -1,5 +1,5 @@
  /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -67,10 +67,11 @@ package org.apache.hadoop.hbase.regionse
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Random;
+ import java.util.concurrent.Callable;
  import java.util.concurrent.ConcurrentSkipListMap;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
  /**
  * HRegion stores data for a certain region of a table.  It stores all columns
@@ -176,6 +177,8 @@ public class HRegion implements HConstan
 
     /**
      * Set flags that make this region read-only.
+     *
+     * @param onOff flip value for region r/o setting
      */
     synchronized void setReadOnly(final boolean onOff) {
       this.writesEnabled = !onOff;
@@ -191,7 +194,7 @@ public class HRegion implements HConstan
     }
   }
 
-  private volatile WriteState writestate = new WriteState();
+  private final WriteState writestate = new WriteState();
 
   final long memstoreFlushSize;
   private volatile long lastFlushTime;
@@ -210,7 +213,10 @@ public class HRegion implements HConstan
   private final Object splitLock = new Object();
   private long minSequenceId;
   private boolean splitRequest;
-  
+
+  private final ReadWriteConsistencyControl rwcc =
+      new ReadWriteConsistencyControl();
+
   /**
    * Name of the region info file that resides just under the region directory.
    */
@@ -296,9 +302,9 @@ public class HRegion implements HConstan
    * Initialize this region and get it ready to roll.
    * Called after construction.
    * 
-   * @param initialFiles
-   * @param reporter
-   * @throws IOException
+   * @param initialFiles path
+   * @param reporter progressable
+   * @throws IOException e
    */
   public void initialize(Path initialFiles, final Progressable reporter)
   throws IOException {
@@ -436,6 +442,10 @@ public class HRegion implements HConstan
     return this.closing.get();
   }
 
+   public ReadWriteConsistencyControl getRWCC() {
+     return rwcc;
+   }
+   
   /**
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't 
    * service any more calls.
@@ -447,7 +457,7 @@ public class HRegion implements HConstan
    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
    * vector if already closed and null if judged that it should not close.
    * 
-   * @throws IOException
+   * @throws IOException e
    */
   public List<StoreFile> close() throws IOException {
     return close(false);
@@ -465,7 +475,7 @@ public class HRegion implements HConstan
    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
    * we are not to close at this time or we are already closed.
    * 
-   * @throws IOException
+   * @throws IOException e
    */
   public List<StoreFile> close(final boolean abort) throws IOException {
     if (isClosed()) {
@@ -597,6 +607,7 @@ public class HRegion implements HConstan
   }
 
   /** @return the last time the region was flushed */
+  @SuppressWarnings({"UnusedDeclaration"})
   public long getLastFlushTime() {
     return this.lastFlushTime;
   }
@@ -698,8 +709,7 @@ public class HRegion implements HConstan
         HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
       moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
 
-      HRegion regions[] = new HRegion [] {regionA, regionB};
-      return regions;
+      return new HRegion [] {regionA, regionB};
     }
   }
 
@@ -773,7 +783,7 @@ public class HRegion implements HConstan
    * server does them sequentially and not in parallel.
    * 
    * @return mid key if split is needed
-   * @throws IOException
+   * @throws IOException e
    */
   public byte [] compactStores() throws IOException {
     boolean majorCompaction = this.forceMajorCompaction;
@@ -794,7 +804,7 @@ public class HRegion implements HConstan
    * 
    * @param majorCompaction True to force a major compaction regardless of thresholds
    * @return split row if split is needed
-   * @throws IOException
+   * @throws IOException e
    */
   byte [] compactStores(final boolean majorCompaction)
   throws IOException {
@@ -863,7 +873,7 @@ public class HRegion implements HConstan
    * 
    * @return true if cache was flushed
    * 
-   * @throws IOException
+   * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
@@ -929,7 +939,7 @@ public class HRegion implements HConstan
    * 
    * @return true if the region needs compacting
    * 
-   * @throws IOException
+   * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
@@ -957,18 +967,28 @@ public class HRegion implements HConstan
     // during the flush
     long sequenceId = -1L;
     long completeSequenceId = -1L;
+
+    // we have to take a write lock during snapshot, or else a write could
+    // end up in both snapshot and memstore (makes it difficult to do atomic
+    // rows then)
     this.updatesLock.writeLock().lock();
-    // Get current size of memstores.
     final long currentMemStoreSize = this.memstoreSize.get();
-    List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>();
+    List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
     try {
       sequenceId = log.startCacheFlush();
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
-      // create the store flushers
+
       for (Store s : stores.values()) {
         storeFlushers.add(s.getStoreFlusher(completeSequenceId));
       }
 
+      // This thread is going to cause a whole bunch of scanners to reseek.
+      // They are depending
+      // on a thread-local to know where to read from.
+      // The reason why we set it up high is so that each HRegionScanner only
+      // has a single read point for all its sub-StoreScanners.
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
       // prepare flush (take a snapshot)
       for (StoreFlusher flusher: storeFlushers) {
          flusher.prepare();
@@ -977,6 +997,8 @@ public class HRegion implements HConstan
       this.updatesLock.writeLock().unlock();
     }
 
+    LOG.debug("Finished snapshotting, commencing flushing stores");
+
     // Any failure from here on out will be catastrophic requiring server
     // restart so hlog content can be replayed and put back into the memstore.
     // Otherwise, the snapshot content while backed up in the hlog, it will not
@@ -990,13 +1012,28 @@ public class HRegion implements HConstan
         flusher.flushCache();
       }
 
-      internalPreFlashcacheCommit();
+      Callable<Void> atomicWork = internalPreFlushcacheCommit();
+
+      LOG.debug("Caches flushed, doing commit now (which includes update scanners)");
 
       /**
-       * Switch between memstore and the new store file
+       * Switch between memstore(snapshot) and the new store file
        */
-      this.newScannerLock.writeLock().lock();
+      if (atomicWork != null) {
+        LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock");
+        newScannerLock.writeLock().lock();
+      }
+
       try {
+        // update this again to make sure we are 'fresh'
+        ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+        if (atomicWork != null) {
+          atomicWork.call();
+        }
+
+        // Switch snapshot (in memstore) -> new hfile (thus causing
+        // all the store scanners to reset/reseek).
         for (StoreFlusher flusher : storeFlushers) {
           boolean needsCompaction = flusher.commit();
           if (needsCompaction) {
@@ -1004,10 +1041,11 @@ public class HRegion implements HConstan
           }
         }
       } finally {
-        this.newScannerLock.writeLock().unlock();
+        if (atomicWork != null) {
+          newScannerLock.writeLock().unlock();
+        }
       }
 
-      // clear the stireFlushers list
       storeFlushers.clear();
       // Set down the memstore size by amount of flush.
       this.memstoreSize.addAndGet(-currentMemStoreSize);
@@ -1057,9 +1095,14 @@ public class HRegion implements HConstan
     * A hook for sub classed wishing to perform operations prior to the cache
     * flush commit stage.
     *
+    * If a subclass wishes that an atomic update of their work and the
+    * flush commit stage happens, they should return a callable. The new scanner
+    * lock will be acquired and released.
+
     * @throws java.io.IOException allow children to throw exception
     */
-   protected void internalPreFlashcacheCommit() throws IOException {
+   protected Callable<Void> internalPreFlushcacheCommit() throws IOException {
+     return null;
    }
 
    /**
@@ -1097,9 +1140,9 @@ public class HRegion implements HConstan
    * <i>ts</i>.
    * 
    * @param row row key
-   * @param family
+   * @param family column family to find on
    * @return map of values
-   * @throws IOException
+   * @throws IOException read exceptions
    */
   public Result getClosestRowBefore(final byte [] row, final byte [] family)
   throws IOException {
@@ -1116,11 +1159,9 @@ public class HRegion implements HConstan
       if (key == null) {
         return null;
       }
-      // This will get all results for this store.  TODO: Do we need to do this?
       Get get = new Get(key.getRow());
-      List<KeyValue> results = new ArrayList<KeyValue>();
-      store.get(get, null, results);
-      return new Result(results);
+      get.addFamily(family);
+      return get(get, null);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
@@ -1134,7 +1175,7 @@ public class HRegion implements HConstan
    *
    * @param scan configured {@link Scan}
    * @return InternalScanner
-   * @throws IOException
+   * @throws IOException read exceptions
    */
   public InternalScanner getScanner(Scan scan)
   throws IOException {
@@ -1172,24 +1213,23 @@ public class HRegion implements HConstan
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
   /**
-   * @param delete
-   * @param lockid
-   * @param writeToWAL
-   * @throws IOException
+   * @param delete delete object
+   * @param lockid existing lock id, or null for grab a lock
+   * @param writeToWAL append to the write ahead lock or not
+   * @throws IOException read exceptions
    */
   public void delete(Delete delete, Integer lockid, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
     checkResources();
     Integer lid = null;
-    newScannerLock.writeLock().lock();
     splitsAndClosesLock.readLock().lock();
     try {
       byte [] row = delete.getRow();
       // If we did not pass an existing row lock, obtain a new one
       lid = getLock(lockid, row);
 
-      //Check to see if this is a deleteRow insert
+      // Check to see if this is a deleteRow insert
       if(delete.getFamilyMap().isEmpty()){
         for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
           // Don't eat the timestamp
@@ -1210,7 +1250,6 @@ public class HRegion implements HConstan
     } finally {
       if(lockid == null) releaseRowLock(lid);
       splitsAndClosesLock.readLock().unlock();
-      newScannerLock.writeLock().unlock();
     }
   }
   
@@ -1225,7 +1264,9 @@ public class HRegion implements HConstan
     long now = System.currentTimeMillis();
     byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
-    this.updatesLock.readLock().lock();
+
+    updatesLock.readLock().lock();
+    ReadWriteConsistencyControl.WriteEntry w = null;
 
     try {
 
@@ -1242,21 +1283,21 @@ public class HRegion implements HConstan
           if (kv.isLatestTimestamp() && kv.isDeleteType()) {
             byte[] qual = kv.getQualifier();
             if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
             Integer count = kvCount.get(qual);
             if (count == null) {
-              kvCount.put(qual, new Integer(1));
+              kvCount.put(qual, 1);
             } else {
-              kvCount.put(qual, new Integer(count+1));
+              kvCount.put(qual, count + 1);
             }
             count = kvCount.get(qual);
 
-            List<KeyValue> result = new ArrayList<KeyValue>(1);
-            Get g = new Get(kv.getRow());
-            g.setMaxVersions(count);
-            NavigableSet<byte []> qualifiers =
-              new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-            qualifiers.add(qual);
-            get(store, g, qualifiers, result);
+            Get get = new Get(kv.getRow());
+            get.setMaxVersions(count);
+            get.addColumn(family, qual);
+
+            List<KeyValue> result = get(get);
+
             if (result.size() < count) {
               // Nothing to delete
               kv.updateLatestStamp(byteNow);
@@ -1301,11 +1342,11 @@ public class HRegion implements HConstan
         }
       }
 
+      // Now make changes to the memstore.
+
       long size = 0;
+      w = rwcc.beginMemstoreInsert();
 
-      //
-      // Now make changes to the memstore.
-      //
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
 
         byte[] family = e.getKey(); 
@@ -1313,13 +1354,17 @@ public class HRegion implements HConstan
         
         Store store = getStore(family);
         for (KeyValue kv: kvs) {
+          kv.setMemstoreTS(w.getWriteNumber());
           size = this.memstoreSize.addAndGet(store.delete(kv));
         }
       }
       flush = isFlushSize(size);
     } finally {
+      if (w != null) rwcc.completeMemstoreInsert(w);
+
       this.updatesLock.readLock().unlock();
     }
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -1367,8 +1412,8 @@ public class HRegion implements HConstan
     // read lock, resources may run out.  For now, the thought is that this
     // will be extremely rare; we'll deal with it when it happens.
     checkResources();
-    newScannerLock.writeLock().lock();
     splitsAndClosesLock.readLock().lock();
+
     try {
       // We obtain a per-row lock, so other clients will block while one client
       // performs an update. The read lock is released by the client calling
@@ -1378,6 +1423,7 @@ public class HRegion implements HConstan
       byte [] row = put.getRow();
       // If we did not pass an existing row lock, obtain a new one
       Integer lid = getLock(lockid, row);
+
       byte [] now = Bytes.toBytes(System.currentTimeMillis());
       try {
         // All edits for the given row (across all column families) must happen atomically.
@@ -1387,7 +1433,6 @@ public class HRegion implements HConstan
       }
     } finally {
       splitsAndClosesLock.readLock().unlock();
-      newScannerLock.writeLock().unlock();
     }
   }
 
@@ -1427,15 +1472,12 @@ public class HRegion implements HConstan
       Integer lid = getLock(lockId, get.getRow()); 
       List<KeyValue> result = new ArrayList<KeyValue>();
       try {
-        //Getting data
-        for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
-          get.getFamilyMap().entrySet()) {
-          get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
-        }
+        result = get(get);
+
         boolean matches = false;
         if (result.size() == 0 && expectedValue.length == 0) {
           matches = true;
-        } else if(result.size() == 1) {
+        } else if (result.size() == 1) {
           //Compare the expected value with the actual value
           byte [] actualValue = result.get(0).getValue();
           matches = Bytes.equals(expectedValue, actualValue);
@@ -1551,6 +1593,7 @@ public class HRegion implements HConstan
   /** 
    * Add updates first to the hlog and then add values to memstore.
    * Warning: Assumption is caller has lock on passed in row.
+   * @param family
    * @param edits Cell updates by column
    * @praram now
    * @throws IOException
@@ -1575,6 +1618,7 @@ public class HRegion implements HConstan
     byte[] byteNow = Bytes.toBytes(now);
     boolean flush = false;
     this.updatesLock.readLock().lock();
+    ReadWriteConsistencyControl.WriteEntry w = null;
     try {
 
       WALEdit walEdit = new WALEdit();
@@ -1618,6 +1662,8 @@ public class HRegion implements HConstan
       
       long size = 0;
 
+      w = rwcc.beginMemstoreInsert();
+
       // now make changes to the memstore
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
         byte[] family = e.getKey(); 
@@ -1625,11 +1671,14 @@ public class HRegion implements HConstan
 
         Store store = getStore(family);
         for (KeyValue kv: edits) {
+          kv.setMemstoreTS(w.getWriteNumber());
           size = this.memstoreSize.addAndGet(store.add(kv));
         }
       }
       flush = isFlushSize(size);
     } finally {
+      if (w != null) rwcc.completeMemstoreInsert(w);
+
       this.updatesLock.readLock().unlock();
     }
     if (flush) {
@@ -1871,9 +1920,13 @@ public class HRegion implements HConstan
     private Filter filter;
     private RowFilterInterface oldFilter;
     private List<KeyValue> results = new ArrayList<KeyValue>();
+    private int isScan;
     private int batch;
 
     RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+      //DebugPrint.println("HRegionScanner.<init>, threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint());
       this.filter = scan.getFilter();
       this.batch = scan.getBatch();
       this.oldFilter = scan.getOldFilter();
@@ -1882,12 +1935,13 @@ public class HRegion implements HConstan
       } else {
         this.stopRow = scan.getStopRow();
       }
+      this.isScan = scan.isGetScan() ? -1 : 0;
       
       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
       if (additionalScanners != null) {
         scanners.addAll(additionalScanners);
       }
-      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : 
+      for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
           scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
         scanners.add(store.getScanner(scan, entry.getValue()));
@@ -1910,6 +1964,9 @@ public class HRegion implements HConstan
       if (oldFilter != null) {
         oldFilter.reset();
       }
+
+      // Start the next row read and reset the thread point
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     }
 
     public boolean next(List<KeyValue> outResults, int limit) throws IOException {
@@ -1918,6 +1975,9 @@ public class HRegion implements HConstan
         throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing=" + closing.get() + " or closed=" + closed.get());
       }
+
+      // This could be a new thread from the last time we called next().
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
       results.clear();
       boolean returnResult = nextInternal(limit);
       if (!returnResult && filterRow()) {
@@ -1999,7 +2059,7 @@ public class HRegion implements HConstan
       return currentRow == null ||
         (this.stopRow != null &&
           comparator.compareRows(this.stopRow, 0, this.stopRow.length,
-            currentRow, 0, currentRow.length) <= 0);
+            currentRow, 0, currentRow.length) <= isScan);
     }
 
     private boolean filterRow() {
@@ -2519,10 +2579,10 @@ public class HRegion implements HConstan
   // HBASE-880
   //
   /**
-   * @param get
-   * @param lockid
+   * @param get get object
+   * @param lockid existing lock id, or null for no previous lock
    * @return result
-   * @throws IOException
+   * @throws IOException read exceptions
    */
   public Result get(final Get get, final Integer lockid) throws IOException {
     // Verify families are all valid
@@ -2535,24 +2595,28 @@ public class HRegion implements HConstan
         get.addFamily(family);
       }
     }
-    // Lock row
-    Integer lid = getLock(lockid, get.getRow()); 
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    try {
-      for (Map.Entry<byte[],NavigableSet<byte[]>> entry:
-          get.getFamilyMap().entrySet()) {
-        get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
+    List<KeyValue> result = get(get);
+
     return new Result(result);
   }
 
-  private void get(final Store store, final Get get,
-    final NavigableSet<byte []> qualifiers, List<KeyValue> result)
-  throws IOException {
-    store.get(get, qualifiers, result);
+  /*
+   * Do a get based on the get parameter.
+   */
+  private List<KeyValue> get(final Get get) throws IOException {
+    Scan scan = new Scan(get);
+
+    List<KeyValue> results = new ArrayList<KeyValue>();
+
+    InternalScanner scanner = null;
+    try {
+      scanner = getScanner(scan);
+      scanner.next(results);
+    } finally {
+      if (scanner != null)
+        scanner.close();
+    }
+    return results;
   }
 
   /**
@@ -2561,6 +2625,7 @@ public class HRegion implements HConstan
    * @param family
    * @param qualifier
    * @param amount
+   * @param writeToWAL
    * @return The new value.
    * @throws IOException
    */
@@ -2575,6 +2640,7 @@ public class HRegion implements HConstan
     try {
       Store store = stores.get(family);
 
+      // TODO call the proper GET API
       // Get the old value:
       Get get = new Get(row);
       get.addColumn(family, qualifier);
@@ -2640,7 +2706,7 @@ public class HRegion implements HConstan
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
       (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
-      (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+      (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
   
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + 

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java Mon Apr 19 19:05:29 2010
@@ -1,5 +1,5 @@
 /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.KeyValue;
+
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -28,8 +30,6 @@ import java.util.SortedSet;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
-import org.apache.hadoop.hbase.KeyValue;
-
 /**
  * A {@link java.util.Set} of {@link KeyValue}s implemented on top of a
  * {@link java.util.concurrent.ConcurrentSkipListMap}.  Works like a
@@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.KeyValue;
  * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent
  * get and set and won't throw ConcurrentModificationException when iterating.
  */
-class KeyValueSkipListSet implements NavigableSet<KeyValue>, Cloneable {
+class KeyValueSkipListSet implements NavigableSet<KeyValue> {
   private ConcurrentNavigableMap<KeyValue, KeyValue> delegatee;
 
   KeyValueSkipListSet(final KeyValue.KVComparator c) {
@@ -167,6 +167,7 @@ class KeyValueSkipListSet implements Nav
   }
 
   public boolean contains(Object o) {
+    //noinspection SuspiciousMethodCalls
     return this.delegatee.containsKey(o);
   }
 
@@ -201,17 +202,4 @@ class KeyValueSkipListSet implements Nav
   public <T> T[] toArray(T[] a) {
     throw new UnsupportedOperationException("Not implemented");
   }
-
-  @Override
-  public KeyValueSkipListSet clone()  {
-    assert this.delegatee.getClass() == ConcurrentSkipListMap.class;
-    KeyValueSkipListSet clonedSet = null;
-    try {
-      clonedSet = (KeyValueSkipListSet) super.clone();
-    } catch (CloneNotSupportedException e) {
-      throw new InternalError(e.getMessage());
-    }
-    clonedSet.delegatee = ((ConcurrentSkipListMap) this.delegatee).clone();
-    return clonedSet;
-  }
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Mon Apr 19 19:05:29 2010
@@ -28,7 +28,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.SortedSet;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -110,7 +112,7 @@ public class MemStore implements HeapSiz
 
   /**
    * Creates a snapshot of the current memstore.
-   * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)}
+   * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.SortedSet)}
    * To get the snapshot made by this method, use {@link #getSnapshot()}
    */
   void snapshot() {
@@ -140,7 +142,7 @@ public class MemStore implements HeapSiz
    * call to {@link #snapshot()}
    * @return Return snapshot.
    * @see {@link #snapshot()}
-   * @see {@link #clearSnapshot(java.util.Map)}
+   * @see {@link #clearSnapshot(java.util.SortedSet)}
    */
   KeyValueSkipListSet getSnapshot() {
     return this.snapshot;
@@ -187,7 +189,7 @@ public class MemStore implements HeapSiz
     return s;
   }
 
-  /**
+  /** 
    * Write a delete
    * @param delete
    * @return approximate size of the passed key and value.
@@ -195,69 +197,8 @@ public class MemStore implements HeapSiz
   long delete(final KeyValue delete) {
     long s = 0;
     this.lock.readLock().lock();
-    //Have to find out what we want to do here, to find the fastest way of
-    //removing things that are under a delete.
-    //Actions that will take place here are:
-    //1. Insert a delete and remove all the affected entries already in memstore
-    //2. In the case of a Delete and the matching put is found then don't insert
-    //   the delete
-    //TODO Would be nice with if we had an iterator for this, so we could remove
-    //things that needs to be removed while iterating and don't have to go
-    //back and do it afterwards
 
     try {
-      boolean notpresent = false;
-      List<KeyValue> deletes = new ArrayList<KeyValue>();
-      SortedSet<KeyValue> tail = this.kvset.tailSet(delete);
-
-      //Parse the delete, so that it is only done once
-      byte [] deleteBuffer = delete.getBuffer();
-      int deleteOffset = delete.getOffset();
-
-      int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
-      deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
-
-      short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
-      deleteOffset += Bytes.SIZEOF_SHORT;
-      int deleteRowOffset = deleteOffset;
-
-      deleteOffset += deleteRowLen;
-
-      byte deleteFamLen = deleteBuffer[deleteOffset];
-      deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
-
-      int deleteQualifierOffset = deleteOffset;
-      int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
-        Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
-        Bytes.SIZEOF_BYTE;
-
-      deleteOffset += deleteQualifierLen;
-
-      int deleteTimestampOffset = deleteOffset;
-      deleteOffset += Bytes.SIZEOF_LONG;
-      byte deleteType = deleteBuffer[deleteOffset];
-
-      //Comparing with tail from memstore
-      for (KeyValue kv : tail) {
-        DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
-            deleteRowOffset, deleteRowLen, deleteQualifierOffset,
-            deleteQualifierLen, deleteTimestampOffset, deleteType,
-            comparator.getRawComparator());
-        if (res == DeleteCode.DONE) {
-          break;
-        } else if (res == DeleteCode.DELETE) {
-          deletes.add(kv);
-        } // SKIP
-      }
-
-      //Delete all the entries effected by the last added delete
-      for (KeyValue kv : deletes) {
-        notpresent = this.kvset.remove(kv);
-        s -= heapSizeChange(kv, notpresent);
-      }
-
-      // Adding the delete to memstore. Add any value, as long as
-      // same instance each time.
       s += heapSizeChange(delete, this.kvset.add(delete));
     } finally {
       this.lock.readLock().unlock();
@@ -265,7 +206,7 @@ public class MemStore implements HeapSiz
     this.size.addAndGet(s);
     return s;
   }
-
+  
   /**
    * @param kv Find the row that comes after this one.  If null, we return the
    * first.
@@ -318,7 +259,7 @@ public class MemStore implements HeapSiz
   }
 
   /**
-   * @param state
+   * @param state column/delete tracking state
    */
   void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
     this.lock.readLock().lock();
@@ -442,8 +383,7 @@ public class MemStore implements HeapSiz
     this.lock.readLock().lock();
     try {
       KeyValueScanner [] scanners = new KeyValueScanner[1];
-      scanners[0] = new MemStoreScanner(this.kvset.clone(),
-        this.snapshot.clone(), this.comparator);
+      scanners[0] = new MemStoreScanner();
       return scanners;
     } finally {
       this.lock.readLock().unlock();
@@ -465,10 +405,8 @@ public class MemStore implements HeapSiz
    * @param matcher Column matcher
    * @param result List to add results to
    * @return true if done with store (early-out), false if not
-   * @throws IOException
    */
-  public boolean get(QueryMatcher matcher, List<KeyValue> result)
-  throws IOException {
+  public boolean get(QueryMatcher matcher, List<KeyValue> result) {
     this.lock.readLock().lock();
     try {
       if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
@@ -485,11 +423,11 @@ public class MemStore implements HeapSiz
    * Gets from either the memstore or the snapshop, and returns a code
    * to let you know which is which.
    *
-   * @param matcher
-   * @param result
+   * @param matcher query matcher
+   * @param result puts results here
    * @return 1 == memstore, 2 == snapshot, 0 == none
    */
-  int getWithCode(QueryMatcher matcher, List<KeyValue> result) throws IOException {
+  int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
     this.lock.readLock().lock();
     try {
       boolean fromMemstore = internalGet(this.kvset, matcher, result);
@@ -517,18 +455,16 @@ public class MemStore implements HeapSiz
   void readLockUnlock() {
     this.lock.readLock().unlock();
   }
-
+  
   /**
    *
    * @param set memstore or snapshot
    * @param matcher query matcher
    * @param result list to add results to
    * @return true if done with store (early-out), false if not
-   * @throws IOException
    */
   boolean internalGet(final NavigableSet<KeyValue> set,
-      final QueryMatcher matcher, final List<KeyValue> result)
-  throws IOException {
+      final QueryMatcher matcher, final List<KeyValue> result) {
     if(set.isEmpty()) return false;
     // Seek to startKey
     SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
@@ -550,11 +486,152 @@ public class MemStore implements HeapSiz
     }
     return false;
   }
+  
 
+  /*
+   * MemStoreScanner implements the KeyValueScanner.
+   * It lets the caller scan the contents of a memstore -- both current
+   * map and snapshot.
+   * This behaves as if it were a real scanner but does not maintain position.
+   */
+  protected class MemStoreScanner implements KeyValueScanner {
+    // Next row information for either kvset or snapshot
+    private KeyValue kvsetNextRow = null;
+    private KeyValue snapshotNextRow = null;
+
+    // iterator based scanning.
+    Iterator<KeyValue> kvsetIt;
+    Iterator<KeyValue> snapshotIt;
+
+    /*
+    Some notes...
+
+     So memstorescanner is fixed at creation time. this includes pointers/iterators into
+    existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
+    snapshot is moved.  since kvset is null there is no point on reseeking on both,
+      we can save us the trouble. During the snapshot->hfile transition, the memstore
+      scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
+      potentially do something smarter by adjusting the existing memstore scanner.
+
+      But there is a greater problem here, that being once a scanner has progressed
+      during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
+      if a scan lasts a little while, there is a chance for new entries in kvset to
+      become available but we will never see them.  This needs to be handled at the
+      StoreScanner level with coordination with MemStoreScanner.
+
+    */
+    
+    MemStoreScanner() {
+      super();
+
+      //DebugPrint.println(" MS new@" + hashCode());
+    }
+
+    protected KeyValue getNext(Iterator<KeyValue> it) {
+      KeyValue ret = null;
+      long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+      //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
+      
+      while (ret == null && it.hasNext()) {
+        KeyValue v = it.next();
+        if (v.getMemstoreTS() <= readPoint) {
+          // keep it.
+          ret = v;
+        }
+      }
+      return ret;
+    }
+
+    public synchronized boolean seek(KeyValue key) {
+      if (key == null) {
+        close();
+        return false;
+      }
+
+      // kvset and snapshot will never be empty.
+      // if tailSet cant find anything, SS is empty (not null).
+      SortedSet<KeyValue> kvTail = kvset.tailSet(key);
+      SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
+
+      kvsetIt = kvTail.iterator();
+      snapshotIt = snapshotTail.iterator();
+
+      kvsetNextRow = getNext(kvsetIt);
+      snapshotNextRow = getNext(snapshotIt);
+
+
+      //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+      //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
+      //    kvset.size() + " threadread = " + readPoint);
+      //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
+      //    snapshot.size() + " threadread = " + readPoint);
+
+      
+      KeyValue lowest = getLowest();
+
+      // has data := (lowest != null)
+      return lowest != null;
+    }
+
+    public synchronized KeyValue peek() {
+      //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+      return getLowest();
+    }
+
+
+    public synchronized KeyValue next() {
+      KeyValue theNext = getLowest();
+
+      if (theNext == null) {
+          return null;
+      }
+
+      // Advance one of the iterators
+      if (theNext == kvsetNextRow) {
+        kvsetNextRow = getNext(kvsetIt);
+      } else {
+        snapshotNextRow = getNext(snapshotIt);
+      }
+
+      //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
+      //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
+      //    getLowest() + " threadpoint=" + readpoint);
+      return theNext;
+    }
+
+    protected KeyValue getLowest() {
+      return getLower(kvsetNextRow,
+          snapshotNextRow);
+    }
+
+    /*
+     * Returns the lower of the two key values, or null if they are both null.
+     * This uses comparator.compare() to compare the KeyValue using the memstore
+     * comparator.
+     */
+    protected KeyValue getLower(KeyValue first, KeyValue second) {
+      if (first == null && second == null) {
+        return null;
+      }
+      if (first != null && second != null) {
+        int compare = comparator.compare(first, second);
+        return (compare <= 0 ? first : second);
+      }
+      return (first != null ? first : second);
+    }
+
+    public synchronized void close() {
+      this.kvsetNextRow = null;
+      this.snapshotNextRow = null;
+
+      this.kvsetIt = null;
+      this.snapshotIt = null;
+    }
+  }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
-
+  
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
@@ -568,11 +645,11 @@ public class MemStore implements HeapSiz
    * @return Size
    */
   long heapSizeChange(final KeyValue kv, final boolean notpresent) {
-    return notpresent ?
+    return notpresent ? 
         ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
         0;
   }
-
+  
   /**
    * Get the entire heap usage for this MemStore not including keys in the
    * snapshot.
@@ -581,7 +658,7 @@ public class MemStore implements HeapSiz
   public long heapSize() {
     return size.get();
   }
-
+  
   /**
    * Get the heap usage of KVs in this MemStore.
    */
@@ -603,7 +680,7 @@ public class MemStore implements HeapSiz
    * enough.  See hbase-900.  Fills memstores then waits so user can heap
    * dump and bring up resultant hprof in something like jprofiler which
    * allows you get 'deep size' on objects.
-   * @param args
+   * @param args main args
    */
   public static void main(String [] args) {
     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
@@ -638,5 +715,4 @@ public class MemStore implements HeapSiz
     }
     LOG.info("Exiting.");
   }
-
 }

Added: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=935708&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Mon Apr 19 19:05:29 2010
@@ -0,0 +1,106 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+public class ReadWriteConsistencyControl {
+  private final AtomicLong memstoreRead = new AtomicLong();
+  private final AtomicLong memstoreWrite = new AtomicLong();
+  // This is the pending queue of writes.
+  private final LinkedList<WriteEntry> writeQueue =
+      new LinkedList<WriteEntry>();
+
+  private static final ThreadLocal<Long> perThreadReadPoint =
+      new ThreadLocal<Long>();
+
+  public static long getThreadReadPoint() {
+    return perThreadReadPoint.get();
+  }
+  
+  public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
+    perThreadReadPoint.set(rwcc.memstoreReadPoint());
+    return getThreadReadPoint();
+  }
+
+  public WriteEntry beginMemstoreInsert() {
+    synchronized (writeQueue) {
+      long nextWriteNumber = memstoreWrite.incrementAndGet();
+      WriteEntry e = new WriteEntry(nextWriteNumber);
+      writeQueue.add(e);
+      return e;
+    }
+  }
+  public void completeMemstoreInsert(WriteEntry e) {
+    synchronized (writeQueue) {
+      e.markCompleted();
+
+      long nextReadValue = -1;
+      boolean ranOnce=false;
+      while (!writeQueue.isEmpty()) {
+        ranOnce=true;
+        WriteEntry queueFirst = writeQueue.getFirst();
+
+        if (nextReadValue > 0) {
+          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
+            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+                + nextReadValue + " next: " + queueFirst.getWriteNumber());
+          }
+        }
+
+        if (queueFirst.isCompleted()) {
+          nextReadValue = queueFirst.getWriteNumber();
+          writeQueue.removeFirst();
+        } else {
+          break;
+        }
+      }
+
+      if (!ranOnce) {
+        throw new RuntimeException("never was a first");
+      }
+
+      if (nextReadValue > 0) {
+        memstoreRead.set(nextReadValue);
+      }
+    }
+
+    // Spin until any other concurrent puts have finished. This makes sure that
+    // if we move on to construct a scanner, we'll get read-your-own-writes
+    // consistency. We anticipate that since puts to the memstore are very fast,
+    // this will be on the order of microseconds - so spinning should be faster
+    // than a condition variable.
+    int spun = 0;
+    while (memstoreRead.get() < e.getWriteNumber()) {
+      spun++;
+    }
+    // Could potentially expose spun as a metric
+  }
+
+  public long memstoreReadPoint() {
+    return memstoreRead.get();
+  }
+
+
+  public static class WriteEntry {
+    private long writeNumber;
+    private boolean completed = false;
+    WriteEntry(long writeNumber) {
+      this.writeNumber = writeNumber;
+    }
+    void markCompleted() {
+      this.completed = true;
+    }
+    boolean isCompleted() {
+      return this.completed;
+    }
+    long getWriteNumber() {
+      return this.writeNumber;
+    }
+  }
+}

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Mon Apr 19 19:05:29 2010
@@ -55,7 +55,11 @@ public class ScanQueryMatcher extends Qu
     this.rowComparator = rowComparator;
     this.deletes =  new ScanDeleteTracker();
     this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
-    this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+    if (scan.isGetScan()) {
+      this.stopKey = KeyValue.createLastOnRow(scan.getStopRow());
+    } else {
+      this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+    }
     this.filter = scan.getFilter();
     this.oldFilter = scan.getOldFilter();
     

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Apr 19 19:05:29 2010
@@ -509,7 +509,7 @@ public class Store implements HConstants
 
   /**
    * Snapshot this stores memstore.  Call before running
-   * {@link #flushCache(long)} so it has some work to do.
+   * {@link #flushCache(long, java.util.SortedSet)} so it has some work to do.
    */
   void snapshot() {
     this.memstore.snapshot();
@@ -614,9 +614,12 @@ public class Store implements HConstants
     this.lock.writeLock().lock();
     try {
       this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
+
+      this.memstore.clearSnapshot(set);
+
       // Tell listeners of the change in readers.
       notifyChangedReadersObservers();
-      this.memstore.clearSnapshot(set);
+
       return this.storefiles.size() >= this.compactionThreshold;
     } finally {
       this.lock.writeLock().unlock();
@@ -644,10 +647,8 @@ public class Store implements HConstants
    * @param o Observer no longer interested in changes in set of Readers.
    */
   void deleteChangedReaderObserver(ChangedReadersObserver o) {
-    if(this.changedReaderObservers.size() > 0) {
-      if (!this.changedReaderObservers.remove(o)) {
-        LOG.warn("Not in set" + o);
-      }
+    if (!this.changedReaderObservers.remove(o)) {
+      LOG.warn("Not in set" + o);
     }
   }
 
@@ -873,7 +874,6 @@ public class Store implements HConstants
   /**
    * Do a minor/major compaction.  Uses the scan infrastructure to make it easy.
    * 
-   * @param writer output writer
    * @param filesToCompact which files to compact
    * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
    * @param maxId Readers maximum sequence id.
@@ -1006,6 +1006,10 @@ public class Store implements HConstants
           Long orderVal = Long.valueOf(result.getMaxSequenceId());
           this.storefiles.put(orderVal, result);
         }
+
+        // WARN ugly hack here, but necessary sadly.
+        ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
+        
         // Tell observers that list of StoreFiles has changed.
         notifyChangedReadersObservers();
         // Finally, delete old store files.
@@ -1496,7 +1500,12 @@ public class Store implements HConstants
   }
 
   /**
-   * Increments the value for the given row/family/qualifier
+   * Increments the value for the given row/family/qualifier.
+   *
+   * This function will always be seen as atomic by other readers
+   * because it only puts a single KV to memstore. Thus no
+   * read/write control necessary.
+   * 
    * @param row
    * @param f
    * @param qualifier

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Mon Apr 19 19:05:29 2010
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,10 +45,15 @@ class StoreScanner implements KeyValueSc
   private boolean cacheBlocks;
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
-  private final AtomicBoolean closing = new AtomicBoolean(false);
+  private boolean closing = false;
+  private final boolean isGet;
 
   /**
    * Opens a scanner across memstore, snapshot, and all StoreFiles.
+   *
+   * @param store who we scan
+   * @param scan the spec
+   * @param columns which columns we are scanning
    */
   StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
     this.store = store;
@@ -58,9 +62,11 @@ class StoreScanner implements KeyValueSc
         columns, store.ttl, store.comparator.getRawComparator(),
         store.versionsToReturn(scan.getMaxVersions()));
 
+    this.isGet = scan.isGetScan();
     List<KeyValueScanner> scanners = getScanners();
 
     // Seek all scanners to the initial key
+    // TODO if scan.isGetScan, use bloomfilters to skip seeking
     for(KeyValueScanner scanner : scanners) {
       scanner.seek(matcher.getStartKey());
     }
@@ -76,10 +82,14 @@ class StoreScanner implements KeyValueSc
    * Used for major compactions.<p>
    * 
    * Opens a scanner across specified StoreFiles.
+   * @param store who we scan
+   * @param scan the spec
+   * @param scanners ancilliary scanners
    */
   StoreScanner(Store store, Scan scan, KeyValueScanner [] scanners) {
     this.store = store;
     this.cacheBlocks = false;
+    this.isGet = false;
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
         null, store.ttl, store.comparator.getRawComparator(),
         store.versionsToReturn(scan.getMaxVersions()));
@@ -99,6 +109,7 @@ class StoreScanner implements KeyValueSc
       final NavigableSet<byte[]> columns,
       final KeyValueScanner [] scanners) {
     this.store = null;
+    this.isGet = false;
     this.cacheBlocks = scan.getCacheBlocks();
     this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, 
         comparator.getRawComparator(), scan.getMaxVersions());
@@ -132,8 +143,8 @@ class StoreScanner implements KeyValueSc
   }
 
   public synchronized void close() {
-    boolean state = this.closing.getAndSet(true);
-    if (state) return;
+    if (this.closing) return;
+    this.closing = true;
     // under test, we dont have a this.store
     if (this.store != null)
       this.store.deleteChangedReaderObserver(this);
@@ -146,11 +157,12 @@ class StoreScanner implements KeyValueSc
 
   /**
    * Get the next row of values from this Store.
-   * @param result
+   * @param outResult
    * @param limit
    * @return true if there are more rows, false if scanner is done
    */
   public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
+    //DebugPrint.println("SS.next");
     KeyValue peeked = this.heap.peek();
     if (peeked == null) {
       close();
@@ -161,6 +173,7 @@ class StoreScanner implements KeyValueSc
     List<KeyValue> results = new ArrayList<KeyValue>();
     LOOP: while((kv = this.heap.peek()) != null) {
       QueryMatcher.MatchCode qcode = matcher.match(kv);
+      //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
       switch(qcode) {
         case INCLUDE:
           KeyValue next = this.heap.next();
@@ -228,8 +241,8 @@ class StoreScanner implements KeyValueSc
         LOG.warn("StoreFile " + sf + " has null Reader");
         continue;
       }
-      // Get a scanner that does not use pread.
-      s.add(r.getScanner(this.cacheBlocks, false));
+      // If isGet, use pread, else false, dont use pread
+      s.add(r.getScanner(this.cacheBlocks, isGet));
     }
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(s.size()+1);
@@ -241,16 +254,20 @@ class StoreScanner implements KeyValueSc
 
   // Implementation of ChangedReadersObserver
   public synchronized void updateReaders() throws IOException {
-    if (this.closing.get()) return;
+    if (this.closing) return;
     KeyValue topKey = this.peek();
     if (topKey == null) return;
+
     List<KeyValueScanner> scanners = getScanners();
 
-    // Seek all scanners to the initial key
     for(KeyValueScanner scanner : scanners) {
       scanner.seek(topKey);
     }
 
+    // close the previous scanners:
+    this.heap.close(); // bubble thru and close all scanners.
+    this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
+
     // Combine all seeked scanners with a heap
     heap = new KeyValueHeap(
         scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/util/FSUtils.java Mon Apr 19 19:05:29 2010
@@ -367,6 +367,7 @@ public class FSUtils {
     return true;
   }
 
+  // TODO move this method OUT of FSUtils. No dependencies to HMaster
   /**
    * Expects to find -ROOT- directory.
    * @param fs

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestKeyValue.java Mon Apr 19 19:05:29 2010
@@ -276,4 +276,49 @@ public class TestKeyValue extends TestCa
     // TODO actually write this test!
     
   }
+
+  private final byte[] rowA = Bytes.toBytes("rowA");
+  private final byte[] rowB = Bytes.toBytes("rowB");
+
+  private final byte[] family = Bytes.toBytes("family");
+  private final byte[] qualA = Bytes.toBytes("qfA");
+
+  private void assertKVLess(KeyValue.KVComparator c,
+                            KeyValue less,
+                            KeyValue greater) {
+    int cmp = c.compare(less,greater);
+    assertTrue(cmp < 0);
+    cmp = c.compare(greater,less);
+    assertTrue(cmp > 0);
+  }
+
+  public void testFirstLastOnRow() {
+    final KVComparator c = KeyValue.COMPARATOR;
+    long ts = 1;
+
+    // These are listed in sort order (ie: every one should be less
+    // than the one on the next line).
+    final KeyValue firstOnRowA = KeyValue.createFirstOnRow(rowA);
+    final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put);
+    final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put);
+        
+    final KeyValue lastOnRowA = KeyValue.createLastOnRow(rowA);
+    final KeyValue firstOnRowB = KeyValue.createFirstOnRow(rowB);
+    final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put);
+
+    assertKVLess(c, firstOnRowA, firstOnRowB);
+    assertKVLess(c, firstOnRowA, kvA_1);
+    assertKVLess(c, firstOnRowA, kvA_2);
+    assertKVLess(c, kvA_1, kvA_2);
+    assertKVLess(c, kvA_2, firstOnRowB);
+    assertKVLess(c, kvA_1, firstOnRowB);
+
+    assertKVLess(c, lastOnRowA, firstOnRowB);
+    assertKVLess(c, firstOnRowB, kvB);
+    assertKVLess(c, lastOnRowA, kvB);
+
+    assertKVLess(c, kvA_2, lastOnRowA);
+    assertKVLess(c, kvA_1, lastOnRowA);
+    assertKVLess(c, firstOnRowA, lastOnRowA);
+  }
 }

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/client/TestClient.java Mon Apr 19 19:05:29 2010
@@ -67,7 +67,7 @@ public class TestClient extends HBaseClu
     super();
   }
 
-  /**
+  /*
    * Test from client side of an involved filter against a multi family that
    * involves deletes.
    * 
@@ -196,7 +196,7 @@ public class TestClient extends HBaseClu
     }
   }
 
-  /**
+  /*
    * Test filters when multiple regions.  It does counts.  Needs eye-balling of
    * logs to ensure that we're not scanning more regions that we're supposed to.
    * Related to the TestFilterAcrossRegions over in the o.a.h.h.filter package.
@@ -253,7 +253,7 @@ public class TestClient extends HBaseClu
     assertEquals(rowCount - endKeyCount, countGreater);
   }
   
-  /**
+  /*
    * Load table with rows from 'aaa' to 'zzz'.
    * @param t
    * @return Count of rows loaded.
@@ -418,7 +418,7 @@ public class TestClient extends HBaseClu
     scanner.close();
   }
 
-  /**
+  /*
    * Test simple table and non-existent row cases.
    */
   public void testSimpleMissing() throws Exception {
@@ -531,7 +531,7 @@ public class TestClient extends HBaseClu
     assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
   }
   
-  /**
+  /*
    * Test basic puts, gets, scans, and deletes for a single row
    * in a multiple family table.
    */
@@ -1438,7 +1438,7 @@ public class TestClient extends HBaseClu
     ht.put(put);
     
     delete = new Delete(ROW);
-    delete.deleteColumn(FAMILIES[0], QUALIFIER);
+    delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4]
     ht.delete(delete);
     
     get = new Get(ROW);
@@ -1473,23 +1473,24 @@ public class TestClient extends HBaseClu
     // But alas, this is not to be.  We can't put them back in either case.
     
     put = new Put(ROW);
-    put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
-    put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
+    put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000
+    put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000
     ht.put(put);
     
-    // The Get returns the latest value but then does not return the
-    // oldest, which was never deleted, ts[1]. 
-    
+
+    // It used to be due to the internal implementation of Get, that
+    // the Get() call would return ts[4] UNLIKE the Scan below. With
+    // the switch to using Scan for Get this is no longer the case.
     get = new Get(ROW);
     get.addFamily(FAMILIES[0]);
     get.setMaxVersions(Integer.MAX_VALUE);
     result = ht.get(get);
     assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
-        new long [] {ts[2], ts[3], ts[4]},
-        new byte[][] {VALUES[2], VALUES[3], VALUES[4]},
+        new long [] {ts[1], ts[2], ts[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
         0, 2);
     
-    // The Scanner returns the previous values, the expected-unexpected behavior
+    // The Scanner returns the previous values, the expected-naive-unexpected  behavior
     
     scan = new Scan(ROW);
     scan.addFamily(FAMILIES[0]);
@@ -1553,7 +1554,7 @@ public class TestClient extends HBaseClu
     result = ht.get(get);
     assertTrue("Expected 2 keys but received " + result.size(),
         result.size() == 2);
-    assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, 
+    assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER,
         new long [] {ts[0], ts[1]},
         new byte[][] {VALUES[0], VALUES[1]},
         0, 1);
@@ -1591,9 +1592,8 @@ public class TestClient extends HBaseClu
     get.addFamily(FAMILIES[2]);
     get.setMaxVersions(Integer.MAX_VALUE);
     result = ht.get(get);
-    assertTrue("Expected 1 key but received " + result.size() + ": " + result,
-        result.size() == 1);
-    assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, 
+    assertEquals(1, result.size());
+    assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
         new long [] {ts[2]},
         new byte[][] {VALUES[2]},
         0, 0);
@@ -1603,9 +1603,8 @@ public class TestClient extends HBaseClu
     scan.addFamily(FAMILIES[2]);
     scan.setMaxVersions(Integer.MAX_VALUE);
     result = getSingleScanResult(ht, scan);
-    assertTrue("Expected 1 key but received " + result.size(),
-        result.size() == 1);
-    assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER, 
+    assertEquals(1, result.size());
+    assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
         new long [] {ts[2]},
         new byte[][] {VALUES[2]},
         0, 0);
@@ -1691,7 +1690,7 @@ public class TestClient extends HBaseClu
     }
   }
 
-  /**
+  /*
    * Baseline "scalability" test.
    * 
    * Tests one hundred families, one million columns, one million versions
@@ -1738,7 +1737,7 @@ public class TestClient extends HBaseClu
     
   }
   
-  /**
+  /*
    * Explicitly test JIRAs related to HBASE-880 / Client API
    */
   public void testJIRAs() throws Exception {
@@ -1754,7 +1753,7 @@ public class TestClient extends HBaseClu
   // JIRA Testers
   //
   
-  /**
+  /*
    * HBASE-867
    *    If millions of columns in a column family, hbase scanner won't come up
    *    
@@ -1844,7 +1843,7 @@ public class TestClient extends HBaseClu
     
   }
   
-  /**
+  /*
    * HBASE-861
    *    get with timestamp will return a value if there is a version with an 
    *    earlier timestamp
@@ -1907,7 +1906,7 @@ public class TestClient extends HBaseClu
     
   }
   
-  /**
+  /*
    * HBASE-33
    *    Add a HTable get/obtainScanner method that retrieves all versions of a 
    *    particular column and row between two timestamps
@@ -1956,7 +1955,7 @@ public class TestClient extends HBaseClu
     
   }
   
-  /**
+  /*
    * HBASE-1014
    *    commit(BatchUpdate) method should return timestamp
    */
@@ -1980,7 +1979,7 @@ public class TestClient extends HBaseClu
     
   }
   
-  /**
+  /*
    * HBASE-1182
    *    Scan for columns > some timestamp 
    */
@@ -2025,7 +2024,7 @@ public class TestClient extends HBaseClu
     
   }
   
-  /**
+  /*
    * HBASE-52
    *    Add a means of scanning over all versions
    */
@@ -2423,7 +2422,7 @@ public class TestClient extends HBaseClu
   
   
   
-  /**
+  /*
    * Verify a single column using gets.
    * Expects family and qualifier arrays to be valid for at least 
    * the range:  idx-2 < idx < idx+2
@@ -2480,7 +2479,7 @@ public class TestClient extends HBaseClu
   }
   
   
-  /**
+  /*
    * Verify a single column using scanners.
    * Expects family and qualifier arrays to be valid for at least 
    * the range:  idx-2 to idx+2
@@ -2542,11 +2541,11 @@ public class TestClient extends HBaseClu
 
   }
   
-  /**
+  /*
    * Verify we do not read any values by accident around a single column
    * Same requirements as getVerifySingleColumn
    */
-  private void getVerifySingleEmpty(HTable ht, 
+  private void getVerifySingleEmpty(HTable ht,
       byte [][] ROWS, int ROWIDX, 
       byte [][] FAMILIES, int FAMILYIDX, 
       byte [][] QUALIFIERS, int QUALIFIERIDX)
@@ -2668,12 +2667,11 @@ public class TestClient extends HBaseClu
         "Got row [" + Bytes.toString(result.getRow()) +"]",
         equals(row, result.getRow()));
     int expectedResults = end - start + 1;
-    assertTrue("Expected " + expectedResults + " keys but result contains " 
-        + result.size(), result.size() == expectedResults);
-    
+    assertEquals(expectedResults, result.size());
+
     KeyValue [] keys = result.sorted();
     
-    for(int i=0;i<keys.length;i++) {
+    for (int i=0; i<keys.length; i++) {
       byte [] value = values[end-i];
       long ts = stamps[end-i];
       KeyValue key = keys[i];
@@ -2692,7 +2690,7 @@ public class TestClient extends HBaseClu
     }
   }
   
-  /**
+  /*
    * Validate that result contains two specified keys, exactly.
    * It is assumed key A sorts before key B.
    */
@@ -2728,10 +2726,7 @@ public class TestClient extends HBaseClu
         equals(valueB, kvB.getValue()));
   }
   
-  /**
-   * 
-   */
-  private void assertSingleResult(Result result, byte [] row, byte [] family, 
+  private void assertSingleResult(Result result, byte [] row, byte [] family,
       byte [] qualifier, byte [] value)
   throws Exception {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
@@ -2808,7 +2803,7 @@ public class TestClient extends HBaseClu
     }
     byte [][] ret = new byte[n][];
     for(int i=0;i<n;i++) {
-      byte [] tail = Bytes.toBytes(new Integer(i).toString());
+      byte [] tail = Bytes.toBytes(Integer.toString(i));
       ret[i] = Bytes.add(base, tail);
     }
     return ret;
@@ -2898,22 +2893,14 @@ public class TestClient extends HBaseClu
     return new HTable(conf, tableName);
   }
   
+  @SuppressWarnings({"SimplifiableIfStatement"})
   private boolean equals(byte [] left, byte [] right) {
-    if(left == null && right == null) return true;
-    if(left == null && right.length == 0) return true;
-    if(right == null && left.length == 0) return true;
+    if (left == null && right == null) return true;
+    if (left == null && right.length == 0) return true;
+    if (right == null && left.length == 0) return true;
     return Bytes.equals(left, right);
   }
-  
-  
-  
-  
-  
-  
-  
-  
 
-  
   public void XtestDuplicateVersions() throws Exception {
     
     byte [] TABLE = Bytes.toBytes("testDuplicateVersions");

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java Mon Apr 19 19:05:29 2010
@@ -254,8 +254,8 @@ public class TestGetDeleteTracker extend
     }
     //update()
     dt.update();
-    assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts3));
-    assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts1));
+    assertFalse(dt.isDeleted(col2, 0, col2Len, ts3));
+    assertFalse(dt.isDeleted(col2, 0, col2Len, ts1));
   }
   public void testIsDeleted_Delete(){
     //Building lists

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=935708&r1=935707&r2=935708&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Mon Apr 19 19:05:29 2010
@@ -94,7 +94,7 @@ public class TestHRegion extends HBaseTe
   //////////////////////////////////////////////////////////////////////////////
   
 
-  /**
+  /*
    * An involved filter test.  Has multiple column families and deletes in mix.
    */
   public void testWeirdCacheBehaviour() throws Exception {
@@ -366,6 +366,34 @@ public class TestHRegion extends HBaseTe
   //////////////////////////////////////////////////////////////////////////////
   // Delete tests
   //////////////////////////////////////////////////////////////////////////////
+  public void testDelete_multiDeleteColumn() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] qual = Bytes.toBytes("qualifier");
+    byte [] value = Bytes.toBytes("value");
+
+    Put put = new Put(row1);
+    put.add(fam1, qual, 1, value);
+    put.add(fam1, qual, 2, value);
+
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+
+    region.put(put);
+
+    // We do support deleting more than 1 'latest' version
+    Delete delete = new Delete(row1);
+    delete.deleteColumn(fam1, qual);
+    delete.deleteColumn(fam1, qual);
+    region.delete(delete, null, false);
+
+    Get get = new Get(row1);
+    get.addFamily(fam1);
+    Result r = region.get(get, null);
+    assertEquals(0, r.size());
+  }
+
   public void testDelete_CheckFamily() throws IOException {
     byte [] tableName = Bytes.toBytes("testtable");
     byte [] row1 = Bytes.toBytes("row1");
@@ -374,11 +402,9 @@ public class TestHRegion extends HBaseTe
     byte [] fam3 = Bytes.toBytes("fam3");
     byte [] fam4 = Bytes.toBytes("fam4");
 
-    byte[][] families  = {fam1, fam2, fam3};
-
     //Setting up region
     String method = this.getName();
-    initHRegion(tableName, method, families);
+    initHRegion(tableName, method, fam1, fam2, fam3);
 
     List<KeyValue> kvs  = new ArrayList<KeyValue>();
     kvs.add(new KeyValue(row1, fam4, null, null));
@@ -1439,6 +1465,41 @@ public class TestHRegion extends HBaseTe
     assertICV(row, fam1, qual1, value+amount);
   }
 
+  public void testIncrementColumnValue_BumpSnapshot() throws IOException {
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 42L;
+    long incr = 44L;
+
+    // first put something in kvset, then snapshot it.
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
+
+    // get the store in question:
+    Store s = region.getStore(fam1);
+    s.snapshot(); //bam
+
+    // now increment:
+    long newVal = region.incrementColumnValue(row, fam1, qual1,
+        incr, false);
+
+    assertEquals(value+incr, newVal);
+
+    // get both versions:
+    Get get = new Get(row);
+    get.setMaxVersions();
+    get.addColumn(fam1,qual1);
+
+    Result r = region.get(get, null);
+    assertEquals(2, r.size());
+    KeyValue first = r.raw()[0];
+    KeyValue second = r.raw()[1];
+
+    assertTrue("ICV failed to upgrade timestamp",
+        first.getTimestamp() != second.getTimestamp());
+  }
+  
   public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
     initHRegion(tableName, getName(), fam1);
 
@@ -1652,7 +1713,7 @@ public class TestHRegion extends HBaseTe
       assertEquals(expected.get(i), actual.get(i));
     }
   }
-  
+
   //////////////////////////////////////////////////////////////////////////////
   // Split test
   //////////////////////////////////////////////////////////////////////////////
@@ -1935,9 +1996,9 @@ public class TestHRegion extends HBaseTe
     FlushThread flushThread = new FlushThread();
     flushThread.start();
 
-    Scan scan = new Scan();
-    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
-      new BinaryComparator(Bytes.toBytes("row0"))));
+    Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
+//    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
+//      new BinaryComparator(Bytes.toBytes("row0"))));
 
     int expectedCount = numFamilies * numQualifiers;
     List<KeyValue> res = new ArrayList<KeyValue>();
@@ -1950,7 +2011,7 @@ public class TestHRegion extends HBaseTe
       }
 
       if (i != 0 && i % flushInterval == 0) {
-        //System.out.println("scan iteration = " + i);
+        //System.out.println("flush scan iteration = " + i);
         flushThread.flush();
       }
 
@@ -1959,14 +2020,18 @@ public class TestHRegion extends HBaseTe
       InternalScanner scanner = region.getScanner(scan);
       while (scanner.next(res)) ;
       if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
-        Assert.assertEquals("i=" + i, expectedCount, res.size());
+        assertEquals("i=" + i, expectedCount, res.size());
         long timestamp = res.get(0).getTimestamp();
-        Assert.assertTrue(timestamp >= prevTimestamp);
+        assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
+            timestamp >= prevTimestamp);
         prevTimestamp = timestamp;
       }
     }
 
     putThread.done();
+
+    region.flushcache();
+
     putThread.join();
     putThread.checkNoError();
 
@@ -2011,15 +2076,16 @@ public class TestHRegion extends HBaseTe
           for (int r = 0; r < numRows; r++) {
             byte[] row = Bytes.toBytes("row" + r);
             Put put = new Put(row);
-            for (int f = 0; f < families.length; f++) {
-              for (int q = 0; q < qualifiers.length; q++) {
-                put.add(families[f], qualifiers[q], (long) val,
-                  Bytes.toBytes(val));
+            for (byte[] family : families) {
+              for (byte[] qualifier : qualifiers) {
+                put.add(family, qualifier, (long) val,
+                    Bytes.toBytes(val));
               }
             }
+//            System.out.println("Putting of kvsetsize=" + put.size());
             region.put(put);
-            if (val > 0 && val % 47 == 0){
-              //System.out.println("put iteration = " + val);
+            if (val > 0 && val % 47 == 0) {
+              System.out.println("put iteration = " + val);
               Delete delete = new Delete(row, (long)val-30, null);
               region.delete(delete, null, true);
             }
@@ -2099,6 +2165,9 @@ public class TestHRegion extends HBaseTe
     }
 
     putThread.done();
+    
+    region.flushcache();
+
     putThread.join();
     putThread.checkNoError();