You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 21:58:32 UTC
svn commit: r1445918 [8/29] - in /hbase/branches/hbase-7290: ./ bin/ conf/
dev-support/ hbase-client/ hbase-common/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/
hbase-common/src/mai...
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueExcludeFilter.java Wed Feb 13 20:58:23 2013
@@ -31,6 +31,8 @@ import com.google.protobuf.InvalidProtoc
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
/**
* A {@link Filter} that checks a single column value, but does not emit the
@@ -84,28 +86,31 @@ public class SingleColumnValueExcludeFil
* @param qualifier
* @param compareOp
* @param comparator
- * @param foundColumn
- * @param matchedColumn
* @param filterIfMissing
* @param latestVersionOnly
*/
- protected SingleColumnValueExcludeFilter(final byte[] family, final byte [] qualifier,
- final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
- final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
- super(family,qualifier,compareOp,comparator,foundColumn,
- matchedColumn,filterIfMissing,latestVersionOnly);
- }
-
- public ReturnCode filterKeyValue(KeyValue keyValue) {
- ReturnCode superRetCode = super.filterKeyValue(keyValue);
- if (superRetCode == ReturnCode.INCLUDE) {
+ protected SingleColumnValueExcludeFilter(final byte[] family, final byte[] qualifier,
+ final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+ final boolean latestVersionOnly) {
+ super(family, qualifier, compareOp, comparator, filterIfMissing, latestVersionOnly);
+ }
+
+ // We cleaned result row in FilterRow to be consistent with scanning process.
+ public boolean hasFilterRow() {
+ return true;
+ }
+
+ // Here we remove from row all key values from testing column
+ public void filterRow(List<KeyValue> kvs) {
+ Iterator it = kvs.iterator();
+ while (it.hasNext()) {
+ KeyValue kv = (KeyValue)it.next();
// If the current column is actually the tested column,
// we will skip it instead.
- if (keyValue.matchingColumn(this.columnFamily, this.columnQualifier)) {
- return ReturnCode.SKIP;
+ if (kv.matchingColumn(this.columnFamily, this.columnQualifier)) {
+ it.remove();
}
}
- return superRetCode;
}
public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
@@ -157,11 +162,10 @@ public class SingleColumnValueExcludeFil
throw new DeserializationException(ioe);
}
- return new SingleColumnValueExcludeFilter(
- parentProto.hasColumnFamily()?parentProto.getColumnFamily().toByteArray():null,
- parentProto.hasColumnQualifier()?parentProto.getColumnQualifier().toByteArray():null,
- compareOp, comparator, parentProto.getFoundColumn(),parentProto.getMatchedColumn(),
- parentProto.getFilterIfMissing(),parentProto.getLatestVersionOnly());
+ return new SingleColumnValueExcludeFilter(parentProto.hasColumnFamily() ? parentProto
+ .getColumnFamily().toByteArray() : null, parentProto.hasColumnQualifier() ? parentProto
+ .getColumnQualifier().toByteArray() : null, compareOp, comparator, parentProto
+ .getFilterIfMissing(), parentProto.getLatestVersionOnly());
}
/**
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Wed Feb 13 20:58:23 2013
@@ -128,17 +128,13 @@ public class SingleColumnValueFilter ext
* @param qualifier
* @param compareOp
* @param comparator
- * @param foundColumn
- * @param matchedColumn
* @param filterIfMissing
* @param latestVersionOnly
*/
- protected SingleColumnValueFilter(final byte[] family, final byte [] qualifier,
- final CompareOp compareOp, ByteArrayComparable comparator, final boolean foundColumn,
- final boolean matchedColumn, final boolean filterIfMissing, final boolean latestVersionOnly) {
- this(family,qualifier,compareOp,comparator);
- this.foundColumn = foundColumn;
- this.matchedColumn = matchedColumn;
+ protected SingleColumnValueFilter(final byte[] family, final byte[] qualifier,
+ final CompareOp compareOp, ByteArrayComparable comparator, final boolean filterIfMissing,
+ final boolean latestVersionOnly) {
+ this(family, qualifier, compareOp, comparator);
this.filterIfMissing = filterIfMissing;
this.latestVersionOnly = latestVersionOnly;
}
@@ -313,8 +309,6 @@ public class SingleColumnValueFilter ext
HBaseProtos.CompareType compareOp = CompareType.valueOf(this.compareOp.name());
builder.setCompareOp(compareOp);
builder.setComparator(ProtobufUtil.toComparator(this.comparator));
- builder.setFoundColumn(this.foundColumn);
- builder.setMatchedColumn(this.matchedColumn);
builder.setFilterIfMissing(this.filterIfMissing);
builder.setLatestVersionOnly(this.latestVersionOnly);
@@ -352,11 +346,10 @@ public class SingleColumnValueFilter ext
throw new DeserializationException(ioe);
}
- return new SingleColumnValueFilter(
- proto.hasColumnFamily()?proto.getColumnFamily().toByteArray():null,
- proto.hasColumnQualifier()?proto.getColumnQualifier().toByteArray():null,
- compareOp, comparator, proto.getFoundColumn(),proto.getMatchedColumn(),
- proto.getFilterIfMissing(),proto.getLatestVersionOnly());
+ return new SingleColumnValueFilter(proto.hasColumnFamily() ? proto.getColumnFamily()
+ .toByteArray() : null, proto.hasColumnQualifier() ? proto.getColumnQualifier()
+ .toByteArray() : null, compareOp, comparator, proto.getFilterIfMissing(), proto
+ .getLatestVersionOnly());
}
/**
@@ -373,12 +366,19 @@ public class SingleColumnValueFilter ext
&& Bytes.equals(this.getQualifier(), other.getQualifier())
&& this.compareOp.equals(other.compareOp)
&& this.getComparator().areSerializedFieldsEqual(other.getComparator())
- && this.foundColumn == other.foundColumn
- && this.matchedColumn == other.matchedColumn
&& this.getFilterIfMissing() == other.getFilterIfMissing()
&& this.getLatestVersionOnly() == other.getLatestVersionOnly();
}
+ /**
+ * The only CF this filter needs is given column family. So, it's the only essential
+ * column in whole scan. If filterIfMissing == false, all families are essential,
+ * because of possibility of skipping the rows without any data in filtered CF.
+ */
+ public boolean isFamilyEssential(byte[] name) {
+ return !this.filterIfMissing || Bytes.equals(name, this.columnFamily);
+ }
+
@Override
public String toString() {
return String.format("%s (%s, %s, %s, %s)",
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/SkipFilter.java Wed Feb 13 20:58:23 2013
@@ -138,6 +138,10 @@ public class SkipFilter extends FilterBa
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/filter/WhileMatchFilter.java Wed Feb 13 20:58:23 2013
@@ -138,6 +138,10 @@ public class WhileMatchFilter extends Fi
return getFilter().areSerializedFieldsEqual(other.getFilter());
}
+ public boolean isFamilyEssential(byte[] name) {
+ return filter.isFamilyEssential(name);
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + this.filter.toString();
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java Wed Feb 13 20:58:23 2013
@@ -33,13 +33,12 @@ import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -47,8 +46,9 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.Closeable;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* An encapsulation for the FileSystem object that hbase uses to access
@@ -259,7 +259,7 @@ public class HFileSystem extends FilterF
final ReorderBlocks lrb, final Configuration conf) {
return (ClientProtocol) Proxy.newProxyInstance
(cp.getClass().getClassLoader(),
- new Class[]{ClientProtocol.class},
+ new Class[]{ClientProtocol.class, Closeable.class},
new InvocationHandler() {
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java Wed Feb 13 20:58:23 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.io;
import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
import com.google.protobuf.ByteString;
@@ -56,7 +54,7 @@ import com.google.protobuf.ByteString;
* references. References are cleaned up by compactions.
*/
@InterfaceAudience.Private
-public class Reference implements Writable {
+public class Reference {
private byte [] splitkey;
private Range region;
@@ -99,7 +97,6 @@ public class Reference implements Writab
/**
* Used by serializations.
- * @deprecated Use the pb serializations instead. Writables are going away.
*/
@Deprecated
// Make this private when it comes time to let go of this constructor. Needed by pb serialization.
@@ -130,18 +127,14 @@ public class Reference implements Writab
return "" + this.region;
}
- /**
- * @deprecated Writables are going away. Use the pb serialization methods instead.
- */
- @Deprecated
- public void write(DataOutput out) throws IOException {
- // Write true if we're doing top of the file.
- out.writeBoolean(isTopFileRegion(this.region));
- Bytes.writeByteArray(out, this.splitkey);
+ public static boolean isTopFileRegion(final Range r) {
+ return r.equals(Range.top);
}
/**
* @deprecated Writables are going away. Use the pb serialization methods instead.
+ * Remove in a release after 0.96 goes out. This is here only to migrate
+ * old Reference files written with Writables before 0.96.
*/
@Deprecated
public void readFields(DataInput in) throws IOException {
@@ -151,10 +144,6 @@ public class Reference implements Writab
this.splitkey = Bytes.readByteArray(in);
}
- public static boolean isTopFileRegion(final Range r) {
- return r.equals(Range.top);
- }
-
public Path write(final FileSystem fs, final Path p)
throws IOException {
FSDataOutputStream out = fs.create(p, false);
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Wed Feb 13 20:58:23 2013
@@ -19,12 +19,8 @@
package org.apache.hadoop.hbase.io;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
@@ -39,7 +35,7 @@ import org.apache.hadoop.hbase.util.Byte
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class TimeRange implements Writable {
+public class TimeRange {
private long minStamp = 0L;
private long maxStamp = Long.MAX_VALUE;
private boolean allTime = false;
@@ -184,17 +180,4 @@ public class TimeRange implements Writab
sb.append(this.minStamp);
return sb.toString();
}
-
- //Writable
- public void readFields(final DataInput in) throws IOException {
- this.minStamp = in.readLong();
- this.maxStamp = in.readLong();
- this.allTime = in.readBoolean();
- }
-
- public void write(final DataOutput out) throws IOException {
- out.writeLong(minStamp);
- out.writeLong(maxStamp);
- out.writeBoolean(this.allTime);
- }
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Wed Feb 13 20:58:23 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Clas
* Cache Key for use with implementations of {@link BlockCache}
*/
@InterfaceAudience.Private
-public class BlockCacheKey implements HeapSize {
+public class BlockCacheKey implements HeapSize, java.io.Serializable {
private final String hfileName;
private final long offset;
private final DataBlockEncoding encoding;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java Wed Feb 13 20:58:23 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.util.StringUtils;
@@ -72,6 +74,28 @@ public class CacheConfig {
public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
"hbase.rs.evictblocksonclose";
+ /**
+ * Configuration keys for Bucket cache
+ */
+ public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
+ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
+ public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
+ "hbase.bucketcache.persistent.path";
+ public static final String BUCKET_CACHE_COMBINED_KEY =
+ "hbase.bucketcache.combinedcache.enabled";
+ public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY =
+ "hbase.bucketcache.percentage.in.combinedcache";
+ public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
+ public static final String BUCKET_CACHE_WRITER_QUEUE_KEY =
+ "hbase.bucketcache.writer.queuelength";
+ /**
+ * Defaults for Bucket cache
+ */
+ public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true;
+ public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
+ public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
+ public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
+
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -341,19 +365,60 @@ public class CacheConfig {
// Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
- long cacheSize = (long)(mu.getMax() * cachePercentage);
+ long lruCacheSize = (long) (mu.getMax() * cachePercentage);
int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
HFile.DEFAULT_BLOCKSIZE);
long offHeapCacheSize =
(long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
DirectMemoryUtils.getDirectMemorySize());
- LOG.info("Allocating LruBlockCache with maximum size " +
- StringUtils.humanReadableInt(cacheSize));
if (offHeapCacheSize <= 0) {
- globalBlockCache = new LruBlockCache(cacheSize,
- StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+ String bucketCacheIOEngineName = conf
+ .get(BUCKET_CACHE_IOENGINE_KEY, null);
+ float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
+ // A percentage of max heap size or a absolute value with unit megabytes
+ long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax()
+ * bucketCachePercentage : bucketCachePercentage * 1024 * 1024);
+
+ boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
+ DEFAULT_BUCKET_CACHE_COMBINED);
+ BucketCache bucketCache = null;
+ if (bucketCacheIOEngineName != null && bucketCacheSize > 0) {
+ int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY,
+ DEFAULT_BUCKET_CACHE_WRITER_THREADS);
+ int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY,
+ DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
+ String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
+ float combinedPercentage = conf.getFloat(
+ BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
+ DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
+ if (combinedWithLru) {
+ lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
+ bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
+ }
+ try {
+ int ioErrorsTolerationDuration = conf.getInt(
+ "hbase.bucketcache.ioengine.errors.tolerated.duration",
+ BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
+ bucketCache = new BucketCache(bucketCacheIOEngineName,
+ bucketCacheSize, writerThreads, writerQueueLen, persistentPath,
+ ioErrorsTolerationDuration);
+ } catch (IOException ioex) {
+ LOG.error("Can't instantiate bucket cache", ioex);
+ throw new RuntimeException(ioex);
+ }
+ }
+ LOG.info("Allocating LruBlockCache with maximum size "
+ + StringUtils.humanReadableInt(lruCacheSize));
+ LruBlockCache lruCache = new LruBlockCache(lruCacheSize,
+ StoreFile.DEFAULT_BLOCKSIZE_SMALL);
+ lruCache.setVictimCache(bucketCache);
+ if (bucketCache != null && combinedWithLru) {
+ globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
+ } else {
+ globalBlockCache = lruCache;
+ }
} else {
- globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
+ globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
}
return globalBlockCache;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java Wed Feb 13 20:58:23 2013
@@ -171,6 +171,22 @@ public class CacheStats {
windowIndex = (windowIndex + 1) % numPeriodsInWindow;
}
+ public long getSumHitCountsPastNPeriods() {
+ return sum(hitCounts);
+ }
+
+ public long getSumRequestCountsPastNPeriods() {
+ return sum(requestCounts);
+ }
+
+ public long getSumHitCachingCountsPastNPeriods() {
+ return sum(hitCachingCounts);
+ }
+
+ public long getSumRequestCachingCountsPastNPeriods() {
+ return sum(requestCachingCounts);
+ }
+
public double getHitRatioPastNPeriods() {
double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts));
return Double.isNaN(ratio) ? 0 : ratio;
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Wed Feb 13 20:58:23 2013
@@ -56,4 +56,9 @@ public interface Cacheable extends HeapS
*/
public CacheableDeserializer<Cacheable> getDeserializer();
+ /**
+ * @return the block type of this cached HFile block
+ */
+ public BlockType getBlockType();
+
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java Wed Feb 13 20:58:23 2013
@@ -34,4 +34,21 @@ public interface CacheableDeserializer<T
* @return T the deserialized object.
*/
public T deserialize(ByteBuffer b) throws IOException;
+
+ /**
+ *
+ * @param b
+ * @param reuse true if Cacheable object can use the given buffer as its
+ * content
+ * @return T the deserialized object.
+ * @throws IOException
+ */
+ public T deserialize(ByteBuffer b, boolean reuse) throws IOException;
+
+ /**
+ * Get the identifier of this deserialiser. Identifier is unique for each
+ * deserializer and generated by {@link CacheableDeserializerIdManager}
+ * @return identifier number of this cacheable deserializer
+ */
+ public int getDeserialiserIdentifier();
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java Wed Feb 13 20:58:23 2013
@@ -96,11 +96,24 @@ public class CachedBlock implements Heap
return size;
}
+ @Override
public int compareTo(CachedBlock that) {
if(this.accessTime == that.accessTime) return 0;
return this.accessTime < that.accessTime ? 1 : -1;
}
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ CachedBlock other = (CachedBlock) obj;
+ return compareTo(other) == 0;
+ }
+
public Cacheable getBuffer() {
return this.buf;
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java Wed Feb 13 20:58:23 2013
@@ -54,7 +54,7 @@ public class ChecksumUtil {
* compute checkums from
* @param endOffset ending offset in the indata stream upto
* which checksums needs to be computed
- * @param outData the output buffer where checksum values are written
+ * @param outdata the output buffer where checksum values are written
* @param outOffset the starting offset in the outdata where the
* checksum values are written
* @param checksumType type of checksum
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java Wed Feb 13 20:58:23 2013
@@ -18,13 +18,10 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import static org.apache.hadoop.hbase.io.hfile.HFile.MAX_FORMAT_VERSION;
-import static org.apache.hadoop.hbase.io.hfile.HFile.MIN_FORMAT_VERSION;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -33,7 +30,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.RawComparator;
@@ -57,6 +56,9 @@ public class FixedFileTrailer {
private static final Log LOG = LogFactory.getLog(FixedFileTrailer.class);
+ /** HFile minor version that introduced pbuf filetrailer */
+ private static final int PBUF_TRAILER_MINOR_VERSION = 2;
+
/**
* We store the comparator class name as a fixed-length field in the trailer.
*/
@@ -113,7 +115,7 @@ public class FixedFileTrailer {
private long lastDataBlockOffset;
/** Raw key comparator class name in version 2 */
- private String comparatorClassName = RawComparator.class.getName();
+ private String comparatorClassName = KeyValue.KEY_COMPARATOR.getClass().getName();
/** The {@link HFile} format major version. */
private final int majorVersion;
@@ -129,11 +131,10 @@ public class FixedFileTrailer {
private static int[] computeTrailerSizeByVersion() {
int versionToSize[] = new int[HFile.MAX_FORMAT_VERSION + 1];
- for (int version = MIN_FORMAT_VERSION;
- version <= MAX_FORMAT_VERSION;
+ for (int version = HFile.MIN_FORMAT_VERSION;
+ version <= HFile.MAX_FORMAT_VERSION;
++version) {
- FixedFileTrailer fft = new FixedFileTrailer(version,
- HFileBlock.MINOR_VERSION_NO_CHECKSUM);
+ FixedFileTrailer fft = new FixedFileTrailer(version, HFileBlock.MINOR_VERSION_NO_CHECKSUM);
DataOutputStream dos = new DataOutputStream(new NullOutputStream());
try {
fft.serialize(dos);
@@ -148,8 +149,8 @@ public class FixedFileTrailer {
private static int getMaxTrailerSize() {
int maxSize = 0;
- for (int version = MIN_FORMAT_VERSION;
- version <= MAX_FORMAT_VERSION;
+ for (int version = HFile.MIN_FORMAT_VERSION;
+ version <= HFile.MAX_FORMAT_VERSION;
++version)
maxSize = Math.max(getTrailerSize(version), maxSize);
return maxSize;
@@ -158,6 +159,8 @@ public class FixedFileTrailer {
private static final int TRAILER_SIZE[] = computeTrailerSizeByVersion();
private static final int MAX_TRAILER_SIZE = getMaxTrailerSize();
+ private static final int NOT_PB_SIZE = BlockType.MAGIC_LENGTH + Bytes.SIZEOF_INT;
+
static int getTrailerSize(int version) {
return TRAILER_SIZE[version];
}
@@ -178,42 +181,89 @@ public class FixedFileTrailer {
HFile.checkFormatVersion(majorVersion);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutput baosDos = new DataOutputStream(baos);
+ DataOutputStream baosDos = new DataOutputStream(baos);
BlockType.TRAILER.write(baosDos);
- baosDos.writeLong(fileInfoOffset);
- baosDos.writeLong(loadOnOpenDataOffset);
- baosDos.writeInt(dataIndexCount);
+ if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+ serializeAsPB(baosDos);
+ } else {
+ serializeAsWritable(baosDos);
+ }
+
+ // The last 4 bytes of the file encode the major and minor version universally
+ baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
+
+ outputStream.write(baos.toByteArray());
+ }
+
+ /**
+ * Write trailer data as protobuf
+ * @param outputStream
+ * @throws IOException
+ */
+ void serializeAsPB(DataOutputStream output) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ HFileProtos.FileTrailerProto.newBuilder()
+ .setFileInfoOffset(fileInfoOffset)
+ .setLoadOnOpenDataOffset(loadOnOpenDataOffset)
+ .setUncompressedDataIndexSize(uncompressedDataIndexSize)
+ .setTotalUncompressedBytes(totalUncompressedBytes)
+ .setDataIndexCount(dataIndexCount)
+ .setMetaIndexCount(metaIndexCount)
+ .setEntryCount(entryCount)
+ .setNumDataIndexLevels(numDataIndexLevels)
+ .setFirstDataBlockOffset(firstDataBlockOffset)
+ .setLastDataBlockOffset(lastDataBlockOffset)
+ .setComparatorClassName(comparatorClassName)
+ .setCompressionCodec(compressionCodec.ordinal())
+ .build().writeDelimitedTo(baos);
+ output.write(baos.toByteArray());
+ // Pad to make up the difference between variable PB encoding length and the
+ // length when encoded as writable under earlier V2 formats. Failure to pad
+ // properly or if the PB encoding is too big would mean the trailer wont be read
+ // in properly by HFile.
+ int padding = getTrailerSize() - NOT_PB_SIZE - baos.size();
+ if (padding < 0) {
+ throw new IOException("Pbuf encoding size exceeded fixed trailer size limit");
+ }
+ for (int i = 0; i < padding; i++) {
+ output.write(0);
+ }
+ }
+
+ /**
+ * Write trailer data as writable
+ * @param outputStream
+ * @throws IOException
+ */
+ void serializeAsWritable(DataOutputStream output) throws IOException {
+ output.writeLong(fileInfoOffset);
+ output.writeLong(loadOnOpenDataOffset);
+ output.writeInt(dataIndexCount);
if (majorVersion == 1) {
// This used to be metaIndexOffset, but it was not used in version 1.
- baosDos.writeLong(0);
+ output.writeLong(0);
} else {
- baosDos.writeLong(uncompressedDataIndexSize);
+ output.writeLong(uncompressedDataIndexSize);
}
- baosDos.writeInt(metaIndexCount);
- baosDos.writeLong(totalUncompressedBytes);
+ output.writeInt(metaIndexCount);
+ output.writeLong(totalUncompressedBytes);
if (majorVersion == 1) {
- baosDos.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
+ output.writeInt((int) Math.min(Integer.MAX_VALUE, entryCount));
} else {
// This field is long from version 2 onwards.
- baosDos.writeLong(entryCount);
+ output.writeLong(entryCount);
}
- baosDos.writeInt(compressionCodec.ordinal());
+ output.writeInt(compressionCodec.ordinal());
if (majorVersion > 1) {
- baosDos.writeInt(numDataIndexLevels);
- baosDos.writeLong(firstDataBlockOffset);
- baosDos.writeLong(lastDataBlockOffset);
- Bytes.writeStringFixedSize(baosDos, comparatorClassName,
- MAX_COMPARATOR_NAME_LENGTH);
+ output.writeInt(numDataIndexLevels);
+ output.writeLong(firstDataBlockOffset);
+ output.writeLong(lastDataBlockOffset);
+ Bytes.writeStringFixedSize(output, comparatorClassName, MAX_COMPARATOR_NAME_LENGTH);
}
-
- // serialize the major and minor versions
- baosDos.writeInt(materializeVersion(majorVersion, minorVersion));
-
- outputStream.write(baos.toByteArray());
}
/**
@@ -222,7 +272,6 @@ public class FixedFileTrailer {
* {@link #serialize(DataOutputStream)}.
*
* @param inputStream
- * @param version
* @throws IOException
*/
void deserialize(DataInputStream inputStream) throws IOException {
@@ -230,33 +279,100 @@ public class FixedFileTrailer {
BlockType.TRAILER.readAndCheck(inputStream);
- fileInfoOffset = inputStream.readLong();
- loadOnOpenDataOffset = inputStream.readLong();
- dataIndexCount = inputStream.readInt();
-
- if (majorVersion == 1) {
- inputStream.readLong(); // Read and skip metaIndexOffset.
+ if (majorVersion > 2 || (majorVersion == 2 && minorVersion >= PBUF_TRAILER_MINOR_VERSION)) {
+ deserializeFromPB(inputStream);
} else {
- uncompressedDataIndexSize = inputStream.readLong();
- }
- metaIndexCount = inputStream.readInt();
-
- totalUncompressedBytes = inputStream.readLong();
- entryCount = majorVersion == 1 ? inputStream.readInt() : inputStream.readLong();
- compressionCodec = Compression.Algorithm.values()[inputStream.readInt()];
- if (majorVersion > 1) {
- numDataIndexLevels = inputStream.readInt();
- firstDataBlockOffset = inputStream.readLong();
- lastDataBlockOffset = inputStream.readLong();
- comparatorClassName =
- Bytes.readStringFixedSize(inputStream, MAX_COMPARATOR_NAME_LENGTH);
+ deserializeFromWritable(inputStream);
}
+ // The last 4 bytes of the file encode the major and minor version universally
int version = inputStream.readInt();
expectMajorVersion(extractMajorVersion(version));
expectMinorVersion(extractMinorVersion(version));
}
+ /**
+ * Deserialize the file trailer as protobuf
+ * @param inputStream
+ * @throws IOException
+ */
+ void deserializeFromPB(DataInputStream inputStream) throws IOException {
+ // read PB and skip padding
+ int start = inputStream.available();
+ HFileProtos.FileTrailerProto.Builder builder = HFileProtos.FileTrailerProto.newBuilder();
+ builder.mergeDelimitedFrom(inputStream);
+ int size = start - inputStream.available();
+ inputStream.skip(getTrailerSize() - NOT_PB_SIZE - size);
+
+ // process the PB
+ if (builder.hasFileInfoOffset()) {
+ fileInfoOffset = builder.getFileInfoOffset();
+ }
+ if (builder.hasLoadOnOpenDataOffset()) {
+ loadOnOpenDataOffset = builder.getLoadOnOpenDataOffset();
+ }
+ if (builder.hasUncompressedDataIndexSize()) {
+ uncompressedDataIndexSize = builder.getUncompressedDataIndexSize();
+ }
+ if (builder.hasTotalUncompressedBytes()) {
+ totalUncompressedBytes = builder.getTotalUncompressedBytes();
+ }
+ if (builder.hasDataIndexCount()) {
+ dataIndexCount = builder.getDataIndexCount();
+ }
+ if (builder.hasMetaIndexCount()) {
+ metaIndexCount = builder.getMetaIndexCount();
+ }
+ if (builder.hasEntryCount()) {
+ entryCount = builder.getEntryCount();
+ }
+ if (builder.hasNumDataIndexLevels()) {
+ numDataIndexLevels = builder.getNumDataIndexLevels();
+ }
+ if (builder.hasFirstDataBlockOffset()) {
+ firstDataBlockOffset = builder.getFirstDataBlockOffset();
+ }
+ if (builder.hasLastDataBlockOffset()) {
+ lastDataBlockOffset = builder.getLastDataBlockOffset();
+ }
+ if (builder.hasComparatorClassName()) {
+ setComparatorClass(getComparatorClass(builder.getComparatorClassName()));
+ }
+ if (builder.hasCompressionCodec()) {
+ compressionCodec = Compression.Algorithm.values()[builder.getCompressionCodec()];
+ } else {
+ compressionCodec = Compression.Algorithm.NONE;
+ }
+ }
+
+ /**
+ * Deserialize the file trailer as writable data
+ * @param input
+ * @throws IOException
+ */
+ void deserializeFromWritable(DataInput input) throws IOException {
+ fileInfoOffset = input.readLong();
+ loadOnOpenDataOffset = input.readLong();
+ dataIndexCount = input.readInt();
+ if (majorVersion == 1) {
+ input.readLong(); // Read and skip metaIndexOffset.
+ } else {
+ uncompressedDataIndexSize = input.readLong();
+ }
+ metaIndexCount = input.readInt();
+
+ totalUncompressedBytes = input.readLong();
+ entryCount = majorVersion == 1 ? input.readInt() : input.readLong();
+ compressionCodec = Compression.Algorithm.values()[input.readInt()];
+ if (majorVersion > 1) {
+ numDataIndexLevels = input.readInt();
+ firstDataBlockOffset = input.readLong();
+ lastDataBlockOffset = input.readLong();
+ setComparatorClass(getComparatorClass(Bytes.readStringFixedSize(input,
+ MAX_COMPARATOR_NAME_LENGTH)));
+ }
+ }
+
private void append(StringBuilder sb, String s) {
if (sb.length() > 0)
sb.append(", ");
@@ -450,6 +566,10 @@ public class FixedFileTrailer {
this.firstDataBlockOffset = firstDataBlockOffset;
}
+ public String getComparatorClassName() {
+ return comparatorClassName;
+ }
+
/**
* Returns the major version of this HFile format
*/
@@ -466,7 +586,13 @@ public class FixedFileTrailer {
@SuppressWarnings("rawtypes")
public void setComparatorClass(Class<? extends RawComparator> klass) {
- expectAtLeastMajorVersion(2);
+ // Is the comparator instantiable
+ try {
+ klass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Comparator class " + klass.getName() +
+ " is not instantiable", e);
+ }
comparatorClassName = klass.getName();
}
@@ -486,9 +612,11 @@ public class FixedFileTrailer {
try {
return getComparatorClass(comparatorClassName).newInstance();
} catch (InstantiationException e) {
- throw new IOException(e);
+ throw new IOException("Comparator class " + comparatorClassName +
+ " is not instantiable", e);
} catch (IllegalAccessException e) {
- throw new IOException(e);
+ throw new IOException("Comparator class " + comparatorClassName +
+ " is not instantiable", e);
}
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Wed Feb 13 20:58:23 2013
@@ -165,6 +165,9 @@ public class HFile {
public final static String DEFAULT_COMPRESSION =
DEFAULT_COMPRESSION_ALGORITHM.getName();
+ /** Meta data block name for bloom filter bits. */
+ public static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
+
/**
* We assume that HFile path ends with
* ROOT_DIR/TABLE_NAME/REGION_NAME/CF_NAME/HFILE, so it has at least this
@@ -447,8 +450,6 @@ public class HFile {
CacheConfig cacheConf) {
int version = getFormatVersion(conf);
switch (version) {
- case 1:
- return new HFileWriterV1.WriterFactoryV1(conf, cacheConf);
case 2:
return new HFileWriterV2.WriterFactoryV2(conf, cacheConf);
default:
@@ -557,9 +558,6 @@ public class HFile {
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, iae);
}
switch (trailer.getMajorVersion()) {
- case 1:
- return new HFileReaderV1(path, trailer, fsdis, size, closeIStream,
- cacheConf);
case 2:
return new HFileReaderV2(path, trailer, fsdis, fsdisNoFsChecksum,
size, closeIStream,
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Wed Feb 13 20:58:23 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encodi
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
@@ -129,8 +130,9 @@ public class HFileBlock implements Cache
public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
- static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
- Bytes.SIZEOF_INT;
+ // minorVersion+offset+nextBlockOnDiskSizeWithHeader
+ public static final int EXTRA_SERIALIZATION_SPACE = 2 * Bytes.SIZEOF_INT
+ + Bytes.SIZEOF_LONG;
/**
* Each checksum value is an integer that can be stored in 4 bytes.
@@ -139,22 +141,39 @@ public class HFileBlock implements Cache
private static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
- public HFileBlock deserialize(ByteBuffer buf) throws IOException{
- ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
- - HFileBlock.EXTRA_SERIALIZATION_SPACE);
- buf.limit(buf.limit()
- - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
- newByteBuffer.put(buf);
- HFileBlock ourBuffer = new HFileBlock(newByteBuffer,
- MINOR_VERSION_NO_CHECKSUM);
-
+ public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
+ buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+ ByteBuffer newByteBuffer;
+ if (reuse) {
+ newByteBuffer = buf.slice();
+ } else {
+ newByteBuffer = ByteBuffer.allocate(buf.limit());
+ newByteBuffer.put(buf);
+ }
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+ int minorVersion=buf.getInt();
+ HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion);
ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
return ourBuffer;
}
+
+ @Override
+ public int getDeserialiserIdentifier() {
+ return deserializerIdentifier;
+ }
+
+ @Override
+ public HFileBlock deserialize(ByteBuffer b) throws IOException {
+ return deserialize(b, false);
+ }
};
+ private static final int deserializerIdentifier;
+ static {
+ deserializerIdentifier = CacheableDeserializerIdManager
+ .registerDeserializer(blockDeserializer);
+ }
private BlockType blockType;
@@ -359,6 +378,17 @@ public class HFileBlock implements Cache
}
/**
+ * Returns the buffer of this block, including header data. The clients must
+ * not modify the buffer object. This method has to be public because it is
+ * used in {@link BucketCache} to avoid buffer copy.
+ *
+ * @return the byte buffer with header included for read-only operations
+ */
+ public ByteBuffer getBufferReadOnlyWithHeader() {
+ return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
+ }
+
+ /**
* Returns a byte buffer of this block, including header data, positioned at
* the beginning of header. The underlying data array is not copied.
*
@@ -1287,110 +1317,6 @@ public class HFileBlock implements Cache
}
/**
- * Reads version 1 blocks from the file system. In version 1 blocks,
- * everything is compressed, including the magic record, if compression is
- * enabled. Everything might be uncompressed if no compression is used. This
- * reader returns blocks represented in the uniform version 2 format in
- * memory.
- */
- static class FSReaderV1 extends AbstractFSReader {
-
- /** Header size difference between version 1 and 2 */
- private static final int HEADER_DELTA = HEADER_SIZE_NO_CHECKSUM -
- MAGIC_LENGTH;
-
- public FSReaderV1(FSDataInputStream istream, Algorithm compressAlgo,
- long fileSize) throws IOException {
- super(istream, istream, compressAlgo, fileSize, 0, null, null);
- }
-
- /**
- * Read a version 1 block. There is no uncompressed header, and the block
- * type (the magic record) is part of the compressed data. This
- * implementation assumes that the bounded range file input stream is
- * needed to stop the decompressor reading into next block, because the
- * decompressor just grabs a bunch of data without regard to whether it is
- * coming to end of the compressed section.
- *
- * The block returned is still a version 2 block, and in particular, its
- * first {@link #HEADER_SIZE} bytes contain a valid version 2 header.
- *
- * @param offset the offset of the block to read in the file
- * @param onDiskSizeWithMagic the on-disk size of the version 1 block,
- * including the magic record, which is the part of compressed
- * data if using compression
- * @param uncompressedSizeWithMagic uncompressed size of the version 1
- * block, including the magic record
- */
- @Override
- public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic,
- int uncompressedSizeWithMagic, boolean pread) throws IOException {
- if (uncompressedSizeWithMagic <= 0) {
- throw new IOException("Invalid uncompressedSize="
- + uncompressedSizeWithMagic + " for a version 1 block");
- }
-
- if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE)
- {
- throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic
- + " (maximum allowed: " + Integer.MAX_VALUE + ")");
- }
-
- int onDiskSize = (int) onDiskSizeWithMagic;
-
- if (uncompressedSizeWithMagic < MAGIC_LENGTH) {
- throw new IOException("Uncompressed size for a version 1 block is "
- + uncompressedSizeWithMagic + " but must be at least "
- + MAGIC_LENGTH);
- }
-
- // The existing size already includes magic size, and we are inserting
- // a version 2 header.
- ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic
- + HEADER_DELTA);
-
- int onDiskSizeWithoutHeader;
- if (compressAlgo == Compression.Algorithm.NONE) {
- // A special case when there is no compression.
- if (onDiskSize != uncompressedSizeWithMagic) {
- throw new IOException("onDiskSize=" + onDiskSize
- + " and uncompressedSize=" + uncompressedSizeWithMagic
- + " must be equal for version 1 with no compression");
- }
-
- // The first MAGIC_LENGTH bytes of what this will read will be
- // overwritten.
- readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA,
- onDiskSize, false, offset, pread);
-
- onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH;
- } else {
- InputStream bufferedBoundedStream = createBufferedBoundedStream(
- offset, onDiskSize, pread);
- Compression.decompress(buf.array(), buf.arrayOffset()
- + HEADER_DELTA, bufferedBoundedStream, onDiskSize,
- uncompressedSizeWithMagic, this.compressAlgo);
-
- // We don't really have a good way to exclude the "magic record" size
- // from the compressed block's size, since it is compressed as well.
- onDiskSizeWithoutHeader = onDiskSize;
- }
-
- BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset()
- + HEADER_DELTA, MAGIC_LENGTH);
-
- // We set the uncompressed size of the new HFile block we are creating
- // to the size of the data portion of the block without the magic record,
- // since the magic record gets moved to the header.
- HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader,
- uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER,
- offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(),
- onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM);
- return b;
- }
- }
-
- /**
* We always prefetch the header of the next block, so that we know its
* on-disk size in advance and can read it in one operation.
*/
@@ -1780,7 +1706,17 @@ public class HFileBlock implements Cache
@Override
public void serialize(ByteBuffer destination) {
- destination.put(this.buf.duplicate());
+ ByteBuffer dupBuf = this.buf.duplicate();
+ dupBuf.rewind();
+ destination.put(dupBuf);
+ destination.putInt(this.minorVersion);
+ destination.putLong(this.offset);
+ destination.putInt(this.nextBlockOnDiskSizeWithHeader);
+ destination.rewind();
+ }
+
+ public void serializeExtraInfo(ByteBuffer destination) {
+ destination.putInt(this.minorVersion);
destination.putLong(this.offset);
destination.putInt(this.nextBlockOnDiskSizeWithHeader);
destination.rewind();
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Wed Feb 13 20:58:23 2013
@@ -77,8 +77,8 @@ public class HFileReaderV2 extends Abstr
static final int MIN_MINOR_VERSION = 0;
/** Maximum minor version supported by this HFile format */
- // We went to version 2 when we moved to pb'ing the fileinfo trailer on the file. This version can read Writables
- // version 1 too.
+ // We went to version 2 when we moved to pb'ing fileinfo and the trailer on
+ // the file. This version can read Writables version 1.
static final int MAX_MINOR_VERSION = 2;
/**
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Wed Feb 13 20:58:23 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -173,6 +175,9 @@ public class LruBlockCache implements Bl
/** Overhead of the structure itself */
private long overhead;
+ /** Where to send victims (blocks evicted from the cache) */
+ private BucketCache victimHandler = null;
+
/**
* Default constructor. Specify maximum size and expected average block
* size (approximation is fine).
@@ -342,6 +347,8 @@ public class LruBlockCache implements Bl
CachedBlock cb = map.get(cacheKey);
if(cb == null) {
if (!repeat) stats.miss(caching);
+ if (victimHandler != null)
+ return victimHandler.getBlock(cacheKey, caching, repeat);
return null;
}
stats.hit(caching);
@@ -349,12 +356,20 @@ public class LruBlockCache implements Bl
return cb.getBuffer();
}
+ /**
+ * Whether the cache contains block with specified cacheKey
+ * @param cacheKey
+ * @return true if contains the block
+ */
+ public boolean containsBlock(BlockCacheKey cacheKey) {
+ return map.containsKey(cacheKey);
+ }
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
CachedBlock cb = map.get(cacheKey);
if (cb == null) return false;
- evictBlock(cb);
+ evictBlock(cb, false);
return true;
}
@@ -377,14 +392,31 @@ public class LruBlockCache implements Bl
++numEvicted;
}
}
+ if (victimHandler != null) {
+ numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+ }
return numEvicted;
}
- protected long evictBlock(CachedBlock block) {
+ /**
+ * Evict the block, and it will be cached by the victim handler if exists &&
+ * block may be read again later
+ * @param block
+ * @param evictedByEvictionProcess true if the given block is evicted by
+ * EvictionThread
+ * @return the heap size of evicted block
+ */
+ protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
map.remove(block.getCacheKey());
updateSizeMetrics(block, true);
elements.decrementAndGet();
stats.evicted();
+ if (evictedByEvictionProcess && victimHandler != null) {
+ boolean wait = getCurrentSize() < acceptableSize();
+ boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
+ victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
+ inMemory, wait);
+ }
return block.heapSize();
}
@@ -512,7 +544,7 @@ public class LruBlockCache implements Bl
CachedBlock cb;
long freedBytes = 0;
while ((cb = queue.pollLast()) != null) {
- freedBytes += evictBlock(cb);
+ freedBytes += evictBlock(cb, true);
if (freedBytes >= toFree) {
return freedBytes;
}
@@ -532,6 +564,16 @@ public class LruBlockCache implements Bl
if(this.overflow() == that.overflow()) return 0;
return this.overflow() > that.overflow() ? 1 : -1;
}
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null || !(that instanceof BlockBucket)){
+ return false;
+ }
+
+ return compareTo(( BlockBucket)that) == 0;
+ }
+
}
/**
@@ -625,13 +667,13 @@ public class LruBlockCache implements Bl
public void evict() {
synchronized(this) {
- this.notify(); // FindBugs NN_NAKED_NOTIFY
+ this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
}
- void shutdown() {
+ synchronized void shutdown() {
this.go = false;
- interrupt();
+ this.notifyAll();
}
/**
@@ -693,7 +735,7 @@ public class LruBlockCache implements Bl
}
public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
- (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
+ (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
+ ClassSize.OBJECT);
@@ -762,6 +804,8 @@ public class LruBlockCache implements Bl
}
public void shutdown() {
+ if (victimHandler != null)
+ victimHandler.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
@@ -812,4 +856,9 @@ public class LruBlockCache implements Bl
return counts;
}
+ public void setVictimCache(BucketCache handler) {
+ assert victimHandler == null;
+ victimHandler = handler;
+ }
+
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java Wed Feb 13 20:58:23 2013
@@ -70,4 +70,4 @@ public class BlockingRpcCallback<R> impl
}
return result;
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java Wed Feb 13 20:58:23 2013
@@ -25,11 +25,8 @@ import java.io.IOException;
* but is only used for logging on the server side, etc.
*/
public class CallerDisconnectedException extends IOException {
+ private static final long serialVersionUID = 1L;
public CallerDisconnectedException(String msg) {
super(msg);
}
-
- private static final long serialVersionUID = 1L;
-
-
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Delayable.java Wed Feb 13 20:58:23 2013
@@ -70,4 +70,4 @@ public interface Delayable {
* @throws IOException
*/
public void endDelayThrowing(Throwable t) throws IOException;
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Feb 13 20:58:23 2013
@@ -28,6 +28,7 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -41,6 +42,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -53,6 +55,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
@@ -103,9 +106,11 @@ import com.google.protobuf.Message.Build
@InterfaceAudience.Private
public class HBaseClient {
- public static final Log LOG = LogFactory
- .getLog("org.apache.hadoop.ipc.HBaseClient");
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap<ConnectionId, Connection> connections;
+ private static final Map<String, Method> methodInstances =
+ new ConcurrentHashMap<String, Method>();
protected int counter; // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
@@ -121,7 +126,6 @@ public class HBaseClient {
protected FailedServers failedServers;
protected final SocketFactory socketFactory; // how to create sockets
- private int refCount = 1;
protected String clusterId;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -186,12 +190,13 @@ public class HBaseClient {
}
public static class FailedServerException extends IOException {
+ private static final long serialVersionUID = -4744376109431464127L;
+
public FailedServerException(String s) {
super(s);
}
}
-
/**
* set the ping interval value in configuration
*
@@ -229,36 +234,11 @@ public class HBaseClient {
return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
}
- /**
- * Increment this client's reference count
- *
- */
- synchronized void incCount() {
- refCount++;
- }
-
- /**
- * Decrement this client's reference count
- *
- */
- synchronized void decCount() {
- refCount--;
- }
-
- /**
- * Return if this client has no reference
- *
- * @return true if this client has no reference; false otherwise
- */
- synchronized boolean isZeroReference() {
- return refCount==0;
- }
-
/** A call waiting for a value. */
protected class Call {
- final int id; // call id
- final RpcRequestBody param; // rpc request object
- Message value; // value, null if error
+ final int id; // call id
+ final RpcRequestBody param; // rpc request object
+ Message value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
long startTime;
@@ -302,6 +282,7 @@ public class HBaseClient {
return this.startTime;
}
}
+
protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
static {
@@ -335,9 +316,12 @@ public class HBaseClient {
private int reloginMaxBackoff; // max pause before relogin on sasl failure
// currently active calls
- protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
- protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
- protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
+ protected final ConcurrentSkipListMap<Integer, Call> calls =
+ new ConcurrentSkipListMap<Integer, Call>();
+ protected final AtomicLong lastActivity =
+ new AtomicLong(); // last I/O activity time
+ protected final AtomicBoolean shouldCloseConnection =
+ new AtomicBoolean(); // indicate if the connection is closed
protected IOException closeException; // close reason
Connection(ConnectionId remoteId) throws IOException {
@@ -414,16 +398,14 @@ public class HBaseClient {
return null;
}
UserInformation.Builder userInfoPB = UserInformation.newBuilder();
- if (ugi != null) {
- if (authMethod == AuthMethod.KERBEROS) {
- // Send effective user for Kerberos auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- } else if (authMethod == AuthMethod.SIMPLE) {
- //Send both effective user and real user for simple auth
- userInfoPB.setEffectiveUser(ugi.getUserName());
- if (ugi.getRealUser() != null) {
- userInfoPB.setRealUser(ugi.getRealUser().getUserName());
- }
+ if (authMethod == AuthMethod.KERBEROS) {
+ // Send effective user for Kerberos auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ } else if (authMethod == AuthMethod.SIMPLE) {
+ //Send both effective user and real user for simple auth
+ userInfoPB.setEffectiveUser(ugi.getUserName());
+ if (ugi.getRealUser() != null) {
+ userInfoPB.setRealUser(ugi.getRealUser().getUserName());
}
}
return userInfoPB.build();
@@ -845,11 +827,17 @@ public class HBaseClient {
start();
return;
}
- } catch (IOException e) {
+ } catch (Throwable t) {
failedServers.addToFailedServers(remoteId.address);
- markClosed(e);
+ IOException e = null;
+ if (t instanceof IOException) {
+ e = (IOException)t;
+ markClosed(e);
+ } else {
+ e = new IOException("Coundn't set up IO Streams", t);
+ markClosed(e);
+ }
close();
-
throw e;
}
}
@@ -959,6 +947,24 @@ public class HBaseClient {
}
}
+
+ private Method getMethod(Class<? extends IpcProtocol> protocol,
+ String methodName) {
+ Method method = methodInstances.get(methodName);
+ if (method != null) {
+ return method;
+ }
+ Method[] methods = protocol.getMethods();
+ for (Method m : methods) {
+ if (m.getName().equals(methodName)) {
+ m.setAccessible(true);
+ methodInstances.put(methodName, m);
+ return m;
+ }
+ }
+ return null;
+ }
+
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
@@ -990,9 +996,9 @@ public class HBaseClient {
if (status == Status.SUCCESS) {
Message rpcResponseType;
try {
- rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
- ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
- call.param.getMethodName()));
+ rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType(
+ getMethod(remoteId.getProtocol(),
+ call.param.getMethodName()));
} catch (Exception e) {
throw new RuntimeException(e); //local exception
}
@@ -1270,7 +1276,7 @@ public class HBaseClient {
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
public Message call(RpcRequestBody param, InetSocketAddress addr,
- Class<? extends VersionedProtocol> protocol,
+ Class<? extends IpcProtocol> protocol,
User ticket, int rpcTimeout)
throws InterruptedException, IOException {
Call call = new Call(param);
@@ -1317,7 +1323,6 @@ public class HBaseClient {
* @param exception the relevant exception
* @return an exception to throw
*/
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
protected IOException wrapException(InetSocketAddress addr,
IOException exception) {
if (exception instanceof ConnectException) {
@@ -1340,25 +1345,9 @@ public class HBaseClient {
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
- * contains nulls for calls that timed out or errored.
- * @param params RpcRequestBody parameters
- * @param addresses socket addresses
- * @return RpcResponseBody[]
- * @throws IOException e
- * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
- */
- @Deprecated
- public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
- throws IOException, InterruptedException {
- return call(params, addresses, null, null);
- }
-
- /** Makes a set of calls in parallel. Each parameter is sent to the
- * corresponding address. When all values are available, or have timed out
- * or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
- Class<? extends VersionedProtocol> protocol,
+ Class<? extends IpcProtocol> protocol,
User ticket)
throws IOException, InterruptedException {
if (addresses.length == 0) return new RpcResponseBody[0];
@@ -1393,7 +1382,7 @@ public class HBaseClient {
/* Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
protected Connection getConnection(InetSocketAddress addr,
- Class<? extends VersionedProtocol> protocol,
+ Class<? extends IpcProtocol> protocol,
User ticket,
int rpcTimeout,
Call call)
@@ -1436,11 +1425,10 @@ public class HBaseClient {
final InetSocketAddress address;
final User ticket;
final int rpcTimeout;
- Class<? extends VersionedProtocol> protocol;
+ Class<? extends IpcProtocol> protocol;
private static final int PRIME = 16777619;
- ConnectionId(InetSocketAddress address,
- Class<? extends VersionedProtocol> protocol,
+ ConnectionId(InetSocketAddress address, Class<? extends IpcProtocol> protocol,
User ticket,
int rpcTimeout) {
this.protocol = protocol;
@@ -1453,7 +1441,7 @@ public class HBaseClient {
return address;
}
- Class<? extends VersionedProtocol> getProtocol() {
+ Class<? extends IpcProtocol> getProtocol() {
return protocol;
}
@@ -1479,4 +1467,4 @@ public class HBaseClient {
(ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
}
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed Feb 13 20:58:23 2013
@@ -46,18 +46,17 @@ import java.nio.channels.WritableByteCha
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
@@ -68,28 +67,29 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.IpcProtocol;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -97,20 +97,20 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-
import org.cliffc.high_scale_lib.Counter;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceInfo;
import org.cloudera.htrace.impl.NullSpan;
-import org.cloudera.htrace.Trace;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+// Uses Writables doing sasl
/** A client for an IPC service. IPC calls take a single Protobuf message as a
* parameter, and return a single Protobuf message as their value. A service runs on
@@ -169,22 +169,18 @@ public abstract class HBaseServer implem
new ThreadLocal<RpcServer>();
private volatile boolean started = false;
- // For generated protocol classes which doesn't have VERSION field
- private static final Map<Class<?>, Long>
- PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
-
- private static final Map<String, Class<? extends VersionedProtocol>>
- PROTOCOL_CACHE =
- new ConcurrentHashMap<String, Class<? extends VersionedProtocol>>();
+ private static final Map<String, Class<? extends IpcProtocol>> PROTOCOL_CACHE =
+ new ConcurrentHashMap<String, Class<? extends IpcProtocol>>();
- static Class<? extends VersionedProtocol> getProtocolClass(
+ @SuppressWarnings("unchecked")
+ static Class<? extends IpcProtocol> getProtocolClass(
String protocolName, Configuration conf)
throws ClassNotFoundException {
- Class<? extends VersionedProtocol> protocol =
+ Class<? extends IpcProtocol> protocol =
PROTOCOL_CACHE.get(protocolName);
if (protocol == null) {
- protocol = (Class<? extends VersionedProtocol>)
+ protocol = (Class<? extends IpcProtocol>)
conf.getClassByName(protocolName);
PROTOCOL_CACHE.put(protocolName, protocol);
}
@@ -192,7 +188,7 @@ public abstract class HBaseServer implem
}
/** Returns the server instance called under or null. May be called under
- * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
+ * {@code #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
* and under protobuf methods of parameters and return values.
* Permits applications to access the server context.
* @return HBaseServer
@@ -263,8 +259,6 @@ public abstract class HBaseServer implem
protected int highPriorityLevel; // what level a high priority call is at
- private volatile int responseQueueLen; // size of response queue for this server
-
protected final List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
//maintain a list
@@ -278,7 +272,7 @@ public abstract class HBaseServer implem
protected BlockingQueue<Call> replicationQueue;
private int numOfReplicationHandlers = 0;
private Handler[] replicationHandlers = null;
-
+
protected HBaseRPCErrorHandler errorHandler = null;
/**
@@ -358,7 +352,7 @@ public abstract class HBaseServer implem
if (errorClass != null) {
this.isError = true;
}
-
+
ByteBufferOutputStream buf = null;
if (value != null) {
buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
@@ -460,7 +454,7 @@ public abstract class HBaseServer implem
public synchronized boolean isReturnValueDelayed() {
return this.delayReturnValue;
}
-
+
@Override
public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
if (!connection.channel.isOpen()) {
@@ -1000,7 +994,6 @@ public abstract class HBaseServer implem
return true;
}
if (!call.response.hasRemaining()) {
- responseQueueLen--;
call.connection.decRpcCount();
//noinspection RedundantIfStatement
if (numElements == 1) { // last call fully processes.
@@ -1070,7 +1063,6 @@ public abstract class HBaseServer implem
void doRespond(Call call) throws IOException {
// set the serve time when the response has to be sent later
call.timestamp = System.currentTimeMillis();
- responseQueueLen++;
boolean doRegister = false;
synchronized (call.connection.responseQueue) {
@@ -1120,7 +1112,7 @@ public abstract class HBaseServer implem
protected String hostAddress;
protected int remotePort;
ConnectionHeader header;
- Class<? extends VersionedProtocol> protocol;
+ Class<? extends IpcProtocol> protocol;
protected UserGroupInformation user = null;
private AuthMethod authMethod;
private boolean saslContextEstablished;
@@ -1324,7 +1316,7 @@ public abstract class HBaseServer implem
LOG.debug("SASL server context established. Authenticated client: "
+ user + ". Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
- }
+ }
metrics.authenticationSuccess();
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
@@ -1437,7 +1429,7 @@ public abstract class HBaseServer implem
}
}
if (dataLength < 0) {
- throw new IllegalArgumentException("Unexpected data length "
+ throw new IllegalArgumentException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
data = ByteBuffer.allocate(dataLength);
@@ -1758,7 +1750,7 @@ public abstract class HBaseServer implem
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here
status.setStatus("Setting up call");
- status.setConnection(call.connection.getHostAddress(),
+ status.setConnection(call.connection.getHostAddress(),
call.connection.getRemotePort());
if (LOG.isDebugEnabled())
@@ -2019,11 +2011,12 @@ public abstract class HBaseServer implem
}
return handlers;
}
-
+
public SecretManager<? extends TokenIdentifier> getSecretManager() {
return this.secretManager;
}
+ @SuppressWarnings("unchecked")
public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
}
@@ -2051,7 +2044,7 @@ public abstract class HBaseServer implem
}
}
}
-
+
/** Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
* See {@link #stop()}.
@@ -2110,7 +2103,7 @@ public abstract class HBaseServer implem
connection.getProtocol());
}
authManager.authorize(user != null ? user : null,
- protocol, getConf(), addr);
+ protocol, getConf(), addr);
}
}
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java Wed Feb 13 20:58:23 2013
@@ -18,26 +18,20 @@
package org.apache.hadoop.hbase.ipc;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import java.io.IOException;
-
-import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s