You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 13:58:21 UTC

svn commit: r1446147 [10/35] - in /hbase/branches/hbase-7290v2: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/...

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java Thu Feb 14 12:58:12 2013
@@ -24,9 +24,11 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
 import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 
 /**
  * Defines how value for specific column is interpreted and provides utility
@@ -35,7 +37,8 @@ import com.google.protobuf.ByteString;
  * handle null case gracefully. Refer to {@link LongColumnInterpreter} for an
  * example.
  * <p>
- * Takes two generic parameters. The cell value type of the interpreter is <T>.
+ * Takes two generic parameters and three Message parameters. 
+ * The cell value type of the interpreter is <T>.
  * During some computations like sum, average, the return type can be different
  * than the cell value data type, for eg, sum of int cell values might overflow
  * in case of a int result, we should use Long for its result. Therefore, this
@@ -44,12 +47,19 @@ import com.google.protobuf.ByteString;
  * <S>. There is a conversion method
  * {@link ColumnInterpreter#castToReturnType(Object)} which takes a <T> type and
  * returns a <S> type.
+ * The {@link AggregateImplementation} uses PB messages to initialize the 
+ * user's ColumnInterpreter implementation, and for sending the responses
+ * back to {@link AggregationClient}.
  * @param <T> Cell value data type
  * @param <S> Promoted data type
+ * @param <P> PB message that is used to transport initializer specific bytes
+ * @param <Q> PB message that is used to transport Cell (<T>) instance
+ * @param <R> PB message that is used to transport Promoted (<S>) instance
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public interface ColumnInterpreter<T, S> {
+public abstract class ColumnInterpreter<T, S, P extends Message, 
+Q extends Message, R extends Message> {
 
   /**
    * @param colFamily
@@ -58,7 +68,7 @@ public interface ColumnInterpreter<T, S>
    * @return value of type T
    * @throws IOException
    */
-  T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
+  public abstract T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
       throws IOException;
 
   /**
@@ -67,36 +77,36 @@ public interface ColumnInterpreter<T, S>
    * @return sum or non null value among (if either of them is null); otherwise
    * returns a null.
    */
-  public S add(S l1, S l2);
+  public abstract S add(S l1, S l2);
 
   /**
    * returns the maximum value for this type T
    * @return max
    */
 
-  T getMaxValue();
+  public abstract T getMaxValue();
 
-  T getMinValue();
+  public abstract T getMinValue();
 
   /**
    * @param o1
    * @param o2
    * @return multiplication
    */
-  S multiply(S o1, S o2);
+  public abstract S multiply(S o1, S o2);
 
   /**
    * @param o
    * @return increment
    */
-  S increment(S o);
+  public abstract S increment(S o);
 
   /**
    * provides casting opportunity between the data types.
    * @param o
    * @return cast
    */
-  S castToReturnType(T o);
+  public abstract S castToReturnType(T o);
 
   /**
    * This takes care if either of arguments are null. returns 0 if they are
@@ -105,7 +115,7 @@ public interface ColumnInterpreter<T, S>
    * <li>>0 if l1 > l2 or l1 is not null and l2 is null.
    * <li>< 0 if l1 < l2 or l1 is null and l2 is not null.
    */
-  int compare(final T l1, final T l2);
+  public abstract int compare(final T l1, final T l2);
 
   /**
    * used for computing average of <S> data values. Not providing the divide
@@ -114,51 +124,58 @@ public interface ColumnInterpreter<T, S>
    * @param l
    * @return Average
    */
-  double divideForAvg(S o, Long l);
+  public abstract double divideForAvg(S o, Long l);
 
   /**
    * This method should return any additional data that is needed on the
    * server side to construct the ColumnInterpreter. The server
-   * will pass this to the {@link #initialize(ByteString)}
+   * will pass this to the {@link #initialize}
    * method. If there is no ColumnInterpreter specific data (for e.g.,
    * {@link LongColumnInterpreter}) then null should be returned.
    * @return the PB message
    */
-  ByteString columnInterpreterSpecificData();
+  public abstract P getRequestData();
 
   /**
-   * Return the PB for type T
+   * This method should initialize any field(s) of the ColumnInterpreter with
+   * a parsing of the passed message bytes (used on the server side).
+   * @param msg
+   */
+  public abstract void initialize(P msg);
+  
+  /**
+   * This method gets the PB message corresponding to the cell type
    * @param t
-   * @return PB-message
+   * @return the PB message for the cell-type instance
    */
-  ByteString getProtoForCellType(T t);
+  public abstract Q getProtoForCellType(T t);
 
   /**
-   * Return the PB for type S
-   * @param s
-   * @return PB-message
+   * This method gets the PB message corresponding to the cell type
+   * @param q
+   * @return the cell-type instance from the PB message
    */
-  ByteString getProtoForPromotedType(S s);
+  public abstract T getCellValueFromProto(Q q);
 
   /**
-   * This method should initialize any field(s) of the ColumnInterpreter with
-   * a parsing of the passed message bytes (used on the server side).
-   * @param bytes
+   * This method gets the PB message corresponding to the promoted type
+   * @param s
+   * @return the PB message for the promoted-type instance
    */
-  void initialize(ByteString bytes);
-  
+  public abstract R getProtoForPromotedType(S s);
+
   /**
-   * Converts the bytes in the server's response to the expected type S
-   * @param response
-   * @return response of type S constructed from the message
+   * This method gets the promoted type from the proto message
+   * @param r
+   * @return the promoted-type instance from the PB message
    */
-  S parseResponseAsPromotedType(byte[] response);
-  
+  public abstract S getPromotedValueFromProto(R r);
+
   /**
    * The response message comes as type S. This will convert/cast it to T.
    * In some sense, performs the opposite of {@link #castToReturnType(Object)}
    * @param response
    * @return cast
    */
-  T castToCellType(S response);
+  public abstract T castToCellType(S response);
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Thu Feb 14 12:58:12 2013
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
@@ -48,6 +47,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.jar.JarEntry;
@@ -73,6 +74,10 @@ public abstract class CoprocessorHost<E 
   public static final String WAL_COPROCESSOR_CONF_KEY =
     "hbase.coprocessor.wal.classes";
 
+  //coprocessor jars are put under ${hbase.local.dir}/coprocessor/jars/
+  private static final String COPROCESSOR_JARS_DIR = File.separator
+      + "coprocessor" + File.separator + "jars" + File.separator;
+
   private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
   /** Ordered set of loaded coprocessors with lock */
   protected SortedSet<E> coprocessors =
@@ -131,7 +136,7 @@ public abstract class CoprocessorHost<E 
   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
     Class<?> implClass = null;
 
-    // load default coprocessors from configure file    
+    // load default coprocessors from configure file
     String[] defaultCPClasses = conf.getStrings(confKey);
     if (defaultCPClasses == null || defaultCPClasses.length == 0)
       return;
@@ -175,7 +180,7 @@ public abstract class CoprocessorHost<E 
   public E load(Path path, String className, int priority,
       Configuration conf) throws IOException {
     Class<?> implClass = null;
-    LOG.debug("Loading coprocessor class " + className + " with path " + 
+    LOG.debug("Loading coprocessor class " + className + " with path " +
         path + " and priority " + priority);
 
     ClassLoader cl = null;
@@ -210,13 +215,13 @@ public abstract class CoprocessorHost<E 
       if (!path.toString().endsWith(".jar")) {
         throw new IOException(path.toString() + ": not a jar file?");
       }
-      FileSystem fs = path.getFileSystem(HBaseConfiguration.create());
-      Path dst = new Path(System.getProperty("java.io.tmpdir") +
-          java.io.File.separator +"." + pathPrefix +
+      FileSystem fs = path.getFileSystem(this.conf);
+      File parentDir = new File(this.conf.get("hbase.local.dir") + COPROCESSOR_JARS_DIR);
+      parentDir.mkdirs();
+      File dst = new File(parentDir, "." + pathPrefix +
           "." + className + "." + System.currentTimeMillis() + ".jar");
-      fs.copyToLocalFile(path, dst);
-      File tmpLocal = new File(dst.toString());
-      tmpLocal.deleteOnExit();
+      fs.copyToLocalFile(path, new Path(dst.toString()));
+      dst.deleteOnExit();
 
       // TODO: code weaving goes here
 
@@ -229,8 +234,8 @@ public abstract class CoprocessorHost<E 
       // NOTE: Path.toURL is deprecated (toURI instead) but the URLClassLoader
       // unsurprisingly wants URLs, not URIs; so we will use the deprecated
       // method which returns URLs for as long as it is available
-      List<URL> paths = new ArrayList<URL>();
-      URL url = new File(dst.toString()).getCanonicalFile().toURL();
+      final List<URL> paths = new ArrayList<URL>();
+      URL url = dst.getCanonicalFile().toURL();
       paths.add(url);
 
       JarFile jarFile = new JarFile(dst.toString());
@@ -238,8 +243,7 @@ public abstract class CoprocessorHost<E 
       while (entries.hasMoreElements()) {
         JarEntry entry = entries.nextElement();
         if (entry.getName().matches("/lib/[^/]+\\.jar")) {
-          File file = new File(System.getProperty("java.io.tmpdir") +
-              java.io.File.separator +"." + pathPrefix +
+          File file = new File(parentDir, "." + pathPrefix +
               "." + className + "." + System.currentTimeMillis() + "." + entry.getName().substring(5));
           IOUtils.copyBytes(jarFile.getInputStream(entry), new FileOutputStream(file), conf, true);
           file.deleteOnExit();
@@ -248,7 +252,13 @@ public abstract class CoprocessorHost<E 
       }
       jarFile.close();
 
-      cl = new CoprocessorClassLoader(paths, this.getClass().getClassLoader());
+      cl = AccessController.doPrivileged(new PrivilegedAction<CoprocessorClassLoader>() {
+              @Override
+              public CoprocessorClassLoader run() {
+                return new CoprocessorClassLoader(paths, this.getClass().getClassLoader());
+              }
+            });
+
       // cache cp classloader as a weak value, will be GC'ed when no reference left
       ClassLoader prev = classLoadersCache.putIfAbsent(path, cl);
       if (prev != null) {
@@ -470,6 +480,10 @@ public abstract class CoprocessorHost<E 
         return table.exists(get);
       }
 
+      public Boolean[] exists(List<Get> gets) throws IOException{
+        return table.exists(gets);
+      }
+
       public void put(Put put) throws IOException {
         table.put(put);
       }
@@ -547,16 +561,6 @@ public abstract class CoprocessorHost<E 
         return tableName;
       }
 
-      public RowLock lockRow(byte[] row) throws IOException {
-        throw new RuntimeException(
-          "row locking is not allowed within the coprocessor environment");
-      }
-
-      public void unlockRow(RowLock rl) throws IOException {
-        throw new RuntimeException(
-          "row locking is not allowed within the coprocessor environment");
-      }
-
       @Override
       public void batch(List<? extends Row> actions, Object[] results)
           throws IOException, InterruptedException {
@@ -587,26 +591,6 @@ public abstract class CoprocessorHost<E 
       }
 
       @Override
-      public <T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol,
-          byte[] startKey, byte[] endKey, Batch.Call<T, R> callable,
-          Batch.Callback<R> callback) throws IOException, Throwable {
-        table.coprocessorExec(protocol, startKey, endKey, callable, callback);
-      }
-
-      @Override
-      public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
-          Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
-          throws IOException, Throwable {
-        return table.coprocessorExec(protocol, startKey, endKey, callable);
-      }
-
-      @Override
-      public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
-          byte[] row) {
-        return table.coprocessorProxy(protocol, row);
-      }
-
-      @Override
       public CoprocessorRpcChannel coprocessorService(byte[] row) {
         return table.coprocessorService(row);
       }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Thu Feb 14 12:58:12 2013
@@ -167,12 +167,14 @@ public interface RegionObserver extends 
    * @param store the store being compacted
    * @param scanner the scanner over existing data used in the store file
    * rewriting
+   * @param scanType type of Scan
    * @return the scanner to use during compaction.  Should not be {@code null}
    * unless the implementation is writing new store files on its own.
    * @throws IOException if an error occurred on the coprocessor
    */
   InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final HStore store, final InternalScanner scanner) throws IOException;
+      final HStore store, final InternalScanner scanner,
+      final ScanType scanType) throws IOException;
 
   /**
    * Called prior to writing the {@link StoreFile}s selected for compaction into
@@ -735,6 +737,27 @@ public interface RegionObserver extends 
     throws IOException;
 
   /**
+   * This will be called by the scan flow when the current scanned row is being filtered out by the
+   * filter. The filter may be filtering out the row via any of the below scenarios
+   * <ol>
+   * <li>
+   * <code>boolean filterRowKey(byte [] buffer, int offset, int length)</code> returning true</li>
+   * <li>
+   * <code>boolean filterRow()</code> returning true</li>
+   * <li>
+   * <code>void filterRow(List<KeyValue> kvs)</code> removing all the kvs from the passed List</li>
+   * </ol>
+   * @param c the environment provided by the region server
+   * @param s the scanner
+   * @param currentRow The current rowkey which got filtered out
+   * @param hasMore the 'has more' indication
+   * @return whether more rows are available for the scanner or not
+   * @throws IOException
+   */
+  boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException;
+  
+  /**
    * Called before the client closes a scanner.
    * <p>
    * Call CoprocessorEnvironment#bypass to skip default actions

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Thu Feb 14 12:58:12 2013
@@ -67,7 +67,7 @@ public abstract class EventHandler imple
   protected Server server;
 
   // sequence id generator for default FIFO ordering of events
-  protected static AtomicLong seqids = new AtomicLong(0);
+  protected static final AtomicLong seqids = new AtomicLong(0);
 
   // sequence id for this event
   private final long seqid;
@@ -120,7 +120,7 @@ public abstract class EventHandler imple
 
     // Messages originating from Master to RS
     M_RS_OPEN_REGION          (20, ExecutorType.RS_OPEN_REGION),  // Master asking RS to open a region
-    M_RS_OPEN_ROOT            (21, ExecutorType.RS_OPEN_REGION),  // Master asking RS to open root
+    M_RS_OPEN_ROOT            (21, ExecutorType.RS_OPEN_ROOT),  // Master asking RS to open root
     M_RS_OPEN_META            (22, ExecutorType.RS_OPEN_META),  // Master asking RS to open meta
     M_RS_CLOSE_REGION         (23, ExecutorType.RS_CLOSE_REGION),  // Master asking RS to close a region
     M_RS_CLOSE_ROOT           (24, ExecutorType.RS_CLOSE_ROOT),  // Master asking RS to close root

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java Thu Feb 14 12:58:12 2013
@@ -75,11 +75,10 @@ public abstract class ByteArrayComparabl
    * @return true if and only if the fields of the comparator that are serialized
    * are equal to the corresponding fields in other.  Used for testing.
    */
-  boolean areSerializedFieldsEqual(ByteArrayComparable o) {
-    if (o == this) return true;
-    if (!(o instanceof ByteArrayComparable)) return false;
+  boolean areSerializedFieldsEqual(ByteArrayComparable other) {
+    if (other == this) return true;
 
-    return Bytes.equals(this.getValue(), o.getValue());
+    return Bytes.equals(this.getValue(), other.getValue());
   }
 
   @Override

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java Thu Feb 14 12:58:12 2013
@@ -156,7 +156,7 @@ public abstract class CompareFilter exte
 
   /**
    *
-   * @param other
+   * @param o
    * @return true if and only if the fields of the filter that are serialized
    * are equal to the corresponding fields in other.  Used for testing.
    */

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java Thu Feb 14 12:58:12 2013
@@ -261,10 +261,12 @@ public class DependentColumnFilter exten
   }
 
   /**
-   * @param other
+   * @param o
    * @return true if and only if the fields of the filter that are serialized
    * are equal to the corresponding fields in other.  Used for testing.
    */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
   boolean areSerializedFieldsEqual(Filter o) {
     if (o == this) return true;
     if (!(o instanceof DependentColumnFilter)) return false;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Thu Feb 14 12:58:12 2013
@@ -172,6 +172,14 @@ public abstract class Filter {
   abstract public KeyValue getNextKeyHint(final KeyValue currentKV);
 
   /**
+   * Check that given column family is essential for filter to check row.  Most
+   * filters always return true here. But some could have more sophisticated
+   * logic which could significantly reduce scanning process by not even
+   * touching columns until we are 100% sure that it's data is needed in result.
+   */
+  abstract public boolean isFamilyEssential(byte[] name);
+
+  /**
    * @return The filter serialized using pb
    */
   abstract public byte [] toByteArray();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java Thu Feb 14 12:58:12 2013
@@ -134,6 +134,16 @@ public abstract class FilterBase extends
   }
 
   /**
+   * By default, we require all scan's column families to be present. Our
+   * subclasses may be more precise.
+   *
+   * @inheritDoc
+   */
+  public boolean isFamilyEssential(byte[] name) {
+    return true;
+  }
+
+  /**
    * Given the filter's arguments it constructs the filter
    * <p>
    * @param filterArguments the filter's arguments

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java Thu Feb 14 12:58:12 2013
@@ -361,6 +361,16 @@ public class FilterList extends Filter {
   }
 
   @Override
+  public boolean isFamilyEssential(byte[] name) {
+    for (Filter filter : filters) {
+      if (filter.isFamilyEssential(name)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
   public String toString() {
     return toString(MAX_LOG_FILTERS);
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java Thu Feb 14 12:58:12 2013
@@ -136,6 +136,11 @@ public class FilterWrapper extends Filte
     }
   }
 
+  @Override
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  };
+
   /**
    * @param other
    * @return true if and only if the fields of the filter that are serialized

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java Thu Feb 14 12:58:12 2013
@@ -90,13 +90,13 @@ public class FirstKeyOnlyFilter extends 
    */
   public static FirstKeyOnlyFilter parseFrom(final byte [] pbBytes)
   throws DeserializationException {
-    FilterProtos.FirstKeyOnlyFilter proto;
+    // There is nothing to deserialize.  Why do this at all?
     try {
-      proto = FilterProtos.FirstKeyOnlyFilter.parseFrom(pbBytes);
+      FilterProtos.FirstKeyOnlyFilter.parseFrom(pbBytes);
     } catch (InvalidProtocolBufferException e) {
       throw new DeserializationException(e);
     }
-
+    // Just return a new instance.
     return new FirstKeyOnlyFilter();
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java Thu Feb 14 12:58:12 2013
@@ -18,20 +18,17 @@
 
 package org.apache.hadoop.hbase.filter;
 
-import java.util.Collections;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.DeserializationException;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.mapreduce.RowCounter;
 import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * The filter looks for the given columns in KeyValue. Once there is a match for
@@ -40,7 +37,8 @@ import com.google.protobuf.InvalidProtoc
  * <p>
  * Note : It may emit KVs which do not have the given columns in them, if
  * these KVs happen to occur before a KV which does have a match. Given this
- * caveat, this filter is only useful for special cases like {@link RowCounter}.
+ * caveat, this filter is only useful for special cases
+ * like {@link org.apache.hadoop.hbase.mapreduce.RowCounter}.
  * <p>
  */
 @InterfaceAudience.Public

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java Thu Feb 14 12:58:12 2013
@@ -44,6 +44,11 @@ public class NullComparator extends Byte
   }
 
   @Override
+  public boolean equals(Object obj) {
+    return obj == null;
+  }
+
+  @Override
   public int compareTo(byte[] value, int offset, int length) {
     throw new UnsupportedOperationException();
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java Thu Feb 14 12:58:12 2013
@@ -31,6 +31,8 @@ import com.google.protobuf.InvalidProtoc
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
 
 /**
  * A {@link Filter} that checks a single column value, but does not emit the
@@ -84,28 +86,31 @@ public class SingleColumnValueExcludeFil
    * @param qualifier
    * @param compareOp
    * @param comparator
-   * @param foundColumn
-   * @param matchedColumn
    * @param filterIfMissing
    * @param latestVersionOnly
    */
-  protected SingleColumnValueExcludeFilter(final byte[] family, final byte [] qualifier,
-    final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
-    final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
-    super(family,qualifier,compareOp,comparator,foundColumn,
-      matchedColumn,filterIfMissing,latestVersionOnly);
-  }
-
-  public ReturnCode filterKeyValue(KeyValue keyValue) {
-    ReturnCode superRetCode = super.filterKeyValue(keyValue);
-    if (superRetCode == ReturnCode.INCLUDE) {
+  protected SingleColumnValueExcludeFilter(final byte[] family, final byte[] qualifier,
+      final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+      final boolean latestVersionOnly) {
+    super(family, qualifier, compareOp, comparator, filterIfMissing, latestVersionOnly);
+  }
+
+  // We cleaned result row in FilterRow to be consistent with scanning process.
+  public boolean hasFilterRow() {
+   return true;
+  }
+
+  // Here we remove from row all key values from testing column
+  public void filterRow(List<KeyValue> kvs) {
+    Iterator it = kvs.iterator();
+    while (it.hasNext()) {
+      KeyValue kv = (KeyValue)it.next();
       // If the current column is actually the tested column,
       // we will skip it instead.
-      if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
-        return ReturnCode.SKIP;
+      if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
+        it.remove();
       }
     }
-    return superRetCode;
   }
 
   public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
@@ -157,11 +162,10 @@ public class SingleColumnValueExcludeFil
       throw new DeserializationException(ioe);
     }
 
-    return new SingleColumnValueExcludeFilter(
-      parentProto.hasColumnFamily()?parentProto.getColumnFamily().toByteArray():null,
-      parentProto.hasColumnQualifier()?parentProto.getColumnQualifier().toByteArray():null,
-      compareOp, comparator, parentProto.getFoundColumn(),parentProto.getMatchedColumn(),
-      parentProto.getFilterIfMissing(),parentProto.getLatestVersionOnly());
+    return new SingleColumnValueExcludeFilter(parentProto.hasColumnFamily() ? parentProto
+        .getColumnFamily().toByteArray() : null, parentProto.hasColumnQualifier() ? parentProto
+        .getColumnQualifier().toByteArray() : null, compareOp, comparator, parentProto
+        .getFilterIfMissing(), parentProto.getLatestVersionOnly());
   }
 
   /**

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Thu Feb 14 12:58:12 2013
@@ -128,17 +128,13 @@ public class SingleColumnValueFilter ext
    * @param qualifier
    * @param compareOp
    * @param comparator
-   * @param foundColumn
-   * @param matchedColumn
    * @param filterIfMissing
    * @param latestVersionOnly
    */
-  protected SingleColumnValueFilter(final byte[] family, final byte [] qualifier,
-    final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
-    final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
-    this(family,qualifier,compareOp,comparator);
-    this.foundColumn = foundColumn;
-    this.matchedColumn = matchedColumn;
+  protected SingleColumnValueFilter(final byte[] family, final byte[] qualifier,
+      final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+      final boolean latestVersionOnly) {
+    this(family, qualifier, compareOp, comparator);
     this.filterIfMissing = filterIfMissing;
     this.latestVersionOnly = latestVersionOnly;
   }
@@ -313,8 +309,6 @@ public class SingleColumnValueFilter ext
     HBaseProtos.CompareType compareOp = CompareType.valueOf(this.compareOp.name());
     builder.setCompareOp(compareOp);
     builder.setComparator(ProtobufUtil.toComparator(this.comparator));
-    builder.setFoundColumn(this.foundColumn);
-    builder.setMatchedColumn(this.matchedColumn);
     builder.setFilterIfMissing(this.filterIfMissing);
     builder.setLatestVersionOnly(this.latestVersionOnly);
 
@@ -352,11 +346,10 @@ public class SingleColumnValueFilter ext
       throw new DeserializationException(ioe);
     }
 
-    return new SingleColumnValueFilter(
-      proto.hasColumnFamily()?proto.getColumnFamily().toByteArray():null,
-      proto.hasColumnQualifier()?proto.getColumnQualifier().toByteArray():null,
-      compareOp, comparator, proto.getFoundColumn(),proto.getMatchedColumn(),
-      proto.getFilterIfMissing(),proto.getLatestVersionOnly());
+    return new SingleColumnValueFilter(proto.hasColumnFamily() ? proto.getColumnFamily()
+        .toByteArray() : null, proto.hasColumnQualifier() ? proto.getColumnQualifier()
+        .toByteArray() : null, compareOp, comparator, proto.getFilterIfMissing(), proto
+        .getLatestVersionOnly());
   }
 
   /**
@@ -373,12 +366,19 @@ public class SingleColumnValueFilter ext
       && Bytes.equals(this.getQualifier(), other.getQualifier())
       && this.compareOp.equals(other.compareOp)
       && this.getComparator().areSerializedFieldsEqual(other.getComparator())
-      && this.foundColumn == other.foundColumn
-      && this.matchedColumn == other.matchedColumn
       && this.getFilterIfMissing() == other.getFilterIfMissing()
       && this.getLatestVersionOnly() == other.getLatestVersionOnly();
   }
 
+  /**
+   * The only CF this filter needs is given column family. So, it's the only essential
+   * column in whole scan. If filterIfMissing == false, all families are essential,
+   * because of possibility of skipping the rows without any data in filtered CF.
+   */
+  public boolean isFamilyEssential(byte[] name) {
+    return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
+  }
+
   @Override
   public String toString() {
     return String.format("%s (%s, %s, %s, %s)",

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java Thu Feb 14 12:58:12 2013
@@ -138,6 +138,10 @@ public class SkipFilter extends FilterBa
     return getFilter().areSerializedFieldsEqual(other.getFilter());
   }
 
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + " " + this.filter.toString();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java Thu Feb 14 12:58:12 2013
@@ -138,6 +138,10 @@ public class WhileMatchFilter extends Fi
     return getFilter().areSerializedFieldsEqual(other.getFilter());
   }
 
+  public boolean isFamilyEssential(byte[] name) {
+    return filter.isFamilyEssential(name);
+  }
+
   @Override
   public String toString() {
     return this.getClass().getSimpleName() + " " + this.filter.toString();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java Thu Feb 14 12:58:12 2013
@@ -33,13 +33,12 @@ import java.net.URI;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -47,8 +46,9 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * An encapsulation for the FileSystem object that hbase uses to access
@@ -259,7 +259,7 @@ public class HFileSystem extends FilterF
       final ReorderBlocks lrb, final Configuration conf) {
     return (ClientProtocol) Proxy.newProxyInstance
         (cp.getClass().getClassLoader(),
-            new Class[]{ClientProtocol.class},
+            new Class[]{ClientProtocol.class, Closeable.class},
             new InvocationHandler() {
               public Object invoke(Object proxy, Method method,
                                    Object[] args) throws Throwable {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java Thu Feb 14 12:58:12 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.io;
 import java.io.BufferedInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
 
 import com.google.protobuf.ByteString;
 
@@ -56,7 +54,7 @@ import com.google.protobuf.ByteString;
  * references.  References are cleaned up by compactions.
  */
 @InterfaceAudience.Private
-public class Reference implements Writable {
+public class Reference {
   private byte [] splitkey;
   private Range region;
 
@@ -99,7 +97,6 @@ public class Reference implements Writab
 
   /**
    * Used by serializations.
-   * @deprecated Use the pb serializations instead.  Writables are going away.
    */
   @Deprecated
   // Make this private when it comes time to let go of this constructor.  Needed by pb serialization.
@@ -130,18 +127,14 @@ public class Reference implements Writab
     return "" + this.region;
   }
 
-  /**
-   * @deprecated Writables are going away. Use the pb serialization methods instead.
-   */
-  @Deprecated
-  public void write(DataOutput out) throws IOException {
-    // Write true if we're doing top of the file.
-    out.writeBoolean(isTopFileRegion(this.region));
-    Bytes.writeByteArray(out, this.splitkey);
+  public static boolean isTopFileRegion(final Range r) {
+    return r.equals(Range.top);
   }
 
   /**
    * @deprecated Writables are going away. Use the pb serialization methods instead.
+   * Remove in a release after 0.96 goes out.  This is here only to migrate
+   * old Reference files written with Writables before 0.96.
    */
   @Deprecated
   public void readFields(DataInput in) throws IOException {
@@ -151,10 +144,6 @@ public class Reference implements Writab
     this.splitkey = Bytes.readByteArray(in);
   }
 
-  public static boolean isTopFileRegion(final Range r) {
-    return r.equals(Range.top);
-  }
-
   public Path write(final FileSystem fs, final Path p)
   throws IOException {
     FSDataOutputStream out = fs.create(p, false);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Thu Feb 14 12:58:12 2013
@@ -19,12 +19,8 @@
 
 package org.apache.hadoop.hbase.io;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -39,7 +35,7 @@ import org.apache.hadoop.hbase.util.Byte
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class TimeRange implements Writable {
+public class TimeRange {
   private long minStamp = 0L;
   private long maxStamp = Long.MAX_VALUE;
   private boolean allTime = false;
@@ -184,17 +180,4 @@ public class TimeRange implements Writab
     sb.append(this.minStamp);
     return sb.toString();
   }
-
-  //Writable
-  public void readFields(final DataInput in) throws IOException {
-    this.minStamp = in.readLong();
-    this.maxStamp = in.readLong();
-    this.allTime = in.readBoolean();
-  }
-
-  public void write(final DataOutput out) throws IOException {
-    out.writeLong(minStamp);
-    out.writeLong(maxStamp);
-    out.writeBoolean(this.allTime);
-  }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Thu Feb 14 12:58:12 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Clas
  * Cache Key for use with implementations of {@link BlockCache}
  */
 @InterfaceAudience.Private
-public class BlockCacheKey implements HeapSize {
+public class BlockCacheKey implements HeapSize, java.io.Serializable {
   private final String hfileName;
   private final long offset;
   private final DataBlockEncoding encoding;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java Thu Feb 14 12:58:12 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.DirectMemoryUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -72,6 +74,28 @@ public class CacheConfig {
   public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
       "hbase.rs.evictblocksonclose";
 
+  /**
+   * Configuration keys for Bucket cache
+   */
+  public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
+  public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
+  public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = 
+      "hbase.bucketcache.persistent.path";
+  public static final String BUCKET_CACHE_COMBINED_KEY = 
+      "hbase.bucketcache.combinedcache.enabled";
+  public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = 
+      "hbase.bucketcache.percentage.in.combinedcache";
+  public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
+  public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = 
+      "hbase.bucketcache.writer.queuelength";
+  /**
+   * Defaults for Bucket cache
+   */
+  public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true;
+  public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
+  public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
+  public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
+
   // Defaults
 
   public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -341,19 +365,60 @@ public class CacheConfig {
 
     // Calculate the amount of heap to give the heap.
     MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
-    long cacheSize = (long)(mu.getMax() * cachePercentage);
+    long lruCacheSize = (long) (mu.getMax() * cachePercentage);
     int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
         HFile.DEFAULT_BLOCKSIZE);
     long offHeapCacheSize =
       (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
           DirectMemoryUtils.getDirectMemorySize());
-    LOG.info("Allocating LruBlockCache with maximum size " +
-      StringUtils.humanReadableInt(cacheSize));
     if (offHeapCacheSize <= 0) {
-      globalBlockCache = new LruBlockCache(cacheSize,
-          StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+      String bucketCacheIOEngineName = conf
+          .get(BUCKET_CACHE_IOENGINE_KEY, null);
+      float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
+      // A percentage of max heap size or a absolute value with unit megabytes
+      long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax()
+          * bucketCachePercentage : bucketCachePercentage * 1024 * 1024);
+
+      boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
+          DEFAULT_BUCKET_CACHE_COMBINED);
+      BucketCache bucketCache = null;
+      if (bucketCacheIOEngineName != null && bucketCacheSize > 0) {
+        int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY,
+            DEFAULT_BUCKET_CACHE_WRITER_THREADS);
+        int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY,
+            DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
+        String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
+        float combinedPercentage = conf.getFloat(
+            BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
+            DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
+        if (combinedWithLru) {
+          lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
+          bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
+        }
+        try {
+          int ioErrorsTolerationDuration = conf.getInt(
+              "hbase.bucketcache.ioengine.errors.tolerated.duration",
+              BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
+          bucketCache = new BucketCache(bucketCacheIOEngineName,
+              bucketCacheSize, writerThreads, writerQueueLen, persistentPath,
+              ioErrorsTolerationDuration);
+        } catch (IOException ioex) {
+          LOG.error("Can't instantiate bucket cache", ioex);
+          throw new RuntimeException(ioex);
+        }
+      }
+      LOG.info("Allocating LruBlockCache with maximum size "
+          + StringUtils.humanReadableInt(lruCacheSize));
+      LruBlockCache lruCache = new LruBlockCache(lruCacheSize,
+          StoreFile.DEFAULT_BLOCKSIZE_SMALL);
+      lruCache.setVictimCache(bucketCache);
+      if (bucketCache != null && combinedWithLru) {
+        globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
+      } else {
+        globalBlockCache = lruCache;
+      }
     } else {
-      globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
+      globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize,
           StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
     }
     return globalBlockCache;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java Thu Feb 14 12:58:12 2013
@@ -171,6 +171,22 @@ public class CacheStats {
     windowIndex = (windowIndex + 1) % numPeriodsInWindow;
   }
 
+  public long getSumHitCountsPastNPeriods() {
+    return sum(hitCounts);
+  }
+
+  public long getSumRequestCountsPastNPeriods() {
+    return sum(requestCounts);
+  }
+
+  public long getSumHitCachingCountsPastNPeriods() {
+    return sum(hitCachingCounts);
+  }
+
+  public long getSumRequestCachingCountsPastNPeriods() {
+    return sum(requestCachingCounts);
+  }
+
   public double getHitRatioPastNPeriods() {
     double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts));
     return Double.isNaN(ratio) ? 0 : ratio;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Thu Feb 14 12:58:12 2013
@@ -56,4 +56,9 @@ public interface Cacheable extends HeapS
    */
   public CacheableDeserializer<Cacheable> getDeserializer();
 
+  /**
+   * @return the block type of this cached HFile block
+   */
+  public BlockType getBlockType();
+
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java Thu Feb 14 12:58:12 2013
@@ -34,4 +34,21 @@ public interface CacheableDeserializer<T
    * @return T the deserialized object.
    */
   public T deserialize(ByteBuffer b) throws IOException;
+
+  /**
+   * 
+   * @param b
+   * @param reuse true if Cacheable object can use the given buffer as its
+   *          content
+   * @return T the deserialized object.
+   * @throws IOException
+   */
+  public T deserialize(ByteBuffer b, boolean reuse) throws IOException;
+
+  /**
+   * Get the identifier of this deserialiser. Identifier is unique for each
+   * deserializer and generated by {@link CacheableDeserializerIdManager}
+   * @return identifier number of this cacheable deserializer
+   */
+  public int getDeserialiserIdentifier();
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java Thu Feb 14 12:58:12 2013
@@ -96,11 +96,24 @@ public class CachedBlock implements Heap
     return size;
   }
 
+  @Override
   public int compareTo(CachedBlock that) {
     if(this.accessTime == that.accessTime) return 0;
     return this.accessTime < that.accessTime ? 1 : -1;
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    CachedBlock other = (CachedBlock) obj;
+    return compareTo(other) == 0;
+  }
+
   public Cacheable getBuffer() {
     return this.buf;
   }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java Thu Feb 14 12:58:12 2013
@@ -54,7 +54,7 @@ public class ChecksumUtil {
    *                    compute checkums from
    * @param endOffset ending offset in the indata stream upto
    *                   which checksums needs to be computed
-   * @param outData the output buffer where checksum values are written
+   * @param outdata the output buffer where checksum values are written
    * @param outOffset the starting offset in the outdata where the
    *                  checksum values are written
    * @param checksumType type of checksum

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java Thu Feb 14 12:58:12 2013
@@ -18,13 +18,10 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
-import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -33,7 +30,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.RawComparator;
 
@@ -57,6 +56,9 @@ public class FixedFileTrailer {
 
   private static final Log LOG = LogFactory.getLog(FixedFileTrailer.class);
 
+  /** HFile minor version that introduced pbuf filetrailer */
+  private static final int PBUF_TRAILER_MINOR_VERSION = 2;
+
   /**
    * We store the comparator class name as a fixed-length field in the trailer.
    */
@@ -113,7 +115,7 @@ public class FixedFileTrailer {
   private long lastDataBlockOffset;
 
   /** Raw key comparator class name in version 2 */
-  private String comparatorClassName = RawComparator.class.getName();
+  private String comparatorClassName = KeyValue.KEY_COMPARATOR.getClass().getName();
 
   /** The {@link HFile} format major version. */
   private final int majorVersion;
@@ -129,11 +131,10 @@ public class FixedFileTrailer {
 
   private static int[] computeTrailerSizeByVersion() {
     int versionToSize[] = new int[HFile.MAX_FORMAT_VERSION + 1];
-    for (int version = MIN_FORMAT_VERSION;
-         version <= MAX_FORMAT_VERSION;
+    for (int version = HFile.MIN_FORMAT_VERSION;
+         version <= HFile.MAX_FORMAT_VERSION;
          ++version) {
-      FixedFileTrailer fft = new FixedFileTrailer(version, 
-                                   HFileBlock.MINOR_VERSION_NO_CHECKSUM);
+      FixedFileTrailer fft = new FixedFileTrailer(version, HFileBlock.MINOR_VERSION_NO_CHECKSUM);
       DataOutputStream dos = new DataOutputStream(new NullOutputStream());
       try {
         fft.serialize(dos);
@@ -148,8 +149,8 @@ public class FixedFileTrailer {
 
   private static int getMaxTrailerSize() {
     int maxSize = 0;
-    for (int version = MIN_FORMAT_VERSION;
-         version <= MAX_FORMAT_VERSION;
+    for (int version = HFile.MIN_FORMAT_VERSION;
+         version <= HFile.MAX_FORMAT_VERSION;
          ++version)
       maxSize = Math.max(getTrailerSize(version), maxSize);
     return maxSize;
@@ -158,6 +159,8 @@ public class FixedFileTrailer {
   private static final int TRAILER_SIZE[] = computeTrailerSizeByVersion();
   private static final int MAX_TRAILER_SIZE = getMaxTrailerSize();
 
+  private static final int NOT_PB_SIZE = BlockType.MAGIC_LENGTH + Bytes.SIZEOF_INT;
+
   static int getTrailerSize(int version) {
     return TRAILER_SIZE[version];
   }
@@ -178,42 +181,89 @@ public class FixedFileTrailer {
     HFile.checkFormatVersion(majorVersion);
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutput baosDos = new DataOutputStream(baos);
+    DataOutputStream baosDos = new DataOutputStream(baos);
 
     BlockType.TRAILER.write(baosDos);
-    baosDos.writeLong(fileInfoOffset);
-    baosDos.writeLong(loadOnOpenDataOffset);
-    baosDos.writeInt(dataIndexCount);
+    if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+      serializeAsPB(baosDos);
+    } else {
+      serializeAsWritable(baosDos);
+    }
+
+    // The last 4 bytes of the file encode the major and minor version universally
+    baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
+
+    outputStream.write(baos.toByteArray());
+  }
+
+  /**
+   * Write trailer data as protobuf
+   * @param outputStream
+   * @throws IOException
+   */
+  void serializeAsPB(DataOutputStream output) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    HFileProtos.FileTrailerProto.newBuilder()
+      .setFileInfoOffset(fileInfoOffset)
+      .setLoadOnOpenDataOffset(loadOnOpenDataOffset)
+      .setUncompressedDataIndexSize(uncompressedDataIndexSize)
+      .setTotalUncompressedBytes(totalUncompressedBytes)
+      .setDataIndexCount(dataIndexCount)
+      .setMetaIndexCount(metaIndexCount)
+      .setEntryCount(entryCount)
+      .setNumDataIndexLevels(numDataIndexLevels)
+      .setFirstDataBlockOffset(firstDataBlockOffset)
+      .setLastDataBlockOffset(lastDataBlockOffset)
+      .setComparatorClassName(comparatorClassName)
+      .setCompressionCodec(compressionCodec.ordinal())
+      .build().writeDelimitedTo(baos);
+    output.write(baos.toByteArray());
+    // Pad to make up the difference between variable PB encoding length and the
+    // length when encoded as writable under earlier V2 formats. Failure to pad
+    // properly or if the PB encoding is too big would mean the trailer wont be read
+    // in properly by HFile.
+    int padding = getTrailerSize() - NOT_PB_SIZE - baos.size();
+    if (padding < 0) {
+      throw new IOException("Pbuf encoding size exceeded fixed trailer size limit");
+    }
+    for (int i = 0; i < padding; i++) {
+      output.write(0);
+    }
+  }
+
+  /**
+   * Write trailer data as writable
+   * @param outputStream
+   * @throws IOException
+   */
+  void serializeAsWritable(DataOutputStream output) throws IOException {
+    output.writeLong(fileInfoOffset);
+    output.writeLong(loadOnOpenDataOffset);
+    output.writeInt(dataIndexCount);
 
     if (majorVersion == 1) {
       // This used to be metaIndexOffset, but it was not used in version 1.
-      baosDos.writeLong(0);
+      output.writeLong(0);
     } else {
-      baosDos.writeLong(uncompressedDataIndexSize);
+      output.writeLong(uncompressedDataIndexSize);
     }
 
-    baosDos.writeInt(metaIndexCount);
-    baosDos.writeLong(totalUncompressedBytes);
+    output.writeInt(metaIndexCount);
+    output.writeLong(totalUncompressedBytes);
     if (majorVersion == 1) {
-      baosDos.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
+      output.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
     } else {
       // This field is long from version 2 onwards.
-      baosDos.writeLong(entryCount);
+      output.writeLong(entryCount);
     }
-    baosDos.writeInt(compressionCodec.ordinal());
+    output.writeInt(compressionCodec.ordinal());
 
     if (majorVersion > 1) {
-      baosDos.writeInt(numDataIndexLevels);
-      baosDos.writeLong(firstDataBlockOffset);
-      baosDos.writeLong(lastDataBlockOffset);
-      Bytes.writeStringFixedSize(baosDos, comparatorClassName,
-          MAX_COMPARATOR_NAME_LENGTH);
+      output.writeInt(numDataIndexLevels);
+      output.writeLong(firstDataBlockOffset);
+      output.writeLong(lastDataBlockOffset);
+      Bytes.writeStringFixedSize(output, comparatorClassName, MAX_COMPARATOR_NAME_LENGTH);
     }
-
-    // serialize the major and minor versions
-    baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
-
-    outputStream.write(baos.toByteArray());
   }
 
   /**
@@ -222,7 +272,6 @@ public class FixedFileTrailer {
    * {@link #serialize(DataOutputStream)}.
    *
    * @param inputStream
-   * @param version
    * @throws IOException
    */
   void deserialize(DataInputStream inputStream) throws IOException {
@@ -230,33 +279,100 @@ public class FixedFileTrailer {
 
     BlockType.TRAILER.readAndCheck(inputStream);
 
-    fileInfoOffset = inputStream.readLong();
-    loadOnOpenDataOffset = inputStream.readLong();
-    dataIndexCount = inputStream.readInt();
-
-    if (majorVersion == 1) {
-      inputStream.readLong(); // Read and skip metaIndexOffset.
+    if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+      deserializeFromPB(inputStream);
     } else {
-      uncompressedDataIndexSize = inputStream.readLong();
-    }
-    metaIndexCount = inputStream.readInt();
-
-    totalUncompressedBytes = inputStream.readLong();
-    entryCount = majorVersion == 1 ? inputStream.readInt() : inputStream.readLong();
-    compressionCodec = Compression.Algorithm.values()[inputStream.readInt()];
-    if (majorVersion > 1) {
-      numDataIndexLevels = inputStream.readInt();
-      firstDataBlockOffset = inputStream.readLong();
-      lastDataBlockOffset = inputStream.readLong();
-      comparatorClassName =
-          Bytes.readStringFixedSize(inputStream, MAX_COMPARATOR_NAME_LENGTH);
+      deserializeFromWritable(inputStream);
     }
 
+    // The last 4 bytes of the file encode the major and minor version universally
     int version = inputStream.readInt();
     expectMajorVersion(extractMajorVersion(version));
     expectMinorVersion(extractMinorVersion(version));
   }
 
+  /**
+   * Deserialize the file trailer as protobuf
+   * @param inputStream
+   * @throws IOException
+   */
+  void deserializeFromPB(DataInputStream inputStream) throws IOException {
+    // read PB and skip padding
+    int start = inputStream.available();
+    HFileProtos.FileTrailerProto.Builder builder = HFileProtos.FileTrailerProto.newBuilder();
+    builder.mergeDelimitedFrom(inputStream);
+    int size = start - inputStream.available();
+    inputStream.skip(getTrailerSize() - NOT_PB_SIZE - size);
+
+    // process the PB
+    if (builder.hasFileInfoOffset()) {
+      fileInfoOffset = builder.getFileInfoOffset();
+    }
+    if (builder.hasLoadOnOpenDataOffset()) {
+      loadOnOpenDataOffset = builder.getLoadOnOpenDataOffset();
+    }
+    if (builder.hasUncompressedDataIndexSize()) {
+      uncompressedDataIndexSize = builder.getUncompressedDataIndexSize();
+    }
+    if (builder.hasTotalUncompressedBytes()) {
+      totalUncompressedBytes = builder.getTotalUncompressedBytes();
+    }
+    if (builder.hasDataIndexCount()) {
+      dataIndexCount = builder.getDataIndexCount();
+    }
+    if (builder.hasMetaIndexCount()) {
+      metaIndexCount = builder.getMetaIndexCount();
+    }
+    if (builder.hasEntryCount()) {
+      entryCount = builder.getEntryCount();
+    }
+    if (builder.hasNumDataIndexLevels()) {
+      numDataIndexLevels = builder.getNumDataIndexLevels();
+    }
+    if (builder.hasFirstDataBlockOffset()) {
+      firstDataBlockOffset = builder.getFirstDataBlockOffset();
+    }
+    if (builder.hasLastDataBlockOffset()) {
+      lastDataBlockOffset = builder.getLastDataBlockOffset();
+    }
+    if (builder.hasComparatorClassName()) {
+      setComparatorClass(getComparatorClass(builder.getComparatorClassName()));
+    }
+    if (builder.hasCompressionCodec()) {
+      compressionCodec = Compression.Algorithm.values()[builder.getCompressionCodec()];
+    } else {
+      compressionCodec = Compression.Algorithm.NONE;
+    }
+  }
+
+  /**
+   * Deserialize the file trailer as writable data
+   * @param input
+   * @throws IOException
+   */
+  void deserializeFromWritable(DataInput input) throws IOException {
+    fileInfoOffset = input.readLong();
+    loadOnOpenDataOffset = input.readLong();
+    dataIndexCount = input.readInt();
+    if (majorVersion == 1) {
+      input.readLong(); // Read and skip metaIndexOffset.
+    } else {
+      uncompressedDataIndexSize = input.readLong();
+    }
+    metaIndexCount = input.readInt();
+
+    totalUncompressedBytes = input.readLong();
+    entryCount = majorVersion == 1 ? input.readInt() : input.readLong();
+    compressionCodec = Compression.Algorithm.values()[input.readInt()];
+    if (majorVersion > 1) {
+      numDataIndexLevels = input.readInt();
+      firstDataBlockOffset = input.readLong();
+      lastDataBlockOffset = input.readLong();
+      setComparatorClass(getComparatorClass(Bytes.readStringFixedSize(input,
+        MAX_COMPARATOR_NAME_LENGTH)));
+    }
+  }
+  
   private void append(StringBuilder sb, String s) {
     if (sb.length() > 0)
       sb.append(", ");
@@ -450,6 +566,10 @@ public class FixedFileTrailer {
     this.firstDataBlockOffset = firstDataBlockOffset;
   }
 
+  public String getComparatorClassName() {
+    return comparatorClassName;
+  }
+
   /**
    * Returns the major version of this HFile format
    */
@@ -466,7 +586,13 @@ public class FixedFileTrailer {
 
   @SuppressWarnings("rawtypes")
   public void setComparatorClass(Class<? extends RawComparator> klass) {
-    expectAtLeastMajorVersion(2);
+    // Is the comparator instantiable
+    try {
+      klass.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException("Comparator class " + klass.getName() +
+        " is not instantiable", e);
+    }
     comparatorClassName = klass.getName();
   }
 
@@ -486,9 +612,11 @@ public class FixedFileTrailer {
     try {
       return getComparatorClass(comparatorClassName).newInstance();
     } catch (InstantiationException e) {
-      throw new IOException(e);
+      throw new IOException("Comparator class " + comparatorClassName +
+        " is not instantiable", e);
     } catch (IllegalAccessException e) {
-      throw new IOException(e);
+      throw new IOException("Comparator class " + comparatorClassName +
+        " is not instantiable", e);
     }
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Feb 14 12:58:12 2013
@@ -165,6 +165,9 @@ public class HFile {
   public final static String DEFAULT_COMPRESSION =
     DEFAULT_COMPRESSION_ALGORITHM.getName();
 
+  /** Meta data block name for bloom filter bits. */
+  public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
+
   /**
    * We assume that HFile path ends with
    * ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
@@ -447,8 +450,6 @@ public class HFile {
       CacheConfig cacheConf) {
     int version = getFormatVersion(conf);
     switch (version) {
-    case 1:
-      return new HFileWriterV1.WriterFactoryV1(conf, cacheConf);
     case 2:
       return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
     default:
@@ -557,9 +558,6 @@ public class HFile {
       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
     }
     switch (trailer.getMajorVersion()) {
-    case 1:
-      return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
-          cacheConf);
     case 2:
       return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
           size, closeIStream,

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Thu Feb 14 12:58:12 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
@@ -129,8 +130,9 @@ public class HFileBlock implements Cache
   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
 
-  static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
-      Bytes.SIZEOF_INT;
+  // minorVersion+offset+nextBlockOnDiskSizeWithHeader
+  public static final int EXTRA_SERIALIZATION_SPACE = 2 * Bytes.SIZEOF_INT
+      + Bytes.SIZEOF_LONG;
 
   /**
    * Each checksum value is an integer that can be stored in 4 bytes.
@@ -139,22 +141,39 @@ public class HFileBlock implements Cache
 
   private static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
-        public HFileBlock deserialize(ByteBuffer buf) throws IOException{
-          ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
-              - HFileBlock.EXTRA_SERIALIZATION_SPACE);
-          buf.limit(buf.limit()
-              - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
-          newByteBuffer.put(buf);
-          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, 
-                                   MINOR_VERSION_NO_CHECKSUM);
-
+        public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
+          buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+          ByteBuffer newByteBuffer;
+          if (reuse) {
+            newByteBuffer = buf.slice();
+          } else {
+           newByteBuffer = ByteBuffer.allocate(buf.limit());
+           newByteBuffer.put(buf);
+          }
           buf.position(buf.limit());
           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+          int minorVersion=buf.getInt();
+          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion);
           ourBuffer.offset = buf.getLong();
           ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
           return ourBuffer;
         }
+        
+        @Override
+        public int getDeserialiserIdentifier() {
+          return deserializerIdentifier;
+        }
+
+        @Override
+        public HFileBlock deserialize(ByteBuffer b) throws IOException {
+          return deserialize(b, false);
+        }
       };
+  private static final int deserializerIdentifier;
+  static {
+    deserializerIdentifier = CacheableDeserializerIdManager
+        .registerDeserializer(blockDeserializer);
+  }
 
   private BlockType blockType;
 
@@ -359,6 +378,17 @@ public class HFileBlock implements Cache
   }
 
   /**
+   * Returns the buffer of this block, including header data. The clients must
+   * not modify the buffer object. This method has to be public because it is
+   * used in {@link BucketCache} to avoid buffer copy.
+   * 
+   * @return the byte buffer with header included for read-only operations
+   */
+  public ByteBuffer getBufferReadOnlyWithHeader() {
+    return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
+  }
+
+  /**
    * Returns a byte buffer of this block, including header data, positioned at
    * the beginning of header. The underlying data array is not copied.
    *
@@ -1287,110 +1317,6 @@ public class HFileBlock implements Cache
   }
 
   /**
-   * Reads version 1 blocks from the file system. In version 1 blocks,
-   * everything is compressed, including the magic record, if compression is
-   * enabled. Everything might be uncompressed if no compression is used. This
-   * reader returns blocks represented in the uniform version 2 format in
-   * memory.
-   */
-  static class FSReaderV1 extends AbstractFSReader {
-
-    /** Header size difference between version 1 and 2 */
-    private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM - 
-                                            MAGIC_LENGTH;
-
-    public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
-        long fileSize) throws IOException {
-      super(istream, istream, compressAlgo, fileSize, 0, null, null);
-    }
-
-    /**
-     * Read a version 1 block. There is no uncompressed header, and the block
-     * type (the magic record) is part of the compressed data. This
-     * implementation assumes that the bounded range file input stream is
-     * needed to stop the decompressor reading into next block, because the
-     * decompressor just grabs a bunch of data without regard to whether it is
-     * coming to end of the compressed section.
-     *
-     * The block returned is still a version 2 block, and in particular, its
-     * first {@link #HEADER_SIZE} bytes contain a valid version 2 header.
-     *
-     * @param offset the offset of the block to read in the file
-     * @param onDiskSizeWithMagic the on-disk size of the version 1 block,
-     *          including the magic record, which is the part of compressed
-     *          data if using compression
-     * @param uncompressedSizeWithMagic uncompressed size of the version 1
-     *          block, including the magic record
-     */
-    @Override
-    public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
-        int uncompressedSizeWithMagic, boolean pread) throws IOException {
-      if (uncompressedSizeWithMagic <= 0) {
-        throw new IOException("Invalid uncompressedSize="
-            + uncompressedSizeWithMagic + " for a version 1 block");
-      }
-
-      if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE)
-      {
-        throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic
-            + " (maximum allowed: " + Integer.MAX_VALUE + ")");
-      }
-
-      int onDiskSize = (int) onDiskSizeWithMagic;
-
-      if (uncompressedSizeWithMagic < MAGIC_LENGTH) {
-        throw new IOException("Uncompressed size for a version 1 block is "
-            + uncompressedSizeWithMagic + " but must be at least "
-            + MAGIC_LENGTH);
-      }
-
-      // The existing size already includes magic size, and we are inserting
-      // a version 2 header.
-      ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic
-          + HEADER_DELTA);
-
-      int onDiskSizeWithoutHeader;
-      if (compressAlgo == Compression.Algorithm.NONE) {
-        // A special case when there is no compression.
-        if (onDiskSize != uncompressedSizeWithMagic) {
-          throw new IOException("onDiskSize=" + onDiskSize
-              + " and uncompressedSize=" + uncompressedSizeWithMagic
-              + " must be equal for version 1 with no compression");
-        }
-
-        // The first MAGIC_LENGTH bytes of what this will read will be
-        // overwritten.
-        readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA,
-            onDiskSize, false, offset, pread);
-
-        onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
-      } else {
-        InputStream bufferedBoundedStream = createBufferedBoundedStream(
-            offset, onDiskSize, pread);
-        Compression.decompress(buf.array(), buf.arrayOffset()
-            + HEADER_DELTA, bufferedBoundedStream, onDiskSize,
-            uncompressedSizeWithMagic, this.compressAlgo);
-
-        // We don't really have a good way to exclude the "magic record" size
-        // from the compressed block's size, since it is compressed as well.
-        onDiskSizeWithoutHeader = onDiskSize;
-      }
-
-      BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset()
-          + HEADER_DELTA, MAGIC_LENGTH);
-
-      // We set the uncompressed size of the new HFile block we are creating
-      // to the size of the data portion of the block without the magic record,
-      // since the magic record gets moved to the header.
-      HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
-          uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
-          offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(),
-          onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM);
-      return b;
-    }
-  }
-
-  /**
    * We always prefetch the header of the next block, so that we know its
    * on-disk size in advance and can read it in one operation.
    */
@@ -1780,7 +1706,17 @@ public class HFileBlock implements Cache
 
   @Override
   public void serialize(ByteBuffer destination) {
-    destination.put(this.buf.duplicate());
+    ByteBuffer dupBuf = this.buf.duplicate();
+    dupBuf.rewind();
+    destination.put(dupBuf);
+    destination.putInt(this.minorVersion);
+    destination.putLong(this.offset);
+    destination.putInt(this.nextBlockOnDiskSizeWithHeader);
+    destination.rewind();
+  }
+
+  public void serializeExtraInfo(ByteBuffer destination) {
+    destination.putInt(this.minorVersion);
     destination.putLong(this.offset);
     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
     destination.rewind();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Thu Feb 14 12:58:12 2013
@@ -77,8 +77,8 @@ public class HFileReaderV2 extends Abstr
   static final int MIN_MINOR_VERSION = 0;
 
   /** Maximum minor version supported by this HFile format */
-  // We went to version 2 when we moved to pb'ing the fileinfo trailer on the file. This version can read Writables
-  // version 1 too.
+  // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
+  // the file. This version can read Writables version 1.
   static final int MAX_MINOR_VERSION = 2;
 
   /**