You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/14 13:58:21 UTC
svn commit: r1446147 [10/35] - in /hbase/branches/hbase-7290v2: ./ bin/
conf/ dev-support/ hbase-client/ hbase-common/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/
hbase-common/src/...
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java Thu Feb 14 12:58:12 2013
@@ -24,9 +24,11 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
/**
* Defines how value for specific column is interpreted and provides utility
@@ -35,7 +37,8 @@ import com.google.protobuf.ByteString;
* handle null case gracefully. Refer to {@link LongColumnInterpreter} for an
* example.
* <p>
- * Takes two generic parameters. The cell value type of the interpreter is <T>.
+ * Takes two generic parameters and three Message parameters.
+ * The cell value type of the interpreter is <T>.
* During some computations like sum, average, the return type can be different
* than the cell value data type, for eg, sum of int cell values might overflow
* in case of a int result, we should use Long for its result. Therefore, this
@@ -44,12 +47,19 @@ import com.google.protobuf.ByteString;
* <S>. There is a conversion method
* {@link ColumnInterpreter#castToReturnType(Object)} which takes a <T> type and
* returns a <S> type.
+ * The {@link AggregateImplementation} uses PB messages to initialize the
+ * user's ColumnInterpreter implementation, and for sending the responses
+ * back to {@link AggregationClient}.
* @param <T> Cell value data type
* @param <S> Promoted data type
+ * @param <P> PB message that is used to transport initializer specific bytes
+ * @param <Q> PB message that is used to transport Cell (<T>) instance
+ * @param <R> PB message that is used to transport Promoted (<S>) instance
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public interface ColumnInterpreter<T, S> {
+public abstract class ColumnInterpreter<T, S, P extends Message,
+Q extends Message, R extends Message> {
/**
* @param colFamily
@@ -58,7 +68,7 @@ public interface ColumnInterpreter<T, S>
* @return value of type T
* @throws IOException
*/
- T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
+ public abstract T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
throws IOException;
/**
@@ -67,36 +77,36 @@ public interface ColumnInterpreter<T, S>
* @return sum or non null value among (if either of them is null); otherwise
* returns a null.
*/
- public S add(S l1, S l2);
+ public abstract S add(S l1, S l2);
/**
* returns the maximum value for this type T
* @return max
*/
- T getMaxValue();
+ public abstract T getMaxValue();
- T getMinValue();
+ public abstract T getMinValue();
/**
* @param o1
* @param o2
* @return multiplication
*/
- S multiply(S o1, S o2);
+ public abstract S multiply(S o1, S o2);
/**
* @param o
* @return increment
*/
- S increment(S o);
+ public abstract S increment(S o);
/**
* provides casting opportunity between the data types.
* @param o
* @return cast
*/
- S castToReturnType(T o);
+ public abstract S castToReturnType(T o);
/**
* This takes care if either of arguments are null. returns 0 if they are
@@ -105,7 +115,7 @@ public interface ColumnInterpreter<T, S>
* <li>>0 if l1 > l2 or l1 is not null and l2 is null.
* <li>< 0 if l1 < l2 or l1 is null and l2 is not null.
*/
- int compare(final T l1, final T l2);
+ public abstract int compare(final T l1, final T l2);
/**
* used for computing average of <S> data values. Not providing the divide
@@ -114,51 +124,58 @@ public interface ColumnInterpreter<T, S>
* @param l
* @return Average
*/
- double divideForAvg(S o, Long l);
+ public abstract double divideForAvg(S o, Long l);
/**
* This method should return any additional data that is needed on the
* server side to construct the ColumnInterpreter. The server
- * will pass this to the {@link #initialize(ByteString)}
+ * will pass this to the {@link #initialize}
* method. If there is no ColumnInterpreter specific data (for e.g.,
* {@link LongColumnInterpreter}) then null should be returned.
* @return the PB message
*/
- ByteString columnInterpreterSpecificData();
+ public abstract P getRequestData();
/**
- * Return the PB for type T
+ * This method should initialize any field(s) of the ColumnInterpreter with
+ * a parsing of the passed message bytes (used on the server side).
+ * @param msg
+ */
+ public abstract void initialize(P msg);
+
+ /**
+ * This method gets the PB message corresponding to the cell type
* @param t
- * @return PB-message
+ * @return the PB message for the cell-type instance
*/
- ByteString getProtoForCellType(T t);
+ public abstract Q getProtoForCellType(T t);
/**
- * Return the PB for type S
- * @param s
- * @return PB-message
+ * This method gets the PB message corresponding to the cell type
+ * @param q
+ * @return the cell-type instance from the PB message
*/
- ByteString getProtoForPromotedType(S s);
+ public abstract T getCellValueFromProto(Q q);
/**
- * This method should initialize any field(s) of the ColumnInterpreter with
- * a parsing of the passed message bytes (used on the server side).
- * @param bytes
+ * This method gets the PB message corresponding to the promoted type
+ * @param s
+ * @return the PB message for the promoted-type instance
*/
- void initialize(ByteString bytes);
-
+ public abstract R getProtoForPromotedType(S s);
+
/**
- * Converts the bytes in the server's response to the expected type S
- * @param response
- * @return response of type S constructed from the message
+ * This method gets the promoted type from the proto message
+ * @param r
+ * @return the promoted-type instance from the PB message
*/
- S parseResponseAsPromotedType(byte[] response);
-
+ public abstract S getPromotedValueFromProto(R r);
+
/**
* The response message comes as type S. This will convert/cast it to T.
* In some sense, performs the opposite of {@link #castToReturnType(Object)}
* @param response
* @return cast
*/
- T castToCellType(S response);
+ public abstract T castToCellType(S response);
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Thu Feb 14 12:58:12 2013
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
@@ -48,6 +47,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.jar.JarEntry;
@@ -73,6 +74,10 @@ public abstract class CoprocessorHost<E
public static final String WAL_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.wal.classes";
+ //coprocessor jars are put under ${hbase.local.dir}/coprocessor/jars/
+ private static final String COPROCESSOR_JARS_DIR = File.separator
+ + "coprocessor" + File.separator + "jars" + File.separator;
+
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
/** Ordered set of loaded coprocessors with lock */
protected SortedSet<E> coprocessors =
@@ -131,7 +136,7 @@ public abstract class CoprocessorHost<E
protected void loadSystemCoprocessors(Configuration conf, String confKey) {
Class<?> implClass = null;
- // load default coprocessors from configure file
+ // load default coprocessors from configure file
String[] defaultCPClasses = conf.getStrings(confKey);
if (defaultCPClasses == null || defaultCPClasses.length == 0)
return;
@@ -175,7 +180,7 @@ public abstract class CoprocessorHost<E
public E load(Path path, String className, int priority,
Configuration conf) throws IOException {
Class<?> implClass = null;
- LOG.debug("Loading coprocessor class " + className + " with path " +
+ LOG.debug("Loading coprocessor class " + className + " with path " +
path + " and priority " + priority);
ClassLoader cl = null;
@@ -210,13 +215,13 @@ public abstract class CoprocessorHost<E
if (!path.toString().endsWith(".jar")) {
throw new IOException(path.toString() + ": not a jar file?");
}
- FileSystem fs = path.getFileSystem(HBaseConfiguration.create());
- Path dst = new Path(System.getProperty("java.io.tmpdir") +
- java.io.File.separator +"." + pathPrefix +
+ FileSystem fs = path.getFileSystem(this.conf);
+ File parentDir = new File(this.conf.get("hbase.local.dir") + COPROCESSOR_JARS_DIR);
+ parentDir.mkdirs();
+ File dst = new File(parentDir, "." + pathPrefix +
"." + className + "." + System.currentTimeMillis() + ".jar");
- fs.copyToLocalFile(path, dst);
- File tmpLocal = new File(dst.toString());
- tmpLocal.deleteOnExit();
+ fs.copyToLocalFile(path, new Path(dst.toString()));
+ dst.deleteOnExit();
// TODO: code weaving goes here
@@ -229,8 +234,8 @@ public abstract class CoprocessorHost<E
// NOTE: Path.toURL is deprecated (toURI instead) but the URLClassLoader
// unsurprisingly wants URLs, not URIs; so we will use the deprecated
// method which returns URLs for as long as it is available
- List<URL> paths = new ArrayList<URL>();
- URL url = new File(dst.toString()).getCanonicalFile().toURL();
+ final List<URL> paths = new ArrayList<URL>();
+ URL url = dst.getCanonicalFile().toURL();
paths.add(url);
JarFile jarFile = new JarFile(dst.toString());
@@ -238,8 +243,7 @@ public abstract class CoprocessorHost<E
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
if (entry.getName().matches("/lib/[^/]+\\.jar")) {
- File file = new File(System.getProperty("java.io.tmpdir") +
- java.io.File.separator +"." + pathPrefix +
+ File file = new File(parentDir, "." + pathPrefix +
"." + className + "." + System.currentTimeMillis() + "." + entry.getName().substring(5));
IOUtils.copyBytes(jarFile.getInputStream(entry), new FileOutputStream(file), conf, true);
file.deleteOnExit();
@@ -248,7 +252,13 @@ public abstract class CoprocessorHost<E
}
jarFile.close();
- cl = new CoprocessorClassLoader(paths, this.getClass().getClassLoader());
+ cl = AccessController.doPrivileged(new PrivilegedAction<CoprocessorClassLoader>() {
+ @Override
+ public CoprocessorClassLoader run() {
+ return new CoprocessorClassLoader(paths, this.getClass().getClassLoader());
+ }
+ });
+
// cache cp classloader as a weak value, will be GC'ed when no reference left
ClassLoader prev = classLoadersCache.putIfAbsent(path, cl);
if (prev != null) {
@@ -470,6 +480,10 @@ public abstract class CoprocessorHost<E
return table.exists(get);
}
+ public Boolean[] exists(List<Get> gets) throws IOException{
+ return table.exists(gets);
+ }
+
public void put(Put put) throws IOException {
table.put(put);
}
@@ -547,16 +561,6 @@ public abstract class CoprocessorHost<E
return tableName;
}
- public RowLock lockRow(byte[] row) throws IOException {
- throw new RuntimeException(
- "row locking is not allowed within the coprocessor environment");
- }
-
- public void unlockRow(RowLock rl) throws IOException {
- throw new RuntimeException(
- "row locking is not allowed within the coprocessor environment");
- }
-
@Override
public void batch(List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
@@ -587,26 +591,6 @@ public abstract class CoprocessorHost<E
}
@Override
- public <T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol,
- byte[] startKey, byte[] endKey, Batch.Call<T, R> callable,
- Batch.Callback<R> callback) throws IOException, Throwable {
- table.coprocessorExec(protocol, startKey, endKey, callable, callback);
- }
-
- @Override
- public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
- Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
- throws IOException, Throwable {
- return table.coprocessorExec(protocol, startKey, endKey, callable);
- }
-
- @Override
- public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
- byte[] row) {
- return table.coprocessorProxy(protocol, row);
- }
-
- @Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return table.coprocessorService(row);
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Thu Feb 14 12:58:12 2013
@@ -167,12 +167,14 @@ public interface RegionObserver extends
* @param store the store being compacted
* @param scanner the scanner over existing data used in the store file
* rewriting
+ * @param scanType type of Scan
* @return the scanner to use during compaction. Should not be {@code null}
* unless the implementation is writing new store files on its own.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
- final HStore store, final InternalScanner scanner) throws IOException;
+ final HStore store, final InternalScanner scanner,
+ final ScanType scanType) throws IOException;
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into
@@ -735,6 +737,27 @@ public interface RegionObserver extends
throws IOException;
/**
+ * This will be called by the scan flow when the current scanned row is being filtered out by the
+ * filter. The filter may be filtering out the row via any of the below scenarios
+ * <ol>
+ * <li>
+ * <code>boolean filterRowKey(byte [] buffer, int offset, int length)</code> returning true</li>
+ * <li>
+ * <code>boolean filterRow()</code> returning true</li>
+ * <li>
+ * <code>void filterRow(List<KeyValue> kvs)</code> removing all the kvs from the passed List</li>
+ * </ol>
+ * @param c the environment provided by the region server
+ * @param s the scanner
+ * @param currentRow The current rowkey which got filtered out
+ * @param hasMore the 'has more' indication
+ * @return whether more rows are available for the scanner or not
+ * @throws IOException
+ */
+ boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException;
+
+ /**
* Called before the client closes a scanner.
* <p>
* Call CoprocessorEnvironment#bypass to skip default actions
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Thu Feb 14 12:58:12 2013
@@ -67,7 +67,7 @@ public abstract class EventHandler imple
protected Server server;
// sequence id generator for default FIFO ordering of events
- protected static AtomicLong seqids = new AtomicLong(0);
+ protected static final AtomicLong seqids = new AtomicLong(0);
// sequence id for this event
private final long seqid;
@@ -120,7 +120,7 @@ public abstract class EventHandler imple
// Messages originating from Master to RS
M_RS_OPEN_REGION (20, ExecutorType.RS_OPEN_REGION), // Master asking RS to open a region
- M_RS_OPEN_ROOT (21, ExecutorType.RS_OPEN_REGION), // Master asking RS to open root
+ M_RS_OPEN_ROOT (21, ExecutorType.RS_OPEN_ROOT), // Master asking RS to open root
M_RS_OPEN_META (22, ExecutorType.RS_OPEN_META), // Master asking RS to open meta
M_RS_CLOSE_REGION (23, ExecutorType.RS_CLOSE_REGION), // Master asking RS to close a region
M_RS_CLOSE_ROOT (24, ExecutorType.RS_CLOSE_ROOT), // Master asking RS to close root
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java Thu Feb 14 12:58:12 2013
@@ -75,11 +75,10 @@ public abstract class ByteArrayComparabl
* @return true if and only if the fields of the comparator that are serialized
* are equal to the corresponding fields in other. Used for testing.
*/
- boolean areSerializedFieldsEqual(ByteArrayComparable o) {
- if (o == this) return true;
- if (!(o instanceof ByteArrayComparable)) return false;
+ boolean areSerializedFieldsEqual(ByteArrayComparable other) {
+ if (other == this) return true;
- return Bytes.equals(this.getValue(), o.getValue());
+ return Bytes.equals(this.getValue(), other.getValue());
}
@Override
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/CompareFilter.java Thu Feb 14 12:58:12 2013
@@ -156,7 +156,7 @@ public abstract class CompareFilter exte
/**
*
- * @param other
+ * @param o
* @return true if and only if the fields of the filter that are serialized
* are equal to the corresponding fields in other. Used for testing.
*/
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/DependentColumnFilter.java Thu Feb 14 12:58:12 2013
@@ -261,10 +261,12 @@ public class DependentColumnFilter exten
}
/**
- * @param other
+ * @param o
* @return true if and only if the fields of the filter that are serialized
* are equal to the corresponding fields in other. Used for testing.
*/
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
boolean areSerializedFieldsEqual(Filter o) {
if (o == this) return true;
if (!(o instanceof DependentColumnFilter)) return false;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Thu Feb 14 12:58:12 2013
@@ -172,6 +172,14 @@ public abstract class Filter {
abstract public KeyValue getNextKeyHint(final KeyValue currentKV);
/**
+ * Check that given column family is essential for filter to check row. Most
+ * filters always return true here. But some could have more sophisticated
+ * logic which could significantly reduce scanning process by not even
+ * touching columns until we are 100% sure that it's data is needed in result.
+ */
+ abstract public boolean isFamilyEssential(byte[] name);
+
+ /**
* @return The filter serialized using pb
*/
abstract public byte [] toByteArray();
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterBase.java Thu Feb 14 12:58:12 2013
@@ -134,6 +134,16 @@ public abstract class FilterBase extends
}
/**
+ * By default, we require all scan's column families to be present. Our
+ * subclasses may be more precise.
+ *
+ * @inheritDoc
+ */
+ public boolean isFamilyEssential(byte[] name) {
+ return true;
+ }
+
+ /**
* Given the filter's arguments it constructs the filter
* <p>
* @param filterArguments the filter's arguments
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterList.java Thu Feb 14 12:58:12 2013
@@ -361,6 +361,16 @@ public class FilterList extends Filter {
}
@Override
+ public boolean isFamilyEssential(byte[] name) {
+ for (Filter filter : filters) {
+ if (filter.isFamilyEssential(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
public String toString() {
return toString(MAX_LOG_FILTERS);
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FilterWrapper.java Thu Feb 14 12:58:12 2013
@@ -136,6 +136,11 @@ public class FilterWrapper extends Filte
}
}
+ @Override
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ };
+
/**
* @param other
* @return true if and only if the fields of the filter that are serialized
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyOnlyFilter.java Thu Feb 14 12:58:12 2013
@@ -90,13 +90,13 @@ public class FirstKeyOnlyFilter extends
*/
public static FirstKeyOnlyFilter parseFrom(final byte [] pbBytes)
throws DeserializationException {
- FilterProtos.FirstKeyOnlyFilter proto;
+ // There is nothing to deserialize. Why do this at all?
try {
- proto = FilterProtos.FirstKeyOnlyFilter.parseFrom(pbBytes);
+ FilterProtos.FirstKeyOnlyFilter.parseFrom(pbBytes);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
-
+ // Just return a new instance.
return new FirstKeyOnlyFilter();
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/FirstKeyValueMatchingQualifiersFilter.java Thu Feb 14 12:58:12 2013
@@ -18,20 +18,17 @@
package org.apache.hadoop.hbase.filter;
-import java.util.Collections;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.mapreduce.RowCounter;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Set;
+import java.util.TreeSet;
/**
* The filter looks for the given columns in KeyValue. Once there is a match for
@@ -40,7 +37,8 @@ import com.google.protobuf.InvalidProtoc
* <p>
* Note : It may emit KVs which do not have the given columns in them, if
* these KVs happen to occur before a KV which does have a match. Given this
- * caveat, this filter is only useful for special cases like {@link RowCounter}.
+ * caveat, this filter is only useful for special cases
+ * like {@link org.apache.hadoop.hbase.mapreduce.RowCounter}.
* <p>
*/
@InterfaceAudience.Public
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/NullComparator.java Thu Feb 14 12:58:12 2013
@@ -44,6 +44,11 @@ public class NullComparator extends Byte
}
@Override
+ public boolean equals(Object obj) {
+ return obj == null;
+ }
+
+ @Override
public int compareTo(byte[] value, int offset, int length) {
throw new UnsupportedOperationException();
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java Thu Feb 14 12:58:12 2013
@@ -31,6 +31,8 @@ import com.google.protobuf.InvalidProtoc
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
/**
* A {@link Filter} that checks a single column value, but does not emit the
@@ -84,28 +86,31 @@ public class SingleColumnValueExcludeFil
* @param qualifier
* @param compareOp
* @param comparator
- * @param foundColumn
- * @param matchedColumn
* @param filterIfMissing
* @param latestVersionOnly
*/
- protected SingleColumnValueExcludeFilter(final byte[] family, final byte [] qualifier,
- final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
- final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
- super(family,qualifier,compareOp,comparator,foundColumn,
- matchedColumn,filterIfMissing,latestVersionOnly);
- }
-
- public ReturnCode filterKeyValue(KeyValue keyValue) {
- ReturnCode superRetCode = super.filterKeyValue(keyValue);
- if (superRetCode == ReturnCode.INCLUDE) {
+ protected SingleColumnValueExcludeFilter(final byte[] family, final byte[] qualifier,
+ final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+ final boolean latestVersionOnly) {
+ super(family, qualifier, compareOp, comparator, filterIfMissing, latestVersionOnly);
+ }
+
+ // We cleaned result row in FilterRow to be consistent with scanning process.
+ public boolean hasFilterRow() {
+ return true;
+ }
+
+ // Here we remove from row all key values from testing column
+ public void filterRow(List<KeyValue> kvs) {
+ Iterator it = kvs.iterator();
+ while (it.hasNext()) {
+ KeyValue kv = (KeyValue)it.next();
// If the current column is actually the tested column,
// we will skip it instead.
- if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
- return ReturnCode.SKIP;
+ if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
+ it.remove();
}
}
- return superRetCode;
}
public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
@@ -157,11 +162,10 @@ public class SingleColumnValueExcludeFil
throw new DeserializationException(ioe);
}
- return new SingleColumnValueExcludeFilter(
- parentProto.hasColumnFamily()?parentProto.getColumnFamily().toByteArray():null,
- parentProto.hasColumnQualifier()?parentProto.getColumnQualifier().toByteArray():null,
- compareOp, comparator, parentProto.getFoundColumn(),parentProto.getMatchedColumn(),
- parentProto.getFilterIfMissing(),parentProto.getLatestVersionOnly());
+ return new SingleColumnValueExcludeFilter(parentProto.hasColumnFamily() ? parentProto
+ .getColumnFamily().toByteArray() : null, parentProto.hasColumnQualifier() ? parentProto
+ .getColumnQualifier().toByteArray() : null, compareOp, comparator, parentProto
+ .getFilterIfMissing(), parentProto.getLatestVersionOnly());
}
/**
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Thu Feb 14 12:58:12 2013
@@ -128,17 +128,13 @@ public class SingleColumnValueFilter ext
* @param qualifier
* @param compareOp
* @param comparator
- * @param foundColumn
- * @param matchedColumn
* @param filterIfMissing
* @param latestVersionOnly
*/
- protected SingleColumnValueFilter(final byte[] family, final byte [] qualifier,
- final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
- final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
- this(family,qualifier,compareOp,comparator);
- this.foundColumn = foundColumn;
- this.matchedColumn = matchedColumn;
+ protected SingleColumnValueFilter(final byte[] family, final byte[] qualifier,
+ final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+ final boolean latestVersionOnly) {
+ this(family, qualifier, compareOp, comparator);
this.filterIfMissing = filterIfMissing;
this.latestVersionOnly = latestVersionOnly;
}
@@ -313,8 +309,6 @@ public class SingleColumnValueFilter ext
HBaseProtos.CompareType compareOp = CompareType.valueOf(this.compareOp.name());
builder.setCompareOp(compareOp);
builder.setComparator(ProtobufUtil.toComparator(this.comparator));
- builder.setFoundColumn(this.foundColumn);
- builder.setMatchedColumn(this.matchedColumn);
builder.setFilterIfMissing(this.filterIfMissing);
builder.setLatestVersionOnly(this.latestVersionOnly);
@@ -352,11 +346,10 @@ public class SingleColumnValueFilter ext
throw new DeserializationException(ioe);
}
- return new SingleColumnValueFilter(
- proto.hasColumnFamily()?proto.getColumnFamily().toByteArray():null,
- proto.hasColumnQualifier()?proto.getColumnQualifier().toByteArray():null,
- compareOp, comparator, proto.getFoundColumn(),proto.getMatchedColumn(),
- proto.getFilterIfMissing(),proto.getLatestVersionOnly());
+ return new SingleColumnValueFilter(proto.hasColumnFamily() ? proto.getColumnFamily()
+ .toByteArray() : null, proto.hasColumnQualifier() ? proto.getColumnQualifier()
+ .toByteArray() : null, compareOp, comparator, proto.getFilterIfMissing(), proto
+ .getLatestVersionOnly());
}
/**
@@ -373,12 +366,19 @@ public class SingleColumnValueFilter ext
&& Bytes.equals(this.getQualifier(), other.getQualifier())
&& this.compareOp.equals(other.compareOp)
&& this.getComparator().areSerializedFieldsEqual(other.getComparator())
- && this.foundColumn == other.foundColumn
- && this.matchedColumn == other.matchedColumn
&& this.getFilterIfMissing() == other.getFilterIfMissing()
&& this.getLatestVersionOnly() == other.getLatestVersionOnly();
}
+ /**
+ * The only CF this filter needs is given column family. So, it's the only essential
+ * column in whole scan. If filterIfMissing == false, all families are essential,
+ * because of possibility of skipping the rows without any data in filtered CF.
+ */
+ public boolean isFamilyEssential(byte[] name) {
+ return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
+ }
+
@Override
public String toString() {
return String.format("%s (%s, %s, %s, %s)",
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java Thu Feb 14 12:58:12 2013
@@ -138,6 +138,10 @@ public class SkipFilter extends FilterBa
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java Thu Feb 14 12:58:12 2013
@@ -138,6 +138,10 @@ public class WhileMatchFilter extends Fi
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java Thu Feb 14 12:58:12 2013
@@ -33,13 +33,12 @@ import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -47,8 +46,9 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.Closeable;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* An encapsulation for the FileSystem object that hbase uses to access
@@ -259,7 +259,7 @@ public class HFileSystem extends FilterF
final ReorderBlocks lrb, final Configuration conf) {
return (ClientProtocol) Proxy.newProxyInstance
(cp.getClass().getClassLoader(),
- new Class[]{ClientProtocol.class},
+ new Class[]{ClientProtocol.class, Closeable.class},
new InvocationHandler() {
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java Thu Feb 14 12:58:12 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.io;
import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
import com.google.protobuf.ByteString;
@@ -56,7 +54,7 @@ import com.google.protobuf.ByteString;
* references. References are cleaned up by compactions.
*/
@InterfaceAudience.Private
-public class Reference implements Writable {
+public class Reference {
private byte [] splitkey;
private Range region;
@@ -99,7 +97,6 @@ public class Reference implements Writab
/**
* Used by serializations.
- * @deprecated Use the pb serializations instead. Writables are going away.
*/
@Deprecated
// Make this private when it comes time to let go of this constructor. Needed by pb serialization.
@@ -130,18 +127,14 @@ public class Reference implements Writab
return "" + this.region;
}
- /**
- * @deprecated Writables are going away. Use the pb serialization methods instead.
- */
- @Deprecated
- public void write(DataOutput out) throws IOException {
- // Write true if we're doing top of the file.
- out.writeBoolean(isTopFileRegion(this.region));
- Bytes.writeByteArray(out, this.splitkey);
+ public static boolean isTopFileRegion(final Range r) {
+ return r.equals(Range.top);
}
/**
* @deprecated Writables are going away. Use the pb serialization methods instead.
+ * Remove in a release after 0.96 goes out. This is here only to migrate
+ * old Reference files written with Writables before 0.96.
*/
@Deprecated
public void readFields(DataInput in) throws IOException {
@@ -151,10 +144,6 @@ public class Reference implements Writab
this.splitkey = Bytes.readByteArray(in);
}
- public static boolean isTopFileRegion(final Range r) {
- return r.equals(Range.top);
- }
-
public Path write(final FileSystem fs, final Path p)
throws IOException {
FSDataOutputStream out = fs.create(p, false);
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Thu Feb 14 12:58:12 2013
@@ -19,12 +19,8 @@
package org.apache.hadoop.hbase.io;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
@@ -39,7 +35,7 @@ import org.apache.hadoop.hbase.util.Byte
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class TimeRange implements Writable {
+public class TimeRange {
private long minStamp = 0L;
private long maxStamp = Long.MAX_VALUE;
private boolean allTime = false;
@@ -184,17 +180,4 @@ public class TimeRange implements Writab
sb.append(this.minStamp);
return sb.toString();
}
-
- //Writable
- public void readFields(final DataInput in) throws IOException {
- this.minStamp = in.readLong();
- this.maxStamp = in.readLong();
- this.allTime = in.readBoolean();
- }
-
- public void write(final DataOutput out) throws IOException {
- out.writeLong(minStamp);
- out.writeLong(maxStamp);
- out.writeBoolean(this.allTime);
- }
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Thu Feb 14 12:58:12 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Clas
* Cache Key for use with implementations of {@link BlockCache}
*/
@InterfaceAudience.Private
-public class BlockCacheKey implements HeapSize {
+public class BlockCacheKey implements HeapSize, java.io.Serializable {
private final String hfileName;
private final long offset;
private final DataBlockEncoding encoding;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java Thu Feb 14 12:58:12 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.util.StringUtils;
@@ -72,6 +74,28 @@ public class CacheConfig {
public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
"hbase.rs.evictblocksonclose";
+ /**
+ * Configuration keys for Bucket cache
+ */
+ public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
+ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
+ public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
+ "hbase.bucketcache.persistent.path";
+ public static final String BUCKET_CACHE_COMBINED_KEY =
+ "hbase.bucketcache.combinedcache.enabled";
+ public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY =
+ "hbase.bucketcache.percentage.in.combinedcache";
+ public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
+ public static final String BUCKET_CACHE_WRITER_QUEUE_KEY =
+ "hbase.bucketcache.writer.queuelength";
+ /**
+ * Defaults for Bucket cache
+ */
+ public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true;
+ public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
+ public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
+ public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
+
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -341,19 +365,60 @@ public class CacheConfig {
// Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
- long cacheSize = (long)(mu.getMax() * cachePercentage);
+ long lruCacheSize = (long) (mu.getMax() * cachePercentage);
int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
HFile.DEFAULT_BLOCKSIZE);
long offHeapCacheSize =
(long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
DirectMemoryUtils.getDirectMemorySize());
- LOG.info("Allocating LruBlockCache with maximum size " +
- StringUtils.humanReadableInt(cacheSize));
if (offHeapCacheSize <= 0) {
- globalBlockCache = new LruBlockCache(cacheSize,
- StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+ String bucketCacheIOEngineName = conf
+ .get(BUCKET_CACHE_IOENGINE_KEY, null);
+ float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
+ // A percentage of max heap size or a absolute value with unit megabytes
+ long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax()
+ * bucketCachePercentage : bucketCachePercentage * 1024 * 1024);
+
+ boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
+ DEFAULT_BUCKET_CACHE_COMBINED);
+ BucketCache bucketCache = null;
+ if (bucketCacheIOEngineName != null && bucketCacheSize > 0) {
+ int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY,
+ DEFAULT_BUCKET_CACHE_WRITER_THREADS);
+ int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY,
+ DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
+ String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
+ float combinedPercentage = conf.getFloat(
+ BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
+ DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
+ if (combinedWithLru) {
+ lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
+ bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
+ }
+ try {
+ int ioErrorsTolerationDuration = conf.getInt(
+ "hbase.bucketcache.ioengine.errors.tolerated.duration",
+ BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
+ bucketCache = new BucketCache(bucketCacheIOEngineName,
+ bucketCacheSize, writerThreads, writerQueueLen, persistentPath,
+ ioErrorsTolerationDuration);
+ } catch (IOException ioex) {
+ LOG.error("Can't instantiate bucket cache", ioex);
+ throw new RuntimeException(ioex);
+ }
+ }
+ LOG.info("Allocating LruBlockCache with maximum size "
+ + StringUtils.humanReadableInt(lruCacheSize));
+ LruBlockCache lruCache = new LruBlockCache(lruCacheSize,
+ StoreFile.DEFAULT_BLOCKSIZE_SMALL);
+ lruCache.setVictimCache(bucketCache);
+ if (bucketCache != null && combinedWithLru) {
+ globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
+ } else {
+ globalBlockCache = lruCache;
+ }
} else {
- globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
+ globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
}
return globalBlockCache;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java Thu Feb 14 12:58:12 2013
@@ -171,6 +171,22 @@ public class CacheStats {
windowIndex = (windowIndex + 1) % numPeriodsInWindow;
}
+ public long getSumHitCountsPastNPeriods() {
+ return sum(hitCounts);
+ }
+
+ public long getSumRequestCountsPastNPeriods() {
+ return sum(requestCounts);
+ }
+
+ public long getSumHitCachingCountsPastNPeriods() {
+ return sum(hitCachingCounts);
+ }
+
+ public long getSumRequestCachingCountsPastNPeriods() {
+ return sum(requestCachingCounts);
+ }
+
public double getHitRatioPastNPeriods() {
double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts));
return Double.isNaN(ratio) ? 0 : ratio;
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Thu Feb 14 12:58:12 2013
@@ -56,4 +56,9 @@ public interface Cacheable extends HeapS
*/
public CacheableDeserializer<Cacheable> getDeserializer();
+ /**
+ * @return the block type of this cached HFile block
+ */
+ public BlockType getBlockType();
+
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java Thu Feb 14 12:58:12 2013
@@ -34,4 +34,21 @@ public interface CacheableDeserializer<T
* @return T the deserialized object.
*/
public T deserialize(ByteBuffer b) throws IOException;
+
+ /**
+ *
+ * @param b
+ * @param reuse true if Cacheable object can use the given buffer as its
+ * content
+ * @return T the deserialized object.
+ * @throws IOException
+ */
+ public T deserialize(ByteBuffer b, boolean reuse) throws IOException;
+
+ /**
+ * Get the identifier of this deserialiser. Identifier is unique for each
+ * deserializer and generated by {@link CacheableDeserializerIdManager}
+ * @return identifier number of this cacheable deserializer
+ */
+ public int getDeserialiserIdentifier();
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java Thu Feb 14 12:58:12 2013
@@ -96,11 +96,24 @@ public class CachedBlock implements Heap
return size;
}
+ @Override
public int compareTo(CachedBlock that) {
if(this.accessTime == that.accessTime) return 0;
return this.accessTime < that.accessTime ? 1 : -1;
}
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ CachedBlock other = (CachedBlock) obj;
+ return compareTo(other) == 0;
+ }
+
public Cacheable getBuffer() {
return this.buf;
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java Thu Feb 14 12:58:12 2013
@@ -54,7 +54,7 @@ public class ChecksumUtil {
* compute checkums from
* @param endOffset ending offset in the indata stream upto
* which checksums needs to be computed
- * @param outData the output buffer where checksum values are written
+ * @param outdata the output buffer where checksum values are written
* @param outOffset the starting offset in the outdata where the
* checksum values are written
* @param checksumType type of checksum
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java Thu Feb 14 12:58:12 2013
@@ -18,13 +18,10 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
-import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -33,7 +30,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator;
@@ -57,6 +56,9 @@ public class FixedFileTrailer {
private static final Log LOG = LogFactory.getLog(FixedFileTrailer.class);
+ /** HFile minor version that introduced pbuf filetrailer */
+ private static final int PBUF_TRAILER_MINOR_VERSION = 2;
+
/**
* We store the comparator class name as a fixed-length field in the trailer.
*/
@@ -113,7 +115,7 @@ public class FixedFileTrailer {
private long lastDataBlockOffset;
/** Raw key comparator class name in version 2 */
- private String comparatorClassName = RawComparator.class.getName();
+ private String comparatorClassName = KeyValue.KEY_COMPARATOR.getClass().getName();
/** The {@link HFile} format major version. */
private final int majorVersion;
@@ -129,11 +131,10 @@ public class FixedFileTrailer {
private static int[] computeTrailerSizeByVersion() {
int versionToSize[] = new int[HFile.MAX_FORMAT_VERSION + 1];
- for (int version = MIN_FORMAT_VERSION;
- version <= MAX_FORMAT_VERSION;
+ for (int version = HFile.MIN_FORMAT_VERSION;
+ version <= HFile.MAX_FORMAT_VERSION;
++version) {
- FixedFileTrailer fft = new FixedFileTrailer(version,
- HFileBlock.MINOR_VERSION_NO_CHECKSUM);
+ FixedFileTrailer fft = new FixedFileTrailer(version, HFileBlock.MINOR_VERSION_NO_CHECKSUM);
DataOutputStream dos = new DataOutputStream(new NullOutputStream());
try {
fft.serialize(dos);
@@ -148,8 +149,8 @@ public class FixedFileTrailer {
private static int getMaxTrailerSize() {
int maxSize = 0;
- for (int version = MIN_FORMAT_VERSION;
- version <= MAX_FORMAT_VERSION;
+ for (int version = HFile.MIN_FORMAT_VERSION;
+ version <= HFile.MAX_FORMAT_VERSION;
++version)
maxSize = Math.max(getTrailerSize(version), maxSize);
return maxSize;
@@ -158,6 +159,8 @@ public class FixedFileTrailer {
private static final int TRAILER_SIZE[] = computeTrailerSizeByVersion();
private static final int MAX_TRAILER_SIZE = getMaxTrailerSize();
+ private static final int NOT_PB_SIZE = BlockType.MAGIC_LENGTH + Bytes.SIZEOF_INT;
+
static int getTrailerSize(int version) {
return TRAILER_SIZE[version];
}
@@ -178,42 +181,89 @@ public class FixedFileTrailer {
HFile.checkFormatVersion(majorVersion);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutput baosDos = new DataOutputStream(baos);
+ DataOutputStream baosDos = new DataOutputStream(baos);
BlockType.TRAILER.write(baosDos);
- baosDos.writeLong(fileInfoOffset);
- baosDos.writeLong(loadOnOpenDataOffset);
- baosDos.writeInt(dataIndexCount);
+ if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+ serializeAsPB(baosDos);
+ } else {
+ serializeAsWritable(baosDos);
+ }
+
+ // The last 4 bytes of the file encode the major and minor version universally
+ baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
+
+ outputStream.write(baos.toByteArray());
+ }
+
+ /**
+ * Write trailer data as protobuf
+ * @param outputStream
+ * @throws IOException
+ */
+ void serializeAsPB(DataOutputStream output) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ HFileProtos.FileTrailerProto.newBuilder()
+ .setFileInfoOffset(fileInfoOffset)
+ .setLoadOnOpenDataOffset(loadOnOpenDataOffset)
+ .setUncompressedDataIndexSize(uncompressedDataIndexSize)
+ .setTotalUncompressedBytes(totalUncompressedBytes)
+ .setDataIndexCount(dataIndexCount)
+ .setMetaIndexCount(metaIndexCount)
+ .setEntryCount(entryCount)
+ .setNumDataIndexLevels(numDataIndexLevels)
+ .setFirstDataBlockOffset(firstDataBlockOffset)
+ .setLastDataBlockOffset(lastDataBlockOffset)
+ .setComparatorClassName(comparatorClassName)
+ .setCompressionCodec(compressionCodec.ordinal())
+ .build().writeDelimitedTo(baos);
+ output.write(baos.toByteArray());
+ // Pad to make up the difference between variable PB encoding length and the
+ // length when encoded as writable under earlier V2 formats. Failure to pad
+ // properly or if the PB encoding is too big would mean the trailer wont be read
+ // in properly by HFile.
+ int padding = getTrailerSize() - NOT_PB_SIZE - baos.size();
+ if (padding < 0) {
+ throw new IOException("Pbuf encoding size exceeded fixed trailer size limit");
+ }
+ for (int i = 0; i < padding; i++) {
+ output.write(0);
+ }
+ }
+
+ /**
+ * Write trailer data as writable
+ * @param outputStream
+ * @throws IOException
+ */
+ void serializeAsWritable(DataOutputStream output) throws IOException {
+ output.writeLong(fileInfoOffset);
+ output.writeLong(loadOnOpenDataOffset);
+ output.writeInt(dataIndexCount);
if (majorVersion == 1) {
// This used to be metaIndexOffset, but it was not used in version 1.
- baosDos.writeLong(0);
+ output.writeLong(0);
} else {
- baosDos.writeLong(uncompressedDataIndexSize);
+ output.writeLong(uncompressedDataIndexSize);
}
- baosDos.writeInt(metaIndexCount);
- baosDos.writeLong(totalUncompressedBytes);
+ output.writeInt(metaIndexCount);
+ output.writeLong(totalUncompressedBytes);
if (majorVersion == 1) {
- baosDos.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
+ output.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
} else {
// This field is long from version 2 onwards.
- baosDos.writeLong(entryCount);
+ output.writeLong(entryCount);
}
- baosDos.writeInt(compressionCodec.ordinal());
+ output.writeInt(compressionCodec.ordinal());
if (majorVersion > 1) {
- baosDos.writeInt(numDataIndexLevels);
- baosDos.writeLong(firstDataBlockOffset);
- baosDos.writeLong(lastDataBlockOffset);
- Bytes.writeStringFixedSize(baosDos, comparatorClassName,
- MAX_COMPARATOR_NAME_LENGTH);
+ output.writeInt(numDataIndexLevels);
+ output.writeLong(firstDataBlockOffset);
+ output.writeLong(lastDataBlockOffset);
+ Bytes.writeStringFixedSize(output, comparatorClassName, MAX_COMPARATOR_NAME_LENGTH);
}
-
- // serialize the major and minor versions
- baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
-
- outputStream.write(baos.toByteArray());
}
/**
@@ -222,7 +272,6 @@ public class FixedFileTrailer {
* {@link #serialize(DataOutputStream)}.
*
* @param inputStream
- * @param version
* @throws IOException
*/
void deserialize(DataInputStream inputStream) throws IOException {
@@ -230,33 +279,100 @@ public class FixedFileTrailer {
BlockType.TRAILER.readAndCheck(inputStream);
- fileInfoOffset = inputStream.readLong();
- loadOnOpenDataOffset = inputStream.readLong();
- dataIndexCount = inputStream.readInt();
-
- if (majorVersion == 1) {
- inputStream.readLong(); // Read and skip metaIndexOffset.
+ if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+ deserializeFromPB(inputStream);
} else {
- uncompressedDataIndexSize = inputStream.readLong();
- }
- metaIndexCount = inputStream.readInt();
-
- totalUncompressedBytes = inputStream.readLong();
- entryCount = majorVersion == 1 ? inputStream.readInt() : inputStream.readLong();
- compressionCodec = Compression.Algorithm.values()[inputStream.readInt()];
- if (majorVersion > 1) {
- numDataIndexLevels = inputStream.readInt();
- firstDataBlockOffset = inputStream.readLong();
- lastDataBlockOffset = inputStream.readLong();
- comparatorClassName =
- Bytes.readStringFixedSize(inputStream, MAX_COMPARATOR_NAME_LENGTH);
+ deserializeFromWritable(inputStream);
}
+ // The last 4 bytes of the file encode the major and minor version universally
int version = inputStream.readInt();
expectMajorVersion(extractMajorVersion(version));
expectMinorVersion(extractMinorVersion(version));
}
+ /**
+ * Deserialize the file trailer as protobuf
+ * @param inputStream
+ * @throws IOException
+ */
+ void deserializeFromPB(DataInputStream inputStream) throws IOException {
+ // read PB and skip padding
+ int start = inputStream.available();
+ HFileProtos.FileTrailerProto.Builder builder = HFileProtos.FileTrailerProto.newBuilder();
+ builder.mergeDelimitedFrom(inputStream);
+ int size = start - inputStream.available();
+ inputStream.skip(getTrailerSize() - NOT_PB_SIZE - size);
+
+ // process the PB
+ if (builder.hasFileInfoOffset()) {
+ fileInfoOffset = builder.getFileInfoOffset();
+ }
+ if (builder.hasLoadOnOpenDataOffset()) {
+ loadOnOpenDataOffset = builder.getLoadOnOpenDataOffset();
+ }
+ if (builder.hasUncompressedDataIndexSize()) {
+ uncompressedDataIndexSize = builder.getUncompressedDataIndexSize();
+ }
+ if (builder.hasTotalUncompressedBytes()) {
+ totalUncompressedBytes = builder.getTotalUncompressedBytes();
+ }
+ if (builder.hasDataIndexCount()) {
+ dataIndexCount = builder.getDataIndexCount();
+ }
+ if (builder.hasMetaIndexCount()) {
+ metaIndexCount = builder.getMetaIndexCount();
+ }
+ if (builder.hasEntryCount()) {
+ entryCount = builder.getEntryCount();
+ }
+ if (builder.hasNumDataIndexLevels()) {
+ numDataIndexLevels = builder.getNumDataIndexLevels();
+ }
+ if (builder.hasFirstDataBlockOffset()) {
+ firstDataBlockOffset = builder.getFirstDataBlockOffset();
+ }
+ if (builder.hasLastDataBlockOffset()) {
+ lastDataBlockOffset = builder.getLastDataBlockOffset();
+ }
+ if (builder.hasComparatorClassName()) {
+ setComparatorClass(getComparatorClass(builder.getComparatorClassName()));
+ }
+ if (builder.hasCompressionCodec()) {
+ compressionCodec = Compression.Algorithm.values()[builder.getCompressionCodec()];
+ } else {
+ compressionCodec = Compression.Algorithm.NONE;
+ }
+ }
+
+ /**
+ * Deserialize the file trailer as writable data
+ * @param input
+ * @throws IOException
+ */
+ void deserializeFromWritable(DataInput input) throws IOException {
+ fileInfoOffset = input.readLong();
+ loadOnOpenDataOffset = input.readLong();
+ dataIndexCount = input.readInt();
+ if (majorVersion == 1) {
+ input.readLong(); // Read and skip metaIndexOffset.
+ } else {
+ uncompressedDataIndexSize = input.readLong();
+ }
+ metaIndexCount = input.readInt();
+
+ totalUncompressedBytes = input.readLong();
+ entryCount = majorVersion == 1 ? input.readInt() : input.readLong();
+ compressionCodec = Compression.Algorithm.values()[input.readInt()];
+ if (majorVersion > 1) {
+ numDataIndexLevels = input.readInt();
+ firstDataBlockOffset = input.readLong();
+ lastDataBlockOffset = input.readLong();
+ setComparatorClass(getComparatorClass(Bytes.readStringFixedSize(input,
+ MAX_COMPARATOR_NAME_LENGTH)));
+ }
+ }
+
private void append(StringBuilder sb, String s) {
if (sb.length() > 0)
sb.append(", ");
@@ -450,6 +566,10 @@ public class FixedFileTrailer {
this.firstDataBlockOffset = firstDataBlockOffset;
}
+ public String getComparatorClassName() {
+ return comparatorClassName;
+ }
+
/**
* Returns the major version of this HFile format
*/
@@ -466,7 +586,13 @@ public class FixedFileTrailer {
@SuppressWarnings("rawtypes")
public void setComparatorClass(Class<? extends RawComparator> klass) {
- expectAtLeastMajorVersion(2);
+ // Is the comparator instantiable
+ try {
+ klass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Comparator class " + klass.getName() +
+ " is not instantiable", e);
+ }
comparatorClassName = klass.getName();
}
@@ -486,9 +612,11 @@ public class FixedFileTrailer {
try {
return getComparatorClass(comparatorClassName).newInstance();
} catch (InstantiationException e) {
- throw new IOException(e);
+ throw new IOException("Comparator class " + comparatorClassName +
+ " is not instantiable", e);
} catch (IllegalAccessException e) {
- throw new IOException(e);
+ throw new IOException("Comparator class " + comparatorClassName +
+ " is not instantiable", e);
}
}
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Feb 14 12:58:12 2013
@@ -165,6 +165,9 @@ public class HFile {
public final static String DEFAULT_COMPRESSION =
DEFAULT_COMPRESSION_ALGORITHM.getName();
+ /** Meta data block name for bloom filter bits. */
+ public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
+
/**
* We assume that HFile path ends with
* ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
@@ -447,8 +450,6 @@ public class HFile {
CacheConfig cacheConf) {
int version = getFormatVersion(conf);
switch (version) {
- case 1:
- return new HFileWriterV1.WriterFactoryV1(conf, cacheConf);
case 2:
return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
default:
@@ -557,9 +558,6 @@ public class HFile {
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
}
switch (trailer.getMajorVersion()) {
- case 1:
- return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
- cacheConf);
case 2:
return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
size, closeIStream,
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Thu Feb 14 12:58:12 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encodi
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
@@ -129,8 +130,9 @@ public class HFileBlock implements Cache
public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
- static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
- Bytes.SIZEOF_INT;
+ // minorVersion+offset+nextBlockOnDiskSizeWithHeader
+ public static final int EXTRA_SERIALIZATION_SPACE = 2 * Bytes.SIZEOF_INT
+ + Bytes.SIZEOF_LONG;
/**
* Each checksum value is an integer that can be stored in 4 bytes.
@@ -139,22 +141,39 @@ public class HFileBlock implements Cache
private static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
- public HFileBlock deserialize(ByteBuffer buf) throws IOException{
- ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
- - HFileBlock.EXTRA_SERIALIZATION_SPACE);
- buf.limit(buf.limit()
- - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
- newByteBuffer.put(buf);
- HFileBlock ourBuffer = new HFileBlock(newByteBuffer,
- MINOR_VERSION_NO_CHECKSUM);
-
+ public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
+ buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+ ByteBuffer newByteBuffer;
+ if (reuse) {
+ newByteBuffer = buf.slice();
+ } else {
+ newByteBuffer = ByteBuffer.allocate(buf.limit());
+ newByteBuffer.put(buf);
+ }
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+ int minorVersion=buf.getInt();
+ HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion);
ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
return ourBuffer;
}
+
+ @Override
+ public int getDeserialiserIdentifier() {
+ return deserializerIdentifier;
+ }
+
+ @Override
+ public HFileBlock deserialize(ByteBuffer b) throws IOException {
+ return deserialize(b, false);
+ }
};
+ private static final int deserializerIdentifier;
+ static {
+ deserializerIdentifier = CacheableDeserializerIdManager
+ .registerDeserializer(blockDeserializer);
+ }
private BlockType blockType;
@@ -359,6 +378,17 @@ public class HFileBlock implements Cache
}
/**
+ * Returns the buffer of this block, including header data. The clients must
+ * not modify the buffer object. This method has to be public because it is
+ * used in {@link BucketCache} to avoid buffer copy.
+ *
+ * @return the byte buffer with header included for read-only operations
+ */
+ public ByteBuffer getBufferReadOnlyWithHeader() {
+ return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
+ }
+
+ /**
* Returns a byte buffer of this block, including header data, positioned at
* the beginning of header. The underlying data array is not copied.
*
@@ -1287,110 +1317,6 @@ public class HFileBlock implements Cache
}
/**
- * Reads version 1 blocks from the file system. In version 1 blocks,
- * everything is compressed, including the magic record, if compression is
- * enabled. Everything might be uncompressed if no compression is used. This
- * reader returns blocks represented in the uniform version 2 format in
- * memory.
- */
- static class FSReaderV1 extends AbstractFSReader {
-
- /** Header size difference between version 1 and 2 */
- private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM -
- MAGIC_LENGTH;
-
- public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
- long fileSize) throws IOException {
- super(istream, istream, compressAlgo, fileSize, 0, null, null);
- }
-
- /**
- * Read a version 1 block. There is no uncompressed header, and the block
- * type (the magic record) is part of the compressed data. This
- * implementation assumes that the bounded range file input stream is
- * needed to stop the decompressor reading into next block, because the
- * decompressor just grabs a bunch of data without regard to whether it is
- * coming to end of the compressed section.
- *
- * The block returned is still a version 2 block, and in particular, its
- * first {@link #HEADER_SIZE} bytes contain a valid version 2 header.
- *
- * @param offset the offset of the block to read in the file
- * @param onDiskSizeWithMagic the on-disk size of the version 1 block,
- * including the magic record, which is the part of compressed
- * data if using compression
- * @param uncompressedSizeWithMagic uncompressed size of the version 1
- * block, including the magic record
- */
- @Override
- public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
- int uncompressedSizeWithMagic, boolean pread) throws IOException {
- if (uncompressedSizeWithMagic <= 0) {
- throw new IOException("Invalid uncompressedSize="
- + uncompressedSizeWithMagic + " for a version 1 block");
- }
-
- if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE)
- {
- throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic
- + " (maximum allowed: " + Integer.MAX_VALUE + ")");
- }
-
- int onDiskSize = (int) onDiskSizeWithMagic;
-
- if (uncompressedSizeWithMagic < MAGIC_LENGTH) {
- throw new IOException("Uncompressed size for a version 1 block is "
- + uncompressedSizeWithMagic + " but must be at least "
- + MAGIC_LENGTH);
- }
-
- // The existing size already includes magic size, and we are inserting
- // a version 2 header.
- ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic
- + HEADER_DELTA);
-
- int onDiskSizeWithoutHeader;
- if (compressAlgo == Compression.Algorithm.NONE) {
- // A special case when there is no compression.
- if (onDiskSize != uncompressedSizeWithMagic) {
- throw new IOException("onDiskSize=" + onDiskSize
- + " and uncompressedSize=" + uncompressedSizeWithMagic
- + " must be equal for version 1 with no compression");
- }
-
- // The first MAGIC_LENGTH bytes of what this will read will be
- // overwritten.
- readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA,
- onDiskSize, false, offset, pread);
-
- onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
- } else {
- InputStream bufferedBoundedStream = createBufferedBoundedStream(
- offset, onDiskSize, pread);
- Compression.decompress(buf.array(), buf.arrayOffset()
- + HEADER_DELTA, bufferedBoundedStream, onDiskSize,
- uncompressedSizeWithMagic, this.compressAlgo);
-
- // We don't really have a good way to exclude the "magic record" size
- // from the compressed block's size, since it is compressed as well.
- onDiskSizeWithoutHeader = onDiskSize;
- }
-
- BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset()
- + HEADER_DELTA, MAGIC_LENGTH);
-
- // We set the uncompressed size of the new HFile block we are creating
- // to the size of the data portion of the block without the magic record,
- // since the magic record gets moved to the header.
- HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
- uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
- offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(),
- onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM);
- return b;
- }
- }
-
- /**
* We always prefetch the header of the next block, so that we know its
* on-disk size in advance and can read it in one operation.
*/
@@ -1780,7 +1706,17 @@ public class HFileBlock implements Cache
@Override
public void serialize(ByteBuffer destination) {
- destination.put(this.buf.duplicate());
+ ByteBuffer dupBuf = this.buf.duplicate();
+ dupBuf.rewind();
+ destination.put(dupBuf);
+ destination.putInt(this.minorVersion);
+ destination.putLong(this.offset);
+ destination.putInt(this.nextBlockOnDiskSizeWithHeader);
+ destination.rewind();
+ }
+
+ public void serializeExtraInfo(ByteBuffer destination) {
+ destination.putInt(this.minorVersion);
destination.putLong(this.offset);
destination.putInt(this.nextBlockOnDiskSizeWithHeader);
destination.rewind();
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Thu Feb 14 12:58:12 2013
@@ -77,8 +77,8 @@ public class HFileReaderV2 extends Abstr
static final int MIN_MINOR_VERSION = 0;
/** Maximum minor version supported by this HFile format */
- // We went to version 2 when we moved to pb'ing the fileinfo trailer on the file. This version can read Writables
- // version 1 too.
+ // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
+ // the file. This version can read Writables version 1.
static final int MAX_MINOR_VERSION = 2;
/**