You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/12/23 21:54:15 UTC

svn commit: r1425525 [3/7] - in /hbase/branches/0.94-test: ./ bin/ conf/ security/src/main/java/org/apache/hadoop/hbase/ipc/ security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/...

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Dec 23 20:54:12 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -55,12 +56,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -76,6 +79,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Append;
@@ -228,15 +232,34 @@ public class HRegion implements HeapSize
    * The directory for the table this region is part of.
    * This directory contains the directory for this region.
    */
-  final Path tableDir;
+  private final Path tableDir;
 
-  final HLog log;
-  final FileSystem fs;
-  final Configuration conf;
-  final int rowLockWaitDuration;
+  private final HLog log;
+  private final FileSystem fs;
+  private final Configuration conf;
+  private final int rowLockWaitDuration;
   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
-  final HRegionInfo regionInfo;
-  final Path regiondir;
+
+  // The internal wait duration to acquire a lock before read/update
+  // from the region. It is not per row. The purpose of this wait time
+  // is to avoid waiting a long time while the region is busy, so that
+  // we can release the IPC handler soon enough to improve the
+  // availability of the region server. It can be adjusted by
+  // tuning configuration "hbase.busy.wait.duration".
+  final long busyWaitDuration;
+  static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+
+  // If updating multiple rows in one call, wait longer,
+  // i.e. waiting for busyWaitDuration * # of rows. However,
+  // we can limit the max multiplier.
+  final int maxBusyWaitMultiplier;
+
+  // Max busy wait duration. There is no point to wait longer than the RPC
+  // purge timeout, when a RPC call will be terminated by the RPC engine.
+  final long maxBusyWaitDuration;
+
+  private final HRegionInfo regionInfo;
+  private final Path regiondir;
   KeyValue.KVComparator comparator;
 
   private ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
@@ -354,6 +377,10 @@ public class HRegion implements HeapSize
     this.coprocessorHost = null;
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
     this.opMetrics = new OperationMetrics();
+
+    this.maxBusyWaitDuration = 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+    this.busyWaitDuration = DEFAULT_BUSY_WAIT_DURATION;
+    this.maxBusyWaitMultiplier = 2;
   }
 
   /**
@@ -400,6 +427,17 @@ public class HRegion implements HeapSize
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
     this.opMetrics = new OperationMetrics(conf, this.regionInfo);
 
+    this.busyWaitDuration = conf.getLong(
+      "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
+    this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
+    if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
+      throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
+        + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
+        + maxBusyWaitMultiplier + "). Their product should be positive");
+    }
+    this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
+      2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
     /*
      * timestamp.slop provides a server-side constraint on the timestamp. This
      * assumes that you base your TS around currentTimeMillis(). In this case,
@@ -688,7 +726,7 @@ public class HRegion implements HeapSize
   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
     if (this.rsAccounting != null) {
       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
-    }  
+    }
     return this.memstoreSize.getAndAdd(memStoreSize);
   }
 
@@ -714,7 +752,7 @@ public class HRegion implements HeapSize
 
     // and then create the file
     Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
-    
+
     // if datanode crashes or if the RS goes down just before the close is called while trying to
     // close the created regioninfo file in the .tmp directory then on next
     // creation we will be getting AlreadyCreatedException.
@@ -722,7 +760,7 @@ public class HRegion implements HeapSize
     if (FSUtils.isExists(fs, tmpPath)) {
       FSUtils.delete(fs, tmpPath, true);
     }
-    
+
     FSDataOutputStream out = FSUtils.create(fs, tmpPath, perms);
 
     try {
@@ -739,6 +777,26 @@ public class HRegion implements HeapSize
     }
   }
 
+  /**
+   * @param fs
+   * @param dir
+   * @return An HRegionInfo instance gotten from the <code>.regioninfo</code> file under region dir
+   * @throws IOException
+   */
+  public static HRegionInfo loadDotRegionInfoFileContent(final FileSystem fs, final Path dir)
+  throws IOException {
+    Path regioninfo = new Path(dir, HRegion.REGIONINFO_FILE);
+    if (!fs.exists(regioninfo)) throw new FileNotFoundException(regioninfo.toString());
+    FSDataInputStream in = fs.open(regioninfo);
+    try {
+      HRegionInfo hri = new HRegionInfo();
+      hri.readFields(in);
+      return hri;
+    } finally {
+      in.close();
+    }
+  }
+
   /** @return a HRegionInfo object for this region */
   public HRegionInfo getRegionInfo() {
     return this.regionInfo;
@@ -883,6 +941,7 @@ public class HRegion implements HeapSize
 
     this.closing.set(true);
     status.setStatus("Disabling writes for close");
+    // block waiting for the lock for closing
     lock.writeLock().lock();
     try {
       if (this.isClosed()) {
@@ -984,19 +1043,16 @@ public class HRegion implements HeapSize
     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
   }
 
-  private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
+  static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
       final String threadNamePrefix) {
-    ThreadPoolExecutor openAndCloseThreadPool = Threads
-        .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
-            new ThreadFactory() {
-              private int count = 1;
-
-              public Thread newThread(Runnable r) {
-                Thread t = new Thread(r, threadNamePrefix + "-" + count++);
-                return t;
-              }
-            });
-    return openAndCloseThreadPool;
+    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+      new ThreadFactory() {
+        private int count = 1;
+
+        public Thread newThread(Runnable r) {
+          return new Thread(r, threadNamePrefix + "-" + count++);
+        }
+      });
   }
 
    /**
@@ -1192,6 +1248,7 @@ public class HRegion implements HeapSize
       return false;
     }
     Preconditions.checkArgument(cr.getHRegion().equals(this));
+    // block waiting for the lock for compaction
     lock.readLock().lock();
     MonitoredTask status = TaskMonitor.get().createStatus(
         "Compacting " + cr.getStore() + " in " + this);
@@ -1271,6 +1328,7 @@ public class HRegion implements HeapSize
     }
     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
     status.setStatus("Acquiring readlock on region");
+    // block waiting for the lock for flushing cache
     lock.readLock().lock();
     try {
       if (this.closed.get()) {
@@ -1406,6 +1464,7 @@ public class HRegion implements HeapSize
     // end up in both snapshot and memstore (makes it difficult to do atomic
     // rows then)
     status.setStatus("Obtaining lock to block concurrent updates");
+    // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
     long flushsize = this.memstoreSize.get();
     status.setStatus("Preparing to flush by snapshotting stores");
@@ -1664,11 +1723,23 @@ public class HRegion implements HeapSize
   //////////////////////////////////////////////////////////////////////////////
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * @param delete delete object
+   * @param writeToWAL append to the write ahead lock or not
+   * @throws IOException read exceptions
+   */
+  public void delete(Delete delete, boolean writeToWAL)
+  throws IOException {
+    delete(delete, null, writeToWAL);
+  }
+
   /**
    * @param delete delete object
    * @param lockid existing lock id, or null for grab a lock
    * @param writeToWAL append to the write ahead lock or not
    * @throws IOException read exceptions
+   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public void delete(Delete delete, Integer lockid, boolean writeToWAL)
   throws IOException {
@@ -1784,7 +1855,7 @@ public class HRegion implements HeapSize
     byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
 
-    updatesLock.readLock().lock();
+    lock(updatesLock.readLock());
     try {
       prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
 
@@ -1843,6 +1914,7 @@ public class HRegion implements HeapSize
    * @param put
    * @param lockid
    * @throws IOException
+   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public void put(Put put, Integer lockid) throws IOException {
     this.put(put, lockid, put.getWriteToWAL());
@@ -1855,6 +1927,7 @@ public class HRegion implements HeapSize
    * @param lockid
    * @param writeToWAL
    * @throws IOException
+   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public void put(Put put, Integer lockid, boolean writeToWAL)
   throws IOException {
@@ -1940,7 +2013,7 @@ public class HRegion implements HeapSize
     System.arraycopy(putsAndLocks, 0, mutationsAndLocks, 0, putsAndLocks.length);
     return batchMutate(mutationsAndLocks);
   }
-  
+
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
@@ -2163,7 +2236,7 @@ public class HRegion implements HeapSize
         }
       }
 
-      this.updatesLock.readLock().lock();
+      lock(this.updatesLock.readLock(), numReadyToWrite);
       locked = true;
 
       //
@@ -2294,7 +2367,7 @@ public class HRegion implements HeapSize
 
       // do after lock
       final long netTimeMs = EnvironmentEdgeManager.currentTimeMillis()- startTimeMs;
-            
+
       // See if the column families were consistent through the whole thing.
       // if they were then keep them. If they were not then pass a null.
       // null will be treated as unknown.
@@ -2328,6 +2401,24 @@ public class HRegion implements HeapSize
   //the getting of the lock happens before, so that you would just pass it into
   //the methods. So in the case of checkAndMutate you could just do lockRow,
   //get, put, unlockRow or something
+ /**
+  *
+  * @param row
+  * @param family
+  * @param qualifier
+  * @param compareOp
+  * @param comparator
+  * @param writeToWAL
+  * @throws IOException
+  * @return true if the new put was execute, false otherwise
+  */
+ public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
+     CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
+     boolean writeToWAL)
+ throws IOException {
+   return checkAndMutate(row, family, qualifier, compareOp, comparator, w, null, writeToWAL);
+ }
+  
   /**
    *
    * @param row
@@ -2339,6 +2430,7 @@ public class HRegion implements HeapSize
    * @param writeToWAL
    * @throws IOException
    * @return true if the new put was execute, false otherwise
+   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
       CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
@@ -2457,7 +2549,8 @@ public class HRegion implements HeapSize
    * this and the synchronize on 'this' inside in internalFlushCache to send
    * the notify.
    */
-  private void checkResources() {
+  private void checkResources()
+      throws RegionTooBusyException, InterruptedIOException {
 
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
@@ -2475,12 +2568,30 @@ public class HRegion implements HeapSize
           " is >= than blocking " +
           StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
       }
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      long timeToWait = startTime + busyWaitDuration - now;
+      if (timeToWait <= 0L) {
+        final long totalTime = now - startTime;
+        this.updatesBlockedMs.add(totalTime);
+        LOG.info("Failed to unblock updates for region " + this + " '"
+          + Thread.currentThread().getName() + "' in " + totalTime
+          + "ms. The region is still busy.");
+        throw new RegionTooBusyException("region is flushing");
+      }
       blocked = true;
       synchronized(this) {
         try {
-          wait(threadWakeFrequency);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
+          wait(Math.min(timeToWait, threadWakeFrequency));
+        } catch (InterruptedException ie) {
+          final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+          if (totalTime > 0) {
+            this.updatesBlockedMs.add(totalTime);
+          }
+          LOG.info("Interrupted while waiting to unblock updates for region "
+            + this + " '" + Thread.currentThread().getName() + "'");
+          InterruptedIOException iie = new InterruptedIOException();
+          iie.initCause(ie);
+          throw iie;
         }
       }
     }
@@ -2547,7 +2658,7 @@ public class HRegion implements HeapSize
     byte[] byteNow = Bytes.toBytes(now);
     boolean flush = false;
 
-    this.updatesLock.readLock().lock();
+    lock(this.updatesLock.readLock());
     try {
       checkFamilies(familyMap.keySet());
       checkTimestamps(familyMap, now);
@@ -2578,7 +2689,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-    
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -3172,6 +3283,7 @@ public class HRegion implements HeapSize
    * @param lockId  The lock ID to release.
    */
   public void releaseRowLock(final Integer lockId) {
+    if (lockId == null) return; // null lock id, do nothing
     HashedBytes rowKey = lockIds.remove(lockId);
     if (rowKey == null) {
       LOG.warn("Release unknown lockId: " + lockId);
@@ -3412,6 +3524,10 @@ public class HRegion implements HeapSize
       this(scan, null);
     }
 
+    @Override
+    public long getMvccReadPoint() {
+      return this.readPt;
+    }
     /**
      * Reset both the filter and the old filter.
      */
@@ -3422,7 +3538,7 @@ public class HRegion implements HeapSize
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults, int limit)
+    public boolean next(List<KeyValue> outResults, int limit)
         throws IOException {
       return next(outResults, limit, null);
     }
@@ -3442,30 +3558,42 @@ public class HRegion implements HeapSize
         // This could be a new thread from the last time we called next().
         MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
 
-        results.clear();
-
-        boolean returnResult = nextInternal(limit, metric);
-
-        outResults.addAll(results);
-        resetFilters();
-        if (isFilterDone()) {
-          return false;
-        }
-        return returnResult;
+        return nextRaw(outResults, limit, metric);
       } finally {
         closeRegionOperation();
       }
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults)
+    public boolean nextRaw(List<KeyValue> outResults, String metric)
+        throws IOException {
+      return nextRaw(outResults, batch, metric);
+    }
+
+    @Override
+    public boolean nextRaw(List<KeyValue> outResults, int limit,
+        String metric) throws IOException {
+      results.clear();
+
+      boolean returnResult = nextInternal(limit, metric);
+
+      outResults.addAll(results);
+      resetFilters();
+      if (isFilterDone()) {
+        return false;
+      }
+      return returnResult;
+    }
+
+    @Override
+    public boolean next(List<KeyValue> outResults)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, null);
     }
 
     @Override
-    public synchronized boolean next(List<KeyValue> outResults, String metric)
+    public boolean next(List<KeyValue> outResults, String metric)
         throws IOException {
       // apply the batching limit by default
       return next(outResults, batch, metric);
@@ -3489,8 +3617,16 @@ public class HRegion implements HeapSize
           rpcCall.throwExceptionIfCallerDisconnected();
         }
 
-        byte [] currentRow = peekRow();
-        if (isStopRow(currentRow)) {
+        KeyValue current = this.storeHeap.peek();
+        byte[] currentRow = null;
+        int offset = 0;
+        short length = 0;
+        if (current != null) {
+          currentRow = current.getBuffer();
+          offset = current.getRowOffset();
+          length = current.getRowLength();
+        }
+        if (isStopRow(currentRow, offset, length)) {
           if (filter != null && filter.hasFilterRow()) {
             filter.filterRow(results);
           }
@@ -3499,10 +3635,10 @@ public class HRegion implements HeapSize
           }
 
           return false;
-        } else if (filterRowKey(currentRow)) {
-          nextRow(currentRow);
+        } else if (filterRowKey(currentRow, offset, length)) {
+          nextRow(currentRow, offset, length);
         } else {
-          byte [] nextRow;
+          KeyValue nextKv;
           do {
             this.storeHeap.next(results, limit - results.size(), metric);
             if (limit > 0 && results.size() == limit) {
@@ -3512,9 +3648,10 @@ public class HRegion implements HeapSize
               }
               return true; // we are expecting more yes, but also limited to how many we can return.
             }
-          } while (Bytes.equals(currentRow, nextRow = peekRow()));
+            nextKv = this.storeHeap.peek();
+          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
 
-          final boolean stopRow = isStopRow(nextRow);
+          final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
 
           // now that we have an entire row, lets process with a filters:
 
@@ -3529,7 +3666,7 @@ public class HRegion implements HeapSize
             // the reasons for calling this method are:
             // 1. reset the filters.
             // 2. provide a hook to fast forward the row (used by subclasses)
-            nextRow(currentRow);
+            nextRow(currentRow, offset, length);
 
             // This row was totally filtered out, if this is NOT the last row,
             // we should continue on.
@@ -3545,29 +3682,25 @@ public class HRegion implements HeapSize
       return filter != null
           && filter.filterRow();
     }
-    private boolean filterRowKey(byte[] row) {
+    private boolean filterRowKey(byte[] row, int offset, short length) {
       return filter != null
-          && filter.filterRowKey(row, 0, row.length);
+          && filter.filterRowKey(row, offset, length);
     }
 
-    protected void nextRow(byte [] currentRow) throws IOException {
-      while (Bytes.equals(currentRow, peekRow())) {
-        this.storeHeap.next(MOCKED_LIST);
+    protected void nextRow(byte [] currentRow, int offset, short length) throws IOException {
+      KeyValue next;
+      while((next = this.storeHeap.peek()) != null && next.matchingRow(currentRow, offset, length)) {
+        this.storeHeap.next(MOCKED_LIST);       
       }
       results.clear();
       resetFilters();
     }
 
-    private byte[] peekRow() {
-      KeyValue kv = this.storeHeap.peek();
-      return kv == null ? null : kv.getRow();
-    }
-
-    private boolean isStopRow(byte [] currentRow) {
+    private boolean isStopRow(byte [] currentRow, int offset, short length) {
       return currentRow == null ||
           (stopRow != null &&
           comparator.compareRows(stopRow, 0, stopRow.length,
-              currentRow, 0, currentRow.length) <= isScan);
+              currentRow, offset, length) <= isScan);
     }
 
     @Override
@@ -3695,6 +3828,7 @@ public class HRegion implements HeapSize
    * @param conf
    * @param hTableDescriptor
    * @param hlog shared HLog
+   * @param boolean initialize - true to initialize the region
    * @return new HRegion
    *
    * @throws IOException
@@ -3702,7 +3836,36 @@ public class HRegion implements HeapSize
   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
                                       final Configuration conf,
                                       final HTableDescriptor hTableDescriptor,
-                                      final HLog hlog)
+                                      final HLog hlog,
+                                      final boolean initialize)
+      throws IOException {
+    return createHRegion(info, rootDir, conf, hTableDescriptor,
+        hlog, initialize, false);
+  }
+
+  /**
+   * Convenience method creating new HRegions. Used by createTable.
+   * The {@link HLog} for the created region needs to be closed
+   * explicitly, if it is not null.
+   * Use {@link HRegion#getLog()} to get access.
+   *
+   * @param info Info for region to create.
+   * @param rootDir Root directory for HBase instance
+   * @param conf
+   * @param hTableDescriptor
+   * @param hlog shared HLog
+   * @param boolean initialize - true to initialize the region
+   * @param boolean ignoreHLog
+      - true to skip generate new hlog if it is null, mostly for createTable
+   * @return new HRegion
+   *
+   * @throws IOException
+   */
+  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+                                      final Configuration conf,
+                                      final HTableDescriptor hTableDescriptor,
+                                      final HLog hlog,
+                                      final boolean initialize, final boolean ignoreHLog)
       throws IOException {
     LOG.info("creating HRegion " + info.getTableNameAsString()
         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@@ -3714,16 +3877,26 @@ public class HRegion implements HeapSize
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     HLog effectiveHLog = hlog;
-    if (hlog == null) {
+    if (hlog == null && !ignoreHLog) {
       effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
           new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
     }
     HRegion region = HRegion.newHRegion(tableDir,
         effectiveHLog, fs, conf, info, hTableDescriptor, null);
-    region.initialize();
+    if (initialize) {
+      region.initialize();
+    }
     return region;
   }
 
+  public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
+                                      final Configuration conf,
+                                      final HTableDescriptor hTableDescriptor,
+                                      final HLog hlog)
+    throws IOException {
+    return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
+  }
+
   /**
    * Open a Region.
    * @param info Info for region to be opened.
@@ -4178,9 +4351,19 @@ public class HRegion implements HeapSize
   //
   /**
    * @param get get object
+   * @return result
+   * @throws IOException read exceptions
+   */
+  public Result get(final Get get) throws IOException {
+    return get(get, null);
+  }
+
+  /**
+   * @param get get object
    * @param lockid existing lock id, or null for no previous lock
    * @return result
    * @throws IOException read exceptions
+   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public Result get(final Get get, final Integer lockid) throws IOException {
     checkRow(get.getRow(), "Get");
@@ -4235,7 +4418,7 @@ public class HRegion implements HeapSize
     // do after lock
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateGetMetrics(get.familySet(), after - now);
-    
+
     return results;
   }
 
@@ -4303,7 +4486,7 @@ public class HRegion implements HeapSize
       }
 
       // 3. acquire the region lock
-      this.updatesLock.readLock().lock();
+      lock(this.updatesLock.readLock(), acquiredLocks.size());
       locked = true;
 
       // 4. Get a mvcc write number
@@ -4425,6 +4608,23 @@ public class HRegion implements HeapSize
 
   // TODO: There's a lot of boiler plate code identical
   // to increment... See how to better unify that.
+
+  /**
+  *
+  * Perform one or more append operations on a row.
+  * <p>
+  * Appends performed are done under row lock but reads do not take locks out
+  * so this can be seen partially complete by gets and scans.
+  *
+  * @param append
+  * @param writeToWAL
+  * @return new keyvalues after increment
+  * @throws IOException
+  */
+ public Result append(Append append, boolean writeToWAL)
+     throws IOException {
+   return append(append, null, writeToWAL);
+ }
   /**
    *
    * Perform one or more append operations on a row.
@@ -4437,6 +4637,7 @@ public class HRegion implements HeapSize
    * @param writeToWAL
    * @return new keyvalues after increment
    * @throws IOException
+   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
    */
   public Result append(Append append, Integer lockid, boolean writeToWAL)
       throws IOException {
@@ -4456,7 +4657,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     try {
       Integer lid = getLock(lockid, row, true);
-      this.updatesLock.readLock().lock();
+      lock(this.updatesLock.readLock());
       try {
         long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
@@ -4563,10 +4764,10 @@ public class HRegion implements HeapSize
       closeRegionOperation();
     }
 
-    
+
     long after = EnvironmentEdgeManager.currentTimeMillis();
     this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
-    
+
     if (flush) {
       // Request a cache flush. Do it outside update lock.
       requestFlush();
@@ -4576,6 +4777,22 @@ public class HRegion implements HeapSize
   }
 
   /**
+  *
+  * Perform one or more increment operations on a row.
+  * <p>
+  * Increments performed are done under row lock but reads do not take locks
+  * out so this can be seen partially complete by gets and scans.
+  * @param increment
+  * @param writeToWAL
+  * @return new keyvalues after increment
+  * @throws IOException
+  */
+  public Result increment(Increment increment, boolean writeToWAL)
+  throws IOException {
+    return increment(increment, null, writeToWAL);
+  }
+
+  /**
    *
    * Perform one or more increment operations on a row.
    * <p>
@@ -4586,6 +4803,8 @@ public class HRegion implements HeapSize
    * @param writeToWAL
    * @return new keyvalues after increment
    * @throws IOException
+   * @deprecated row locks (lockId) held outside the extent of the operation are deprecated.
+
    */
   public Result increment(Increment increment, Integer lockid,
       boolean writeToWAL)
@@ -4607,7 +4826,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     try {
       Integer lid = getLock(lockid, row, true);
-      this.updatesLock.readLock().lock();
+      lock(this.updatesLock.readLock());
       try {
         long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
@@ -4691,7 +4910,7 @@ public class HRegion implements HeapSize
       long after = EnvironmentEdgeManager.currentTimeMillis();
       this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
     }
-    
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -4725,7 +4944,7 @@ public class HRegion implements HeapSize
     this.writeRequestsCount.increment();
     try {
       Integer lid = obtainRowLock(row);
-      this.updatesLock.readLock().lock();
+      lock(this.updatesLock.readLock());
       try {
         Store store = stores.get(family);
 
@@ -4818,8 +5037,8 @@ public class HRegion implements HeapSize
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      35 * ClassSize.REFERENCE + Bytes.SIZEOF_INT +
-      (5 * Bytes.SIZEOF_LONG) +
+      35 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      (7 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5119,13 +5338,16 @@ public class HRegion implements HeapSize
    * #closeRegionOperation needs to be called in the try's finally block
    * Acquires a read lock and checks if the region is closing or closed.
    * @throws NotServingRegionException when the region is closing or closed
+   * @throws RegionTooBusyException if failed to get the lock in time
+   * @throws InterruptedIOException if interrupted while waiting for a lock
    */
-  private void startRegionOperation() throws NotServingRegionException {
+  public void startRegionOperation()
+      throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
     if (this.closing.get()) {
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing");
     }
-    lock.readLock().lock();
+    lock(lock.readLock());
     if (this.closed.get()) {
       lock.readLock().unlock();
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
@@ -5137,7 +5359,7 @@ public class HRegion implements HeapSize
    * Closes the lock. This needs to be called in the finally block corresponding
    * to the try block of #startRegionOperation
    */
-  private void closeRegionOperation(){
+  public void closeRegionOperation(){
     lock.readLock().unlock();
   }
 
@@ -5147,15 +5369,17 @@ public class HRegion implements HeapSize
    * #closeBulkRegionOperation needs to be called in the try's finally block
    * Acquires a writelock and checks if the region is closing or closed.
    * @throws NotServingRegionException when the region is closing or closed
+   * @throws RegionTooBusyException if failed to get the lock in time
+   * @throws InterruptedIOException if interrupted while waiting for a lock
    */
   private void startBulkRegionOperation(boolean writeLockNeeded)
-  throws NotServingRegionException {
+      throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
     if (this.closing.get()) {
       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing");
     }
-    if (writeLockNeeded) lock.writeLock().lock();
-    else lock.readLock().lock();
+    if (writeLockNeeded) lock(lock.writeLock());
+    else lock(lock.readLock());
     if (this.closed.get()) {
       if (writeLockNeeded) lock.writeLock().unlock();
       else lock.readLock().unlock();
@@ -5168,7 +5392,7 @@ public class HRegion implements HeapSize
    * Closes the lock. This needs to be called in the finally block corresponding
    * to the try block of #startRegionOperation
    */
-  private void closeBulkRegionOperation(){
+  private void closeBulkRegionOperation() {
     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
     else lock.readLock().unlock();
   }
@@ -5179,7 +5403,7 @@ public class HRegion implements HeapSize
    */
   private void recordPutWithoutWal(final Map<byte [], List<KeyValue>> familyMap) {
     if (numPutsWithoutWAL.getAndIncrement() == 0) {
-      LOG.info("writing data to region " + this + 
+      LOG.info("writing data to region " + this +
                " with WAL disabled. Data may be lost in the event of a crash.");
     }
 
@@ -5193,6 +5417,33 @@ public class HRegion implements HeapSize
     dataInMemoryWithoutWAL.addAndGet(putSize);
   }
 
+  private void lock(final Lock lock)
+      throws RegionTooBusyException, InterruptedIOException {
+    lock(lock, 1);
+  }
+
+  /**
+   * Try to acquire a lock.  Throw RegionTooBusyException
+   * if failed to get the lock in time. Throw InterruptedIOException
+   * if interrupted while waiting for the lock.
+   */
+  private void lock(final Lock lock, final int multiplier)
+      throws RegionTooBusyException, InterruptedIOException {
+    try {
+      final long waitTime = Math.min(maxBusyWaitDuration,
+        busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
+      if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
+        throw new RegionTooBusyException(
+          "failed to get a lock in " + waitTime + "ms");
+      }
+    } catch (InterruptedException ie) {
+      LOG.info("Interrupted while waiting for a lock");
+      InterruptedIOException iie = new InterruptedIOException();
+      iie.initCause(ie);
+      throw iie;
+    }
+  }
+
   /**
    * Calls sync with the given transaction ID if the region's table is not
    * deferring it.
@@ -5232,7 +5483,6 @@ public class HRegion implements HeapSize
     }
   };
 
-
   /**
    * Facility for dumping and compacting catalog tables.
    * Only does catalog tables since these are only tables we for sure know
@@ -5265,11 +5515,11 @@ public class HRegion implements HeapSize
     final HLog log = new HLog(fs, logdir, oldLogDir, c);
     try {
       processTable(fs, tableDir, log, c, majorCompact);
-     } finally {
+    } finally {
        log.close();
        // TODO: is this still right?
        BlockCache bc = new CacheConfig(c).getBlockCache();
        if (bc != null) bc.shutdown();
-     }
+    }
   }
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Dec 23 20:54:12 2012
@@ -44,6 +44,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowLock;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Exec;
@@ -165,6 +167,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.joda.time.field.MillisDurationField;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
@@ -233,7 +236,7 @@ public class HRegionServer implements HR
   // Server to handle client requests. Default access so can be accessed by
   // unit tests.
   RpcServer rpcServer;
-  
+
   // Server to handle client requests.
   private HBaseServer server;  
 
@@ -363,6 +366,8 @@ public class HRegionServer implements HR
    */
   private ClusterId clusterId = null;
 
+  private RegionServerCoprocessorHost rsHost;
+
   /**
    * Starts a HRegionServer at the default location
    *
@@ -434,6 +439,10 @@ public class HRegionServer implements HR
     this.rpcServer.setQosFunction(new QosFunction());
     this.startcode = System.currentTimeMillis();
 
+    // login the zookeeper client principal (if using security)
+    ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
+      "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
+
     // login the server principal (if using secure Hadoop)
     User.login(this.conf, "hbase.regionserver.keytab.file",
       "hbase.regionserver.kerberos.principal", this.isa.getHostName());
@@ -1013,6 +1022,7 @@ public class HRegionServer implements HR
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
       this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
+      this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
       startServiceThreads();
       LOG.info("Serving as " + this.serverNameFromMasterPOV +
         ", RPC listening on " + this.isa +
@@ -1020,6 +1030,7 @@ public class HRegionServer implements HR
         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
       isOnline = true;
     } catch (Throwable e) {
+      LOG.warn("Exception in region server : ", e);
       this.isOnline = false;
       stop("Failed initialization");
       throw convertThrowableToIOE(cleanup(e, "Failed init"),
@@ -1095,8 +1106,7 @@ public class HRegionServer implements HR
         storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, rootIndexSizeKB,
         totalStaticIndexSizeKB, totalStaticBloomSizeKB,
         (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(),
-        totalCompactingKVs, currentCompactedKVs,
-        r.getCoprocessorHost().getCoprocessors());
+        totalCompactingKVs, currentCompactedKVs);
   }
 
   /**
@@ -1576,6 +1586,7 @@ public class HRegionServer implements HR
     this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
         this.getConfiguration(), this.getServerName().toString());
     splitLogWorker.start();
+    
   }
 
   /**
@@ -1643,10 +1654,15 @@ public class HRegionServer implements HR
 
   @Override
   public void stop(final String msg) {
-    this.stopped = true;
-    LOG.info("STOPPED: " + msg);
-    // Wakes run() if it is sleeping
-    sleeper.skipSleepCycle();
+    try {
+      this.rsHost.preStop(msg);
+      this.stopped = true;
+      LOG.info("STOPPED: " + msg);
+      // Wakes run() if it is sleeping
+      sleeper.skipSleepCycle();
+    } catch (IOException exp) {
+      LOG.warn("The region server did not stop", exp);
+    }
   }
 
   public void waitForServerOnline(){
@@ -2430,23 +2446,32 @@ public class HRegionServer implements HR
         }
       }
 
-      for (int i = 0; i < nbRows
-          && currentScanResultSize < maxScannerResultSize; i++) {
-        requestCount.incrementAndGet();
-        // Collect values to be returned here
-        boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE);
-        if (!values.isEmpty()) {
-          for (KeyValue kv : values) {
-            currentScanResultSize += kv.heapSize();
+      MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
+      region.startRegionOperation();
+      try {
+        int i = 0;
+        synchronized(s) {
+          for (; i < nbRows
+              && currentScanResultSize < maxScannerResultSize; i++) {
+            // Collect values to be returned here
+            boolean moreRows = s.nextRaw(values, SchemaMetrics.METRIC_NEXTSIZE);
+            if (!values.isEmpty()) {
+              for (KeyValue kv : values) {
+                currentScanResultSize += kv.heapSize();
+              }
+              results.add(new Result(values));
+            }
+            if (!moreRows) {
+              break;
+            }
+            values.clear();
           }
-          results.add(new Result(values));
         }
-        if (!moreRows) {
-          break;
-        }
-        values.clear();
+        requestCount.addAndGet(i);
+        region.readRequestsCount.add(i);
+      } finally {
+        region.closeRegionOperation();
       }
-
       // coprocessor postNext hook
       if (region != null && region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postScannerNext(s, results, nbRows, true);
@@ -2589,6 +2614,9 @@ public class HRegionServer implements HR
     return -1;
   }
 
+  /**
+   * @deprecated {@link RowLock} and associated operations are deprecated.
+   */
   public long lockRow(byte[] regionName, byte[] row) throws IOException {
     checkOpen();
     NullPointerException npe = null;
@@ -2605,6 +2633,9 @@ public class HRegionServer implements HR
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().preLockRow(regionName, row);
+      }
       Integer r = region.obtainRowLock(row);
       long lockId = addRowLock(r, region);
       LOG.debug("Row lock " + lockId + " explicitly acquired by client");
@@ -2648,6 +2679,9 @@ public class HRegionServer implements HR
     return rl;
   }
 
+  /**
+   * @deprecated {@link RowLock} and associated operations are deprecated.
+   */
   @Override
   @QosPriority(priority=HConstants.HIGH_QOS)
   public void unlockRow(byte[] regionName, long lockId) throws IOException {
@@ -2666,6 +2700,9 @@ public class HRegionServer implements HR
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
+      if (region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().preUnLockRow(regionName, lockId);
+      }
       String lockName = String.valueOf(lockId);
       Integer r = rowlocks.remove(lockName);
       if (r == null) {
@@ -2842,6 +2879,11 @@ public class HRegionServer implements HR
     final int versionOfClosingNode)
   throws IOException {
     checkOpen();
+    //Check for permissions to close.
+    HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
+    if (actualRegion.getCoprocessorHost() != null) {
+      actualRegion.getCoprocessorHost().preClose(false);
+    }
     LOG.info("Received close region: " + region.getRegionNameAsString() +
       ". Version of ZK closing node:" + versionOfClosingNode);
     boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
@@ -2889,6 +2931,17 @@ public class HRegionServer implements HR
    */
   protected boolean closeRegion(HRegionInfo region, final boolean abort,
       final boolean zk, final int versionOfClosingNode) {
+    
+    HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
+    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() !=null)){
+      try {
+        actualRegion.getCoprocessorHost().preClose(abort);
+      } catch (IOException e) {
+        LOG.warn(e);
+        return false;
+      }
+    }
+    
     if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
       LOG.warn("Received close for region we are already opening or closing; " +
           region.getEncodedName());
@@ -3589,6 +3642,10 @@ public class HRegionServer implements HR
     return this.zooKeeper;
   }
 
+  public RegionServerCoprocessorHost getCoprocessorHost(){
+    return this.rsHost;
+  }
+
 
   public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
     return this.regionsInTransitionInRS;
@@ -3766,8 +3823,13 @@ public class HRegionServer implements HR
 
   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
   public String[] getCoprocessors() {
-    HServerLoad hsl = buildServerLoad();
-    return hsl == null? null: hsl.getCoprocessors();
+    TreeSet<String> coprocessors = new TreeSet<String>(
+        this.hlog.getCoprocessorHost().getCoprocessors());
+    Collection<HRegion> regions = getOnlineRegionsLocalContext();
+    for (HRegion region: regions) {
+      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
+    }
+    return coprocessors.toArray(new String[0]);
   }
 
   /**

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Sun Dec 23 20:54:12 2012
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -265,7 +264,7 @@ public class RegionCoprocessorHost
   /**
    * Invoked before a region open
    */
-  public void preOpen() {
+  public void preOpen(){
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -285,7 +284,7 @@ public class RegionCoprocessorHost
   /**
    * Invoked after a region open
    */
-  public void postOpen() {
+  public void postOpen(){
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -306,7 +305,7 @@ public class RegionCoprocessorHost
    * Invoked before a region is closed
    * @param abortRequested true if the server is aborting
    */
-  public void preClose(boolean abortRequested) {
+  public void preClose(boolean abortRequested) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -314,7 +313,7 @@ public class RegionCoprocessorHost
         try {
           ((RegionObserver)env.getInstance()).preClose(ctx, abortRequested);
         } catch (Throwable e) {
-          handleCoprocessorThrowableNoRethrow(env, e);
+          handleCoprocessorThrowable(env, e);
         }
       }
     }
@@ -324,7 +323,7 @@ public class RegionCoprocessorHost
    * Invoked after a region is closed
    * @param abortRequested true if the server is aborting
    */
-  public void postClose(boolean abortRequested) {
+  public void postClose(boolean abortRequested){
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
@@ -1483,5 +1482,31 @@ public class RegionCoprocessorHost
 
     return hasLoaded;
   }
+  
+  public void preLockRow(byte[] regionName, byte[] row) throws IOException {
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env : coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        ((RegionObserver) env.getInstance()).preLockRow(ctx, regionName, row);
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+  }
+
+  public void preUnLockRow(byte[] regionName, long lockId) throws IOException {
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env : coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        ((RegionObserver) env.getInstance()).preUnlockRow(ctx, regionName, lockId);
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+  }
 
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Sun Dec 23 20:54:12 2012
@@ -20,7 +20,10 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
 
 /**
  * RegionScanner describes iterators over rows in an HRegion.
@@ -49,4 +52,50 @@ public interface RegionScanner extends I
    */
   public boolean reseek(byte[] row) throws IOException;
 
+  /**
+   * @return The Scanner's MVCC readPt see {@link MultiVersionConsistencyControl}
+   */
+  public long getMvccReadPoint();
+
+  /**
+   * Grab the next row's worth of values with the default limit on the number of values
+   * to return.
+   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+   * See {@link #nextRaw(List, int, String)}
+   * @param result return output array
+   * @param metric the metric name
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException e
+   */
+  public boolean nextRaw(List<KeyValue> result, String metric) throws IOException;
+
+  /**
+   * Grab the next row's worth of values with a limit on the number of values
+   * to return.
+   * This is a special internal method to be called from coprocessor hooks to avoid expensive setup.
+   * Caller must set the thread's readpoint, start and close a region operation, an synchronize on the scanner object.
+   * Example:
+   * <code><pre>
+   * HRegion region = ...;
+   * RegionScanner scanner = ...
+   * MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+   * region.startRegionOperation();
+   * try {
+   *   synchronized(scanner) {
+   *     ...
+   *     boolean moreRows = scanner.nextRaw(values);
+   *     ...
+   *   }
+   * } finally {
+   *   region.closeRegionOperation();
+   * }
+   * </pre></code>
+   * @param result return output array
+   * @param limit limit on row count to get
+   * @param metric the metric name
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException e
+   */
+  public boolean nextRaw(List<KeyValue> result, int limit, String metric) throws IOException;
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Sun Dec 23 20:54:12 2012
@@ -82,6 +82,8 @@ public class ScanQueryMatcher {
   /* row is not private for tests */
   /** Row the query is on */
   byte [] row;
+  int rowOffset;
+  short rowLength;
   
   /**
    * Oldest put in any of the involved store files
@@ -222,7 +224,7 @@ public class ScanQueryMatcher {
     short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
     offset += Bytes.SIZEOF_SHORT;
 
-    int ret = this.rowComparator.compareRows(row, 0, row.length,
+    int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
         bytes, offset, rowLength);
     if (ret <= -1) {
       return MatchCode.DONE;
@@ -385,8 +387,10 @@ public class ScanQueryMatcher {
    * Set current row
    * @param row
    */
-  public void setRow(byte [] row) {
+  public void setRow(byte [] row, int offset, short length) {
     this.row = row;
+    this.rowOffset = offset;
+    this.rowLength = length;
     reset();
   }
 

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Dec 23 20:54:12 2012
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -132,9 +131,6 @@ public class Store extends SchemaConfigu
   private volatile long totalUncompressedBytes = 0L;
   private final Object flushLock = new Object();
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final String storeNameStr;
-  private CompactionProgress progress;
-  private final int compactionKVMax;
   private final boolean verifyBulkLoads;
 
   /* The default priority for user-specified compaction requests.
@@ -158,10 +154,6 @@ public class Store extends SchemaConfigu
     new CopyOnWriteArraySet<ChangedReadersObserver>();
 
   private final int blocksize;
-  /** Compression algorithm for flush files and minor compaction */
-  private final Compression.Algorithm compression;
-  /** Compression algorithm for major compaction */
-  private final Compression.Algorithm compactionCompression;
   private HFileDataBlockEncoder dataBlockEncoder;
 
   /** Checksum configuration */
@@ -171,6 +163,8 @@ public class Store extends SchemaConfigu
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
+  private final Compactor compactor;
+
   /**
    * Constructor
    * @param basedir qualified path under which the region directory lives;
@@ -185,25 +179,16 @@ public class Store extends SchemaConfigu
   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
     FileSystem fs, Configuration conf)
   throws IOException {
-    super(conf, region.getTableDesc().getNameAsString(),
+    super(conf, region.getRegionInfo().getTableNameAsString(),
         Bytes.toString(family.getName()));
-    HRegionInfo info = region.regionInfo;
+    HRegionInfo info = region.getRegionInfo();
     this.fs = fs;
-    this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
-    if (!this.fs.exists(this.homedir)) {
-      if (!this.fs.mkdirs(this.homedir))
-        throw new IOException("Failed create of: " + this.homedir.toString());
-    }
+    Path p = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
+    this.homedir = createStoreHomeDir(this.fs, p);
     this.region = region;
     this.family = family;
     this.conf = conf;
     this.blocksize = family.getBlocksize();
-    this.compression = family.getCompression();
-    // avoid overriding compression setting for major compactions if the user
-    // has not specified it separately
-    this.compactionCompression =
-      (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
-        family.getCompactionCompression() : this.compression;
 
     this.dataBlockEncoder =
         new HFileDataBlockEncoderImpl(family.getDataBlockEncodingOnDisk(),
@@ -228,7 +213,6 @@ public class Store extends SchemaConfigu
         "ms in store " + this);
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
-    this.storeNameStr = getColumnFamilyName();
 
     // By default, compact if storefile.count >= minFilesToCompact
     this.minFilesToCompact = Math.max(2,
@@ -245,10 +229,8 @@ public class Store extends SchemaConfigu
       this.region.memstoreFlushSize);
     this.maxCompactSize
       = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
-    this.compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
 
-    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify",
-        false);
+    this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
     if (Store.closeCheckInterval == 0) {
       Store.closeCheckInterval = conf.getInt(
@@ -260,6 +242,47 @@ public class Store extends SchemaConfigu
     this.checksumType = getChecksumType(conf);
     // initilize bytes per checksum
     this.bytesPerChecksum = getBytesPerChecksum(conf);
+    // Create a compaction tool instance
+    this.compactor = new Compactor(this.conf);
+  }
+
+  /**
+   * @param family
+   * @return
+   */
+  long getTTL(final HColumnDescriptor family) {
+    // HCD.getTimeToLive returns ttl in seconds.  Convert to milliseconds.
+    long ttl = family.getTimeToLive();
+    if (ttl == HConstants.FOREVER) {
+      // Default is unlimited ttl.
+      ttl = Long.MAX_VALUE;
+    } else if (ttl == -1) {
+      ttl = Long.MAX_VALUE;
+    } else {
+      // Second -> ms adjust for user data
+      ttl *= 1000;
+    }
+    return ttl;
+  }
+
+  /**
+   * Create this store's homedir
+   * @param fs
+   * @param homedir
+   * @return Return <code>homedir</code>
+   * @throws IOException
+   */
+  Path createStoreHomeDir(final FileSystem fs,
+      final Path homedir) throws IOException {
+    if (!fs.exists(homedir)) {
+      if (!fs.mkdirs(homedir))
+        throw new IOException("Failed create of: " + homedir.toString());
+    }
+    return homedir;
+  }
+
+  FileSystem getFileSystem() {
+    return this.fs;
   }
 
   /**
@@ -320,7 +343,7 @@ public class Store extends SchemaConfigu
    * Return the directory in which this store stores its
    * StoreFiles
    */
-  public Path getHomedir() {
+  Path getHomedir() {
     return homedir;
   }
 
@@ -339,6 +362,10 @@ public class Store extends SchemaConfigu
     this.dataBlockEncoder = blockEncoder;
   }
 
+  FileStatus [] getStoreFiles() throws IOException {
+    return FSUtils.listStatus(this.fs, this.homedir, null);
+  }
+
   /**
    * Creates an unsorted list of StoreFile loaded in parallel
    * from the given directory.
@@ -346,7 +373,7 @@ public class Store extends SchemaConfigu
    */
   private List<StoreFile> loadStoreFiles() throws IOException {
     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
-    FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null);
+    FileStatus files[] = getStoreFiles();
 
     if (files == null || files.length == 0) {
       return results;
@@ -637,7 +664,7 @@ public class Store extends SchemaConfigu
           storeFileCloserThreadPool.shutdownNow();
         }
       }
-      LOG.debug("closed " + this.storeNameStr);
+      LOG.info("Closed " + this);
       return result;
     } finally {
       this.lock.writeLock().unlock();
@@ -723,6 +750,7 @@ public class Store extends SchemaConfigu
       scanner = cpScanner;
     }
     try {
+      int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
       // TODO:  We can fail in the below block before we complete adding this
       // flush to list of store files.  Add cleanup of anything put on filesystem
       // if we fail.
@@ -736,7 +764,7 @@ public class Store extends SchemaConfigu
           List<KeyValue> kvs = new ArrayList<KeyValue>();
           boolean hasMore;
           do {
-            hasMore = scanner.next(kvs, this.compactionKVMax);
+            hasMore = scanner.next(kvs, compactionKVMax);
             if (!kvs.isEmpty()) {
               for (KeyValue kv : kvs) {
                 // If we know that this KV is going to be included always, then let us
@@ -828,7 +856,7 @@ public class Store extends SchemaConfigu
    */
   private StoreFile.Writer createWriterInTmp(int maxKeyCount)
   throws IOException {
-    return createWriterInTmp(maxKeyCount, this.compression, false);
+    return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
   }
 
   /*
@@ -981,16 +1009,12 @@ public class Store extends SchemaConfigu
    * @param cr
    *          compaction details obtained from requestCompaction()
    * @throws IOException
+   * @return Storefile we compacted into or null if we failed or opted out early.
    */
-  void compact(CompactionRequest cr) throws IOException {
-    if (cr == null || cr.getFiles().isEmpty()) {
-      return;
-    }
-    Preconditions.checkArgument(cr.getStore().toString()
-        .equals(this.toString()));
-
+  StoreFile compact(CompactionRequest cr) throws IOException {
+    if (cr == null || cr.getFiles().isEmpty()) return null;
+    Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
     List<StoreFile> filesToCompact = cr.getFiles();
-
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
@@ -1002,19 +1026,26 @@ public class Store extends SchemaConfigu
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this.storeNameStr + " of "
+        + this + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
     StoreFile sf = null;
     try {
-      StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
-          maxId);
+      StoreFile.Writer writer =
+        this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
       // Move the compaction into place.
-      sf = completeCompaction(filesToCompact, writer);
-      if (region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().postCompact(this, sf);
+      if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
+        sf = completeCompaction(filesToCompact, writer);
+        if (region.getCoprocessorHost() != null) {
+          region.getCoprocessorHost().postCompact(this, sf);
+        }
+      } else {
+        // Create storefile around what we wrote with a reader on it.
+        sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
+          this.family.getBloomFilterType(), this.dataBlockEncoder);
+        sf.createReader();
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1023,7 +1054,7 @@ public class Store extends SchemaConfigu
     }
 
     LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+        + filesToCompact.size() + " file(s) in " + this + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into " +
         (sf == null ? "none" : sf.getPath().getName()) +
@@ -1031,6 +1062,7 @@ public class Store extends SchemaConfigu
           StringUtils.humanReadableInt(sf.getReader().length()))
         + "; total size for store is "
         + StringUtils.humanReadableInt(storeSize));
+    return sf;
   }
 
   /**
@@ -1070,7 +1102,8 @@ public class Store extends SchemaConfigu
 
     try {
       // Ready to go. Have list of files to compact.
-      StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
+      StoreFile.Writer writer =
+        this.compactor.compact(this, filesToCompact, isMajor, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
       if (region.getCoprocessorHost() != null) {
@@ -1119,10 +1152,10 @@ public class Store extends SchemaConfigu
   }
 
   /** getter for CompactionProgress object
-   * @return CompactionProgress object
+   * @return CompactionProgress object; can be null
    */
   public CompactionProgress getCompactionProgress() {
-    return this.progress;
+    return this.compactor.getProgress();
   }
 
   /*
@@ -1174,19 +1207,19 @@ public class Store extends SchemaConfigu
         if (sf.isMajorCompaction() &&
             (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this.storeNameStr +
+            LOG.debug("Skipping major compaction of " + this +
                 " because one (major) compacted file only and oldestTime " +
                 oldest + "ms is < ttl=" + this.ttl);
           }
         } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
-          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+          LOG.debug("Major compaction triggered on store " + this +
             ", because keyvalues outdated; time since last major compaction " +
             (now - lowTimestamp) + "ms");
           result = true;
         }
       } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+          LOG.debug("Major compaction triggered on store " + this +
               "; time since last major compaction " + (now - lowTimestamp) + "ms");
         }
         result = true;
@@ -1376,12 +1409,12 @@ public class Store extends SchemaConfigu
              compactSelection.getFilesToCompact().get(pos).getReader().length()
                > maxCompactSize &&
              !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
-      compactSelection.clearSubList(0, pos);
+      if (pos != 0) compactSelection.clearSubList(0, pos);
     }
 
     if (compactSelection.getFilesToCompact().isEmpty()) {
       LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
-        this.storeNameStr + ": no store files to compact");
+        this + ": no store files to compact");
       compactSelection.emptyFileList();
       return compactSelection;
     }
@@ -1468,7 +1501,7 @@ public class Store extends SchemaConfigu
       // if we don't have enough files to compact, just wait
       if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipped compaction of " + this.storeNameStr
+          LOG.debug("Skipped compaction of " + this
             + ".  Only " + (end - start) + " file(s) of size "
             + StringUtils.humanReadableInt(totalSize)
             + " have met compaction criteria.");
@@ -1495,149 +1528,6 @@ public class Store extends SchemaConfigu
   }
 
   /**
-   * Do a minor/major compaction on an explicit set of storefiles in a Store.
-   * Uses the scan infrastructure to make it easy.
-   *
-   * @param filesToCompact which files to compact
-   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
-   * @param maxId Readers maximum sequence id.
-   * @return Product of compaction or null if all cells expired or deleted and
-   * nothing made it through the compaction.
-   * @throws IOException
-   */
-  StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
-                               final boolean majorCompaction, final long maxId)
-      throws IOException {
-    // calculate maximum key count after compaction (for blooms)
-    int maxKeyCount = 0;
-    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
-    for (StoreFile file : filesToCompact) {
-      StoreFile.Reader r = file.getReader();
-      if (r != null) {
-        // NOTE: getFilterEntries could cause under-sized blooms if the user
-        //       switches bloom type (e.g. from ROW to ROWCOL)
-        long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
-          ? r.getFilterEntries() : r.getEntries();
-        maxKeyCount += keyCount;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Compacting " + file +
-            ", keycount=" + keyCount +
-            ", bloomtype=" + r.getBloomFilterType().toString() +
-            ", size=" + StringUtils.humanReadableInt(r.length()) +
-            ", encoding=" + r.getHFileReader().getEncodingOnDisk());
-        }
-      }
-      // For major compactions calculate the earliest put timestamp
-      // of all involved storefiles. This is used to remove 
-      // family delete marker during the compaction.
-      if (majorCompaction) {
-        byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
-        if (tmp == null) {
-          // there's a file with no information, must be an old one
-          // assume we have very old puts
-          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
-        } else {
-          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
-        }
-      }
-    }
-
-    // keep track of compaction progress
-    progress = new CompactionProgress(maxKeyCount);
-
-    // For each file, obtain a scanner:
-    List<StoreFileScanner> scanners = StoreFileScanner
-      .getScannersForStoreFiles(filesToCompact, false, false, true);
-
-    // Make the instantiation lazy in case compaction produces no product; i.e.
-    // where all source cells are expired or deleted.
-    StoreFile.Writer writer = null;
-    // Find the smallest read point across all the Scanners.
-    long smallestReadPoint = region.getSmallestReadPoint();
-    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
-    try {
-      InternalScanner scanner = null;
-      try {
-        if (getHRegion().getCoprocessorHost() != null) {
-          scanner = getHRegion()
-              .getCoprocessorHost()
-              .preCompactScannerOpen(this, scanners,
-                  majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
-        }
-        if (scanner == null) {
-          Scan scan = new Scan();
-          scan.setMaxVersions(getFamily().getMaxVersions());
-          /* Include deletes, unless we are doing a major compaction */
-          scanner = new StoreScanner(this, getScanInfo(), scan, scanners,
-            majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
-            smallestReadPoint, earliestPutTs);
-        }
-        if (getHRegion().getCoprocessorHost() != null) {
-          InternalScanner cpScanner =
-            getHRegion().getCoprocessorHost().preCompact(this, scanner);
-          // NULL scanner returned from coprocessor hooks means skip normal processing
-          if (cpScanner == null) {
-            return null;
-          }
-          scanner = cpScanner;
-        }
-
-        int bytesWritten = 0;
-        // since scanner.next() can return 'false' but still be delivering data,
-        // we have to use a do/while loop.
-        ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
-        // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
-        boolean hasMore;
-        do {
-          hasMore = scanner.next(kvs, this.compactionKVMax);
-          if (writer == null && !kvs.isEmpty()) {
-            writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
-                true);
-          }
-          if (writer != null) {
-            // output to writer:
-            for (KeyValue kv : kvs) {
-              if (kv.getMemstoreTS() <= smallestReadPoint) {
-                kv.setMemstoreTS(0);
-              }
-              writer.append(kv);
-              // update progress per key
-              ++progress.currentCompactedKVs;
-
-              // check periodically to see if a system stop is requested
-              if (Store.closeCheckInterval > 0) {
-                bytesWritten += kv.getLength();
-                if (bytesWritten > Store.closeCheckInterval) {
-                  bytesWritten = 0;
-                  if (!this.region.areWritesEnabled()) {
-                    writer.close();
-                    fs.delete(writer.getPath(), false);
-                    throw new InterruptedIOException(
-                        "Aborting compaction of store " + this +
-                        " in region " + this.region +
-                        " because user requested stop.");
-                  }
-                }
-              }
-            }
-          }
-          kvs.clear();
-        } while (hasMore);
-      } finally {
-        if (scanner != null) {
-          scanner.close();
-        }
-      }
-    } finally {
-      if (writer != null) {
-        writer.appendMetadata(maxId, majorCompaction);
-        writer.close();
-      }
-    }
-    return writer;
-  }
-
-  /**
    * Validates a store file by opening and closing it. In HFileV2 this should
    * not be an expensive operation.
    *
@@ -1741,7 +1631,7 @@ public class Store extends SchemaConfigu
 
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
-      LOG.error("Failed replacing compacted files in " + this.storeNameStr +
+      LOG.error("Failed replacing compacted files in " + this +
         ". Compacted file is " + (result == null? "none": result.toString()) +
         ".  Files replaced " + compactedFiles.toString() +
         " some of which may have been already removed", e);
@@ -2027,7 +1917,7 @@ public class Store extends SchemaConfigu
         return mk.getRow();
       }
     } catch(IOException e) {
-      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
+      LOG.warn("Failed getting store size for " + this, e);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -2080,7 +1970,7 @@ public class Store extends SchemaConfigu
 
   @Override
   public String toString() {
-    return this.storeNameStr;
+    return getColumnFamilyName();
   }
 
   /**
@@ -2196,7 +2086,7 @@ public class Store extends SchemaConfigu
   }
 
   HRegionInfo getHRegionInfo() {
-    return this.region.regionInfo;
+    return this.region.getRegionInfo();
   }
 
   /**
@@ -2324,8 +2214,8 @@ public class Store extends SchemaConfigu
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
-          + (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+          + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+          + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
       + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sun Dec 23 20:54:12 2012
@@ -320,7 +320,7 @@ public class StoreFile extends SchemaCon
    * @return Calculated path to parent region file.
    * @throws IOException
    */
-  static Path getReferredToFile(final Path p) {
+  public static Path getReferredToFile(final Path p) {
     Matcher m = REF_NAME_PARSER.matcher(p.getName());
     if (m == null || !m.matches()) {
       LOG.warn("Failed match of store file name " + p.toString());

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun Dec 23 20:54:12 2012
@@ -340,8 +340,11 @@ public class StoreScanner extends NonLaz
 
     // only call setRow if the row changes; avoids confusing the query matcher
     // if scanning intra-row
-    if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
-      matcher.setRow(peeked.getRow());
+    byte[] row = peeked.getBuffer();
+    int offset = peeked.getRowOffset();
+    short length = peeked.getRowLength();
+    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
+      matcher.setRow(row, offset, length);
     }
 
     KeyValue kv;
@@ -521,9 +524,12 @@ public class StoreScanner extends NonLaz
     if (kv == null) {
       kv = lastTopKey;
     }
-    if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
+    byte[] row = kv.getBuffer();
+    int offset = kv.getRowOffset();
+    short length = kv.getRowLength();
+    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
       matcher.reset();
-      matcher.setRow(kv.getRow());
+      matcher.setRow(row, offset, length);
     }
   }
 

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java Sun Dec 23 20:54:12 2012
@@ -49,5 +49,4 @@ public class CompactionProgress {
   public float getProgressPct() {
     return currentCompactedKVs / totalCompactingKVs;
   }
-
 }

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java Sun Dec 23 20:54:12 2012
@@ -143,7 +143,12 @@ public class Compressor {
       // the status byte also acts as the higher order byte of the dictionary
       // entry
       short dictIdx = toShort(status, in.readByte());
-      byte[] entry = dict.getEntry(dictIdx);
+      byte[] entry;
+      try {
+        entry = dict.getEntry(dictIdx);
+      } catch (Exception ex) {
+        throw new IOException("Unable to uncompress the log entry", ex);
+      }
       if (entry == null) {
         throw new IOException("Missing dictionary entry for index "
             + dictIdx);

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Sun Dec 23 20:54:12 2012
@@ -167,6 +167,7 @@ public class HLog implements Syncable {
     Entry next(Entry reuse) throws IOException;
     void seek(long pos) throws IOException;
     long getPosition() throws IOException;
+    void reset() throws IOException;
   }
 
   public interface Writer {
@@ -695,15 +696,18 @@ public class HLog implements Syncable {
 
   /**
    * Get a reader for the WAL.
+   * The proper way to tail a log that can be under construction is to first use this method
+   * to get a reader then call {@link HLog.Reader#reset()} to see the new data. It will also
+   * take care of keeping implementation-specific context (like compression).
    * @param fs
    * @param path
    * @param conf
    * @return A WAL reader.  Close when done with it.
    * @throws IOException
    */
-  public static Reader getReader(final FileSystem fs,
-    final Path path, Configuration conf)
-  throws IOException {
+  public static Reader getReader(final FileSystem fs, final Path path,
+                                 Configuration conf)
+      throws IOException {
     try {
 
       if (logReaderClass == null) {

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Sun Dec 23 20:54:12 2012
@@ -139,15 +139,17 @@ public class SequenceFileLogReader imple
 
   Configuration conf;
   WALReader reader;
+  FileSystem fs;
 
   // Needed logging exceptions
   Path path;
   int edit = 0;
   long entryStart = 0;
+  boolean emptyCompressionContext = true;
   /**
    * Compression context to use reading.  Can be null if no compression.
    */
-  private CompressionContext compressionContext = null;
+  protected CompressionContext compressionContext = null;
 
   protected Class<? extends HLogKey> keyClass;
 
@@ -173,6 +175,7 @@ public class SequenceFileLogReader imple
     this.conf = conf;
     this.path = path;
     reader = new WALReader(fs, path, conf);
+    this.fs = fs;
 
     // If compression is enabled, new dictionaries are created here.
     boolean compression = reader.isWALCompressionEnabled();
@@ -237,11 +240,22 @@ public class SequenceFileLogReader imple
       throw addFileInfoToException(ioe);
     }
     edit++;
+    if (compressionContext != null && emptyCompressionContext) {
+      emptyCompressionContext = false;
+    }
     return b? e: null;
   }
 
   @Override
   public void seek(long pos) throws IOException {
+    if (compressionContext != null && emptyCompressionContext) {
+      while (next() != null) {
+        if (getPosition() == pos) {
+          emptyCompressionContext = false;
+          break;
+        }
+      }
+    }
     try {
       reader.seek(pos);
     } catch (IOException ioe) {
@@ -286,4 +300,11 @@ public class SequenceFileLogReader imple
 
     return ioe;
   }
+
+  @Override
+  public void reset() throws IOException {
+    // Resetting the reader lets us see newly added data if the file is being written to
+    // We also keep the same compressionContext which was previously populated for this file
+    reader = new WALReader(fs, path, conf);
+  }
 }
\ No newline at end of file