You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 21:58:32 UTC

svn commit: r1445918 [10/29] - in /hbase/branches/hbase-7290: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/ma...

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java Wed Feb 13 20:58:23 2013
@@ -61,7 +61,6 @@ public abstract class TableEventHandler 
   protected final MasterServices masterServices;
   protected final byte [] tableName;
   protected final String tableNameStr;
-  protected boolean persistedToZk = false;
 
   public TableEventHandler(EventType eventType, byte [] tableName, Server server,
       MasterServices masterServices)
@@ -111,10 +110,7 @@ public abstract class TableEventHandler 
       LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
     } catch (KeeperException e) {
       LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
-    } finally {
-      // notify the waiting thread that we're done persisting the request
-      setPersist();
-    }
+    } 
   }
 
   public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException {
@@ -165,29 +161,6 @@ public abstract class TableEventHandler 
     return done;
   }
 
-  /**
-   * Table modifications are processed asynchronously, but provide an API for
-   * you to query their status.
-   *
-   * @throws IOException
-   */
-  public synchronized void waitForPersist() throws IOException {
-    if (!persistedToZk) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw (IOException) new InterruptedIOException().initCause(ie);
-      }
-      assert persistedToZk;
-    }
-  }
-
-  private synchronized void setPersist() {
-    if (!persistedToZk) {
-      persistedToZk = true;
-      notify();
-    }
-  }
 
   /**
    * Gets a TableDescriptor from the masterServices.  Can Throw exceptions.

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java Wed Feb 13 20:58:23 2013
@@ -55,11 +55,8 @@ public class TableModifyFamilyHandler ex
     if (cpHost != null) {
       cpHost.preModifyColumnHandler(this.tableName, this.familyDesc);
     }
-    // Update table descriptor in HDFS
-    HTableDescriptor htd =
-      this.masterServices.getMasterFileSystem().modifyColumn(tableName, familyDesc);
-    // Update in-memory descriptor cache
-    this.masterServices.getTableDescriptors().add(htd);
+    // Update table descriptor
+    this.masterServices.getMasterFileSystem().modifyColumn(tableName, familyDesc);
     if (cpHost != null) {
       cpHost.postModifyColumnHandler(this.tableName, this.familyDesc);
     }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/LogMonitoring.java Wed Feb 13 20:58:23 2013
@@ -79,17 +79,18 @@ public abstract class LogMonitoring {
   private static void dumpTailOfLog(File f, PrintWriter out, long tailKb)
       throws IOException {
     FileInputStream fis = new FileInputStream(f);
+    BufferedReader r = null;
     try {
       FileChannel channel = fis.getChannel();
       channel.position(Math.max(0, channel.size() - tailKb*1024));
-      BufferedReader r = new BufferedReader(
-          new InputStreamReader(fis));
+      r = new BufferedReader(new InputStreamReader(fis));
       r.readLine(); // skip the first partial line
       String line;
       while ((line = r.readLine()) != null) {
         out.println(line);
       }
     } finally {
+      if (r != null) IOUtils.closeStream(r);
       IOUtils.closeStream(fis);
     }
   }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Wed Feb 13 20:58:23 2013
@@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Exec;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -66,10 +66,9 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
@@ -77,7 +76,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -116,7 +114,6 @@ import com.google.protobuf.ByteString;
  * or build components for protocol buffer requests.
  */
 @InterfaceAudience.Private
-@SuppressWarnings("deprecation")
 public final class RequestConverter {
 
   private RequestConverter() {
@@ -170,7 +167,7 @@ public final class RequestConverter {
    * @param regionName the name of the region to get
    * @param get the client Get
    * @param existenceOnly indicate if check row existence only
-   * @return a protocol buffer GetReuqest
+   * @return a protocol buffer GetRequest
    */
   public static GetRequest buildGetRequest(final byte[] regionName,
       final Get get, final boolean existenceOnly) throws IOException {
@@ -184,6 +181,27 @@ public final class RequestConverter {
   }
 
   /**
+   * Create a protocol buffer MultiGetRequest for client Gets All gets are going to be run against
+   * the same region.
+   * @param regionName the name of the region to get from
+   * @param gets the client Gets
+   * @param existenceOnly indicate if check rows existence only
+   * @return a protocol buffer MultiGetRequest
+   */
+  public static MultiGetRequest buildMultiGetRequest(final byte[] regionName, final List<Get> gets,
+      final boolean existenceOnly, final boolean closestRowBefore) throws IOException {
+    MultiGetRequest.Builder builder = MultiGetRequest.newBuilder();
+    RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+    builder.setExistenceOnly(existenceOnly);
+    builder.setClosestRowBefore(closestRowBefore);
+    builder.setRegion(region);
+    for (Get get : gets) {
+      builder.addGet(ProtobufUtil.toGet(get));
+    }
+    return builder.build();
+  }
+
+  /**
    * Create a protocol buffer MutateRequest for a client increment
    *
    * @param regionName
@@ -437,40 +455,6 @@ public final class RequestConverter {
   }
 
   /**
-   * Create a protocol buffer LockRowRequest
-   *
-   * @param regionName
-   * @param row
-   * @return a lock row request
-   */
-  public static LockRowRequest buildLockRowRequest(
-      final byte[] regionName, final byte[] row) {
-    LockRowRequest.Builder builder = LockRowRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
-      RegionSpecifierType.REGION_NAME, regionName);
-    builder.setRegion(region);
-    builder.addRow(ByteString.copyFrom(row));
-    return builder.build();
-  }
-
-  /**
-   * Create a protocol buffer UnlockRowRequest
-   *
-   * @param regionName
-   * @param lockId
-   * @return a unlock row request
-   */
-  public static UnlockRowRequest buildUnlockRowRequest(
-      final byte[] regionName, final long lockId) {
-    UnlockRowRequest.Builder builder = UnlockRowRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
-      RegionSpecifierType.REGION_NAME, regionName);
-    builder.setRegion(region);
-    builder.setLockId(lockId);
-    return builder.build();
-  }
-
-  /**
    * Create a protocol buffer bulk load request
    *
    * @param familyPaths
@@ -496,24 +480,6 @@ public final class RequestConverter {
   }
 
   /**
-   * Create a protocol buffer coprocessor exec request
-   *
-   * @param regionName
-   * @param exec
-   * @return a coprocessor exec request
-   * @throws IOException
-   */
-  public static ExecCoprocessorRequest buildExecCoprocessorRequest(
-      final byte[] regionName, final Exec exec) throws IOException {
-    ExecCoprocessorRequest.Builder builder = ExecCoprocessorRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
-      RegionSpecifierType.REGION_NAME, regionName);
-    builder.setRegion(region);
-    builder.setCall(ProtobufUtil.toExec(exec));
-    return builder.build();
-  }
-
-  /**
    * Create a protocol buffer multi request for a list of actions.
    * RowMutations in the list (if any) will be ignored.
    *
@@ -538,8 +504,6 @@ public final class RequestConverter {
         protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
       } else if (row instanceof Delete) {
         protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
-      } else if (row instanceof Exec) {
-        protoAction.setExec(ProtobufUtil.toExec((Exec)row));
       } else if (row instanceof Append) {
         protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
       } else if (row instanceof Increment) {
@@ -1054,9 +1018,10 @@ public final class RequestConverter {
    * @param table
    * @return a GetSchemaAlterStatusRequest
    */
-  public static GetSchemaAlterStatusRequest buildGetSchemaAlterStatusRequest(final byte [] table) {
+  public static GetSchemaAlterStatusRequest buildGetSchemaAlterStatusRequest(
+      final byte [] tableName) {
     GetSchemaAlterStatusRequest.Builder builder = GetSchemaAlterStatusRequest.newBuilder();
-    builder.setTableName(ByteString.copyFrom(table));
+    builder.setTableName(ByteString.copyFrom(tableName));
     return builder.build();
   }
 
@@ -1151,6 +1116,78 @@ public final class RequestConverter {
   }
 
   /**
+   * Create a request to grant user permissions.
+   *
+   * @param username the short user name who to grant permissions
+   * @param table optional table name the permissions apply
+   * @param family optional column family
+   * @param qualifier optional qualifier
+   * @param actions the permissions to be granted
+   * @return A {@link AccessControlProtos} GrantRequest
+   */
+  public static AccessControlProtos.GrantRequest buildGrantRequest(
+      String username, byte[] table, byte[] family, byte[] qualifier,
+      AccessControlProtos.Permission.Action... actions) {
+    AccessControlProtos.Permission.Builder permissionBuilder =
+        AccessControlProtos.Permission.newBuilder();
+    for (AccessControlProtos.Permission.Action a : actions) {
+      permissionBuilder.addAction(a);
+    }
+    if (table != null) {
+      permissionBuilder.setTable(ByteString.copyFrom(table));
+    }
+    if (family != null) {
+      permissionBuilder.setFamily(ByteString.copyFrom(family));
+    }
+    if (qualifier != null) {
+      permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+    }
+
+    return AccessControlProtos.GrantRequest.newBuilder()
+      .setPermission(
+          AccessControlProtos.UserPermission.newBuilder()
+              .setUser(ByteString.copyFromUtf8(username))
+              .setPermission(permissionBuilder.build())
+      ).build();
+  }
+
+  /**
+   * Create a request to revoke user permissions.
+   *
+   * @param username the short user name whose permissions to be revoked
+   * @param table optional table name the permissions apply
+   * @param family optional column family
+   * @param qualifier optional qualifier
+   * @param actions the permissions to be revoked
+   * @return A {@link AccessControlProtos} RevokeRequest
+   */
+  public static AccessControlProtos.RevokeRequest buildRevokeRequest(
+      String username, byte[] table, byte[] family, byte[] qualifier,
+      AccessControlProtos.Permission.Action... actions) {
+    AccessControlProtos.Permission.Builder permissionBuilder =
+        AccessControlProtos.Permission.newBuilder();
+    for (AccessControlProtos.Permission.Action a : actions) {
+      permissionBuilder.addAction(a);
+    }
+    if (table != null) {
+      permissionBuilder.setTable(ByteString.copyFrom(table));
+    }
+    if (family != null) {
+      permissionBuilder.setFamily(ByteString.copyFrom(family));
+    }
+    if (qualifier != null) {
+      permissionBuilder.setQualifier(ByteString.copyFrom(qualifier));
+    }
+
+    return AccessControlProtos.RevokeRequest.newBuilder()
+      .setPermission(
+          AccessControlProtos.UserPermission.newBuilder()
+              .setUser(ByteString.copyFromUtf8(username))
+              .setPermission(permissionBuilder.build())
+      ).build();
+  }
+
+  /**
    * Create a RegionOpenInfo based on given region info and version of offline node
    */
   private static RegionOpenInfo buildRegionOpenInfo(

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Wed Feb 13 20:58:23 2013
@@ -91,7 +91,8 @@ public final class ResponseConverter {
       if (result.hasException()) {
         results.add(ProtobufUtil.toException(result.getException()));
       } else if (result.hasValue()) {
-        Object value = ProtobufUtil.toObject(result.getValue());
+        ClientProtos.Result r = result.getValue();
+        Object value = ProtobufUtil.toResult(r);
         if (value instanceof ClientProtos.Result) {
           results.add(ProtobufUtil.toResult((ClientProtos.Result)value));
         } else {

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Wed Feb 13 20:58:23 2013
@@ -250,9 +250,12 @@ public class CompactSplitThread implemen
     while (!done) {
       try {
         done = t.awaitTermination(60, TimeUnit.SECONDS);
-        LOG.debug("Waiting for " + name + " to finish...");
+        LOG.info("Waiting for " + name + " to finish...");
+        if (!done) {
+          t.shutdownNow();
+        }
       } catch (InterruptedException ie) {
-        LOG.debug("Interrupted waiting for " + name + " to finish...");
+        LOG.warn("Interrupted waiting for " + name + " to finish...");
       }
     }
   }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Wed Feb 13 20:58:23 2013
@@ -157,10 +157,12 @@ public class CompactionTool extends Conf
       HStore store = getStore(region, familyDir);
       do {
         CompactionRequest cr = store.requestCompaction();
-        StoreFile storeFile = store.compact(cr);
-        if (storeFile != null) {
+        List<StoreFile> storeFiles = store.compact(cr);
+        if (storeFiles != null && !storeFiles.isEmpty()) {
           if (keepCompactedFiles && deleteCompacted) {
-            fs.delete(storeFile.getPath(), false);
+            for (StoreFile storeFile: storeFiles) {
+              fs.delete(storeFile.getPath(), false);
+            }
           }
         }
       } while (store.needsCompaction() && !compactOnce);

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 
 /**
  * A {@link RegionSplitPolicy} implementation which splits a region
@@ -36,14 +38,15 @@ public class ConstantSizeRegionSplitPoli
   @Override
   protected void configureForRegion(HRegion region) {
     super.configureForRegion(region);
-    long maxFileSize = region.getTableDesc().getMaxFileSize();
-
-    // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE.
-    if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
-      maxFileSize = getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
+    Configuration conf = getConf();
+    HTableDescriptor desc = region.getTableDesc();
+    if (desc != null) {
+      this.desiredMaxFileSize = desc.getMaxFileSize();
+    }
+    if (this.desiredMaxFileSize <= 0) {
+      this.desiredMaxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
         HConstants.DEFAULT_MAX_FILE_SIZE);
     }
-    this.desiredMaxFileSize = maxFileSize;
   }
 
   @Override

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -45,10 +46,15 @@ extends ConstantSizeRegionSplitPolicy {
   @Override
   protected void configureForRegion(HRegion region) {
     super.configureForRegion(region);
-    this.flushSize = region.getTableDesc() != null?
-      region.getTableDesc().getMemStoreFlushSize():
-      getConf().getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+    Configuration conf = getConf();
+    HTableDescriptor desc = region.getTableDesc();
+    if (desc != null) {
+      this.flushSize = desc.getMemStoreFlushSize();
+    }
+    if (this.flushSize <= 0) {
+      this.flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
+    }
   }
 
   @Override

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyPrefixRegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.classification.
 public class KeyPrefixRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy {
   private static final Log LOG = LogFactory
       .getLog(KeyPrefixRegionSplitPolicy.class);
-  public static String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
+  public static final String PREFIX_LENGTH_KEY = "prefix_split_key_policy.prefix_length";
 
   private int prefixLength = 0;
 

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Wed Feb 13 20:58:23 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -47,7 +48,7 @@ class LogRoller extends HasThread implem
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
   private final Server server;
-  private final RegionServerServices services;
+  protected final RegionServerServices services;
   private volatile long lastrolltime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollperiod;
@@ -92,7 +93,7 @@ class LogRoller extends HasThread implem
       try {
         this.lastrolltime = now;
         // This is array of actual region names.
-        byte [][] regionsToFlush = this.services.getWAL().rollWriter(rollLog.get());
+        byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
         if (regionsToFlush != null) {
           for (byte [] r: regionsToFlush) scheduleFlush(r);
         }
@@ -159,6 +160,10 @@ class LogRoller extends HasThread implem
     }
   }
 
+  protected HLog getWAL() throws IOException {
+    return this.services.getWAL(null);
+  }
+
   @Override
   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
     // Not interested

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Wed Feb 13 20:58:23 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -29,10 +30,10 @@ import java.util.SortedMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.cliffc.high_scale_lib.Counter;
 
@@ -59,7 +61,7 @@ import com.google.common.base.Preconditi
  * @see FlushRequester
  */
 @InterfaceAudience.Private
-class MemStoreFlusher extends HasThread implements FlushRequester {
+class MemStoreFlusher implements FlushRequester {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
   // These two data members go together.  Any entry in the one must have
   // a corresponding entry in the other.
@@ -71,8 +73,8 @@ class MemStoreFlusher extends HasThread 
 
   private final long threadWakeFrequency;
   private final HRegionServer server;
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition flushOccurred = lock.newCondition();
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Object blockSignal = new Object();
 
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
@@ -87,6 +89,9 @@ class MemStoreFlusher extends HasThread 
   private long blockingWaitTime;
   private final Counter updatesBlockedMsHighWater = new Counter();
 
+  private FlushHandler[] flushHandlers = null;
+  private int handlerCount;
+
   /**
    * @param conf
    * @param server
@@ -111,6 +116,7 @@ class MemStoreFlusher extends HasThread 
       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
+    this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
     LOG.info("globalMemStoreLimit=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
       ", globalMemStoreLimitLowMark=" +
@@ -213,64 +219,59 @@ class MemStoreFlusher extends HasThread 
     return true;
   }
 
-  @Override
-  public void run() {
-    while (!this.server.isStopped()) {
-      FlushQueueEntry fqe = null;
-      try {
-        wakeupPending.set(false); // allow someone to wake us up again
-        fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (fqe == null || fqe instanceof WakeupFlushThread) {
-          if (isAboveLowWaterMark()) {
-            LOG.debug("Flush thread woke up because memory above low water=" +
-              StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-            if (!flushOneForGlobalPressure()) {
-              // Wasn't able to flush any region, but we're above low water mark
-              // This is unlikely to happen, but might happen when closing the
-              // entire server - another thread is flushing regions. We'll just
-              // sleep a little bit to avoid spinning, and then pretend that
-              // we flushed one, so anyone blocked will check again
-              lock.lock();
-              try {
+  private class FlushHandler extends HasThread {
+    @Override
+    public void run() {
+      while (!server.isStopped()) {
+        FlushQueueEntry fqe = null;
+        try {
+          wakeupPending.set(false); // allow someone to wake us up again
+          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          if (fqe == null || fqe instanceof WakeupFlushThread) {
+            if (isAboveLowWaterMark()) {
+              LOG.debug("Flush thread woke up because memory above low water="
+                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+              if (!flushOneForGlobalPressure()) {
+                // Wasn't able to flush any region, but we're above low water mark
+                // This is unlikely to happen, but might happen when closing the
+                // entire server - another thread is flushing regions. We'll just
+                // sleep a little bit to avoid spinning, and then pretend that
+                // we flushed one, so anyone blocked will check again
                 Thread.sleep(1000);
-                flushOccurred.signalAll();
-              } finally {
-                lock.unlock();
+                wakeUpIfBlocking();
               }
+              // Enqueue another one of these tokens so we'll wake up again
+              wakeupFlushThread();
             }
-            // Enqueue another one of these tokens so we'll wake up again
-            wakeupFlushThread();
+            continue;
+          }
+          FlushRegionEntry fre = (FlushRegionEntry) fqe;
+          if (!flushRegion(fre)) {
+            break;
           }
+        } catch (InterruptedException ex) {
           continue;
-        }
-        FlushRegionEntry fre = (FlushRegionEntry)fqe;
-        if (!flushRegion(fre)) {
-          break;
-        }
-      } catch (InterruptedException ex) {
-        continue;
-      } catch (ConcurrentModificationException ex) {
-        continue;
-      } catch (Exception ex) {
-        LOG.error("Cache flusher failed for entry " + fqe, ex);
-        if (!server.checkFileSystem()) {
-          break;
+        } catch (ConcurrentModificationException ex) {
+          continue;
+        } catch (Exception ex) {
+          LOG.error("Cache flusher failed for entry " + fqe, ex);
+          if (!server.checkFileSystem()) {
+            break;
+          }
         }
       }
-    }
-    this.regionsInQueue.clear();
-    this.flushQueue.clear();
+      synchronized (regionsInQueue) {
+        regionsInQueue.clear();
+        flushQueue.clear();
+      }
 
-    // Signal anyone waiting, so they see the close flag
-    lock.lock();
-    try {
-      flushOccurred.signalAll();
-    } finally {
-      lock.unlock();
+      // Signal anyone waiting, so they see the close flag
+      wakeUpIfBlocking();
+      LOG.info(getName() + " exiting");
     }
-    LOG.info(getName() + " exiting");
   }
 
+
   private void wakeupFlushThread() {
     if (wakeupPending.compareAndSet(false, true)) {
       flushQueue.add(new WakeupFlushThread());
@@ -287,6 +288,10 @@ class MemStoreFlusher extends HasThread 
           continue;
         }
 
+        if (region.writestate.flushing || !region.writestate.writesEnabled) {
+          continue;
+        }
+
         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
           continue;
         }
@@ -332,11 +337,41 @@ class MemStoreFlusher extends HasThread 
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    lock.lock();
+    lock.writeLock().lock();
     try {
-      this.interrupt();
+      for (FlushHandler flushHander : flushHandlers) {
+        if (flushHander != null) flushHander.interrupt();
+      }
     } finally {
-      lock.unlock();
+      lock.writeLock().unlock();
+    }
+  }
+
+  synchronized void start(UncaughtExceptionHandler eh) {
+    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
+        server.getServerName().toString() + "-MemStoreFlusher", eh);
+    flushHandlers = new FlushHandler[handlerCount];
+    for (int i = 0; i < flushHandlers.length; i++) {
+      flushHandlers[i] = new FlushHandler();
+      flusherThreadFactory.newThread(flushHandlers[i]);
+      flushHandlers[i].start();
+    }
+  }
+
+  boolean isAlive() {
+    for (FlushHandler flushHander : flushHandlers) {
+      if (flushHander != null && flushHander.isAlive()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  void join() {
+    for (FlushHandler flushHander : flushHandlers) {
+      if (flushHander != null) {
+        Threads.shutdown(flushHander.getThread());
+      }
     }
   }
 
@@ -365,7 +400,8 @@ class MemStoreFlusher extends HasThread 
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
           if (!this.server.compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestCompaction(region, getName());
+              this.server.compactSplitThread.requestCompaction(region, Thread
+                  .currentThread().getName());
             } catch (IOException e) {
               LOG.error(
                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@@ -404,8 +440,8 @@ class MemStoreFlusher extends HasThread 
         // emergencyFlush, then item was removed via a flushQueue.poll.
         flushQueue.remove(fqe);
      }
-     lock.lock();
     }
+    lock.readLock().lock();
     try {
       boolean shouldCompact = region.flushcache();
       // We just want to check the size
@@ -413,7 +449,7 @@ class MemStoreFlusher extends HasThread 
       if (shouldSplit) {
         this.server.compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestCompaction(region, getName());
+        server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
       }
 
     } catch (DroppedSnapshotException ex) {
@@ -432,15 +468,18 @@ class MemStoreFlusher extends HasThread 
         return false;
       }
     } finally {
-      try {
-        flushOccurred.signalAll();
-      } finally {
-        lock.unlock();
-      }
+      lock.readLock().unlock();
+      wakeUpIfBlocking();
     }
     return true;
   }
 
+  private void wakeUpIfBlocking() {
+    synchronized (blockSignal) {
+      blockSignal.notifyAll();
+    }
+  }
+
   private boolean isTooManyStoreFiles(HRegion region) {
     for (Store hstore : region.stores.values()) {
       if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@@ -458,12 +497,12 @@ class MemStoreFlusher extends HasThread 
    */
   public void reclaimMemStoreMemory() {
     if (isAboveHighWaterMark()) {
-      lock.lock();
-      try {
+      long start = System.currentTimeMillis();
+      synchronized (this.blockSignal) {
         boolean blocked = false;
         long startTime = 0;
         while (isAboveHighWaterMark() && !server.isStopped()) {
-          if(!blocked){
+          if (!blocked) {
             startTime = EnvironmentEdgeManager.currentTimeMillis();
             LOG.info("Blocking updates on " + server.toString() +
             ": the global memstore size " +
@@ -476,10 +515,12 @@ class MemStoreFlusher extends HasThread 
           try {
             // we should be able to wait forever, but we've seen a bug where
             // we miss a notify, so put a 5 second bound on it at least.
-            flushOccurred.await(5, TimeUnit.SECONDS);
+            blockSignal.wait(5 * 1000);
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
           }
+          long took = System.currentTimeMillis() - start;
+          LOG.warn("Memstore is above high water mark and block " + took + "ms");
         }
         if(blocked){
           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -488,8 +529,6 @@ class MemStoreFlusher extends HasThread 
           }
           LOG.info("Unblocking updates for server " + server.toString());
         }
-      } finally {
-        lock.unlock();
       }
     } else if (isAboveLowWaterMark()) {
       wakeupFlushThread();
@@ -500,21 +539,21 @@ class MemStoreFlusher extends HasThread 
     return "flush_queue="
         + flushQueue.size();
   }
-  
+
   public String dumpQueue() {
     StringBuilder queueList = new StringBuilder();
     queueList.append("Flush Queue Queue dump:\n");
     queueList.append("  Flush Queue:\n");
     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
-    
+
     while(it.hasNext()){
       queueList.append("    "+it.next().toString());
       queueList.append("\n");
     }
-    
+
     return queueList.toString();
   }
-  
+
   interface FlushQueueEntry extends Delayed {}
 
   /**
@@ -530,6 +569,12 @@ class MemStoreFlusher extends HasThread 
     public int compareTo(Delayed o) {
       return -1;
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (this == obj);
+    }
+
   }
 
   /**
@@ -597,5 +642,17 @@ class MemStoreFlusher extends HasThread 
     public String toString() {
       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
+      Delayed other = (Delayed) obj;
+      return compareTo(other) == 0;
+    }
   }
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java Wed Feb 13 20:58:23 2013
@@ -40,11 +40,11 @@ interface OnlineRegions extends Server {
   /**
    * This method removes HRegion corresponding to hri from the Map of onlineRegions.
    *
-   * @param encodedRegionName
-   * @param destination - destination, if any. Null otherwise
+   * @param r Region to remove.
+   * @param destination Destination, if any, null otherwise.
    * @return True if we removed a region from online list.
    */
-  public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination);
+  public boolean removeFromOnlineRegions(final HRegion r, ServerName destination);
 
   /**
    * Return {@link HRegion} instance.

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Wed Feb 13 20:58:23 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocess
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -135,6 +134,7 @@ public class RegionCoprocessorHost
    */
   public RegionCoprocessorHost(final HRegion region,
       final RegionServerServices rsServices, final Configuration conf) {
+    this.conf = conf;
     this.rsServices = rsServices;
     this.region = region;
     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
@@ -214,11 +214,6 @@ public class RegionCoprocessorHost
     // It uses a visitor pattern to invoke registered Endpoint
     // method.
     for (Class c : implClass.getInterfaces()) {
-      if (CoprocessorProtocol.class.isAssignableFrom(c)) {
-        region.registerProtocol(c, (CoprocessorProtocol)instance);
-      }
-      // we allow endpoints to register as both CoproocessorProtocols and Services
-      // for ease of transition
       if (CoprocessorService.class.isAssignableFrom(c)) {
         region.registerService( ((CoprocessorService)instance).getService() );
       }
@@ -430,9 +425,11 @@ public class RegionCoprocessorHost
    * Called prior to rewriting the store files selected for compaction
    * @param store the store being compacted
    * @param scanner the scanner used to read store data during compaction
-   * @throws IOException 
+   * @param scanType type of Scan
+   * @throws IOException
    */
-  public InternalScanner preCompact(HStore store, InternalScanner scanner) throws IOException {
+  public InternalScanner preCompact(HStore store, InternalScanner scanner,
+      ScanType scanType) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     boolean bypass = false;
     for (RegionEnvironment env: coprocessors) {
@@ -440,7 +437,7 @@ public class RegionCoprocessorHost
         ctx = ObserverContext.createAndPrepare(env, ctx);
         try {
           scanner = ((RegionObserver)env.getInstance()).preCompact(
-              ctx, store, scanner);
+              ctx, store, scanner, scanType);
         } catch (Throwable e) {
           handleCoprocessorThrowable(env,e);
         }
@@ -503,7 +500,7 @@ public class RegionCoprocessorHost
 
   /**
    * Invoked before a memstore flush
-   * @throws IOException 
+   * @throws IOException
    */
   public void preFlush() throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -524,7 +521,7 @@ public class RegionCoprocessorHost
 
   /**
    * See
-   * {@link RegionObserver#preFlush(ObserverContext<RegionCoprocessorEnvironment>, HStore, KeyValueScanner)}
+   * {@link RegionObserver#preFlushScannerOpen(ObserverContext, HStore, KeyValueScanner, InternalScanner)}
    */
   public InternalScanner preFlushScannerOpen(HStore store, KeyValueScanner memstoreScanner) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -607,7 +604,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked just before a split
    * @throws IOException
@@ -633,7 +630,7 @@ public class RegionCoprocessorHost
    * Invoked just after a split
    * @param l the new left-hand daughter region
    * @param r the new right-hand daughter region
-   * @throws IOException 
+   * @throws IOException
    */
   public void postSplit(HRegion l, HRegion r) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@@ -651,7 +648,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked just before the rollback of a failed split is started
    * @throws IOException
@@ -672,7 +669,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked just after the rollback of a failed split is done
    * @throws IOException
@@ -693,7 +690,7 @@ public class RegionCoprocessorHost
       }
     }
   }
-  
+
   /**
    * Invoked after a split is completed irrespective of a failure or success.
    * @throws IOException
@@ -1355,6 +1352,35 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * This will be called by the scan flow when the current scanned row is being filtered out by the
+   * filter.
+   * @param s the scanner
+   * @param currentRow The current rowkey which got filtered out
+   * @return whether more rows are available for the scanner or not
+   * @throws IOException
+   */
+  public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow)
+      throws IOException {
+    boolean hasMore = true; // By default assume more rows there.
+    ObserverContext<RegionCoprocessorEnvironment> ctx = null;
+    for (RegionEnvironment env : coprocessors) {
+      if (env.getInstance() instanceof RegionObserver) {
+        ctx = ObserverContext.createAndPrepare(env, ctx);
+        try {
+          hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
+              hasMore);
+        } catch (Throwable e) {
+          handleCoprocessorThrowable(env, e);
+        }
+        if (ctx.shouldComplete()) {
+          break;
+        }
+      }
+    }
+    return hasMore;
+  }
+  
+  /**
    * @param s the scanner
    * @return true if default behavior should be bypassed, false otherwise
    * @exception IOException Exception

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Wed Feb 13 20:58:23 2013
@@ -19,10 +19,11 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -38,8 +39,9 @@ public interface RegionServerServices ex
    */
   public boolean isStopping();
 
-  /** @return the HLog */
-  public HLog getWAL();
+  /** @return the HLog for a particular region. Pass null for getting the 
+   * default (common) WAL */
+  public HLog getWAL(HRegionInfo regionInfo) throws IOException;
 
   /**
    * @return Implementation of {@link CompactionRequestor} or null.
@@ -79,7 +81,7 @@ public interface RegionServerServices ex
    * Get the regions that are currently being opened or closed in the RS
    * @return map of regions in transition in this RS
    */
-  public Map<byte[], Boolean> getRegionsInTransitionInRS();
+  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS();
 
   /**
    * @return Return the FileSystem object used by the regionserver

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java Wed Feb 13 20:58:23 2013
@@ -91,10 +91,9 @@ public abstract class RegionSplitPolicy 
 
   /**
    * Create the RegionSplitPolicy configured for the given table.
-   * Each
    * @param region
    * @param conf
-   * @return
+   * @return a RegionSplitPolicy
    * @throws IOException
    */
   public static RegionSplitPolicy create(HRegion region,

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Wed Feb 13 20:58:23 2013
@@ -119,7 +119,7 @@ public interface RowProcessor<S extends 
   /**
    * This method should return any additional data that is needed on the
    * server side to construct the RowProcessor. The server will pass this to
-   * the {@link #initialize(ByteString)} method. If there is no RowProcessor
+   * the {@link #initialize(Message msg)} method. If there is no RowProcessor
    * specific data then null should be returned.
    * @return the PB message
    * @throws IOException

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Feb 13 20:58:23 2013
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DeserializationException;
-import org.apache.hadoop.hbase.RegionServerStatusProtocol;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
@@ -80,7 +79,7 @@ public class SplitLogWorker extends ZooK
   private volatile String currentTask = null;
   private int currentVersion;
   private volatile boolean exitWorker;
-  private Object grabTaskLock = new Object();
+  private final Object grabTaskLock = new Object();
   private boolean workerInGrabTask = false;
 
 
@@ -109,8 +108,8 @@ public class SplitLogWorker extends ZooK
         // interrupted or has encountered a transient error and when it has
         // encountered a bad non-retry-able persistent error.
         try {
-          if (HLogSplitter.splitLogFile(rootdir,
-              fs.getFileStatus(new Path(filename)), fs, conf, p, sequenceIdChecker) == false) {
+          if (!HLogSplitter.splitLogFile(rootdir,
+              fs.getFileStatus(new Path(filename)), fs, conf, p, sequenceIdChecker)) {
             return Status.PREEMPTED;
           }
         } catch (InterruptedIOException iioe) {
@@ -249,13 +248,13 @@ public class SplitLogWorker extends ZooK
         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
         return;
       }
-      if (slt.isUnassigned() == false) {
+      if (!slt.isUnassigned()) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
         return;
       }
 
       currentVersion = stat.getVersion();
-      if (attemptToOwnTask(true) == false) {
+      if (!attemptToOwnTask(true)) {
         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
         return;
       }
@@ -277,7 +276,7 @@ public class SplitLogWorker extends ZooK
 
         @Override
         public boolean progress() {
-          if (attemptToOwnTask(false) == false) {
+          if (!attemptToOwnTask(false)) {
             LOG.warn("Failed to heartbeat the task" + currentTask);
             return false;
           }
@@ -323,7 +322,6 @@ public class SplitLogWorker extends ZooK
         Thread.interrupted();
       }
     }
-    return;
   }
 
   /**
@@ -395,8 +393,7 @@ public class SplitLogWorker extends ZooK
     } catch (KeeperException e) {
       LOG.warn("failed to end task, " + path + " " + slt, e);
     }
-    SplitLogCounters.tot_wkr_final_transistion_failed.incrementAndGet();
-    return;
+    SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
   }
 
   void getDataSetWatchAsync() {
@@ -531,7 +528,6 @@ public class SplitLogWorker extends ZooK
     worker = new Thread(null, this, "SplitLogWorker-" + serverName);
     exitWorker = false;
     worker.start();
-    return;
   }
 
   /**
@@ -558,7 +554,6 @@ public class SplitLogWorker extends ZooK
       }
       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
       getDataSetWatchSuccess(path, data);
-      return;
     }
   }
 
@@ -574,7 +569,7 @@ public class SplitLogWorker extends ZooK
       DONE(),
       ERR(),
       RESIGNED(),
-      PREEMPTED();
+      PREEMPTED()
     }
     public Status exec(String name, CancelableProgressable p);
   }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Wed Feb 13 20:58:23 2013
@@ -296,7 +296,7 @@ public class SplitTransaction {
       throw new IOException(errorMsg);
     }
     if (!testing) {
-      services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName(), null);
+      services.removeFromOnlineRegions(this.parent, null);
     }
     this.journal.add(JournalEntry.OFFLINED_PARENT);
 

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Wed Feb 13 20:58:23 2013
@@ -219,7 +219,7 @@ public class StoreFileScanner implements
    *
    * @param s
    * @param k
-   * @return
+   * @return false if not found or if k is after the end.
    * @throws IOException
    */
   public static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
@@ -279,7 +279,7 @@ public class StoreFileScanner implements
     boolean haveToSeek = true;
     if (useBloom) {
       // check ROWCOL Bloom filter first.
-      if (reader.getBloomFilterType() == StoreFile.BloomType.ROWCOL) {
+      if (reader.getBloomFilterType() == BloomType.ROWCOL) {
         haveToSeek = reader.passesGeneralBloomFilter(kv.getBuffer(),
             kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
             kv.getQualifierOffset(), kv.getQualifierLength());

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Feb 13 20:58:23 2013
@@ -44,38 +44,38 @@ import org.apache.hadoop.hbase.util.Envi
 public class StoreScanner extends NonLazyKeyValueScanner
     implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
-  private HStore store;
-  private ScanQueryMatcher matcher;
-  private KeyValueHeap heap;
-  private boolean cacheBlocks;
-
-  private int countPerRow = 0;
-  private int storeLimit = -1;
-  private int storeOffset = 0;
+  protected HStore store;
+  protected ScanQueryMatcher matcher;
+  protected KeyValueHeap heap;
+  protected boolean cacheBlocks;
+
+  protected int countPerRow = 0;
+  protected int storeLimit = -1;
+  protected int storeOffset = 0;
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
   // Doesnt need to be volatile because it's always accessed via synchronized methods
-  private boolean closing = false;
-  private final boolean isGet;
-  private final boolean explicitColumnQuery;
-  private final boolean useRowColBloom;
-  private final Scan scan;
-  private final NavigableSet<byte[]> columns;
-  private final long oldestUnexpiredTS;
-  private final int minVersions;
+  protected boolean closing = false;
+  protected final boolean isGet;
+  protected final boolean explicitColumnQuery;
+  protected final boolean useRowColBloom;
+  protected final Scan scan;
+  protected final NavigableSet<byte[]> columns;
+  protected final long oldestUnexpiredTS;
+  protected final int minVersions;
 
   /** We don't ever expect to change this, the constant is just for clarity. */
   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
 
   /** Used during unit testing to ensure that lazy seek does save seek ops */
-  private static boolean lazySeekEnabledGlobally =
+  protected static boolean lazySeekEnabledGlobally =
       LAZY_SEEK_ENABLED_BY_DEFAULT;
 
   // if heap == null and lastTop != null, you need to reseek given the key below
-  private KeyValue lastTop = null;
+  protected KeyValue lastTop = null;
 
   /** An internal constructor. */
-  private StoreScanner(HStore store, boolean cacheBlocks, Scan scan,
+  protected StoreScanner(HStore store, boolean cacheBlocks, Scan scan,
       final NavigableSet<byte[]> columns, long ttl, int minVersions) {
     this.store = store;
     this.cacheBlocks = cacheBlocks;
@@ -203,7 +203,7 @@ public class StoreScanner extends NonLaz
    * Get a filtered list of scanners. Assumes we are not in a compaction.
    * @return list of scanners to seek
    */
-  private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
+  protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
     final boolean isCompaction = false;
     return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
         isCompaction, matcher));
@@ -213,7 +213,7 @@ public class StoreScanner extends NonLaz
    * Filters the given list of scanners using Bloom filter, time range, and
    * TTL.
    */
-  private List<KeyValueScanner> selectScannersFrom(
+  protected List<KeyValueScanner> selectScannersFrom(
       final List<? extends KeyValueScanner> allScanners) {
     boolean memOnly;
     boolean filesOnly;
@@ -277,13 +277,8 @@ public class StoreScanner extends NonLaz
 
   @Override
   public synchronized boolean seek(KeyValue key) throws IOException {
-    if (this.heap == null) {
-
-      List<KeyValueScanner> scanners = getScannersNoCompaction();
-
-      heap = new KeyValueHeap(scanners, store.comparator);
-    }
-
+    // reset matcher state, in case that underlying store changed
+    checkReseek();
     return this.heap.seek(key);
   }
 
@@ -491,7 +486,7 @@ public class StoreScanner extends NonLaz
    *         next KV)
    * @throws IOException
    */
-  private boolean checkReseek() throws IOException {
+  protected boolean checkReseek() throws IOException {
     if (this.heap == null && this.lastTop != null) {
       resetScannerStack(this.lastTop);
       if (this.heap.peek() == null
@@ -507,7 +502,7 @@ public class StoreScanner extends NonLaz
     return false;
   }
 
-  private void resetScannerStack(KeyValue lastTopKey) throws IOException {
+  protected void resetScannerStack(KeyValue lastTopKey) throws IOException {
     if (heap != null) {
       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
     }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java Wed Feb 13 20:58:23 2013
@@ -148,6 +148,4 @@ public class TimeRangeTracker implements
   public String toString() {
     return "[" + minimumTimestamp + "," + maximumTimestamp + "]";
   }
-
 }
-

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Wed Feb 13 20:58:23 2013
@@ -19,391 +19,133 @@
 
 package org.apache.hadoop.hbase.regionserver.compactions;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.StringUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.Random;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
- * The default (and only, as of now) algorithm for selecting files for compaction.
- * Combines the compaction configuration and the provisional file selection that
- * it's given to produce the list of suitable candidates for compaction.
+ * A compaction policy determines how to select files for compaction,
+ * how to compact them, and how to generate the compacted files.
  */
 @InterfaceAudience.Private
-public class CompactionPolicy {
+public abstract class CompactionPolicy extends Configured {
+
+  /**
+   * The name of the configuration parameter that specifies
+   * the class of a compaction policy that is used to compact
+   * HBase store files.
+   */
+  public static final String COMPACTION_POLICY_KEY =
+    "hbase.hstore.compaction.policy";
 
-  private static final Log LOG = LogFactory.getLog(CompactionPolicy.class);
-  private final static Calendar calendar = new GregorianCalendar();
+  private static final Class<? extends CompactionPolicy>
+    DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class;
 
   CompactionConfiguration comConf;
-  StoreConfiguration storeConfig;
-
-  public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
-    updateConfiguration(configuration, storeConfig);
-  }
+  Compactor compactor;
+  HStore store;
 
   /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
    * @throws java.io.IOException
    */
-  public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
-      boolean isUserCompaction, boolean forceMajor)
-    throws IOException {
-    // Prelimanry compaction subject to filters
-    CompactSelection candidateSelection = new CompactSelection(candidateFiles);
-    long cfTtl = this.storeConfig.getStoreFileTtl();
-    if (!forceMajor) {
-      // If there are expired files, only select them so that compaction deletes them
-      if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
-        CompactSelection expiredSelection = selectExpiredStoreFiles(
-          candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
-        if (expiredSelection != null) {
-          return expiredSelection;
-        }
-      }
-      candidateSelection = skipLargeFiles(candidateSelection);
-    }
-
-    // Force a major compaction if this is a user-requested major compaction,
-    // or if we do not have too many files to compact and this was requested
-    // as a major compaction.
-    // Or, if there are any references among the candidates.
-    boolean majorCompaction = (
-      (forceMajor && isUserCompaction)
-      || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
-          && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
-      || StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
-      );
-
-    if (!majorCompaction) {
-      // we're doing a minor compaction, let's see what files are applicable
-      candidateSelection = filterBulk(candidateSelection);
-      candidateSelection = applyCompactionPolicy(candidateSelection);
-      candidateSelection = checkMinFilesCriteria(candidateSelection);
-    }
-    candidateSelection =
-        removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
-    return candidateSelection;
-  }
+  public abstract CompactSelection selectCompaction(
+    final List<StoreFile> candidateFiles, final boolean isUserCompaction,
+    final boolean forceMajor) throws IOException;
 
   /**
-   * Updates the compaction configuration. Used for tests.
-   * TODO: replace when HBASE-3909 is completed in some form.
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
    */
-  public void updateConfiguration(Configuration configuration,
-      StoreConfiguration storeConfig) {
-    this.comConf = new CompactionConfiguration(configuration, storeConfig);
-    this.storeConfig = storeConfig;
-  }
+  public abstract boolean isMajorCompaction(
+    final List<StoreFile> filesToCompact) throws IOException;
 
   /**
-   * Select the expired store files to compact
-   *
-   * @param candidates the initial set of storeFiles
-   * @param maxExpiredTimeStamp
-   *          The store file will be marked as expired if its max time stamp is
-   *          less than this maxExpiredTimeStamp.
-   * @return A CompactSelection contains the expired store files as
-   *         filesToCompact
+   * @param compactionSize Total size of some compaction
+   * @return whether this should be a large or small compaction
    */
-  private CompactSelection selectExpiredStoreFiles(
-      CompactSelection candidates, long maxExpiredTimeStamp) {
-    List<StoreFile> filesToCompact = candidates.getFilesToCompact();
-    if (filesToCompact == null || filesToCompact.size() == 0)
-      return null;
-    ArrayList<StoreFile> expiredStoreFiles = null;
-    boolean hasExpiredStoreFiles = false;
-    CompactSelection expiredSFSelection = null;
-
-    for (StoreFile storeFile : filesToCompact) {
-      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
-        LOG.info("Deleting the expired store file by compaction: "
-            + storeFile.getPath() + " whose maxTimeStamp is "
-            + storeFile.getReader().getMaxTimestamp()
-            + " while the max expired timestamp is " + maxExpiredTimeStamp);
-        if (!hasExpiredStoreFiles) {
-          expiredStoreFiles = new ArrayList<StoreFile>();
-          hasExpiredStoreFiles = true;
-        }
-        expiredStoreFiles.add(storeFile);
-      }
-    }
-
-    if (hasExpiredStoreFiles) {
-      expiredSFSelection = new CompactSelection(expiredStoreFiles);
-    }
-    return expiredSFSelection;
-  }
+  public abstract boolean throttleCompaction(long compactionSize);
 
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all files above maxCompactSize
-   * Also save all references. We MUST compact them
+   * @param numCandidates Number of candidate store files
+   * @return whether a compactionSelection is possible
    */
-  private CompactSelection skipLargeFiles(CompactSelection candidates) {
-    int pos = 0;
-    while (pos < candidates.getFilesToCompact().size() &&
-      candidates.getFilesToCompact().get(pos).getReader().length() >
-        comConf.getMaxCompactSize() &&
-      !candidates.getFilesToCompact().get(pos).isReference()) {
-      ++pos;
-    }
-    if (pos > 0) {
-      LOG.debug("Some files are too large. Excluding " + pos
-          + " files from compaction candidates");
-      candidates.clearSubList(0, pos);
-    }
-    return candidates;
-  }
+  public abstract boolean needsCompaction(int numCandidates);
 
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all bulk load files if configured
+   * Inform the policy that some configuration has been change,
+   * so cached value should be updated it any.
    */
-  private CompactSelection filterBulk(CompactSelection candidates) {
-    candidates.getFilesToCompact().removeAll(Collections2.filter(
-        candidates.getFilesToCompact(),
-        new Predicate<StoreFile>() {
-          @Override
-          public boolean apply(StoreFile input) {
-            return input.excludeFromMinorCompaction();
-          }
-        }));
-    return candidates;
-  }
-
-  /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * take upto maxFilesToCompact from the start
-   */
-  private CompactSelection removeExcessFiles(CompactSelection candidates,
-      boolean isUserCompaction, boolean isMajorCompaction) {
-    int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
-    if (excess > 0) {
-      if (isMajorCompaction && isUserCompaction) {
-        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
-            " files because of a user-requested major compaction");
-      } else {
-        LOG.debug("Too many admissible files. Excluding " + excess
-          + " files from compaction candidates");
-        candidates.clearSubList(comConf.getMaxFilesToCompact(),
-          candidates.getFilesToCompact().size());
-      }
+  public void updateConfiguration() {
+    if (getConf() != null && store != null) {
+      comConf = new CompactionConfiguration(getConf(), store);
     }
-    return candidates;
   }
+
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * forget the compactionSelection if we don't have enough files
+   * Get the compactor for this policy
+   * @return the compactor for this policy
    */
-  private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
-    int minFiles = comConf.getMinFilesToCompact();
-    if (candidates.getFilesToCompact().size() < minFiles) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Not compacting files because we only have " +
-            candidates.getFilesToCompact().size() +
-          " files ready for compaction.  Need " + minFiles + " to initiate.");
-      }
-      candidates.emptyFileList();
-    }
-    return candidates;
+  public Compactor getCompactor() {
+    return compactor;
   }
 
   /**
-    * @param candidates pre-filtrate
-    * @return filtered subset
-    * -- Default minor compaction selection algorithm:
-    * choose CompactSelection from candidates --
-    * First exclude bulk-load files if indicated in configuration.
-    * Start at the oldest file and stop when you find the first file that
-    * meets compaction criteria:
-    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
-    * OR
-    * (2) within the compactRatio of sum(newer_files)
-    * Given normal skew, any newer files will also meet this criteria
-    * <p/>
-    * Additional Note:
-    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
-    * compact().  Consider the oldest files first to avoid a
-    * situation where we always compact [end-threshold,end).  Then, the
-    * last file becomes an aggregate of the previous compactions.
-    *
-    * normal skew:
-    *
-    *         older ----> newer (increasing seqID)
-    *     _
-    *    | |   _
-    *    | |  | |   _
-    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
-    *    | |  | |  | |  | |  _  | |
-    *    | |  | |  | |  | | | | | |
-    *    | |  | |  | |  | | | | | |
-    */
-  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
-    if (candidates.getFilesToCompact().isEmpty()) {
-      return candidates;
-    }
-
-    // we're doing a minor compaction, let's see what files are applicable
-    int start = 0;
-    double ratio = comConf.getCompactionRatio();
-    if (isOffPeakHour() && candidates.trySetOffpeak()) {
-      ratio = comConf.getCompactionRatioOffPeak();
-      LOG.info("Running an off-peak compaction, selection ratio = " + ratio
-          + ", numOutstandingOffPeakCompactions is now "
-          + CompactSelection.getNumOutStandingOffPeakCompactions());
-    }
-
-    // get store file sizes for incremental compacting selection.
-    int countOfFiles = candidates.getFilesToCompact().size();
-    long[] fileSizes = new long[countOfFiles];
-    long[] sumSize = new long[countOfFiles];
-    for (int i = countOfFiles - 1; i >= 0; --i) {
-      StoreFile file = candidates.getFilesToCompact().get(i);
-      fileSizes[i] = file.getReader().length();
-      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
-      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
-      sumSize[i] = fileSizes[i]
-        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
-        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
-    }
-
-
-    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
-      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
-          (long) (sumSize[start + 1] * ratio))) {
-      ++start;
-    }
-    if (start < countOfFiles) {
-      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
-        + " files from " + countOfFiles + " candidates");
-    }
-
-    candidates = candidates.getSubList(start, countOfFiles);
-
-    return candidates;
-  }
-
-  /*
-   * @param filesToCompact Files to compact. Can be null.
-   * @return True if we should run a major compaction.
+   * Set the new configuration
    */
-  public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
-      throws IOException {
-    boolean result = false;
-    long mcTime = getNextMajorCompactTime(filesToCompact);
-    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
-      return result;
-    }
-    // TODO: Use better method for determining stamp of last major (HBASE-2990)
-    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
-    long now = System.currentTimeMillis();
-    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
-      // Major compaction time has elapsed.
-      long cfTtl = this.storeConfig.getStoreFileTtl();
-      if (filesToCompact.size() == 1) {
-        // Single file
-        StoreFile sf = filesToCompact.get(0);
-        Long minTimestamp = sf.getMinimumTimestamp();
-        long oldest = (minTimestamp == null)
-            ? Long.MIN_VALUE
-            : now - minTimestamp.longValue();
-        if (sf.isMajorCompaction() &&
-            (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this +
-                " because one (major) compacted file only and oldestTime " +
-                oldest + "ms is < ttl=" + cfTtl);
-          }
-        } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
-          LOG.debug("Major compaction triggered on store " + this +
-            ", because keyvalues outdated; time since last major compaction " +
-            (now - lowTimestamp) + "ms");
-          result = true;
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this +
-              "; time since last major compaction " + (now - lowTimestamp) + "ms");
-        }
-        result = true;
-      }
-    }
-    return result;
-  }
-
-  public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
-    // default = 24hrs
-    long ret = comConf.getMajorCompactionPeriod();
-    if (ret > 0) {
-      // default = 20% = +/- 4.8 hrs
-      double jitterPct = comConf.getMajorCompactionJitter();
-      if (jitterPct > 0) {
-        long jitter = Math.round(ret * jitterPct);
-        // deterministic jitter avoids a major compaction storm on restart
-        Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
-        if (seed != null) {
-          double rnd = (new Random(seed)).nextDouble();
-          ret += jitter - Math.round(2L * jitter * rnd);
-        } else {
-          ret = 0; // no storefiles == no major compaction
-        }
-      }
-    }
-    return ret;
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    updateConfiguration();
   }
 
   /**
-   * @param compactionSize Total size of some compaction
-   * @return whether this should be a large or small compaction
+   * Upon construction, this method will be called with the HStore
+   * to be governed. It will be called once and only once.
    */
-  public boolean throttleCompaction(long compactionSize) {
-    return compactionSize > comConf.getThrottlePoint();
+  protected void configureForStore(HStore store) {
+    this.store = store;
+    updateConfiguration();
   }
 
   /**
-   * @param numCandidates Number of candidate store files
-   * @return whether a compactionSelection is possible
+   * Create the CompactionPolicy configured for the given HStore.
+   * @param store
+   * @param conf
+   * @return a CompactionPolicy
+   * @throws IOException
    */
-  public boolean needsCompaction(int numCandidates) {
-    return numCandidates > comConf.getMinFilesToCompact();
+  public static CompactionPolicy create(HStore store,
+      Configuration conf) throws IOException {
+    Class<? extends CompactionPolicy> clazz =
+      getCompactionPolicyClass(store.getFamily(), conf);
+    CompactionPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+    policy.configureForStore(store);
+    return policy;
   }
 
-  /**
-   * @return whether this is off-peak hour
-   */
-  private boolean isOffPeakHour() {
-    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
-    int startHour = comConf.getOffPeakStartHour();
-    int endHour = comConf.getOffPeakEndHour();
-    // If offpeak time checking is disabled just return false.
-    if (startHour == endHour) {
-      return false;
-    }
-    if (startHour < endHour) {
-      return (currentHour >= startHour && currentHour < endHour);
+  static Class<? extends CompactionPolicy> getCompactionPolicyClass(
+      HColumnDescriptor family, Configuration conf) throws IOException {
+    String className = conf.get(COMPACTION_POLICY_KEY,
+      DEFAULT_COMPACTION_POLICY_CLASS.getName());
+
+    try {
+      Class<? extends CompactionPolicy> clazz =
+        Class.forName(className).asSubclass(CompactionPolicy.class);
+      return clazz;
+    } catch (Exception  e) {
+      throw new IOException(
+        "Unable to load configured region compaction policy '"
+        + className + "' for column '" + family.getNameAsString()
+        + "'", e);
     }
-    return (currentHour >= startHour || currentHour < endHour);
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Wed Feb 13 20:58:23 2013
@@ -89,7 +89,7 @@ public class CompactionRequest implement
      * Find out if a given region is in compaction now.
      *
      * @param regionId
-     * @return
+     * @return a CompactionState
      */
     public static CompactionState getCompactionState(
         final long regionId) {
@@ -174,6 +174,11 @@ public class CompactionRequest implement
       return this.hashCode() - request.hashCode();
     }
 
+    @Override
+    public boolean equals(Object obj) {
+      return (this == obj);
+    }
+
     /** Gets the HRegion for the request */
     public HRegion getHRegion() {
       return r;

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java Wed Feb 13 20:58:23 2013
@@ -126,13 +126,21 @@ public class CloseRegionHandler extends 
       // Check that this region is being served here
       HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
       if (region == null) {
-        LOG.warn("Received CLOSE for region " + name +
-            " but currently not serving");
+        LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring");
+        if (zk){
+          LOG.error("The znode is not modified as we are not serving " + name);
+        }
+        // TODO: do better than a simple warning
         return;
       }
 
       // Close the region
       try {
+        if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo, expectedVersion)){
+          // bad znode state
+          return; // We're node deleting the znode, but it's not ours...
+        }
+
         // TODO: If we need to keep updating CLOSING stamp to prevent against
         // a timeout if this is long-running, need to spin up a thread?
         if (region.close(abort) == null) {
@@ -152,7 +160,7 @@ public class CloseRegionHandler extends 
         throw new RuntimeException(t);
       }
 
-      this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName(), destination);
+      this.rsServices.removeFromOnlineRegions(region, destination);
 
       if (this.zk) {
         if (setClosedState(this.expectedVersion, region)) {