You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/05/07 21:26:51 UTC
svn commit: r942186 [4/18] - in /hadoop/hbase/trunk: ./
contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/
core/src/main/java/org/apache/hadoop/hbase/
core/src/main/java/org/apache/hadoop/hbase/client/
core/src/main/java/org/apache/h...
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java Fri May 7 19:26:45 2010
@@ -27,49 +27,49 @@ import org.apache.hadoop.hbase.util.Clas
/**
* Represents an entry in the {@link LruBlockCache}.
- *
+ *
* <p>Makes the block memory-aware with {@link HeapSize} and Comparable
* to sort by access time for the LRU. It also takes care of priority by
* either instantiating as in-memory or handling the transition from single
* to multiple access.
*/
public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
-
+
public final static long PER_BLOCK_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) +
ClassSize.STRING + ClassSize.BYTE_BUFFER);
-
- static enum BlockPriority {
+
+ static enum BlockPriority {
/**
* Accessed a single time (used for scan-resistance)
*/
- SINGLE,
+ SINGLE,
/**
* Accessed multiple times
*/
- MULTI,
+ MULTI,
/**
* Block from in-memory store
*/
MEMORY
};
-
+
private final String blockName;
private final ByteBuffer buf;
private volatile long accessTime;
private long size;
private BlockPriority priority;
-
+
public CachedBlock(String blockName, ByteBuffer buf, long accessTime) {
this(blockName, buf, accessTime, false);
}
-
+
public CachedBlock(String blockName, ByteBuffer buf, long accessTime,
boolean inMemory) {
this.blockName = blockName;
this.buf = buf;
this.accessTime = accessTime;
- this.size = ClassSize.align(blockName.length()) +
+ this.size = ClassSize.align(blockName.length()) +
ClassSize.align(buf.capacity()) + PER_BLOCK_OVERHEAD;
if(inMemory) {
this.priority = BlockPriority.MEMORY;
@@ -77,7 +77,7 @@ public class CachedBlock implements Heap
this.priority = BlockPriority.SINGLE;
}
}
-
+
/**
* Block has been accessed. Update its local access time.
*/
@@ -87,7 +87,7 @@ public class CachedBlock implements Heap
this.priority = BlockPriority.MULTI;
}
}
-
+
public long heapSize() {
return size;
}
@@ -96,15 +96,15 @@ public class CachedBlock implements Heap
if(this.accessTime == that.accessTime) return 0;
return this.accessTime < that.accessTime ? 1 : -1;
}
-
+
public ByteBuffer getBuffer() {
return this.buf;
}
-
+
public String getName() {
return this.blockName;
}
-
+
public BlockPriority getPriority() {
return this.priority;
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlockQueue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlockQueue.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlockQueue.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlockQueue.java Fri May 7 19:26:45 2010
@@ -28,22 +28,22 @@ import org.apache.hadoop.hbase.io.HeapSi
* A memory-bound queue that will grow until an element brings
* total size >= maxSize. From then on, only entries that are sorted larger
* than the smallest current entry will be inserted/replaced.
- *
+ *
* <p>Use this when you want to find the largest elements (according to their
* ordering, not their heap size) that consume as close to the specified
* maxSize as possible. Default behavior is to grow just above rather than
* just below specified max.
- *
+ *
* <p>Object used in this queue must implement {@link HeapSize} as well as
* {@link Comparable}.
*/
public class CachedBlockQueue implements HeapSize {
-
+
private PriorityQueue<CachedBlock> queue;
-
+
private long heapSize;
private long maxSize;
-
+
/**
* @param maxSize the target size of elements in the queue
* @param blockSize expected average size of blocks
@@ -55,10 +55,10 @@ public class CachedBlockQueue implements
heapSize = 0;
this.maxSize = maxSize;
}
-
+
/**
* Attempt to add the specified cached block to this queue.
- *
+ *
* <p>If the queue is smaller than the max size, or if the specified element
* is ordered before the smallest element in the queue, the element will be
* added to the queue. Otherwise, there is no side effect of this call.
@@ -82,7 +82,7 @@ public class CachedBlockQueue implements
}
}
}
-
+
/**
* Get a sorted List of all elements in this queue, in descending order.
* @return list of cached elements in descending order
@@ -94,7 +94,7 @@ public class CachedBlockQueue implements
}
return blocks.toArray(new CachedBlock[blocks.size()]);
}
-
+
/**
* Total size of all elements in this queue.
* @return size of all elements currently in queue, in bytes
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/Compression.java Fri May 7 19:26:45 2010
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Fri May 7 19:26:45 2010
@@ -102,7 +102,7 @@ import org.apache.hadoop.io.compress.Dec
* compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
* decompress, comparing to "lzo".
* </ul>
- *
+ *
* For more on the background behind HFile, see <a
* href=https://issues.apache.org/jira/browse/HBASE-3315>HBASE-61</a>.
* <p>
@@ -124,7 +124,7 @@ import org.apache.hadoop.io.compress.Dec
public class HFile {
static final Log LOG = LogFactory.getLog(HFile.class);
- /* These values are more or less arbitrary, and they are used as a
+ /* These values are more or less arbitrary, and they are used as a
* form of check to make sure the file isn't completely corrupt.
*/
final static byte [] DATABLOCKMAGIC =
@@ -133,14 +133,14 @@ public class HFile {
{ 'I', 'D', 'X', 'B', 'L', 'K', 41, 43 };
final static byte [] METABLOCKMAGIC =
{ 'M', 'E', 'T', 'A', 'B', 'L', 'K', 99 };
- final static byte [] TRAILERBLOCKMAGIC =
+ final static byte [] TRAILERBLOCKMAGIC =
{ 'T', 'R', 'A', 'B', 'L', 'K', 34, 36 };
/**
* Maximum length of key in HFile.
*/
public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
-
+
/**
* Default blocksize for hfile.
*/
@@ -269,7 +269,7 @@ public class HFile {
* @param blocksize
* @param compress
* @param comparator
- * @throws IOException
+ * @throws IOException
* @throws IOException
*/
public Writer(FileSystem fs, Path path, int blocksize,
@@ -314,7 +314,7 @@ public class HFile {
this(ostream, blocksize,
Compression.getCompressionAlgorithmByName(compress), c);
}
-
+
/**
* Constructor that takes a stream.
* @param ostream Stream to use.
@@ -399,13 +399,13 @@ public class HFile {
this.compressor, 0);
return new DataOutputStream(os);
}
-
+
/*
* Let go of block compressor and compressing stream gotten in call
* {@link #getCompressingStream}.
* @param dos
* @return How much was written on this stream since it was taken out.
- * @see #getCompressingStream()
+ * @see #getCompressingStream()
* @throws IOException
*/
private int releaseCompressingStream(final DataOutputStream dos)
@@ -435,7 +435,7 @@ public class HFile {
* from {@link Reader#loadFileInfo()}.
* @param k Key
* @param v Value
- * @throws IOException
+ * @throws IOException
*/
public void appendFileInfo(final byte [] k, final byte [] v)
throws IOException {
@@ -543,7 +543,7 @@ public class HFile {
private boolean checkKey(final byte [] key, final int offset, final int length)
throws IOException {
boolean dupKey = false;
-
+
if (key == null || length <= 0) {
throw new IOException("Key cannot be null or empty");
}
@@ -553,7 +553,7 @@ public class HFile {
}
if (this.lastKeyBuffer != null) {
int keyComp = this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset,
- this.lastKeyLength, key, offset, length);
+ this.lastKeyLength, key, offset, length);
if (keyComp > 0) {
throw new IOException("Added a key not lexically larger than" +
" previous key=" + Bytes.toString(key, offset, length) +
@@ -587,7 +587,7 @@ public class HFile {
finishBlock();
FixedFileTrailer trailer = new FixedFileTrailer();
-
+
// Write out the metadata blocks if any.
ArrayList<Long> metaOffsets = null;
ArrayList<Integer> metaDataSizes = null;
@@ -618,10 +618,10 @@ public class HFile {
// Now finish off the trailer.
trailer.dataIndexCount = blockKeys.size();
trailer.metaIndexCount = metaNames.size();
-
+
trailer.totalUncompressedBytes = totalBytes;
trailer.entryCount = entryCount;
-
+
trailer.compressionCodec = this.compressAlgo.ordinal();
trailer.serialize(outputStream);
@@ -690,7 +690,7 @@ public class HFile {
private BlockIndex metaIndex;
FixedFileTrailer trailer;
private volatile boolean fileInfoLoaded = false;
-
+
// Filled when we read in the trailer.
private Compression.Algorithm compressAlgo;
@@ -699,7 +699,7 @@ public class HFile {
// Stats read in when we load file info.
private int avgKeyLen = -1;
private int avgValueLen = -1;
-
+
// Used to ensure we seek correctly.
RawComparator<byte []> comparator;
@@ -710,7 +710,7 @@ public class HFile {
private final BlockCache cache;
public int cacheHits = 0;
public int blockLoads = 0;
-
+
// Whether file is from in-memory store
private boolean inMemory = false;
@@ -727,8 +727,8 @@ public class HFile {
this(null, -1, null, false);
}
- /**
- * Opens a HFile. You must load the file info before you can
+ /**
+ * Opens a HFile. You must load the file info before you can
* use it by calling {@link #loadFileInfo()}.
*
* @param fs filesystem to load from
@@ -743,8 +743,8 @@ public class HFile {
this.name = path.toString();
}
- /**
- * Opens a HFile. You must load the index before you can
+ /**
+ * Opens a HFile. You must load the index before you can
* use it by calling {@link #loadFileInfo()}.
*
* @param fsdis input stream. Caller is responsible for closing the passed
@@ -788,7 +788,7 @@ public class HFile {
public long length() {
return this.fileSize;
}
-
+
public boolean inMemory() {
return this.inMemory;
}
@@ -909,7 +909,7 @@ public class HFile {
} else {
blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block];
}
-
+
ByteBuffer buf = decompress(metaIndex.blockOffsets[block],
longToInt(blockSize), metaIndex.blockDataSizes[block], true);
byte [] magic = new byte[METABLOCKMAGIC.length];
@@ -1005,18 +1005,18 @@ public class HFile {
* @param offset
* @param compressedSize
* @param decompressedSize
- *
+ *
* @return
* @throws IOException
*/
private ByteBuffer decompress(final long offset, final int compressedSize,
- final int decompressedSize, final boolean pread)
+ final int decompressedSize, final boolean pread)
throws IOException {
Decompressor decompressor = null;
ByteBuffer buf = null;
try {
decompressor = this.compressAlgo.getDecompressor();
- // My guess is that the bounded range fis is needed to stop the
+ // My guess is that the bounded range fis is needed to stop the
// decompressor reading into next block -- IIRC, it just grabs a
// bunch of data w/o regard to whether decompressor is coming to end of a
// decompression.
@@ -1026,15 +1026,15 @@ public class HFile {
decompressor, 0);
buf = ByteBuffer.allocate(decompressedSize);
IOUtils.readFully(is, buf.array(), 0, buf.capacity());
- is.close();
+ is.close();
} finally {
if (null != decompressor) {
- this.compressAlgo.returnDecompressor(decompressor);
+ this.compressAlgo.returnDecompressor(decompressor);
}
}
return buf;
}
-
+
/**
* @return First key in the file. May be null if file has no entries.
*/
@@ -1076,7 +1076,7 @@ public class HFile {
return (this.blockIndex != null? this.blockIndex.heapSize(): 0) +
((this.metaIndex != null)? this.metaIndex.heapSize(): 0);
}
-
+
/**
* @return Midkey for this file. We work with block boundaries only so
* returned midkey is an approximation only.
@@ -1103,7 +1103,7 @@ public class HFile {
private final Reader reader;
private ByteBuffer block;
private int currBlock;
-
+
private final boolean cacheBlocks;
private final boolean pread;
@@ -1117,7 +1117,7 @@ public class HFile {
this.cacheBlocks = cacheBlocks;
this.pread = pread;
}
-
+
public KeyValue getKeyValue() {
if(this.block == null) {
return null;
@@ -1179,25 +1179,25 @@ public class HFile {
currValueLen = block.getInt();
return true;
}
-
+
public int seekTo(byte [] key) throws IOException {
return seekTo(key, 0, key.length);
}
-
+
public int seekTo(byte[] key, int offset, int length) throws IOException {
int b = reader.blockContainingKey(key, offset, length);
if (b < 0) return -1; // falls before the beginning of the file! :-(
// Avoid re-reading the same block (that'd be dumb).
loadBlock(b);
-
+
return blockSeek(key, offset, length, false);
}
/**
* Within a loaded block, seek looking for the first key
* that is smaller than (or equal to?) the key we are interested in.
- *
+ *
* A note on the seekBefore - if you have seekBefore = true, AND the
* first key in the block = key, then you'll get thrown exceptions.
* @param key to find
@@ -1245,7 +1245,7 @@ public class HFile {
public boolean seekBefore(byte [] key) throws IOException {
return seekBefore(key, 0, key.length);
}
-
+
public boolean seekBefore(byte[] key, int offset, int length)
throws IOException {
int b = reader.blockContainingKey(key, offset, length);
@@ -1304,7 +1304,7 @@ public class HFile {
blockFetches++;
return true;
}
-
+
private void loadBlock(int bloc) throws IOException {
if (block == null) {
block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
@@ -1327,7 +1327,7 @@ public class HFile {
return trailer.toString();
}
}
-
+
/*
* The RFile has a fixed trailer which contains offsets to other variable
* parts of the file. Also includes basic metadata on this file.
@@ -1347,14 +1347,14 @@ public class HFile {
int entryCount;
int compressionCodec;
int version = 1;
-
+
FixedFileTrailer() {
super();
}
static int trailerSize() {
// Keep this up to date...
- return
+ return
( Bytes.SIZEOF_INT * 5 ) +
( Bytes.SIZEOF_LONG * 4 ) +
TRAILERBLOCKMAGIC.length;
@@ -1386,7 +1386,7 @@ public class HFile {
metaIndexOffset = inputStream.readLong();
metaIndexCount = inputStream.readInt();
-
+
totalUncompressedBytes = inputStream.readLong();
entryCount = inputStream.readInt();
compressionCodec = inputStream.readInt();
@@ -1425,7 +1425,7 @@ public class HFile {
/* Needed doing lookup on blocks.
*/
final RawComparator<byte []> comparator;
-
+
/*
* Shutdown default constructor
*/
@@ -1453,7 +1453,7 @@ public class HFile {
/**
* Adds a new entry in the block index.
- *
+ *
* @param key Last key in the block
* @param offset file offset where the block is stored
* @param dataSize the uncompressed data size
@@ -1484,13 +1484,13 @@ public class HFile {
// the block with a firstKey < key. This means the value we want is potentially
// in the next block.
pos --; // in previous block.
-
+
return pos;
}
// wow, a perfect hit, how unlikely?
return pos;
}
-
+
/*
* @return File midkey. Inexact. Operates on block boundaries. Does
* not go into blocks.
@@ -1581,12 +1581,12 @@ public class HFile {
}
public long heapSize() {
- long heapsize = ClassSize.align(ClassSize.OBJECT +
+ long heapsize = ClassSize.align(ClassSize.OBJECT +
2 * Bytes.SIZEOF_INT + (3 + 1) * ClassSize.REFERENCE);
- //Calculating the size of blockKeys
+ //Calculating the size of blockKeys
if(blockKeys != null) {
//Adding array + references overhead
- heapsize += ClassSize.align(ClassSize.ARRAY +
+ heapsize += ClassSize.align(ClassSize.ARRAY +
blockKeys.length * ClassSize.REFERENCE);
//Adding bytes
for(byte [] bs : blockKeys) {
@@ -1594,17 +1594,17 @@ public class HFile {
}
}
if(blockOffsets != null) {
- heapsize += ClassSize.align(ClassSize.ARRAY +
+ heapsize += ClassSize.align(ClassSize.ARRAY +
blockOffsets.length * Bytes.SIZEOF_LONG);
}
if(blockDataSizes != null) {
- heapsize += ClassSize.align(ClassSize.ARRAY +
+ heapsize += ClassSize.align(ClassSize.ARRAY +
blockDataSizes.length * Bytes.SIZEOF_INT);
}
-
+
return ClassSize.align(heapsize);
}
-
+
}
/*
@@ -1631,7 +1631,7 @@ public class HFile {
/**
* Get names of supported compression algorithms. The names are acceptable by
* HFile.Writer.
- *
+ *
* @return Array of strings, each represents a supported compression
* algorithm. Currently, the following compression algorithms are
* supported.
@@ -1658,13 +1658,13 @@ public class HFile {
/**
* Returns all files belonging to the given region directory. Could return an
* empty list.
- *
+ *
* @param fs The file system reference.
* @param regionDir The region directory to scan.
* @return The list of files found.
* @throws IOException When scanning the files fails.
*/
- static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
+ static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
throws IOException {
List<Path> res = new ArrayList<Path>();
PathFilter dirFilter = new FSUtils.DirFilter(fs);
@@ -1679,7 +1679,7 @@ public class HFile {
}
return res;
}
-
+
public static void main(String []args) throws IOException {
try {
// create options
@@ -1725,7 +1725,7 @@ public class HFile {
Path regionDir = new Path(tableDir, Integer.toString(enc));
if (verbose) System.out.println("region dir -> " + regionDir);
List<Path> regionFiles = getStoreFiles(fs, regionDir);
- if (verbose) System.out.println("Number of region files found -> " +
+ if (verbose) System.out.println("Number of region files found -> " +
regionFiles.size());
if (verbose) {
int i = 1;
@@ -1742,7 +1742,7 @@ public class HFile {
System.err.println("ERROR, file doesnt exist: " + file);
continue;
}
- // create reader and load file info
+ // create reader and load file info
HFile.Reader reader = new HFile.Reader(fs, file, null, false);
Map<byte[],byte[]> fileInfo = reader.loadFileInfo();
// scan over file and read key/value's and check if requested
@@ -1760,9 +1760,9 @@ public class HFile {
// check if rows are in order
if (checkRow && pkv != null) {
if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
- System.err.println("WARNING, previous row is greater then" +
- " current row\n\tfilename -> " + file +
- "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
+ System.err.println("WARNING, previous row is greater then" +
+ " current row\n\tfilename -> " + file +
+ "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
"\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
}
}
@@ -1770,14 +1770,14 @@ public class HFile {
if (checkFamily) {
String fam = Bytes.toString(kv.getFamily());
if (!file.toString().contains(fam)) {
- System.err.println("WARNING, filename does not match kv family," +
- "\n\tfilename -> " + file +
+ System.err.println("WARNING, filename does not match kv family," +
+ "\n\tfilename -> " + file +
"\n\tkeyvalue -> " + Bytes.toStringBinary(kv.getKey()));
}
if (pkv != null && Bytes.compareTo(pkv.getFamily(), kv.getFamily()) != 0) {
System.err.println("WARNING, previous kv has different family" +
- " compared to current key\n\tfilename -> " + file +
- "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
+ " compared to current key\n\tfilename -> " + file +
+ "\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
"\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
}
}
@@ -1787,7 +1787,7 @@ public class HFile {
if (verbose || printKeyValue) {
System.out.println("Scanned kv count -> " + count);
}
- // print meta data
+ // print meta data
if (printMeta) {
System.out.println("Block index size as per heapsize: " + reader.indexSize());
System.out.println(reader.toString());
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java Fri May 7 19:26:45 2010
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.KeyValue;
/**
* A scanner allows you to position yourself within a HFile and
* scan through it. It allows you to reposition yourself as well.
- *
+ *
* <p>A scanner doesn't always have a key/value that it is pointing to
* when it is first created and before
* {@link #seekTo()}/{@link #seekTo(byte[])} are called.
@@ -40,7 +40,7 @@ public interface HFileScanner {
/**
* SeekTo or just before the passed <code>key</code>. Examine the return
* code to figure whether we found the key or not.
- * Consider the key stream of all the keys in the file,
+ * Consider the key stream of all the keys in the file,
* <code>k[0] .. k[n]</code>, where there are n keys in the file.
* @param key Key to find.
* @return -1, if key < k[0], no position;
@@ -53,7 +53,7 @@ public interface HFileScanner {
public int seekTo(byte[] key) throws IOException;
public int seekTo(byte[] key, int offset, int length) throws IOException;
/**
- * Consider the key stream of all the keys in the file,
+ * Consider the key stream of all the keys in the file,
* <code>k[0] .. k[n]</code>, where there are n keys in the file.
* @param key Key to find
* @return false if key <= k[0] or true with scanner in position 'i' such
@@ -87,7 +87,7 @@ public interface HFileScanner {
/**
* Gets a buffer view to the current value. You must call
* {@link #seekTo(byte[])} before this method.
- *
+ *
* @return byte buffer for the value. The limit is set to the value size, and
* the position is 0, the start of the buffer view.
*/
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Fri May 7 19:26:45 2010
@@ -40,35 +40,35 @@ import org.apache.hadoop.hbase.util.Clas
* memory-bound using an LRU eviction algorithm, and concurrent: backed by a
* {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
* constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
- *
+ *
* Contains three levels of block priority to allow for
* scan-resistance and in-memory families. A block is added with an inMemory
* flag if necessary, otherwise a block becomes a single access priority. Once
* a blocked is accessed again, it changes to multiple access. This is used
* to prevent scans from thrashing the cache, adding a least-frequently-used
* element to the eviction algorithm.<p>
- *
+ *
* Each priority is given its own chunk of the total cache to ensure
* fairness during eviction. Each priority will retain close to its maximum
* size, however, if any priority is not using its entire chunk the others
* are able to grow beyond their chunk size.<p>
- *
+ *
* Instantiated at a minimum with the total size and average block size.
- * All sizes are in bytes. The block size is not especially important as this
+ * All sizes are in bytes. The block size is not especially important as this
* cache is fully dynamic in its sizing of blocks. It is only used for
* pre-allocating data structures and in initial heap estimation of the map.<p>
- *
+ *
* The detailed constructor defines the sizes for the three priorities (they
* should total to the maximum size defined). It also sets the levels that
* trigger and control the eviction thread.<p>
- *
+ *
* The acceptable size is the cache size level which triggers the eviction
* process to start. It evicts enough blocks to get the size below the
* minimum size specified.<p>
- *
+ *
* Eviction happens in a separate thread and involves a single full-scan
* of the map. It determines how many bytes must be freed to reach the minimum
- * size, and then while scanning determines the fewest least-recently-used
+ * size, and then while scanning determines the fewest least-recently-used
* blocks necessary from each of the three priorities (would be 3 times bytes
* to free). It then uses the priority chunk sizes to evict fairly according
* to the relative sizes and usage.
@@ -76,81 +76,81 @@ import org.apache.hadoop.hbase.util.Clas
public class LruBlockCache implements BlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(LruBlockCache.class);
-
+
/** Default Configuration Parameters*/
-
+
/** Backing Concurrent Map Configuration */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
-
+
/** Eviction thresholds */
static final float DEFAULT_MIN_FACTOR = 0.75f;
static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
-
+
/** Priority buckets */
static final float DEFAULT_SINGLE_FACTOR = 0.25f;
static final float DEFAULT_MULTI_FACTOR = 0.50f;
static final float DEFAULT_MEMORY_FACTOR = 0.25f;
-
+
/** Statistics thread */
static final int statThreadPeriod = 60;
-
+
/** Concurrent map (the cache) */
private final ConcurrentHashMap<String,CachedBlock> map;
-
+
/** Eviction lock (locked when eviction in process) */
private final ReentrantLock evictionLock = new ReentrantLock(true);
-
+
/** Volatile boolean to track if we are in an eviction process or not */
private volatile boolean evictionInProgress = false;
-
+
/** Eviction thread */
private final EvictionThread evictionThread;
-
+
/** Statistics thread schedule pool (for heavy debugging, could remove) */
private final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1);
-
+
/** Current size of cache */
private final AtomicLong size;
-
+
/** Current number of cached elements */
private final AtomicLong elements;
-
+
/** Cache access count (sequential ID) */
private final AtomicLong count;
-
+
/** Cache statistics */
private final CacheStats stats;
-
+
/** Maximum allowable size of cache (block put if size > max, evict) */
private long maxSize;
/** Approximate block size */
private long blockSize;
-
+
/** Acceptable size of cache (no evictions if size < acceptable) */
private float acceptableFactor;
-
+
/** Minimum threshold of cache (when evicting, evict until size < min) */
private float minFactor;
-
+
/** Single access bucket size */
private float singleFactor;
-
+
/** Multiple access bucket size */
private float multiFactor;
-
+
/** In-memory bucket size */
private float memoryFactor;
-
+
/** Overhead of the structure itself */
private long overhead;
-
+
/**
* Default constructor. Specify maximum size and expected average block
* size (approximation is fine).
- *
+ *
* <p>All other factors will be calculated based on defaults specified in
* this class.
* @param maxSize maximum size of cache, in bytes
@@ -159,7 +159,7 @@ public class LruBlockCache implements Bl
public LruBlockCache(long maxSize, long blockSize) {
this(maxSize, blockSize, true);
}
-
+
/**
* Constructor used for testing. Allows disabling of the eviction thread.
*/
@@ -171,7 +171,7 @@ public class LruBlockCache implements Bl
DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR,
DEFAULT_MEMORY_FACTOR);
}
-
+
/**
* Configurable constructor. Use this constructor if not using defaults.
* @param maxSize maximum size of this cache, in bytes
@@ -191,7 +191,7 @@ public class LruBlockCache implements Bl
float minFactor, float acceptableFactor,
float singleFactor, float multiFactor, float memoryFactor) {
if(singleFactor + multiFactor + memoryFactor != 1) {
- throw new IllegalArgumentException("Single, multi, and memory factors " +
+ throw new IllegalArgumentException("Single, multi, and memory factors " +
" should total 1.0");
}
if(minFactor >= acceptableFactor) {
@@ -223,16 +223,16 @@ public class LruBlockCache implements Bl
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
}
-
+
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
if(this.size.get() > acceptableSize() && !evictionInProgress) {
runEviction();
}
}
-
+
// BlockCache implementation
-
+
/**
* Cache the block with the specified name and buffer.
* <p>
@@ -295,7 +295,7 @@ public class LruBlockCache implements Bl
stats.evicted();
return block.heapSize();
}
-
+
/**
* Multi-threaded call to run the eviction process.
*/
@@ -306,7 +306,7 @@ public class LruBlockCache implements Bl
evictionThread.evict();
}
}
-
+
/**
* Eviction method.
*/
@@ -314,25 +314,25 @@ public class LruBlockCache implements Bl
// Ensure only one eviction at a time
if(!evictionLock.tryLock()) return;
-
+
try {
evictionInProgress = true;
-
+
long bytesToFree = size.get() - minSize();
-
- LOG.debug("Block cache LRU eviction started. Attempting to free " +
+
+ LOG.debug("Block cache LRU eviction started. Attempting to free " +
bytesToFree + " bytes");
-
+
if(bytesToFree <= 0) return;
-
+
// Instantiate priority buckets
- BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
+ BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
singleSize());
- BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
+ BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
multiSize());
- BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
+ BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
memorySize());
-
+
// Scan entire map putting into appropriate buckets
for(CachedBlock cachedBlock : map.values()) {
switch(cachedBlock.getPriority()) {
@@ -350,17 +350,17 @@ public class LruBlockCache implements Bl
}
}
}
-
- PriorityQueue<BlockBucket> bucketQueue =
+
+ PriorityQueue<BlockBucket> bucketQueue =
new PriorityQueue<BlockBucket>(3);
-
+
bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);
-
+
int remainingBuckets = 3;
long bytesFreed = 0;
-
+
BlockBucket bucket;
while((bucket = bucketQueue.poll()) != null) {
long overflow = bucket.overflow();
@@ -368,28 +368,28 @@ public class LruBlockCache implements Bl
long bucketBytesToFree = Math.min(overflow,
(bytesToFree - bytesFreed) / remainingBuckets);
bytesFreed += bucket.free(bucketBytesToFree);
- }
+ }
remainingBuckets--;
}
-
+
float singleMB = ((float)bucketSingle.totalSize())/((float)(1024*1024));
float multiMB = ((float)bucketMulti.totalSize())/((float)(1024*1024));
float memoryMB = ((float)bucketMemory.totalSize())/((float)(1024*1024));
-
- LOG.debug("Block cache LRU eviction completed. " +
+
+ LOG.debug("Block cache LRU eviction completed. " +
"Freed " + bytesFreed + " bytes. " +
"Priority Sizes: " +
"Single=" + singleMB + "MB (" + bucketSingle.totalSize() + "), " +
"Multi=" + multiMB + "MB (" + bucketMulti.totalSize() + ")," +
"Memory=" + memoryMB + "MB (" + bucketMemory.totalSize() + ")");
-
+
} finally {
stats.evict();
evictionInProgress = false;
evictionLock.unlock();
}
}
-
+
/**
* Used to group blocks into priority buckets. There will be a BlockBucket
* for each priority (single, multi, memory). Once bucketed, the eviction
@@ -401,18 +401,18 @@ public class LruBlockCache implements Bl
private CachedBlockQueue queue;
private long totalSize = 0;
private long bucketSize;
-
+
public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
this.bucketSize = bucketSize;
queue = new CachedBlockQueue(bytesToFree, blockSize);
totalSize = 0;
}
-
+
public void add(CachedBlock block) {
totalSize += block.heapSize();
queue.add(block);
}
-
+
public long free(long toFree) {
CachedBlock [] blocks = queue.get();
long freedBytes = 0;
@@ -424,21 +424,21 @@ public class LruBlockCache implements Bl
}
return freedBytes;
}
-
+
public long overflow() {
return totalSize - bucketSize;
}
-
+
public long totalSize() {
return totalSize;
}
-
+
public int compareTo(BlockBucket that) {
if(this.overflow() == that.overflow()) return 0;
return this.overflow() > that.overflow() ? 1 : -1;
}
}
-
+
/**
* Get the maximum size of this cache.
* @return max size in bytes
@@ -446,7 +446,7 @@ public class LruBlockCache implements Bl
public long getMaxSize() {
return this.maxSize;
}
-
+
/**
* Get the current size of this cache.
* @return current size in bytes
@@ -454,7 +454,7 @@ public class LruBlockCache implements Bl
public long getCurrentSize() {
return this.size.get();
}
-
+
/**
* Get the current size of this cache.
* @return current size in bytes
@@ -462,7 +462,7 @@ public class LruBlockCache implements Bl
public long getFreeSize() {
return getMaxSize() - getCurrentSize();
}
-
+
/**
* Get the size of this cache (number of cached blocks)
* @return number of cached blocks
@@ -470,14 +470,14 @@ public class LruBlockCache implements Bl
public long size() {
return this.elements.get();
}
-
+
/**
* Get the number of eviction runs that have occurred
*/
public long getEvictionCount() {
return this.stats.getEvictionCount();
}
-
+
/**
* Get the number of blocks that have been evicted during the lifetime
* of this cache.
@@ -485,22 +485,22 @@ public class LruBlockCache implements Bl
public long getEvictedCount() {
return this.stats.getEvictedCount();
}
-
+
/*
* Eviction thread. Sits in waiting state until an eviction is triggered
* when the cache size grows above the acceptable level.<p>
- *
+ *
* Thread is triggered into action by {@link LruBlockCache#runEviction()}
*/
private static class EvictionThread extends Thread {
private WeakReference<LruBlockCache> cache;
-
+
public EvictionThread(LruBlockCache cache) {
super("LruBlockCache.EvictionThread");
setDaemon(true);
this.cache = new WeakReference<LruBlockCache>(cache);
}
-
+
@Override
public void run() {
while(true) {
@@ -520,7 +520,7 @@ public class LruBlockCache implements Bl
}
}
}
-
+
/*
* Statistics thread. Periodically prints the cache statistics to the log.
*/
@@ -537,7 +537,7 @@ public class LruBlockCache implements Bl
lru.logStats();
}
}
-
+
public void logStats() {
// Log size
long totalSize = heapSize();
@@ -545,7 +545,7 @@ public class LruBlockCache implements Bl
float sizeMB = ((float)totalSize)/((float)(1024*1024));
float freeMB = ((float)freeSize)/((float)(1024*1024));
float maxMB = ((float)maxSize)/((float)(1024*1024));
- LruBlockCache.LOG.debug("Cache Stats: Sizes: " +
+ LruBlockCache.LOG.debug("Cache Stats: Sizes: " +
"Total=" + sizeMB + "MB (" + totalSize + "), " +
"Free=" + freeMB + "MB (" + freeSize + "), " +
"Max=" + maxMB + "MB (" + maxSize +")" +
@@ -561,46 +561,46 @@ public class LruBlockCache implements Bl
"Miss Ratio=" + stats.getMissRatio()*100 + "%, " +
"Evicted/Run=" + stats.evictedPerEviction());
}
-
+
/**
* Get counter statistics for this cache.
- *
+ *
* <p>Includes: total accesses, hits, misses, evicted blocks, and runs
* of the eviction processes.
*/
public CacheStats getStats() {
return this.stats;
}
-
+
public static class CacheStats {
private final AtomicLong accessCount = new AtomicLong(0);
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong evictionCount = new AtomicLong(0);
private final AtomicLong evictedCount = new AtomicLong(0);
-
+
public void miss() {
missCount.incrementAndGet();
accessCount.incrementAndGet();
}
-
+
public void hit() {
hitCount.incrementAndGet();
accessCount.incrementAndGet();
}
-
+
public void evict() {
evictionCount.incrementAndGet();
}
-
+
public void evicted() {
evictedCount.incrementAndGet();
}
-
+
public long getRequestCount() {
return accessCount.get();
}
-
+
public long getMissCount() {
return missCount.get();
}
@@ -608,48 +608,48 @@ public class LruBlockCache implements Bl
public long getHitCount() {
return hitCount.get();
}
-
+
public long getEvictionCount() {
return evictionCount.get();
}
-
+
public long getEvictedCount() {
return evictedCount.get();
}
-
+
public double getHitRatio() {
return ((float)getHitCount()/(float)getRequestCount());
}
-
+
public double getMissRatio() {
return ((float)getMissCount()/(float)getRequestCount());
}
-
+
public double evictedPerEviction() {
return (float)((float)getEvictedCount()/(float)getEvictionCount());
}
}
-
+
public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
- (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
+ (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
+ ClassSize.OBJECT);
-
+
// HeapSize implementation
public long heapSize() {
return getCurrentSize();
}
-
+
public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
// FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
- ((long)Math.ceil(maxSize*1.2/blockSize)
+ ((long)Math.ceil(maxSize*1.2/blockSize)
* ClassSize.CONCURRENT_HASHMAP_ENTRY) +
(concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
}
-
+
// Simple calculators of sizes given factors and maxSize
-
+
private long acceptableSize() {
return (long)Math.floor(this.maxSize * this.acceptableFactor);
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java Fri May 7 19:26:45 2010
@@ -37,19 +37,19 @@ public class SimpleBlockCache implements
this.blockId = blockId;
}
}
- private Map<String,Ref> cache =
+ private Map<String,Ref> cache =
new HashMap<String,Ref>();
private ReferenceQueue q = new ReferenceQueue();
public int dumps = 0;
-
+
/**
* Constructor
*/
public SimpleBlockCache() {
super();
}
-
+
void processQueue() {
Ref r;
while ( (r = (Ref)q.poll()) != null) {
@@ -78,7 +78,7 @@ public class SimpleBlockCache implements
cache.put(blockName, new Ref(blockName, buf, q));
}
- public synchronized void cacheBlock(String blockName, ByteBuffer buf,
+ public synchronized void cacheBlock(String blockName, ByteBuffer buf,
boolean inMemory) {
cache.put(blockName, new Ref(blockName, buf, q));
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri May 7 19:26:45 2010
@@ -55,14 +55,14 @@ import java.util.concurrent.atomic.Atomi
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
- *
+ *
* <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
* moved into this package so can access package-private methods.
- *
+ *
* @see HBaseServer
*/
public class HBaseClient {
-
+
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final Hashtable<ConnectionId, Connection> connections =
@@ -82,14 +82,14 @@ public class HBaseClient {
protected final SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
-
+
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
final static int PING_CALL_ID = -1;
-
+
/**
* set the ping interval value in configuration
- *
+ *
* @param conf Configuration
* @param pingInterval the ping interval
*/
@@ -101,14 +101,14 @@ public class HBaseClient {
/**
* Get the ping interval from configuration;
* If not set in the configuration, return the default value.
- *
+ *
* @param conf Configuration
* @return the ping interval
*/
static int getPingInterval(Configuration conf) {
return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
}
-
+
/**
* Increment this client's reference count
*
@@ -116,7 +116,7 @@ public class HBaseClient {
synchronized void incCount() {
refCount++;
}
-
+
/**
* Decrement this client's reference count
*
@@ -124,10 +124,10 @@ public class HBaseClient {
synchronized void decCount() {
refCount--;
}
-
+
/**
* Return if this client has no reference
- *
+ *
* @return true if this client has no reference; false otherwise
*/
synchronized boolean isZeroReference() {
@@ -158,17 +158,17 @@ public class HBaseClient {
/** Set the exception when there is an error.
* Notify the caller the call is done.
- *
+ *
* @param error exception thrown by the call; either local or remote
*/
public synchronized void setException(IOException error) {
this.error = error;
callComplete();
}
-
- /** Set the return value when there is no error.
+
+ /** Set the return value when there is no error.
* Notify the caller the call is done.
- *
+ *
* @param value return value of the call.
*/
public synchronized void setValue(Writable value) {
@@ -185,7 +185,7 @@ public class HBaseClient {
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
-
+
// currently active calls
private final Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
@@ -195,10 +195,10 @@ public class HBaseClient {
public Connection(InetSocketAddress address) throws IOException {
this(new ConnectionId(address, null));
}
-
+
public Connection(ConnectionId remoteId) throws IOException {
if (remoteId.getAddress().isUnresolved()) {
- throw new UnknownHostException("unknown host: " +
+ throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
}
this.remoteId = remoteId;
@@ -249,7 +249,7 @@ public class HBaseClient {
}
sendPing();
}
-
+
/** Read a byte from the stream.
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
@@ -269,7 +269,7 @@ public class HBaseClient {
/** Read bytes into a buffer starting from offset <code>off</code>
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
- *
+ *
* @return the total number of bytes read; -1 if the connection is closed.
*/
@Override
@@ -283,7 +283,7 @@ public class HBaseClient {
} while (true);
}
}
-
+
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
@@ -293,7 +293,7 @@ public class HBaseClient {
if (socket != null || shouldCloseConnection.get()) {
return;
}
-
+
short ioFailures = 0;
short timeoutFailures = 0;
try {
@@ -371,8 +371,8 @@ public class HBaseClient {
try {
Thread.sleep(failureSleep);
} catch (InterruptedException ignored) {}
-
- LOG.info("Retrying connect to server: " + remoteId.getAddress() +
+
+ LOG.info("Retrying connect to server: " + remoteId.getAddress() +
" after sleeping " + failureSleep + "ms. Already tried " + curRetries +
" time(s).");
}
@@ -385,17 +385,17 @@ public class HBaseClient {
out.write(HBaseServer.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
DataOutputBuffer buf = new DataOutputBuffer();
- ObjectWritable.writeObject(buf, remoteId.getTicket(),
+ ObjectWritable.writeObject(buf, remoteId.getTicket(),
UserGroupInformation.class, conf);
int bufLen = buf.getLength();
out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen);
}
-
+
/* wait till someone signals us to start reading RPC response or
- * it is idle too long, it is marked as to be closed,
+ * it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
- *
+ *
* Return true if it is time to read a response; false otherwise.
*/
@SuppressWarnings({"ThrowableInstanceNeverThrown"})
@@ -409,7 +409,7 @@ public class HBaseClient {
} catch (InterruptedException ignored) {}
}
}
-
+
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true;
} else if (shouldCloseConnection.get()) {
@@ -417,7 +417,7 @@ public class HBaseClient {
} else if (calls.isEmpty()) { // idle connection closed or stopped
markClosed(null);
return false;
- } else { // get stopped but there are still pending requests
+ } else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(
new InterruptedException()));
return false;
@@ -428,7 +428,7 @@ public class HBaseClient {
return remoteId.getAddress();
}
- /* Send a ping to the server if the time elapsed
+ /* Send a ping to the server if the time elapsed
* since last I/O activity is equal to or greater than the ping interval
*/
protected synchronized void sendPing() throws IOException {
@@ -446,7 +446,7 @@ public class HBaseClient {
@Override
public void run() {
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": starting, having connections "
+ LOG.debug(getName() + ": starting, having connections "
+ connections.size());
try {
@@ -459,7 +459,7 @@ public class HBaseClient {
}
close();
-
+
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections "
+ connections.size());
@@ -480,7 +480,7 @@ public class HBaseClient {
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
-
+
//for serializing the
//data to be written
d = new DataOutputBuffer();
@@ -499,7 +499,7 @@ public class HBaseClient {
// close early
IOUtils.closeStream(d);
}
- }
+ }
/* Receive a response.
* Because only one receiver, so no synchronization on in.
@@ -509,7 +509,7 @@ public class HBaseClient {
return;
}
touch();
-
+
try {
int id = in.readInt(); // try to read an id
@@ -533,14 +533,14 @@ public class HBaseClient {
markClosed(e);
}
}
-
+
private synchronized void markClosed(IOException e) {
if (shouldCloseConnection.compareAndSet(false, true)) {
closeException = e;
notifyAll();
}
}
-
+
/** Close the connection. */
private synchronized void close() {
if (!shouldCloseConnection.get()) {
@@ -583,14 +583,14 @@ public class HBaseClient {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": closed");
}
-
+
/* Cleanup all calls and mark them as done */
private void cleanupCalls() {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
while (itor.hasNext()) {
- Call c = itor.next().getValue();
+ Call c = itor.next().getValue();
c.setException(closeException); // local exception
- itor.remove();
+ itor.remove();
}
}
}
@@ -599,7 +599,7 @@ public class HBaseClient {
private class ParallelCall extends Call {
private final ParallelResults results;
protected final int index;
-
+
public ParallelCall(Writable param, ParallelResults results, int index) {
super(param);
this.results = results;
@@ -643,10 +643,10 @@ public class HBaseClient {
* @param conf configuration
* @param factory socket factory
*/
- public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
+ public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
SocketFactory factory) {
this.valueClass = valueClass;
- this.maxIdleTime =
+ this.maxIdleTime =
conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.failureSleep = conf.getInt("hbase.client.pause", 2000);
@@ -668,7 +668,7 @@ public class HBaseClient {
public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
}
-
+
/** Return the socket factory of this client
*
* @return this client's socket factory
@@ -687,14 +687,14 @@ public class HBaseClient {
if (!running.compareAndSet(true, false)) {
return;
}
-
+
// wake up all connections
synchronized (connections) {
for (Connection conn : connections.values()) {
conn.interrupt();
}
}
-
+
// wait until all connections are closed
while (!connections.isEmpty()) {
try {
@@ -706,7 +706,7 @@ public class HBaseClient {
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
- * network problems or if the remote code threw an exception.
+ * network problems or if the remote code threw an exception.
* @param param writable parameter
* @param address network address
* @return Writable
@@ -716,9 +716,9 @@ public class HBaseClient {
throws IOException {
return call(param, address, null);
}
-
- public Writable call(Writable param, InetSocketAddress addr,
- UserGroupInformation ticket)
+
+ public Writable call(Writable param, InetSocketAddress addr,
+ UserGroupInformation ticket)
throws IOException {
Call call = new Call(param);
Connection connection = getConnection(addr, ticket, call);
@@ -755,11 +755,11 @@ public class HBaseClient {
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
- * The new exception provides the stack trace of the place where
+ * The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information.
- * If the exception is ConnectException or SocketTimeoutException,
+ * If the exception is ConnectException or SocketTimeoutException,
* return a new one of the same type; Otherwise return an IOException.
- *
+ *
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
@@ -787,7 +787,7 @@ public class HBaseClient {
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
- * contains nulls for calls that timed out or errored.
+ * contains nulls for calls that timed out or errored.
* @param params writable parameters
* @param addresses socket addresses
* @return Writable[]
@@ -808,7 +808,7 @@ public class HBaseClient {
connection.sendParam(call); // send each parameter
} catch (IOException e) {
// log errors
- LOG.info("Calling "+addresses[i]+" caught: " +
+ LOG.info("Calling "+addresses[i]+" caught: " +
e.getMessage(),e);
results.size--; // wait for one fewer result
}
@@ -825,7 +825,7 @@ public class HBaseClient {
/* Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
- private Connection getConnection(InetSocketAddress addr,
+ private Connection getConnection(InetSocketAddress addr,
UserGroupInformation ticket,
Call call)
throws IOException {
@@ -834,7 +834,7 @@ public class HBaseClient {
throw new IOException("The client is stopped");
}
Connection connection;
- /* we could avoid this allocation for each RPC by having a
+ /* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
@@ -848,7 +848,7 @@ public class HBaseClient {
}
}
} while (!connection.addCall(call));
-
+
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
@@ -864,19 +864,19 @@ public class HBaseClient {
private static class ConnectionId {
final InetSocketAddress address;
final UserGroupInformation ticket;
-
+
ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
this.address = address;
this.ticket = ticket;
}
-
+
InetSocketAddress getAddress() {
return address;
}
UserGroupInformation getTicket() {
return ticket;
}
-
+
@Override
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
@@ -886,10 +886,10 @@ public class HBaseClient {
}
return false;
}
-
+
@Override
public int hashCode() {
return address.hashCode() ^ System.identityHashCode(ticket);
}
- }
+ }
}
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Fri May 7 19:26:45 2010
@@ -53,7 +53,7 @@ import java.util.Map;
* optimizations like using our own version of ObjectWritable. Class has been
* renamed to avoid confusing it w/ hadoop versions.
* <p>
- *
+ *
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
* be one of:
@@ -167,9 +167,9 @@ public class HBaseRPC {
protected ClientCache() {}
/**
- * Construct & cache an IPC client with the user-provided SocketFactory
+ * Construct & cache an IPC client with the user-provided SocketFactory
* if no cached client exists.
- *
+ *
* @param conf Configuration
* @param factory socket factory
* @return an IPC client
@@ -193,9 +193,9 @@ public class HBaseRPC {
}
/**
- * Construct & cache an IPC client with the default SocketFactory
+ * Construct & cache an IPC client with the default SocketFactory
* if no cached client exists.
- *
+ *
* @param conf Configuration
* @return an IPC client
*/
@@ -204,7 +204,7 @@ public class HBaseRPC {
}
/**
- * Stop a RPC client connection
+ * Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
* @param client client to stop
*/
@@ -222,7 +222,7 @@ public class HBaseRPC {
}
protected final static ClientCache CLIENTS = new ClientCache();
-
+
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
@@ -235,7 +235,7 @@ public class HBaseRPC {
* @param conf configuration
* @param factory socket factory
*/
- public Invoker(InetSocketAddress address, UserGroupInformation ticket,
+ public Invoker(InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory) {
this.address = address;
this.ticket = ticket;
@@ -257,8 +257,8 @@ public class HBaseRPC {
}
return value.get();
}
-
- /* close the IPC client that's responsible for this invoker's RPCs */
+
+ /* close the IPC client that's responsible for this invoker's RPCs */
synchronized protected void close() {
if (!isClosed) {
isClosed = true;
@@ -275,7 +275,7 @@ public class HBaseRPC {
private String interfaceName;
private long clientVersion;
private long serverVersion;
-
+
/**
* Create a version mismatch exception
* @param interfaceName the name of the protocol mismatch
@@ -290,23 +290,23 @@ public class HBaseRPC {
this.clientVersion = clientVersion;
this.serverVersion = serverVersion;
}
-
+
/**
* Get the interface name
- * @return the java class name
+ * @return the java class name
* (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
*/
public String getInterfaceName() {
return interfaceName;
}
-
+
/**
* @return the client's preferred version
*/
public long getClientVersion() {
return clientVersion;
}
-
+
/**
* @return the server's agreed to version.
*/
@@ -314,7 +314,7 @@ public class HBaseRPC {
return serverVersion;
}
}
-
+
/**
* @param protocol protocol interface
* @param clientVersion which client version we expect
@@ -383,7 +383,7 @@ public class HBaseRPC {
SocketFactory factory) throws IOException {
return getProxy(protocol, clientVersion, addr, null, conf, factory);
}
-
+
/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
@@ -400,23 +400,23 @@ public class HBaseRPC {
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory)
- throws IOException {
+ throws IOException {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
return proxy;
}
- throw new VersionMismatch(protocol.getName(), clientVersion,
+ throw new VersionMismatch(protocol.getName(), clientVersion,
serverVersion);
}
/**
* Construct a client-side proxy object with the default SocketFactory
- *
+ *
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
@@ -462,7 +462,7 @@ public class HBaseRPC {
HBaseClient client = CLIENTS.getClient(conf);
try {
Writable[] wrappedValues = client.call(invocations, addrs);
-
+
if (method.getReturnType() == Void.TYPE) {
return null;
}
@@ -472,7 +472,7 @@ public class HBaseRPC {
for (int i = 0; i < values.length; i++)
if (wrappedValues[i] != null)
values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
-
+
return values;
} finally {
CLIENTS.stopClient(client);
@@ -490,7 +490,7 @@ public class HBaseRPC {
* @return Server
* @throws IOException e
*/
- public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
+ public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
throws IOException {
return getServer(instance, bindAddress, port, 1, false, conf);
}
@@ -510,7 +510,7 @@ public class HBaseRPC {
*/
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
- final boolean verbose, Configuration conf)
+ final boolean verbose, Configuration conf)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
}
@@ -529,11 +529,11 @@ public class HBaseRPC {
* @param port the port to listen for connections on
* @throws IOException e
*/
- public Server(Object instance, Configuration conf, String bindAddress, int port)
+ public Server(Object instance, Configuration conf, String bindAddress, int port)
throws IOException {
this(instance, conf, bindAddress, port, 1, false);
}
-
+
private static String classNameBase(String className) {
String[] names = className.split("\\.", -1);
if (names == null || names.length == 0) {
@@ -541,7 +541,7 @@ public class HBaseRPC {
}
return names[names.length-1];
}
-
+
/** Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java Fri May 7 19:26:45 2010
@@ -29,7 +29,7 @@ import org.apache.hadoop.ipc.VersionedPr
public interface HBaseRPCProtocolVersion extends VersionedProtocol {
/**
* Interface version.
- *
+ *
* HMasterInterface version history:
* <ul>
* <li>Version was incremented to 2 when we brought the hadoop RPC local to
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCStatistics.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCStatistics.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCStatistics.java Fri May 7 19:26:45 2010
@@ -38,7 +38,7 @@ public class HBaseRPCStatistics extends
String hostName, String port) {
super(registry, "HBaseRPCStatistics");
- String name = String.format("RPCStatistics-%s",
+ String name = String.format("RPCStatistics-%s",
(port != null ? port : "unknown"));
mbeanName = MBeanUtil.registerMBean("HBase", name, this);
Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=942186&r1=942185&r2=942186&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Fri May 7 19:26:45 2010
@@ -30,7 +30,7 @@ import org.apache.hadoop.metrics.util.Me
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/**
- *
+ *
* This class is for maintaining the various RPC statistics
* and publishing them through the metrics interfaces.
* This also registers the JMX MBean for RPC.
@@ -45,22 +45,22 @@ public class HBaseRpcMetrics implements
private MetricsRecord metricsRecord;
private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
private final HBaseRPCStatistics rpcStatistics;
-
+
public HBaseRpcMetrics(String hostName, String port) {
MetricsContext context = MetricsUtil.getContext("rpc");
metricsRecord = MetricsUtil.createRecord(context, "metrics");
metricsRecord.setTag("port", port);
- LOG.info("Initializing RPC Metrics with hostName="
+ LOG.info("Initializing RPC Metrics with hostName="
+ hostName + ", port=" + port);
context.registerUpdater(this);
-
+
rpcStatistics = new HBaseRPCStatistics(this.registry, hostName, port);
}
-
-
+
+
/**
* The metrics variables are public:
* - they can be set directly by calling their set/inc methods