You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2010/01/09 22:10:05 UTC

svn commit: r897547 [10/10] - in /hadoop/hbase/branches/0.20_on_hadoop-0.18.3: ./ bin/ conf/ lib/ src/contrib/ src/contrib/ec2/ src/contrib/ec2/bin/ src/contrib/ec2/bin/image/ src/contrib/indexed/ src/contrib/indexed/lib/ src/contrib/indexed/lib/fmpp-0...

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat Jan  9 21:09:59 2010
@@ -28,9 +28,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.SortedSet;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -80,10 +78,6 @@
   // Used to track own heapSize
   final AtomicLong size;
 
-  // All access must be synchronized.
-  final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
-    new CopyOnWriteArraySet<ChangedMemStoreObserver>();
-
   /**
    * Default constructor. Used for tests.
    */
@@ -131,7 +125,6 @@
         if (!this.kvset.isEmpty()) {
           this.snapshot = this.kvset;
           this.kvset = new KeyValueSkipListSet(this.comparator);
-          tellChangedMemStoreObservers();
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
         }
@@ -141,15 +134,6 @@
     }
   }
 
-  /*
-   * Tell outstanding scanners that memstore has changed.
-   */
-  private void tellChangedMemStoreObservers() {
-    for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
-      o.changedMemStore();
-    }
-  }
-
   /**
    * Return the current snapshot.
    * Called by flusher to get current snapshot made by a previous
@@ -168,7 +152,7 @@
    * @throws UnexpectedException
    * @see {@link #snapshot()}
    */
-  void clearSnapshot(final KeyValueSkipListSet ss)
+  void clearSnapshot(final SortedSet<KeyValue> ss)
   throws UnexpectedException {
     this.lock.writeLock().lock();
     try {
@@ -180,7 +164,6 @@
       // create a new snapshot and let the old one go.
       if (!ss.isEmpty()) {
         this.snapshot = new KeyValueSkipListSet(this.comparator);
-        tellChangedMemStoreObservers();
       }
     } finally {
       this.lock.writeLock().unlock();
@@ -204,7 +187,7 @@
     return s;
   }
 
-  /** 
+  /**
    * Write a delete
    * @param delete
    * @return approximate size of the passed key and value.
@@ -221,7 +204,7 @@
     //TODO Would be nice with if we had an iterator for this, so we could remove
     //things that needs to be removed while iterating and don't have to go
     //back and do it afterwards
-    
+
     try {
       boolean notpresent = false;
       List<KeyValue> deletes = new ArrayList<KeyValue>();
@@ -230,34 +213,34 @@
       //Parse the delete, so that it is only done once
       byte [] deleteBuffer = delete.getBuffer();
       int deleteOffset = delete.getOffset();
-  
+
       int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
       deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
-  
+
       short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
       deleteOffset += Bytes.SIZEOF_SHORT;
       int deleteRowOffset = deleteOffset;
-  
+
       deleteOffset += deleteRowLen;
-  
+
       byte deleteFamLen = deleteBuffer[deleteOffset];
       deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
-  
+
       int deleteQualifierOffset = deleteOffset;
       int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
-        Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - 
+        Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
         Bytes.SIZEOF_BYTE;
-      
+
       deleteOffset += deleteQualifierLen;
-  
+
       int deleteTimestampOffset = deleteOffset;
       deleteOffset += Bytes.SIZEOF_LONG;
       byte deleteType = deleteBuffer[deleteOffset];
-      
+
       //Comparing with tail from memstore
       for (KeyValue kv : tail) {
-        DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer, 
-            deleteRowOffset, deleteRowLen, deleteQualifierOffset, 
+        DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
+            deleteRowOffset, deleteRowLen, deleteQualifierOffset,
             deleteQualifierLen, deleteTimestampOffset, deleteType,
             comparator.getRawComparator());
         if (res == DeleteCode.DONE) {
@@ -272,7 +255,7 @@
         notpresent = this.kvset.remove(kv);
         s -= heapSizeChange(kv, notpresent);
       }
-      
+
       // Adding the delete to memstore. Add any value, as long as
       // same instance each time.
       s += heapSizeChange(delete, this.kvset.add(delete));
@@ -282,7 +265,7 @@
     this.size.addAndGet(s);
     return s;
   }
-  
+
   /**
    * @param kv Find the row that comes after this one.  If null, we return the
    * first.
@@ -459,7 +442,8 @@
     this.lock.readLock().lock();
     try {
       KeyValueScanner [] scanners = new KeyValueScanner[1];
-      scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
+      scanners[0] = new MemStoreScanner(this.kvset.clone(),
+        this.snapshot.clone(), this.comparator);
       return scanners;
     } finally {
       this.lock.readLock().unlock();
@@ -533,7 +517,7 @@
   void readLockUnlock() {
     this.lock.readLock().unlock();
   }
-  
+
   /**
    *
    * @param set memstore or snapshot
@@ -566,171 +550,11 @@
     }
     return false;
   }
-  
-
-  /*
-   * MemStoreScanner implements the KeyValueScanner.
-   * It lets the caller scan the contents of a memstore -- both current
-   * map and snapshot.
-   * This behaves as if it were a real scanner but does not maintain position.
-   */
-  protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
-    private List<KeyValue> result = new ArrayList<KeyValue>();
-    private int idx = 0;
-    // Make access atomic.
-    private FirstOnRow firstOnNextRow = new FirstOnRow();
-    // Keep reference to Set so can remove myself when closed.
-    private final Set<ChangedMemStoreObserver> observers;
-
-    MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
-      super();
-      this.observers = observers;
-      this.observers.add(this);
-    }
-
-    public boolean seek(KeyValue key) {
-      try {
-        if (key == null) {
-          close();
-          return false;
-        }
-        this.firstOnNextRow.set(key);
-        return cacheNextRow();
-      } catch(Exception e) {
-        close();
-        return false;
-      }
-    }
-
-    public KeyValue peek() {
-      if (idx >= this.result.size()) {
-        if (!cacheNextRow()) {
-          return null;
-        }
-        return peek();
-      }
-      return result.get(idx);
-    }
-
-    public KeyValue next() {
-      if (idx >= result.size()) {
-        if (!cacheNextRow()) {
-          return null;
-        }
-        return next();
-      }
-      return this.result.get(idx++);
-    }
-
-    /**
-     * @return True if successfully cached a next row.
-     */
-    boolean cacheNextRow() {
-      // Prevent snapshot being cleared while caching a row.
-      lock.readLock().lock();
-      try {
-        this.result.clear();
-        this.idx = 0;
-        // Look at each set, kvset and snapshot.
-        // Both look for matching entries for this.current row returning what
-        // they
-        // have as next row after this.current (or null if nothing in set or if
-        // nothing follows.
-        KeyValue kvsetNextRow = cacheNextRow(kvset);
-        KeyValue snapshotNextRow = cacheNextRow(snapshot);
-        if (kvsetNextRow == null && snapshotNextRow == null) {
-          // Nothing more in memstore but we might have gotten current row
-          // results
-          // Indicate at end of store by setting next row to null.
-          this.firstOnNextRow.set(null);
-          return !this.result.isEmpty();
-        } else if (kvsetNextRow != null && snapshotNextRow != null) {
-          // Set current at the lowest of the two values.
-          int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
-          this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
-        } else {
-          this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
-        }
-        return true;
-      } finally {
-        lock.readLock().unlock();
-      }
-    }
-
-    /*
-     * See if set has entries for the <code>this.current</code> row.  If so,
-     * add them to <code>this.result</code>.
-     * @param set Set to examine
-     * @return Next row in passed <code>set</code> or null if nothing in this
-     * passed <code>set</code>
-     */
-    private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
-      if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
-      SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
-      if (tail == null || tail.isEmpty()) return null;
-      KeyValue first = tail.first();
-      KeyValue nextRow = null;
-      for (KeyValue kv: tail) {
-        if (comparator.compareRows(first, kv) != 0) {
-          nextRow = kv;
-          break;
-        }
-        this.result.add(kv);
-      }
-      return nextRow;
-    }
-
-    public void close() {
-      this.firstOnNextRow.set(null);
-      idx = 0;
-      if (!result.isEmpty()) {
-        result.clear();
-      }
-      this.observers.remove(this);
-    }
-
-    public void changedMemStore() {
-      this.firstOnNextRow.reset();
-    }
-  }
 
-  /*
-   * Private class that holds firstOnRow and utility.
-   * Usually firstOnRow is the first KeyValue we find on next row rather than
-   * the absolute minimal first key (empty column, Type.Maximum, maximum ts).
-   * Usually its ok being sloppy with firstOnRow letting it be the first thing
-   * found on next row -- this works -- but if the memstore changes on us, reset
-   * firstOnRow to be the ultimate firstOnRow.  We play sloppy with firstOnRow
-   * usually so we don't have to  allocate a new KeyValue each time firstOnRow
-   * is updated.
-   */
-  private static class FirstOnRow {
-    private KeyValue firstOnRow = null;
-
-    FirstOnRow() {
-      super();
-    }
-
-    synchronized void set(final KeyValue kv) {
-      this.firstOnRow = kv;
-    }
-
-    /* Reset firstOnRow to a 'clean', absolute firstOnRow.
-     */
-    synchronized void reset() {
-      if (this.firstOnRow == null) return;
-      this.firstOnRow =
-         new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
-    }
-
-    synchronized KeyValue get() {
-      return this.firstOnRow;
-    }
-  }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
-  
+      ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
+
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
@@ -744,11 +568,11 @@
    * @return Size
    */
   long heapSizeChange(final KeyValue kv, final boolean notpresent) {
-    return notpresent ? 
+    return notpresent ?
         ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
         0;
   }
-  
+
   /**
    * Get the entire heap usage for this MemStore not including keys in the
    * snapshot.
@@ -757,7 +581,7 @@
   public long heapSize() {
     return size.get();
   }
-  
+
   /**
    * Get the heap usage of KVs in this MemStore.
    */
@@ -806,15 +630,4 @@
     LOG.info("Exiting.");
   }
 
-  /**
-   * Observers want to know about MemStore changes.
-   * Called when snapshot is cleared and when we make one.
-   */
-  interface ChangedMemStoreObserver {
-    /**
-     * Notify observers.
-     * @throws IOException
-     */
-    void changedMemStore();
-  }
 }

Added: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java?rev=897547&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java (added)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java Sat Jan  9 21:09:59 2010
@@ -0,0 +1,164 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * MemStoreScanner implements the KeyValueScanner.
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * <p/>
+ * The memstore scanner keeps its own reference to the main and snapshot
+ * key/value sets. Keeping those references allows the scanner to be indifferent
+ * to memstore flushes. Calling the {@link #close()} method ensures that the
+ * references to those classes are null'd allowing the GC to pick them up.
+ */
+class MemStoreScanner implements KeyValueScanner {
+  private static final Log LOG = LogFactory.getLog(MemStoreScanner.class);
+
+  private static final
+  SortedSet<KeyValue> EMPTY_SET = new TreeSet<KeyValue>();
+  private static final Iterator<KeyValue> EMPTY_ITERATOR =
+    new Iterator<KeyValue>() {
+
+      @Override
+      public boolean hasNext() {
+        return false;
+      }
+      @Override
+      public KeyValue next() {
+        return null;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+
+
+  private SortedSet<KeyValue> kvsetRef;
+  private SortedSet<KeyValue> snapshotRef;
+  private KeyValue.KVComparator comparatorRef;
+  private Iterator<KeyValue> kvsetIterator;
+  private Iterator<KeyValue> snapshotIterator;
+
+  private KeyValue currentKvsetKV;
+  private KeyValue currentSnapshotKV;
+  private KeyValue nextKV;
+
+  /**
+   * Create a new memstore scanner.
+   *
+   * @param kvset      the main key value set
+   * @param snapshot   the snapshot set
+   * @param comparator the comparator to use
+   */
+  MemStoreScanner(KeyValueSkipListSet kvset,
+    KeyValueSkipListSet snapshot, KeyValue.KVComparator comparator) {
+    super();
+    this.kvsetRef = kvset;
+    this.snapshotRef = snapshot != null ? snapshot : EMPTY_SET;
+    this.comparatorRef = comparator;
+    this.kvsetIterator = kvsetRef.iterator();
+    this.snapshotIterator = snapshotRef.iterator();
+    this.nextKV = currentKvsetKV = currentSnapshotKV = null;
+  }
+
+  private void fill() {
+    if (nextKV == null) {
+      if (currentSnapshotKV == null && snapshotIterator.hasNext()) {
+        currentSnapshotKV = snapshotIterator.next();
+      }
+
+      if (currentKvsetKV == null && kvsetIterator.hasNext()) {
+        currentKvsetKV = kvsetIterator.next();
+      }
+
+      if (currentSnapshotKV != null && currentKvsetKV != null) {
+        int cmp = comparatorRef.compare(currentSnapshotKV, currentKvsetKV);
+        if (cmp <= 0) {
+          nextKV = currentSnapshotKV;
+          currentSnapshotKV = null;
+        } else {
+          nextKV = currentKvsetKV;
+          currentKvsetKV = null;
+        }
+      } else if (currentSnapshotKV != null) {
+        nextKV = currentSnapshotKV;
+        currentSnapshotKV = null;
+      } else {
+        nextKV = currentKvsetKV;
+        currentKvsetKV = null;
+      }
+    }
+  }
+
+  @Override
+  public synchronized boolean seek(KeyValue key) {
+    if (key == null) {
+      close();
+      return false;
+    }
+    SortedSet<KeyValue> kvsetTail = kvsetRef.tailSet(key);
+    SortedSet<KeyValue> snapshotTail = snapshotRef != null ?
+      snapshotRef.tailSet(key) : EMPTY_SET;
+
+    kvsetIterator = kvsetTail.iterator();
+    snapshotIterator = snapshotTail.iterator();
+
+    currentKvsetKV = null;
+    currentSnapshotKV = null;
+    nextKV = null;
+
+    return kvsetIterator.hasNext() || snapshotIterator.hasNext();
+  }
+
+  @Override
+  public synchronized KeyValue peek() {
+    fill();
+    return nextKV;
+  }
+
+  @Override
+  public synchronized KeyValue next() {
+    fill();
+    KeyValue next = nextKV;
+    nextKV = null;
+    return next;
+  }
+
+  public synchronized void close() {
+    this.kvsetRef = EMPTY_SET;
+    this.snapshotRef = EMPTY_SET;
+    this.kvsetIterator = EMPTY_ITERATOR;
+    this.snapshotIterator = EMPTY_ITERATOR;
+    this.currentKvsetKV = null;
+    this.currentSnapshotKV = null;
+    this.nextKV = null;
+  }
+}

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java Sat Jan  9 21:09:59 2010
@@ -104,7 +104,7 @@
   @Override
   public boolean isDeleted(byte [] buffer, int qualifierOffset,
       int qualifierLength, long timestamp) {
-    if (timestamp < familyStamp) {
+    if (timestamp <= familyStamp) {
       return true;
     }
     

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Jan  9 21:09:59 2010
@@ -31,6 +31,7 @@
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.SortedSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -507,21 +508,12 @@
    * @return true if a compaction is needed
    * @throws IOException
    */
-  boolean flushCache(final long logCacheFlushId) throws IOException {
-    // Get the snapshot to flush.  Presumes that a call to
-    // this.memstore.snapshot() has happened earlier up in the chain.
-    KeyValueSkipListSet snapshot = this.memstore.getSnapshot();
+  private StoreFile flushCache(final long logCacheFlushId,
+    SortedSet<KeyValue> snapshot) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    StoreFile sf = internalFlushCache(snapshot, logCacheFlushId);
-    if (sf == null) {
-      return false;
-    }
-    // Add new file to store files.  Clear snapshot too while we have the
-    // Store write lock.
-    int size = updateStorefiles(logCacheFlushId, sf, snapshot);
-    return size >= this.compactionThreshold;
+    return internalFlushCache(snapshot, logCacheFlushId);
   }
 
   /*
@@ -530,7 +522,7 @@
    * @return StoreFile created.
    * @throws IOException
    */
-  private StoreFile internalFlushCache(final KeyValueSkipListSet set,
+  private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
     final long logCacheFlushId)
   throws IOException {
     HFile.Writer writer = null;
@@ -600,20 +592,18 @@
    * @param sf
    * @param set That was used to make the passed file <code>p</code>.
    * @throws IOException
-   * @return Count of store files.
+   * @return Whether compaction is required.
    */
-  private int updateStorefiles(final long logCacheFlushId,
-    final StoreFile sf, final KeyValueSkipListSet set)
+  private boolean updateStorefiles(final long logCacheFlushId,
+    final StoreFile sf, final SortedSet<KeyValue> set)
   throws IOException {
-    int count = 0;
     this.lock.writeLock().lock();
     try {
       this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
-      count = this.storefiles.size();
       // Tell listeners of the change in readers.
       notifyChangedReadersObservers();
       this.memstore.clearSnapshot(set);
-      return count;
+      return this.storefiles.size() >= this.compactionThreshold;
     } finally {
       this.lock.writeLock().unlock();
     }
@@ -1556,4 +1546,41 @@
   public long heapSize() {
     return DEEP_OVERHEAD + this.memstore.heapSize();
   }
+
+  public StoreFlusher getStoreFlusher(long cacheFlushId) {
+    return new StoreFlusherImpl(cacheFlushId);
+  }
+
+  private class StoreFlusherImpl implements StoreFlusher {
+
+    private long cacheFlushId;
+    private SortedSet<KeyValue> snapshot;
+    private StoreFile storeFile;
+
+    private StoreFlusherImpl(long cacheFlushId) {
+      this.cacheFlushId = cacheFlushId;
+    }
+
+
+    @Override
+    public void prepare() {
+      memstore.snapshot();
+      this.snapshot = memstore.getSnapshot();
+    }
+
+    @Override
+    public void flushCache() throws IOException {
+      storeFile = Store.this.flushCache(cacheFlushId, snapshot);
+    }
+
+    @Override
+    public boolean commit() throws IOException {
+      if (storeFile == null) {
+      return false;
+      }
+      // Add new file to store files.  Clear snapshot too while we have the
+      // Store write lock.
+      return Store.this.updateStorefiles(cacheFlushId,storeFile, snapshot);
+    }
+  }
 }

Added: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=897547&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (added)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Sat Jan  9 21:09:59 2010
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+/**
+ * An package protected interface for a store flushing.
+ * A store flusher carries the state requires to prepare/flush/commit the
+ * store's cache.
+ */
+interface StoreFlusher {
+
+  /**
+   * Prepare for store flush (create snapshot).
+   * <p/>
+   * Requires pausing writes.
+   * <p/>
+   * A very short operation.
+   */
+  void prepare();
+
+  /**
+   * Flush the cache (create the new store file).
+   * <p/>
+   * A lengthy operation which doesn't require locking out any function of
+   * the store.
+   *
+   * @throws IOException in case flush fails
+   */
+  void flushCache() throws IOException;
+
+  /**
+   * Commit the flush - add the store file to the store and clear the memstore
+   * snapshot.
+   * <p/>
+   * Requires pausing scans.
+   * <p/>
+   * A very short operation.
+   *
+   * @return
+   * @throws IOException
+   */
+  boolean commit() throws IOException;
+
+}

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Jan  9 21:09:59 2010
@@ -247,6 +247,7 @@
 
     // Reset the state of the Query Matcher and set to top row
     matcher.reset();
-    matcher.setRow(heap.peek().getRow());
+    KeyValue kv = heap.peek();
+    matcher.setRow((kv == null ? topKey : kv).getRow());
   }
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java Sat Jan  9 21:09:59 2010
@@ -102,6 +102,31 @@
   public final MetricsIntValue memstoreSizeMB =
     new MetricsIntValue("memstoreSizeMB");
 
+  /**
+   * Size of the compaction queue.
+   */
+  public final MetricsIntValue compactionQueueSize = 
+    new MetricsIntValue("compactionQueueSize");
+  
+  /**
+   * filesystem read latency
+   */
+  public final MetricsTimeVaryingRate fsReadLatency = 
+    new MetricsTimeVaryingRate("fsReadLatency");
+
+  /**
+   * filesystem write latency
+   */
+  public final MetricsTimeVaryingRate fsWriteLatency = 
+    new MetricsTimeVaryingRate("fsWriteLatency");
+
+  /**
+   * filesystem sync latency
+   */
+  public final MetricsTimeVaryingRate fsSyncLatency = 
+    new MetricsTimeVaryingRate("fsSyncLatency");
+
+
   public RegionServerMetrics() {
     MetricsContext context = MetricsUtil.getContext("hbase");
     metricsRecord = MetricsUtil.createRecord(context, "regionserver");

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Bytes.java Sat Jan  9 21:09:59 2010
@@ -26,6 +26,7 @@
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.math.BigInteger;
+import java.math.BigDecimal;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -39,27 +40,27 @@
  * HashSets, etc.
  */
 public class Bytes {
-  
+
   /**
    * Size of boolean in bytes
    */
   public static final int SIZEOF_BOOLEAN = Byte.SIZE/Byte.SIZE;
-  
+
   /**
    * Size of byte in bytes
    */
   public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN;
-  
+
   /**
    * Size of char in bytes
    */
   public static final int SIZEOF_CHAR = Character.SIZE/Byte.SIZE;
-  
+
   /**
    * Size of double in bytes
    */
   public static final int SIZEOF_DOUBLE = Double.SIZE/Byte.SIZE;
-  
+
   /**
    * Size of float in bytes
    */
@@ -69,7 +70,7 @@
    * Size of int in bytes
    */
   public static final int SIZEOF_INT = Integer.SIZE/Byte.SIZE;
-  
+
   /**
    * Size of long in bytes
    */
@@ -80,7 +81,7 @@
    */
   public static final int SIZEOF_SHORT = Short.SIZE/Byte.SIZE;
 
-  
+
   /**
    * Estimate of size cost to pay beyond payload in jvm for instance of byte [].
    * Estimate based on study of jhat and jprofiler numbers.
@@ -118,16 +119,27 @@
    */
   public static RawComparator<byte []> BYTES_RAWCOMPARATOR =
     new ByteArrayComparator();
-  
+
   /**
    * Read byte-array written with a WritableableUtils.vint prefix.
    * @param in Input to read from.
    * @return byte array read off <code>in</code>
-   * @throws IOException 
+   * @throws IOException
    */
   public static byte [] readByteArray(final DataInput in)
   throws IOException {
     int len = WritableUtils.readVInt(in);
+    return readByteArray(in, len);
+  }
+
+  /**
+   * Read byte-array from data input.
+   * logic,
+   * @param in Input to read from.
+   * @return byte array read off <code>in</code>
+   * @throws IOException io error
+   */
+  public static byte[] readByteArray(DataInput in, int len) throws IOException {
     if (len < 0) {
       throw new NegativeArraySizeException(Integer.toString(len));
     }
@@ -135,7 +147,7 @@
     in.readFully(result, 0, len);
     return result;
   }
-  
+
   /**
    * Read byte-array written with a WritableableUtils.vint prefix.
    * IOException is converted to a RuntimeException.
@@ -380,7 +392,7 @@
     }
     return result;
   }
-  
+
   /**
    * Convert a boolean to a byte array.
    * @param b
@@ -456,7 +468,7 @@
     }
     return l;
   }
-  
+
   /**
    * Put a long value out to the specified byte array position.
    * @param bytes the byte array
@@ -570,7 +582,7 @@
     b[0] = (byte)(val);
     return b;
   }
-  
+
   /**
    * Converts a byte array to an int value
    * @param bytes
@@ -609,7 +621,7 @@
     }
     return n;
   }
-  
+
   /**
    * Put an int value out to the specified byte array position.
    * @param bytes the byte array
@@ -628,7 +640,7 @@
     bytes[offset] = (byte)(val);
     return offset + SIZEOF_INT;
   }
-  
+
   /**
    * Convert a short value to a byte array
    * @param val
@@ -679,7 +691,7 @@
     n ^= bytes[offset+1] & 0xFF;
     return n;
   }
-  
+
   /**
    * Put a short value out to the specified byte array position.
    * @param bytes the byte array
@@ -696,7 +708,239 @@
     bytes[offset] = (byte)(val);
     return offset + SIZEOF_SHORT;
   }
-  
+
+  /**
+   * Convert a char value to a byte array
+   *
+   * @param val
+   * @return the byte array
+   */
+  public static byte[] toBytes(char val) {
+    byte[] b = new byte[SIZEOF_CHAR];
+    b[1] = (byte) (val);
+    val >>= 8;
+    b[0] = (byte) (val);
+    return b;
+  }
+
+  /**
+   * Converts a byte array to a char value
+   *
+   * @param bytes
+   * @return the char value
+   */
+  public static char toChar(byte[] bytes) {
+    return toChar(bytes, 0);
+  }
+
+
+  /**
+   * Converts a byte array to a char value
+   *
+   * @param bytes
+   * @param offset
+   * @return the char value
+   */
+  public static char toChar(byte[] bytes, int offset) {
+    return toChar(bytes, offset, SIZEOF_CHAR);
+  }
+
+  /**
+   * Converts a byte array to a char value
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the char value
+   */
+  public static char toChar(byte[] bytes, int offset, final int length) {
+    if (bytes == null || length != SIZEOF_CHAR ||
+      (offset + length > bytes.length)) {
+      return (char)-1;
+    }
+    char n = 0;
+    n ^= bytes[offset] & 0xFF;
+    n <<= 8;
+    n ^= bytes[offset + 1] & 0xFF;
+    return n;
+  }
+
+  /**
+   * Put a char value out to the specified byte array position.
+   *
+   * @param bytes  the byte array
+   * @param offset position in the array
+   * @param val    short to write out
+   * @return incremented offset
+   */
+  public static int putChar(byte[] bytes, int offset, char val) {
+    if (bytes == null || (bytes.length - offset < SIZEOF_CHAR)) {
+      return offset;
+    }
+    bytes[offset + 1] = (byte) (val);
+    val >>= 8;
+    bytes[offset] = (byte) (val);
+    return offset + SIZEOF_CHAR;
+  }
+
+  /**
+   * Convert a char array value to a byte array
+   *
+   * @param val
+   * @return the byte array
+   */
+  public static byte[] toBytes(char[] val) {
+    byte[] bytes = new byte[val.length * 2];
+    putChars(bytes,0,val);
+    return bytes;
+  }
+
+  /**
+   * Converts a byte array to a char array value
+   *
+   * @param bytes
+   * @return the char value
+   */
+  public static char[] toChars(byte[] bytes) {
+    return toChars(bytes, 0, bytes.length);
+  }
+
+
+  /**
+   * Converts a byte array to a char array value
+   *
+   * @param bytes
+   * @param offset
+   * @return the char value
+   */
+  public static char[] toChars(byte[] bytes, int offset) {
+    return toChars(bytes, offset, bytes.length-offset);
+  }
+
+  /**
+   * Converts a byte array to a char array value
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the char value
+   */
+  public static char[] toChars(byte[] bytes, int offset, final int length) {
+    int max = offset + length;
+    if (bytes == null || (max > bytes.length) || length %2 ==1) {
+      return null;
+    }
+
+    char[] chars = new char[length / 2];
+    for (int i = 0, j = offset; i < chars.length && j < max; i++, j += 2) {
+      char c = 0;
+      c ^= bytes[j] & 0xFF;
+      c <<= 8;
+      c ^= bytes[j + 1] & 0xFF;
+      chars[i] = c;
+    }
+    return chars;
+  }
+
+  /**
+   * Put a char array value out to the specified byte array position.
+   *
+   * @param bytes  the byte array
+   * @param offset position in the array
+   * @param val    short to write out
+   * @return incremented offset
+   */
+  public static int putChars(byte[] bytes, int offset, char[] val) {
+    int max = val.length * 2 + offset;
+    if (bytes == null || (bytes.length < max)) {
+      return offset;
+    }
+    for (int i=0,j=offset; i<val.length && j<max;i++, j+=2){
+      char c = val[i];
+      bytes[j + 1] = (byte) (c);
+      bytes[j] = (byte) (c >>>8);
+    }
+
+    return offset + SIZEOF_CHAR;
+  }
+
+
+  /**
+   * Convert a BigDecimal value to a byte array
+   *
+   * @param val
+   * @return the byte array
+   */
+  public static byte[] toBytes(BigDecimal val) {
+    byte[] valueBytes = val.unscaledValue().toByteArray();
+    byte[] result = new byte[valueBytes.length + SIZEOF_INT];
+    int offset = putInt(result, 0, val.scale());
+    putBytes(result, offset, valueBytes, 0, valueBytes.length);
+    return result;
+  }
+
+  /**
+   * Converts a byte array to a BigDecimal
+   *
+   * @param bytes
+   * @return the char value
+   */
+  public static BigDecimal toBigDecimal(byte[] bytes) {
+    return toBigDecimal(bytes, 0, bytes.length);
+  }
+
+
+  /**
+   * Converts a byte array to a BigDecimal value
+   *
+   * @param bytes
+   * @param offset
+   * @return the char value
+   */
+  public static BigDecimal toBigDecimal(byte[] bytes, int offset) {
+    return toBigDecimal(bytes, offset, bytes.length);
+  }
+
+  /**
+   * Converts a byte array to a BigDecimal value
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the char value
+   */
+  public static BigDecimal toBigDecimal(byte[] bytes, int offset, final int length) {
+    if (bytes == null || length < SIZEOF_INT + 1 ||
+      (offset + length > bytes.length)) {
+      return null;
+    }
+
+    int scale = toInt(bytes, 0);
+    byte[] tcBytes = new byte[length - SIZEOF_INT];
+    System.arraycopy(bytes, SIZEOF_INT, tcBytes, 0, length - SIZEOF_INT);
+    return new BigDecimal(new BigInteger(tcBytes), scale);
+  }
+
+  /**
+   * Put a BigDecimal value out to the specified byte array position.
+   *
+   * @param bytes  the byte array
+   * @param offset position in the array
+   * @param val    BigDecimal to write out
+   * @return incremented offset
+   */
+  public static int putBigDecimal(byte[] bytes, int offset, BigDecimal val) {
+    if (bytes == null) {
+      return offset;
+    }
+
+    byte[] valueBytes = val.unscaledValue().toByteArray();
+    byte[] result = new byte[valueBytes.length + SIZEOF_INT];
+    offset = putInt(result, offset, val.scale());
+    return putBytes(result, offset, valueBytes, 0, valueBytes.length);
+  }
+
+
   /**
    * @param vint Integer to make a vint of.
    * @return Vint as bytes array.
@@ -759,7 +1003,7 @@
    * Reads a zero-compressed encoded long from input stream and returns it.
    * @param buffer Binary array
    * @param offset Offset into array at which vint begins.
-   * @throws java.io.IOException 
+   * @throws java.io.IOException
    * @return deserialized long from stream.
    */
   public static long readVLong(final byte [] buffer, final int offset)
@@ -822,7 +1066,7 @@
       (left == null || right == null || (left.length != right.length))? false:
         compareTo(left, right) == 0;
   }
-  
+
   /**
    * @param b
    * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
@@ -885,7 +1129,7 @@
     System.arraycopy(c, 0, result, a.length + b.length, c.length);
     return result;
   }
-  
+
   /**
    * @param a
    * @param length
@@ -920,7 +1164,7 @@
     for(int i=0;i<length;i++) padding[i] = 0;
     return add(padding,a);
   }
-  
+
   /**
    * @param a
    * @param length
@@ -931,7 +1175,7 @@
     for(int i=0;i<length;i++) padding[i] = 0;
     return add(a,padding);
   }
-  
+
   /**
    * Split passed range.  Expensive operation relatively.  Uses BigInteger math.
    * Useful splitting ranges for MapReduce jobs.
@@ -986,7 +1230,7 @@
     result[num+1] = b;
     return result;
   }
-  
+
   /**
    * @param t
    * @return Array of byte arrays made from passed array of Text
@@ -1007,7 +1251,7 @@
   public static byte [][] toByteArrays(final String column) {
     return toByteArrays(toBytes(column));
   }
-  
+
   /**
    * @param column
    * @return A byte array of a byte array where first and only entry is
@@ -1018,7 +1262,7 @@
     result[0] = column;
     return result;
   }
-  
+
   /**
    * Binary search for keys in indexes.
    * @param arr array of byte arrays to search for
@@ -1032,7 +1276,7 @@
       int length, RawComparator<byte []> comparator) {
     int low = 0;
     int high = arr.length - 1;
-    
+
     while (low <= high) {
       int mid = (low+high) >>> 1;
       // we have to compare in this order, because the comparator order
@@ -1046,7 +1290,7 @@
       else if (cmp < 0)
         high = mid - 1;
       // BAM. how often does this really happen?
-      else 
+      else
         return mid;
     }
     return - (low+1);
@@ -1055,13 +1299,13 @@
   /**
    * Bytewise binary increment/deincrement of long contained in byte array
    * on given amount.
-   * 
+   *
    * @param value - array of bytes containing long (length <= SIZEOF_LONG)
    * @param amount value will be incremented on (deincremented if negative)
    * @return array of bytes containing incremented long (length == SIZEOF_LONG)
    * @throws IOException - if value.length > SIZEOF_LONG
    */
-  public static byte [] incrementBytes(byte[] value, long amount) 
+  public static byte [] incrementBytes(byte[] value, long amount)
   throws IOException {
     byte[] val = value;
     if (val.length < SIZEOF_LONG) {
@@ -1094,7 +1338,7 @@
     if (amount < 0) {
       amo = -amount;
       sign = -1;
-    } 
+    }
     for(int i=0;i<value.length;i++) {
       int cur = ((int)amo % 256) * sign;
       amo = (amo >> 8);
@@ -1136,5 +1380,5 @@
     }
     return value;
   }
-  
+
 }

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/FSUtils.java Sat Jan  9 21:09:59 2010
@@ -24,6 +24,8 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +41,7 @@
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 
 /**
@@ -143,13 +146,18 @@
     // Are there any data nodes up yet?
     // Currently the safe mode check falls through if the namenode is up but no
     // datanodes have reported in yet.
-    while (dfs.getDataNodeStats().length == 0) {
-      LOG.info("Waiting for dfs to come up...");
-      try {
-        Thread.sleep(wait);
-      } catch (InterruptedException e) {
-        //continue
+    try {
+      while (dfs.getDataNodeStats().length == 0) {
+        LOG.info("Waiting for dfs to come up...");
+        try {
+          Thread.sleep(wait);
+        } catch (InterruptedException e) {
+          //continue
+        }
       }
+    } catch (IOException e) {
+      // getDataNodeStats can fail if superuser privilege is required to run
+      // the datanode report, just ignore it
     }
     // Make sure dfs is not in safe mode
     while (dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET)) {
@@ -363,6 +371,98 @@
   }
 
   /**
+   * Returns the total overall fragmentation percentage. Includes .META. and 
+   * -ROOT- as well.
+   *  
+   * @param master  The master defining the HBase root and file system.
+   * @return A map for each table and its percentage.
+   * @throws IOException When scanning the directory fails.
+   */
+  public static int getTotalTableFragmentation(final HMaster master) 
+  throws IOException {
+    Map<String, Integer> map = getTableFragmentation(master);
+    return map != null && map.size() > 0 ? map.get("-TOTAL-").intValue() : -1;
+  }
+    
+  /**
+   * Runs through the HBase rootdir and checks how many stores for each table
+   * have more than one file in them. Checks -ROOT- and .META. too. The total 
+   * percentage across all tables is stored under the special key "-TOTAL-". 
+   * 
+   * @param master  The master defining the HBase root and file system.
+   * @return A map for each table and its percentage.
+   * @throws IOException When scanning the directory fails.
+   */
+  public static Map<String, Integer> getTableFragmentation(
+    final HMaster master) 
+  throws IOException {
+    Path path = master.getRootDir();
+    // since HMaster.getFileSystem() is package private
+    FileSystem fs = path.getFileSystem(master.getConfiguration());
+    return getTableFragmentation(fs, path);
+  }
+    
+  /**
+   * Runs through the HBase rootdir and checks how many stores for each table
+   * have more than one file in them. Checks -ROOT- and .META. too. The total 
+   * percentage across all tables is stored under the special key "-TOTAL-". 
+   * 
+   * @param fs  The file system to use.
+   * @param hbaseRootDir  The root directory to scan.
+   * @return A map for each table and its percentage.
+   * @throws IOException When scanning the directory fails.
+   */
+  public static Map<String, Integer> getTableFragmentation(
+    final FileSystem fs, final Path hbaseRootDir)
+  throws IOException {
+    Map<String, Integer> frags = new HashMap<String, Integer>();
+    int cfCountTotal = 0;
+    int cfFragTotal = 0;
+    DirFilter df = new DirFilter(fs);
+    // presumes any directory under hbase.rootdir is a table
+    FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, df);
+    for (int i = 0; i < tableDirs.length; i++) {
+      // Skip the .log directory.  All others should be tables.  Inside a table,
+      // there are compaction.dir directories to skip.  Otherwise, all else
+      // should be regions.  Then in each region, should only be family
+      // directories.  Under each of these, should be one file only.
+      Path d = tableDirs[i].getPath();
+      if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
+        continue;
+      }
+      int cfCount = 0;
+      int cfFrag = 0;
+      FileStatus [] regionDirs = fs.listStatus(d, df);
+      for (int j = 0; j < regionDirs.length; j++) {
+        Path dd = regionDirs[j].getPath();
+        if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
+          continue;
+        }
+        // else its a region name, now look in region for families
+        FileStatus [] familyDirs = fs.listStatus(dd, df);
+        for (int k = 0; k < familyDirs.length; k++) {
+          cfCount++;
+          cfCountTotal++;
+          Path family = familyDirs[k].getPath();
+          // now in family make sure only one file
+          FileStatus [] familyStatus = fs.listStatus(family);
+          if (familyStatus.length > 1) {
+            cfFrag++;
+            cfFragTotal++;
+          }
+        }
+      }
+      // compute percentage per table and store in result list
+      frags.put(d.getName(), new Integer(
+        Math.round((float) cfFrag / cfCount * 100)));
+    }
+    // set overall percentage for all tables
+    frags.put("-TOTAL-", new Integer(
+      Math.round((float) cfFragTotal / cfCountTotal * 100)));
+    return frags;
+  }
+
+  /**
    * Expects to find -ROOT- directory.
    * @param fs
    * @param hbaseRootDir

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Pair.java Sat Jan  9 21:09:59 2010
@@ -1,24 +1,20 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.Serializable;
+import java.util.Arrays;
 
 /**
- * A generic class for pairs.
- * @param <T1> 
- * @param <T2> 
+ * A generic class for immutable pairs.
+ * @param <T1>
+ * @param <T2>
  */
-public class Pair<T1, T2> implements Serializable
+public final class Pair<T1, T2> implements Serializable
 {
   private static final long serialVersionUID = -3986244606585552569L;
   protected T1 first = null;
   protected T2 second = null;
+  private int hashcode;
 
-  /**
-   * Default constructor.
-   */
-  public Pair()
-  {
-  }
 
   /**
    * Constructor
@@ -29,24 +25,38 @@
   {
     this.first = a;
     this.second = b;
-  }
 
-  /**
-   * Replace the first element of the pair.
-   * @param a
-   */
-  public void setFirst(T1 a)
-  {
-    this.first = a;
-  }
-
-  /**
-   * Replace the second element of the pair.
-   * @param b 
-   */
-  public void setSecond(T2 b)
-  {
-    this.second = b;
+    // generate a hash code
+    hashcode = first != null ? generateHashCode(first) : 0;
+    hashcode = 31 * hashcode + (second != null ? generateHashCode(second) : 0);
+  }
+
+  private static int generateHashCode(Object o) {
+    if (o.getClass().isArray()) {
+      if (o instanceof long[]) {
+          return Arrays.hashCode((long[]) o);
+      } else if (o instanceof int[]) {
+          return Arrays.hashCode((int[]) o);
+      } else if (o instanceof short[]) {
+          return Arrays.hashCode((short[]) o);
+      } else if (o instanceof char[]) {
+          return Arrays.hashCode((char[]) o);
+      } else if (o instanceof byte[]) {
+          return Arrays.hashCode((byte[]) o);
+      } else if (o instanceof double[]) {
+          return Arrays.hashCode((double[]) o);
+      } else if (o instanceof float[]) {
+          return Arrays.hashCode((float[]) o);
+      } else if (o instanceof boolean[]) {
+          return Arrays.hashCode((boolean[]) o);
+      } else {
+          // Not an array of primitives
+          return Arrays.hashCode((Object[]) o);
+      }
+    } else {
+      // Standard comparison
+      return o.hashCode();
+    }
   }
 
   /**
@@ -67,9 +77,55 @@
     return second;
   }
 
+  /**
+   * Creates a new instance of the pair encapsulating the supplied values.
+   *
+   * @param one  the first value
+   * @param two  the second value
+   * @param <T1> the type of the first element.
+   * @param <T2> the type of the second element.
+   * @return the new instance
+   */
+  public static <T1, T2> Pair<T1, T2> of(T1 one, T2 two)
+  {
+    return new Pair<T1, T2>(one, two);
+  }
+
   private static boolean equals(Object x, Object y)
   {
-     return (x == null && y == null) || (x != null && x.equals(y));
+    // Null safe compare first
+    if (x == null || y == null) {
+      return x == y;
+    }
+
+    Class clazz = x.getClass();
+    // If they are both the same type of array
+    if (clazz.isArray() && clazz == y.getClass()) {
+      // NOTE: this section is borrowed from EqualsBuilder in commons-lang
+      if (x instanceof long[]) {
+          return Arrays.equals((long[]) x, (long[]) y);
+      } else if (x instanceof int[]) {
+          return Arrays.equals((int[]) x, (int[]) y);
+      } else if (x instanceof short[]) {
+          return Arrays.equals((short[]) x, (short[]) y);
+      } else if (x instanceof char[]) {
+          return Arrays.equals((char[]) x, (char[]) y);
+      } else if (x instanceof byte[]) {
+          return Arrays.equals((byte[]) x, (byte[]) y);
+      } else if (x instanceof double[]) {
+          return Arrays.equals((double[]) x, (double[]) y);
+      } else if (x instanceof float[]) {
+          return Arrays.equals((float[]) x, (float[]) y);
+      } else if (x instanceof boolean[]) {
+          return Arrays.equals((boolean[]) x, (boolean[]) y);
+      } else {
+          // Not an array of primitives
+          return Arrays.deepEquals((Object[]) x, (Object[]) y);
+      }
+    } else {
+      // Standard comparison
+      return x.equals(y);
+    }
   }
 
   @Override
@@ -83,12 +139,7 @@
   @Override
   public int hashCode()
   {
-    if (first == null)
-      return (second == null) ? 0 : second.hashCode() + 1;
-    else if (second == null)
-      return first.hashCode() + 2;
-    else
-      return first.hashCode() * 17 + second.hashCode();
+    return hashcode;
   }
 
   @Override

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/util/Writables.java Sat Jan  9 21:09:59 2010
@@ -22,11 +22,13 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.DataInput;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Utility class with methods for manipulating Writable objects
@@ -193,4 +195,31 @@
     }
     return Bytes.toLong(c.getValue());
   }
+
+  /**
+   * Reads a zero-compressed encoded long from input stream and returns it.
+   * This method is a copy of {@link WritableUtils#readVLong(java.io.DataInput)}
+   * changed to allow the first byte to be provided as an argument.
+   * todo add this method to hadoop WritableUtils and refactor the base method
+   * to use it.
+   *
+   * @param stream    Binary input stream
+   * @param firstByte the first byte of the vlong
+   * @return deserialized long from stream.
+   * @throws java.io.IOException    io error
+   */
+  public static long readVLong(DataInput stream, byte firstByte)
+    throws IOException {
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len - 1; idx++) {
+      byte b = stream.readByte();
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Sat Jan  9 21:09:59 2010
@@ -379,7 +379,7 @@
     try {
       return readAddressOrThrow(znode, watcher);
     } catch (IOException e) {
-      e.printStackTrace();
+      LOG.debug("readAddress " +znode, e);
       return null;
     }
   }

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java Sat Jan  9 21:09:59 2010
@@ -21,7 +21,9 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.Path;
@@ -59,6 +61,31 @@
     shutdownDfs(cluster);
     super.tearDown();
   }
+
+  /**
+   * Test the findMemstoresWithEditsOlderThan method.
+   * @throws IOException
+   */
+  public void testFindMemstoresWithEditsOlderThan() throws IOException {
+    Map<byte [], Long> regionsToSeqids = new HashMap<byte [], Long>();
+    for (int i = 0; i < 10; i++) {
+      Long l = new Long(i);
+      regionsToSeqids.put(l.toString().getBytes(), l);
+    }
+    byte [][] regions =
+      HLog.findMemstoresWithEditsOlderThan(1, regionsToSeqids);
+    assertEquals(1, regions.length);
+    assertTrue(Bytes.equals(regions[0], "0".getBytes()));
+    regions = HLog.findMemstoresWithEditsOlderThan(3, regionsToSeqids);
+    int count = 3;
+    assertEquals(count, regions.length);
+    // Regions returned are not ordered.
+    for (int i = 0; i < count; i++) {
+      assertTrue(Bytes.equals(regions[i], "0".getBytes()) ||
+        Bytes.equals(regions[i], "1".getBytes()) ||
+        Bytes.equals(regions[i], "2".getBytes()));
+    }
+  }
  
   /**
    * Just write multiple logs then split.  Before fix for HADOOP-2283, this
@@ -184,5 +211,4 @@
       }
     }
   }
-
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat Jan  9 21:09:59 2010
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeMap;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,9 +44,15 @@
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import junit.framework.Assert;
 
 /**
  * Basic stand-alone testing of HRegion.
@@ -62,13 +69,13 @@
   private final int MAX_VERSIONS = 2;
 
   // Test names
-  private final byte[] tableName = Bytes.toBytes("testtable");;
-  private final byte[] qual1 = Bytes.toBytes("qual1");
-  private final byte[] qual2 = Bytes.toBytes("qual2");
-  private final byte[] qual3 = Bytes.toBytes("qual3");
-  private final byte[] value1 = Bytes.toBytes("value1");
-  private final byte[] value2 = Bytes.toBytes("value2");
-  private final byte [] row = Bytes.toBytes("rowA");
+  protected final byte[] tableName = Bytes.toBytes("testtable");;
+  protected final byte[] qual1 = Bytes.toBytes("qual1");
+  protected final byte[] qual2 = Bytes.toBytes("qual2");
+  protected final byte[] qual3 = Bytes.toBytes("qual3");
+  protected final byte[] value1 = Bytes.toBytes("value1");
+  protected final byte[] value2 = Bytes.toBytes("value2");
+  protected final byte [] row = Bytes.toBytes("rowA");
 
   /**
    * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@@ -1773,7 +1780,370 @@
       }
     }
   }
-  
+
+  /**
+   * Flushes the cache in a thread while scanning. The tests verify that the
+   * scan is coherent - e.g. the returned results are always of the same or
+   * later update as the previous results.
+   * @throws IOException scan / compact
+   * @throws InterruptedException thread join
+   */
+  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
+    byte[] tableName = Bytes.toBytes("testFlushCacheWhileScanning");
+    byte[] family = Bytes.toBytes("family");
+    int numRows = 1000;
+    int flushAndScanInterval = 10;
+    int compactInterval = 10 * flushAndScanInterval;
+
+    String method = "testFlushCacheWhileScanning";
+    initHRegion(tableName,method, family);
+    FlushThread flushThread = new FlushThread();
+    flushThread.start();
+
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    scan.setFilter(new SingleColumnValueFilter(family, qual1,
+      CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5L))));
+
+    int expectedCount = 0;
+    List<KeyValue> res = new ArrayList<KeyValue>();
+
+    boolean toggle=true;
+    for (long i = 0; i < numRows; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.add(family, qual1, Bytes.toBytes(i % 10));
+      region.put(put);
+
+      if (i != 0 && i % compactInterval == 0) {
+        //System.out.println("iteration = " + i);
+        region.compactStores(true);
+      }
+
+      if (i % 10 == 5L) {
+        expectedCount++;
+      }
+
+      if (i != 0 && i % flushAndScanInterval == 0) {
+        res.clear();
+        InternalScanner scanner = region.getScanner(scan);
+        if (toggle) {
+          flushThread.flush();
+        }
+        while (scanner.next(res)) ;
+        if (!toggle) {
+          flushThread.flush();
+        }
+        Assert.assertEquals("i=" + i, expectedCount, res.size());
+        toggle = !toggle;
+      }
+    }
+
+    flushThread.done();
+    flushThread.join();
+    flushThread.checkNoError();
+  }
+
+  protected class FlushThread extends Thread {
+    private volatile boolean done;
+    private Throwable error = null;
+
+    public void done() {
+      done = true;
+      synchronized (this) {
+        interrupt();
+      }
+    }
+
+    public void checkNoError() {
+      if (error != null) {
+        Assert.assertNull(error);
+      }
+    }
+
+    @Override
+    public void run() {
+      done = false;
+      while (!done) {
+        synchronized (this) {
+          try {
+            wait();
+          } catch (InterruptedException ignored) {
+            if (done) {
+              break;
+            }
+          }
+        }
+        try {
+          region.flushcache();
+        } catch (IOException e) {
+          if (!done) {
+            LOG.error("Error while flusing cache", e);
+            error = e;
+          }
+          break;
+        }
+      }
+
+    }
+
+    public void flush() {
+      synchronized (this) {
+        notify();
+      }
+
+    }
+  }
+
+  /**
+   * Writes very wide records and scans for the latest every time..
+   * Flushes and compacts the region every now and then to keep things
+   * realistic.
+   *
+   * @throws IOException          by flush / scan / compaction
+   * @throws InterruptedException when joining threads
+   */
+  public void testWritesWhileScanning()
+    throws IOException, InterruptedException {
+    byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
+    int testCount = 100;
+    int numRows = 1;
+    int numFamilies = 10;
+    int numQualifiers = 100;
+    int flushInterval = 7;
+    int compactInterval = 5 * flushInterval;
+    byte[][] families = new byte[numFamilies][];
+    for (int i = 0; i < numFamilies; i++) {
+      families[i] = Bytes.toBytes("family" + i);
+    }
+    byte[][] qualifiers = new byte[numQualifiers][];
+    for (int i = 0; i < numQualifiers; i++) {
+      qualifiers[i] = Bytes.toBytes("qual" + i);
+    }
+
+    String method = "testWritesWhileScanning";
+    initHRegion(tableName, method, families);
+    PutThread putThread = new PutThread(numRows, families, qualifiers);
+    putThread.start();
+    FlushThread flushThread = new FlushThread();
+    flushThread.start();
+
+    Scan scan = new Scan();
+    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
+      new BinaryComparator(Bytes.toBytes("row0"))));
+
+    int expectedCount = numFamilies * numQualifiers;
+    List<KeyValue> res = new ArrayList<KeyValue>();
+
+    long prevTimestamp = 0L;
+    for (int i = 0; i < testCount; i++) {
+
+      if (i != 0 && i % compactInterval == 0) {
+        region.compactStores(true);
+      }
+
+      if (i != 0 && i % flushInterval == 0) {
+        //System.out.println("scan iteration = " + i);
+        flushThread.flush();
+      }
+
+      boolean previousEmpty = res.isEmpty();
+      res.clear();
+      InternalScanner scanner = region.getScanner(scan);
+      while (scanner.next(res)) ;
+      if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
+        Assert.assertEquals("i=" + i, expectedCount, res.size());
+        long timestamp = res.get(0).getTimestamp();
+        Assert.assertTrue(timestamp >= prevTimestamp);
+        prevTimestamp = timestamp;
+      }
+    }
+
+    putThread.done();
+    putThread.join();
+    putThread.checkNoError();
+
+    flushThread.done();
+    flushThread.join();
+    flushThread.checkNoError();
+  }
+
+  protected class PutThread extends Thread {
+    private volatile boolean done;
+    private Throwable error = null;
+    private int numRows;
+    private byte[][] families;
+    private byte[][] qualifiers;
+
+    private PutThread(int numRows, byte[][] families,
+      byte[][] qualifiers) {
+      this.numRows = numRows;
+      this.families = families;
+      this.qualifiers = qualifiers;
+    }
+
+    public void done() {
+      done = true;
+      synchronized (this) {
+        interrupt();
+      }
+    }
+
+    public void checkNoError() {
+      if (error != null) {
+        Assert.assertNull(error);
+      }
+    }
+
+    @Override
+    public void run() {
+      done = false;
+      int val = 0;
+      while (!done) {
+        try {
+          for (int r = 0; r < numRows; r++) {
+            byte[] row = Bytes.toBytes("row" + r);
+            Put put = new Put(row);
+            for (int f = 0; f < families.length; f++) {
+              for (int q = 0; q < qualifiers.length; q++) {
+                put.add(families[f], qualifiers[q], (long) val,
+                  Bytes.toBytes(val));
+              }
+            }
+            region.put(put);
+            if (val > 0 && val % 47 == 0){
+              //System.out.println("put iteration = " + val);
+              Delete delete = new Delete(row, (long)val-30, null);
+              region.delete(delete, null, true);
+            }
+            val++;
+          }
+        } catch (IOException e) {
+          LOG.error("error while putting records", e);
+          error = e;
+          break;
+        }
+      }
+
+    }
+
+  }
+
+
+  /**
+   * Writes very wide records and gets the latest row every time..
+   * Flushes and compacts the region every now and then to keep things
+   * realistic.
+   *
+   * @throws IOException          by flush / scan / compaction
+   * @throws InterruptedException when joining threads
+   */
+  public void testWritesWhileGetting()
+    throws IOException, InterruptedException {
+    byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
+    int testCount = 200;
+    int numRows = 1;
+    int numFamilies = 10;
+    int numQualifiers = 100;
+    int flushInterval = 10;
+    int compactInterval = 10 * flushInterval;
+    byte[][] families = new byte[numFamilies][];
+    for (int i = 0; i < numFamilies; i++) {
+      families[i] = Bytes.toBytes("family" + i);
+    }
+    byte[][] qualifiers = new byte[numQualifiers][];
+    for (int i = 0; i < numQualifiers; i++) {
+      qualifiers[i] = Bytes.toBytes("qual" + i);
+    }
+
+    String method = "testWritesWhileScanning";
+    initHRegion(tableName, method, families);
+    PutThread putThread = new PutThread(numRows, families, qualifiers);
+    putThread.start();
+    FlushThread flushThread = new FlushThread();
+    flushThread.start();
+
+    Get get = new Get(Bytes.toBytes("row0"));
+    Result result = null;
+
+    int expectedCount = numFamilies * numQualifiers;
+
+    long prevTimestamp = 0L;
+    for (int i = 0; i < testCount; i++) {
+
+      if (i != 0 && i % compactInterval == 0) {
+        region.compactStores(true);
+      }
+
+      if (i != 0 && i % flushInterval == 0) {
+        //System.out.println("iteration = " + i);
+        flushThread.flush();
+      }
+
+      boolean previousEmpty = result == null || result.isEmpty();
+      result = region.get(get, null);
+      if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
+        Assert.assertEquals("i=" + i, expectedCount, result.size());
+        long timestamp =
+          result.getCellValue(families[0], qualifiers[0]).getTimestamp();
+        Assert.assertTrue(timestamp >= prevTimestamp);
+        prevTimestamp = timestamp;
+      }
+    }
+
+    putThread.done();
+    putThread.join();
+    putThread.checkNoError();
+
+    flushThread.done();
+    flushThread.join();
+    flushThread.checkNoError();
+  }
+
+
+  public void testIndexesScanWithOneDeletedRow() throws IOException {
+    byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow");
+    byte[] family = Bytes.toBytes("family");
+
+    //Setting up region
+    String method = "testIndexesScanWithOneDeletedRow";
+    initHRegion(tableName, method, new HBaseConfiguration(), family);
+
+    Put put = new Put(Bytes.toBytes(1L));
+    put.add(family, qual1, 1L, Bytes.toBytes(1L));
+    region.put(put);
+
+    region.flushcache();
+
+    Delete delete = new Delete(Bytes.toBytes(1L), 1L, null);
+    //delete.deleteColumn(family, qual1);
+    region.delete(delete, null, true);
+
+    put = new Put(Bytes.toBytes(2L));
+    put.add(family, qual1, 2L, Bytes.toBytes(2L));
+    region.put(put);
+
+    Scan idxScan = new Scan();
+    idxScan.addFamily(family);
+    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
+      Arrays.<Filter>asList(new SingleColumnValueFilter(family, qual1,
+        CompareFilter.CompareOp.GREATER_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes(0L))),
+        new SingleColumnValueFilter(family, qual1,
+          CompareFilter.CompareOp.LESS_OR_EQUAL,
+          new BinaryComparator(Bytes.toBytes(3L)))
+      )));
+    InternalScanner scanner = region.getScanner(idxScan);
+    List<KeyValue> res = new ArrayList<KeyValue>();
+
+    //long start = System.nanoTime();
+    while (scanner.next(res)) ;
+    //long end = System.nanoTime();
+    //System.out.println("memStoreEmpty=" + memStoreEmpty + ", time=" + (end - start)/1000000D);
+    assertEquals(1L, res.size());
+
+  }
+
+
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)
   throws IOException {
@@ -1887,16 +2257,22 @@
   throws IOException {
     initHRegion(tableName, callingMethod, new HBaseConfiguration(), families);
   }
-  
-  private void initHRegion (byte [] tableName, String callingMethod,
-    HBaseConfiguration conf, byte [] ... families)
-  throws IOException{
+
+  protected void initHRegion(byte[] tableName, String callingMethod,
+    HBaseConfiguration conf, byte[]... families)
+    throws IOException {
+    HTableDescriptor htd = constructTableDescriptor(tableName, families);
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    Path path = new Path(DIR + callingMethod);
+    region = HRegion.createHRegion(info, path, conf);
+  }
+
+  protected HTableDescriptor constructTableDescriptor(byte[] tableName,
+    byte[]... families) {
     HTableDescriptor htd = new HTableDescriptor(tableName);
-    for(byte [] family : families) {
+    for (byte[] family : families) {
       htd.addFamily(new HColumnDescriptor(family));
     }
-    HRegionInfo info = new HRegionInfo(htd, null, null, false);
-    Path path = new Path(DIR + callingMethod); 
-    region = HRegion.createHRegion(info, path, conf);
+    return htd;
   }
 }

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Sat Jan  9 21:09:59 2010
@@ -217,7 +217,7 @@
 
   private void flush(int storeFilessize) throws IOException{
     this.store.snapshot();
-    this.store.flushCache(id++);
+    flushStore(store, id++);
     assertEquals(storeFilessize, this.store.getStorefiles().size());
     assertEquals(0, this.store.memstore.kvset.size());
   }
@@ -260,7 +260,7 @@
     assertTrue(ret > 0);
 
     // then flush.
-    this.store.flushCache(id++);
+    flushStore(store, id++);
     assertEquals(1, this.store.getStorefiles().size());
     // from the one we inserted up there, and a new one
     assertEquals(2, this.store.memstore.kvset.size());
@@ -286,4 +286,11 @@
     assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
 
   }
+
+  private static void flushStore(Store store, long id) throws IOException {
+    StoreFlusher storeFlusher = store.getStoreFlusher(id);
+    storeFlusher.prepare();
+    storeFlusher.flushCache();
+    storeFlusher.commit();
+  }
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestBytes.java Sat Jan  9 21:09:59 2010
@@ -21,8 +21,10 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.math.BigDecimal;
 
 import junit.framework.TestCase;
+import junit.framework.Assert;
 
 public class TestBytes extends TestCase {
   public void testNullHashCode() {
@@ -70,6 +72,27 @@
     assertTrue(Bytes.equals(parts[1], middle));
   }
 
+  public void testToChars() throws Exception {
+    char[] chars = new char[]{'b', 'l', 'a', 'b', 'l', 'a', 'b', 'l', 'a'};
+    byte[] bytes = Bytes.toBytes(chars);
+    char[] chars2 = Bytes.toChars(bytes);
+    assertTrue(Arrays.equals(chars, chars2));
+  }
+
+  public void testToBigDecimal() throws Exception {
+    BigDecimal bd1 = new BigDecimal("3.14");
+    byte[] bytes = Bytes.toBytes(bd1);
+    BigDecimal bd2 = Bytes.toBigDecimal(bytes);
+    assertEquals(bd1, bd2);
+  }
+
+  public void testToChar() throws Exception {
+    for (char c = 0; c < Character.MAX_VALUE; c++) {
+      byte[] b = Bytes.toBytes(c);
+      assertEquals(c, Bytes.toChar(b));
+    }
+  }
+
   public void testToLong() throws Exception {
     long [] longs = {-1l, 123l, 122232323232l};
     for (int i = 0; i < longs.length; i++) {

Added: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java?rev=897547&view=auto
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java (added)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/util/TestPair.java Sat Jan  9 21:09:59 2010
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hbase.HBaseTestCase;
+
+import java.io.*;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * `
+ * Tests for the pair class.
+ */
+public class TestPair extends HBaseTestCase {
+
+  /**
+   * Test of factory method.
+   * Verifies that <code>null</code> args are permitted.
+   */
+  public void testOf() {
+    Pair<Integer, Double> p1 = Pair.of(1, 1.2);
+    Assert.assertEquals(1, (int) p1.getFirst());
+    Assert.assertEquals(1.2, p1.getSecond());
+
+    Pair<String, Integer> p2 = Pair.of("a", 1);
+    Assert.assertEquals("a", p2.getFirst());
+    Assert.assertEquals(1, (int) p2.getSecond());
+
+    p2 = Pair.of(null, (Integer) null);
+    Assert.assertNull(p2.getFirst());
+    Assert.assertNull(p2.getSecond());
+  }
+
+
+  /**
+   * Test equals() method works as expected.
+   */
+  public void testEquals() {
+    // Set up test data
+    Pair<?, ?> item1a = Pair.of(1, 1);
+    Pair<?, ?> item1b = Pair.of(1, 1);
+    Pair<?, ?> item2 = Pair.of(1, 2);
+    Pair<?, ?> item3 = Pair.of(2, 1);
+
+    // One should be equal
+    Assert.assertTrue(item1a.equals(item1b));
+
+    // Others should not be
+    Assert.assertFalse(item1a.equals(item2));
+    Assert.assertFalse(item1a.equals(item3));
+
+    // check array equals
+    Pair<?,?> item4 = Pair.of(new byte[]{3,27}, new String[]{"foo","bar"});
+    Pair<?,?> item5 = Pair.of(new byte[]{3,27}, new String[]{"foo","bar"});
+    Assert.assertTrue(item4.equals(item5));
+
+  }
+
+  /**
+   * Test equals() method works as expected.
+   */
+  public void testEqualsNullFirst() {
+    // Set up test data
+    Pair<?, ?> item1 = Pair.of(1, 1);
+    Pair<?, ?> item2 = Pair.of(null, 2);
+
+    Assert.assertFalse(item1.equals(item2));
+    Assert.assertFalse(item2.equals(item1));
+  }
+
+  /**
+   * Test equals() method works as expected.
+   */
+  public void testEqualsNullSecond() {
+    // Set up test data
+    Pair<?, ?> item1 = Pair.of(1, 1);
+    Pair<?, ?> item2 = Pair.of(1, null);
+
+    Assert.assertFalse(item1.equals(item2));
+    Assert.assertFalse(item2.equals(item1));
+  }
+
+  /**
+   * Test equals() method works as expected.
+   */
+  public void testEqualsNullBoth() {
+    // Set up test data
+    Pair<?, ?> item1 = Pair.of(null, null);
+    Pair<?, ?> item2 = Pair.of(null, null);
+
+    Assert.assertTrue(item1.equals(item2));
+    Assert.assertTrue(item2.equals(item1));
+  }
+
+  /**
+   * Test hashCode() method works correctly.
+   */
+  public void testHashCode() {
+    // Set up test data
+    Set<Pair<?, ?>> testSet = new HashSet<Pair<?, ?>>();
+    Pair<?, ?> item1a = Pair.of(1, 1);
+    Pair<?, ?> item1b = Pair.of(1, 1);
+    Pair<?, ?> item2 = Pair.of(1, 2);
+    Pair<?, ?> item3 = Pair.of(2, 1);
+
+    // Item one should be found since they share the same content.
+    testSet.add(item1a);
+    Assert.assertTrue(testSet.contains(item1b));
+
+    // Others should not be found
+    Assert.assertFalse(testSet.contains(item2));
+    Assert.assertFalse(testSet.contains(item3));
+  }
+
+  /**
+   * Test hashCode() method works correctly with primitive arrays.
+   */
+  public void testHashCodeWithPrimitiveArray() {
+    // Set up test data
+    Pair<?, ?> pair1 = Pair.of(new byte[] {1, 2, 3}, new byte[] {4, 5, 6});
+    Pair<?, ?> pair2 = Pair.of(new byte[] {1, 2, 3}, new byte[] {4, 5, 6});
+
+    Assert.assertEquals(pair1.hashCode(), pair2.hashCode());
+    Assert.assertFalse(pair1.hashCode() == Pair.of(1, 2).hashCode());
+  }
+
+  /**
+   * Test hashCode() method works correctly with object arrays.
+   */
+  public void testHashCodeWithObjectArray() {
+    // Set up test data
+    Pair<?, ?> pair1 = Pair.of(new Object[] {"one", "two", "three"}, new Object[] {"four", "five", "six"});
+    Pair<?, ?> pair2 = Pair.of(new Object[] {"one", "two", "three"}, new Object[] {"four", "five", "six"});
+
+    Assert.assertEquals(pair1.hashCode(), pair2.hashCode());
+    Assert.assertFalse(pair1.hashCode() == Pair.of(1, 2).hashCode());
+  }
+
+  /**
+   * Tests the the pair class serializes and deserializes without a problem.
+   *
+   * @throws IOException            exception - should not occur
+   * @throws ClassNotFoundException exception - should not occur
+   */
+  public void testSerializable() throws IOException, ClassNotFoundException {
+    Pair<?, ?> item1 = Pair.of(5, "bla");
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(bout);
+    out.writeObject(item1);
+    ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bout.toByteArray()));
+    Assert.assertEquals(item1, in.readObject());
+  }
+
+}

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/master.jsp Sat Jan  9 21:09:59 2010
@@ -3,6 +3,7 @@
   import="java.net.URLEncoder" 
   import="org.apache.hadoop.io.Text"
   import="org.apache.hadoop.hbase.util.Bytes"
+  import="org.apache.hadoop.hbase.util.FSUtils"
   import="org.apache.hadoop.hbase.master.HMaster"
   import="org.apache.hadoop.hbase.HConstants"
   import="org.apache.hadoop.hbase.master.MetaRegion"
@@ -23,6 +24,7 @@
   if (interval == 0) {
       interval = 1;
   }
+  Map<String, Integer> frags = master.getTableFragmentation();
 %><?xml version="1.0" encoding="UTF-8" ?>
 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" 
   "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 
@@ -48,6 +50,7 @@
 <tr><td>HBase Root Directory</td><td><%= master.getRootDir().toString() %></td><td>Location of HBase home directory</td></tr>
 <tr><td>Load average</td><td><%= master.getAverageLoad() %></td><td>Average number of regions per regionserver. Naive computation.</td></tr>
 <tr><td>Regions On FS</td><td><%= master.countRegionsOnFS() %></td><td>Number of regions on FileSystem. Rough count.</td></tr>
+<tr><td>Fragmentation</td><td><%= frags.get("-TOTAL-") != null ? frags.get("-TOTAL-").intValue() + "%" : "n/a" %></td><td>Overall fragmentation of all tables, including .META. and -ROOT-.</td></tr>
 <tr><td>Zookeeper Quorum</td><td><%= master.getZooKeeperWrapper().getQuorumServers() %></td><td>Addresses of all registered ZK servers. For more, see <a href="/zk.jsp">zk dump</a>.</td></tr>
 </table>
 
@@ -55,11 +58,17 @@
 <% 
   if (rootLocation != null) { %>
 <table>
-<tr><th>Table</th><th>Description</th></tr>
-<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %>"><%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %></a></td><td>The -ROOT- table holds references to all .META. regions.</td></tr>
+<tr><th>Table</th><th title="Fragmentation - Will be 0% after a major compaction and fluctuate during normal usage.">Frag.</th><th>Description</th></tr>
+<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %>"><%= Bytes.toString(HConstants.ROOT_TABLE_NAME) %></a></td>
+<td align="center"><%= frags.get("-ROOT-") != null ? frags.get("-ROOT-").intValue() + "%" : "n/a" %></td>
+<td>The -ROOT- table holds references to all .META. regions.</td>
+</tr>
 <%
     if (onlineRegions != null && onlineRegions.size() > 0) { %>
-<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.META_TABLE_NAME) %>"><%= Bytes.toString(HConstants.META_TABLE_NAME) %></a></td><td>The .META. table holds references to all User Table regions</td></tr>
+<tr><td><a href="/table.jsp?name=<%= Bytes.toString(HConstants.META_TABLE_NAME) %>"><%= Bytes.toString(HConstants.META_TABLE_NAME) %></a></td>
+<td align="center"><%= frags.get(".META.") != null ? frags.get(".META.").intValue() + "%" : "n/a" %></td>
+<td>The .META. table holds references to all User Table regions</td>
+</tr>
   
 <%  } %>
 </table>
@@ -69,9 +78,12 @@
 <% HTableDescriptor[] tables = new HBaseAdmin(conf).listTables(); 
    if(tables != null && tables.length > 0) { %>
 <table>
-<tr><th>Table</th><th>Description</th></tr>
+<tr><th>Table</th><th title="Fragmentation - Will be 0% after a major compaction and fluctuate during normal usage.">Frag.</th><th>Description</th></tr>
 <%   for(HTableDescriptor htDesc : tables ) { %>
-<tr><td><a href=/table.jsp?name=<%= htDesc.getNameAsString() %>><%= htDesc.getNameAsString() %></a> </td><td><%= htDesc.toString() %></td></tr>
+<tr><td><a href=/table.jsp?name=<%= htDesc.getNameAsString() %>><%= htDesc.getNameAsString() %></a> </td>
+<td align="center"><%= frags.get(htDesc.getNameAsString()) != null ? frags.get(htDesc.getNameAsString()).intValue() + "%" : "n/a" %></td>
+<td><%= htDesc.toString() %></td>
+</tr>
 <%   }  %>
 
 <p> <%= tables.length %> table(s) in set.</p>

Modified: hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp?rev=897547&r1=897546&r2=897547&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp (original)
+++ hadoop/hbase/branches/0.20_on_hadoop-0.18.3/src/webapps/master/table.jsp Sat Jan  9 21:09:59 2010
@@ -27,6 +27,7 @@
       master.getServerAddressToServerInfo();
   String tableHeader = "<h2>Table Regions</h2><table><tr><th>Name</th><th>Region Server</th><th>Encoded Name</th><th>Start Key</th><th>End Key</th></tr>";
   HServerAddress rootLocation = master.getRootRegionLocation();
+  Map<String, Integer> frags = master.getTableFragmentation(); 
 %>
 
 <?xml version="1.0" encoding="UTF-8" ?>
@@ -120,6 +121,7 @@
 <table>
   <tr><th>Attribute Name</th><th>Value</th><th>Description</th></tr>
   <tr><td>Enabled</td><td><%= hbadmin.isTableEnabled(table.getTableName()) %></td><td>Is the table enabled</td></tr>
+  <tr><td>Fragmentation</td><td><%= frags.get(tableName) != null ? frags.get(tableName).intValue() + "%" : "n/a" %></td><td>How fragmented is the table. After a major compaction it is 0%.</td></tr>
 </table>
 <%
   Map<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();