You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/01/09 03:44:42 UTC
[38/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master
branch (1.7.0-SNAPSHOT)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java
index 69902a4..5c35f65 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java
@@ -22,9 +22,9 @@ import org.apache.accumulo.core.data.Range;
public interface KeyFunctor {
/**
* Implementations should return null if a range can not be converted to a bloom key.
- *
+ *
*/
org.apache.hadoop.util.bloom.Key transform(Range range);
-
+
org.apache.hadoop.util.bloom.Key transform(Key key);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java
index 20eb26c..b46f593 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java
@@ -22,18 +22,18 @@ import org.apache.accumulo.core.data.Range;
import org.apache.hadoop.util.bloom.Key;
public class RowFunctor implements KeyFunctor {
-
+
@Override
public Key transform(org.apache.accumulo.core.data.Key acuKey) {
byte keyData[];
-
+
ByteSequence row = acuKey.getRowData();
keyData = new byte[row.length()];
System.arraycopy(row.getBackingArray(), 0, keyData, 0, row.length());
-
+
return new Key(keyData, 1.0);
}
-
+
@Override
public Key transform(Range range) {
if (isRangeInBloomFilter(range, PartialKey.ROW)) {
@@ -41,16 +41,16 @@ public class RowFunctor implements KeyFunctor {
}
return null;
}
-
+
static boolean isRangeInBloomFilter(Range range, PartialKey keyDepth) {
-
+
if (range.getStartKey() == null || range.getEndKey() == null) {
return false;
}
-
+
if (range.getStartKey().equals(range.getEndKey(), keyDepth))
return true;
-
+
// include everything but the deleted flag in the comparison...
return range.getStartKey().followingKey(keyDepth).equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME) && !range.isEndKeyInclusive();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index 267f805..fb2762f 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -43,66 +43,66 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
public class MapFileOperations extends FileOperations {
-
+
public static class RangeIterator implements FileSKVIterator {
-
+
SortedKeyValueIterator<Key,Value> reader;
private Range range;
private boolean hasTop;
-
+
public RangeIterator(SortedKeyValueIterator<Key,Value> reader) {
this.reader = reader;
}
-
+
@Override
public void close() throws IOException {
((FileSKVIterator) reader).close();
}
-
+
@Override
public Key getFirstKey() throws IOException {
return ((FileSKVIterator) reader).getFirstKey();
}
-
+
@Override
public Key getLastKey() throws IOException {
return ((FileSKVIterator) reader).getLastKey();
}
-
+
@Override
public DataInputStream getMetaStore(String name) throws IOException {
return ((FileSKVIterator) reader).getMetaStore(name);
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new RangeIterator(reader.deepCopy(env));
}
-
+
@Override
public Key getTopKey() {
if (!hasTop)
throw new IllegalStateException();
return reader.getTopKey();
}
-
+
@Override
public Value getTopValue() {
if (!hasTop)
throw new IllegalStateException();
return reader.getTopValue();
}
-
+
@Override
public boolean hasTop() {
return hasTop;
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void next() throws IOException {
if (!hasTop)
@@ -110,87 +110,87 @@ public class MapFileOperations extends FileOperations {
reader.next();
hasTop = reader.hasTop() && !range.afterEndKey(reader.getTopKey());
}
-
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
reader.seek(range, columnFamilies, inclusive);
this.range = range;
-
+
hasTop = reader.hasTop() && !range.afterEndKey(reader.getTopKey());
-
+
while (hasTop() && range.beforeStartKey(getTopKey())) {
next();
}
}
-
+
@Override
public void closeDeepCopies() throws IOException {
((FileSKVIterator) reader).closeDeepCopies();
}
-
+
@Override
public void setInterruptFlag(AtomicBoolean flag) {
((FileSKVIterator) reader).setInterruptFlag(flag);
}
}
-
+
@Override
public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
FileSKVIterator iter = new RangeIterator(new MapFileIterator(acuconf, fs, file, conf));
-
+
if (seekToBeginning)
iter.seek(new Range(new Key(), null), new ArrayList<ByteSequence>(), false);
-
+
return iter;
}
-
+
@Override
public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-
+
throw new UnsupportedOperationException();
}
-
+
@Override
public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
return new SequenceFileIterator(MapFileUtil.openIndex(conf, fs, new Path(file)), false);
}
-
+
@Override
public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
return fs.getFileStatus(new Path(file + "/" + MapFile.DATA_FILE_NAME)).getLen();
}
-
+
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
AccumuloConfiguration tableConf) throws IOException {
MapFileIterator mfIter = new MapFileIterator(tableConf, fs, file, conf);
-
+
FileSKVIterator iter = new RangeIterator(mfIter);
-
+
iter.seek(range, columnFamilies, inclusive);
-
+
return iter;
}
-
+
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-
+
return openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf);
}
-
+
@Override
public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
BlockCache dataCache, BlockCache indexCache) throws IOException {
-
+
return openReader(file, seekToBeginning, fs, conf, acuconf);
}
-
+
@Override
public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
throws IOException {
-
+
return openIndex(file, fs, conf, acuconf);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
index 41b00d9..1373eac 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
@@ -36,7 +36,7 @@ public class MapFileUtil {
throw e;
}
}
-
+
@SuppressWarnings("deprecation")
public static SequenceFile.Reader openIndex(Configuration conf, FileSystem fs, Path mapFile) throws IOException {
Path indexPath = new Path(mapFile, MapFile.INDEX_FILE_NAME);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
index 2156a67..1ed9aca 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
@@ -27,40 +27,40 @@ import org.apache.accumulo.core.file.blockfile.ABlockReader;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
/**
- *
+ *
*/
public class BlockIndex {
-
+
public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
-
+
BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
-
+
int accessCount = blockIndex.accessCount.incrementAndGet();
-
+
// 1 is a power of two, but do not care about it
if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
}
-
+
if (blockIndex.blockIndex != null)
return blockIndex;
return null;
}
-
+
private static boolean isPowerOfTwo(int x) {
return ((x > 0) && (x & (x - 1)) == 0);
}
-
+
private AtomicInteger accessCount = new AtomicInteger(0);
private volatile BlockIndexEntry[] blockIndex = null;
public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
-
+
private Key prevKey;
private int entriesLeft;
private int pos;
-
+
public BlockIndexEntry(int pos, int entriesLeft, Key prevKey) {
this.pos = pos;
this.entriesLeft = entriesLeft;
@@ -70,7 +70,7 @@ public class BlockIndex {
public BlockIndexEntry(Key key) {
this.prevKey = key;
}
-
+
public int getEntriesLeft() {
return entriesLeft;
}
@@ -79,39 +79,39 @@ public class BlockIndex {
public int compareTo(BlockIndexEntry o) {
return prevKey.compareTo(o.prevKey);
}
-
+
@Override
public boolean equals(Object o) {
if (o instanceof BlockIndexEntry)
return compareTo((BlockIndexEntry) o) == 0;
return false;
}
-
+
@Override
public String toString() {
return prevKey + " " + entriesLeft + " " + pos;
}
-
+
public Key getPrevKey() {
return prevKey;
}
-
+
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
}
-
+
public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
// get a local ref to the index, another thread could change it
BlockIndexEntry[] blockIndex = this.blockIndex;
-
+
int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
int index;
-
+
if (pos < 0) {
if (pos == -1)
return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning
@@ -127,7 +127,7 @@ public class BlockIndex {
break;
}
}
-
+
// handle case where multiple keys in block are exactly the same, want to find the earliest key in the index
while (index - 1 > 0) {
if (blockIndex[index].getPrevKey().equals(blockIndex[index - 1].getPrevKey()))
@@ -136,7 +136,7 @@ public class BlockIndex {
break;
}
-
+
if (index == 0 && blockIndex[index].getPrevKey().equals(startKey))
return null;
@@ -144,24 +144,24 @@ public class BlockIndex {
cacheBlock.seek(bie.pos);
return bie;
}
-
+
private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
cacheBlock.seek(0);
-
+
RelativeKey rk = new RelativeKey();
Value val = new Value();
-
+
int interval = indexEntry.getNumEntries() / indexEntries;
-
+
if (interval <= 32)
return;
-
+
// multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one
if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
return;
int count = 0;
-
+
ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1);
while (count < (indexEntry.getNumEntries() - interval + 1)) {
@@ -174,7 +174,7 @@ public class BlockIndex {
if (count > 0 && count % interval == 0) {
index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, myPrevKey));
}
-
+
count++;
}
@@ -182,7 +182,7 @@ public class BlockIndex {
cacheBlock.seek(0);
}
-
+
BlockIndexEntry[] getIndexEntries() {
return blockIndex;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 9456331..cd6bff8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -61,7 +61,9 @@ public class CreateEmpty {
static class Opts extends Help {
@Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.", validateWith = IsSupportedCompressionAlgorithm.class)
String codec = Compression.COMPRESSION_NONE;
- @Parameter(description = " <path> { <path> ... } Each path given is a URL. Relative paths are resolved according to the default filesystem defined in your Hadoop configuration, which is usually an HDFS instance.", required = true, validateWith = NamedLikeRFile.class)
+ @Parameter(
+ description = " <path> { <path> ... } Each path given is a URL. Relative paths are resolved according to the default filesystem defined in your Hadoop configuration, which is usually an HDFS instance.",
+ required = true, validateWith = NamedLikeRFile.class)
List<String> files = new ArrayList<String>();
}
@@ -74,7 +76,8 @@ public class CreateEmpty {
for (String arg : opts.files) {
Path path = new Path(arg);
log.info("Writing to file '" + path + "'");
- FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec);
+ FileSKVWriter writer = (new RFileOperations())
+ .openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec);
writer.close();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java
index f9c8686..b1dab36 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java
@@ -30,10 +30,10 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
class IndexIterator implements SortedKeyValueIterator<Key,Value> {
-
+
private Key key;
private Iterator<IndexEntry> indexIter;
-
+
IndexIterator(Iterator<IndexEntry> indexIter) {
this.indexIter = indexIter;
if (indexIter.hasNext())
@@ -41,32 +41,32 @@ class IndexIterator implements SortedKeyValueIterator<Key,Value> {
else
key = null;
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
-
+
@Override
public Key getTopKey() {
return key;
}
-
+
@Override
public Value getTopValue() {
throw new UnsupportedOperationException();
}
-
+
@Override
public boolean hasTop() {
return key != null;
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void next() throws IOException {
if (indexIter.hasNext())
@@ -74,10 +74,10 @@ class IndexIterator implements SortedKeyValueIterator<Key,Value> {
else
key = null;
}
-
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
throw new UnsupportedOperationException();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
index 5dade97..f220a58 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
@@ -35,62 +35,62 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.HeapIterator;
class MultiIndexIterator extends HeapIterator implements FileSKVIterator {
-
+
private RFile.Reader source;
-
+
MultiIndexIterator(RFile.Reader source, List<Iterator<IndexEntry>> indexes) {
super(indexes.size());
-
+
this.source = source;
-
+
for (Iterator<IndexEntry> index : indexes) {
addSource(new IndexIterator(index));
}
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void close() throws IOException {
source.close();
}
-
+
@Override
public void closeDeepCopies() throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Key getFirstKey() throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public Key getLastKey() throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public DataInputStream getMetaStore(String name) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void setInterruptFlag(AtomicBoolean flag) {
throw new UnsupportedOperationException();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 632968e..2109478 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -41,7 +41,7 @@ import org.apache.accumulo.core.file.rfile.bcfile.Utils;
import org.apache.hadoop.io.WritableComparable;
public class MultiLevelIndex {
-
+
public static class IndexEntry implements WritableComparable<IndexEntry> {
private Key key;
private int entries;
@@ -49,7 +49,7 @@ public class MultiLevelIndex {
private long compressedSize;
private long rawSize;
private boolean newFormat;
-
+
IndexEntry(Key k, int e, long offset, long compressedSize, long rawSize) {
this.key = k;
this.entries = e;
@@ -58,11 +58,11 @@ public class MultiLevelIndex {
this.rawSize = rawSize;
newFormat = true;
}
-
+
public IndexEntry(boolean newFormat) {
this.newFormat = newFormat;
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
key = new Key();
@@ -78,7 +78,7 @@ public class MultiLevelIndex {
rawSize = -1;
}
}
-
+
@Override
public void write(DataOutput out) throws IOException {
key.write(out);
@@ -89,59 +89,59 @@ public class MultiLevelIndex {
Utils.writeVLong(out, rawSize);
}
}
-
+
public Key getKey() {
return key;
}
-
+
public int getNumEntries() {
return entries;
}
-
+
public long getOffset() {
return offset;
}
-
+
public long getCompressedSize() {
return compressedSize;
}
-
+
public long getRawSize() {
return rawSize;
}
-
+
@Override
public int compareTo(IndexEntry o) {
return key.compareTo(o.key);
}
-
+
@Override
public boolean equals(Object o) {
if (o instanceof IndexEntry)
- return compareTo((IndexEntry)o) == 0;
+ return compareTo((IndexEntry) o) == 0;
return false;
}
-
+
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
}
-
+
// a list that deserializes index entries on demand
private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess {
-
+
private int[] offsets;
private byte[] data;
private boolean newFormat;
-
+
SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
this.offsets = offsets;
this.data = data;
this.newFormat = newFormat;
}
-
+
@Override
public IndexEntry get(int index) {
int len;
@@ -149,41 +149,41 @@ public class MultiLevelIndex {
len = data.length - offsets[index];
else
len = offsets[index + 1] - offsets[index];
-
+
ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
DataInputStream dis = new DataInputStream(bais);
-
+
IndexEntry ie = new IndexEntry(newFormat);
try {
ie.readFields(dis);
} catch (IOException e) {
throw new RuntimeException(e);
}
-
+
return ie;
}
-
+
@Override
public int size() {
return offsets.length;
}
-
+
public long sizeInBytes() {
return data.length + 4 * offsets.length;
}
-
+
}
-
+
private static class KeyIndex extends AbstractList<Key> implements List<Key>, RandomAccess {
-
+
private int[] offsets;
private byte[] data;
-
+
KeyIndex(int[] offsets, byte[] data) {
this.offsets = offsets;
this.data = data;
}
-
+
@Override
public Key get(int index) {
int len;
@@ -191,122 +191,122 @@ public class MultiLevelIndex {
len = data.length - offsets[index];
else
len = offsets[index + 1] - offsets[index];
-
+
ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
DataInputStream dis = new DataInputStream(bais);
-
+
Key key = new Key();
try {
key.readFields(dis);
} catch (IOException e) {
throw new RuntimeException(e);
}
-
+
return key;
}
-
+
@Override
public int size() {
return offsets.length;
}
}
-
+
static class IndexBlock {
-
+
private ByteArrayOutputStream indexBytes;
private DataOutputStream indexOut;
-
+
private ArrayList<Integer> offsets;
private int level;
private int offset;
-
+
SerializedIndex index;
KeyIndex keyIndex;
private boolean hasNext;
-
+
public IndexBlock(int level, int totalAdded) {
// System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")");
-
+
this.level = level;
this.offset = totalAdded;
-
+
indexBytes = new ByteArrayOutputStream();
indexOut = new DataOutputStream(indexBytes);
offsets = new ArrayList<Integer>();
}
-
+
public IndexBlock() {}
-
+
public void add(Key key, int value, long offset, long compressedSize, long rawSize) throws IOException {
offsets.add(indexOut.size());
new IndexEntry(key, value, offset, compressedSize, rawSize).write(indexOut);
}
-
+
int getSize() {
return indexOut.size() + 4 * offsets.size();
}
-
+
public void write(DataOutput out) throws IOException {
out.writeInt(level);
out.writeInt(offset);
out.writeBoolean(hasNext);
-
+
out.writeInt(offsets.size());
for (Integer offset : offsets) {
out.writeInt(offset);
}
-
+
indexOut.close();
byte[] indexData = indexBytes.toByteArray();
-
+
out.writeInt(indexData.length);
out.write(indexData);
}
-
+
public void readFields(DataInput in, int version) throws IOException {
-
+
if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
level = in.readInt();
offset = in.readInt();
hasNext = in.readBoolean();
-
+
int numOffsets = in.readInt();
int[] offsets = new int[numOffsets];
-
+
for (int i = 0; i < numOffsets; i++)
offsets[i] = in.readInt();
-
+
int indexSize = in.readInt();
byte[] serializedIndex = new byte[indexSize];
in.readFully(serializedIndex);
-
+
index = new SerializedIndex(offsets, serializedIndex, true);
keyIndex = new KeyIndex(offsets, serializedIndex);
} else if (version == RFile.RINDEX_VER_3) {
level = 0;
offset = 0;
hasNext = false;
-
+
int size = in.readInt();
-
+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
ArrayList<Integer> oal = new ArrayList<Integer>();
-
+
for (int i = 0; i < size; i++) {
IndexEntry ie = new IndexEntry(false);
oal.add(dos.size());
ie.readFields(in);
ie.write(dos);
}
-
+
dos.close();
-
+
int[] oia = new int[oal.size()];
for (int i = 0; i < oal.size(); i++) {
oia[i] = oal.get(i);
}
-
+
byte[] serializedIndex = baos.toByteArray();
index = new SerializedIndex(oia, serializedIndex, false);
keyIndex = new KeyIndex(oia, serializedIndex);
@@ -314,100 +314,100 @@ public class MultiLevelIndex {
level = 0;
offset = 0;
hasNext = false;
-
+
int numIndexEntries = in.readInt();
int offsets[] = new int[numIndexEntries];
for (int i = 0; i < numIndexEntries; i++) {
offsets[i] = in.readInt();
}
-
+
int size = in.readInt();
byte[] indexData = new byte[size];
in.readFully(indexData);
-
+
index = new SerializedIndex(offsets, indexData, false);
keyIndex = new KeyIndex(offsets, indexData);
} else {
throw new RuntimeException("Unexpected version " + version);
}
-
+
}
-
+
List<IndexEntry> getIndex() {
return index;
}
-
+
public List<Key> getKeyIndex() {
return keyIndex;
}
-
+
int getLevel() {
return level;
}
-
+
int getOffset() {
return offset;
}
-
+
boolean hasNext() {
return hasNext;
}
-
+
void setHasNext(boolean b) {
this.hasNext = b;
}
-
+
}
-
+
/**
* this class buffers writes to the index so that chunks of index blocks are contiguous in the file instead of having index blocks sprinkled throughout the
* file making scans of the entire index slow.
*/
public static class BufferedWriter {
-
+
private Writer writer;
private DataOutputStream buffer;
private int buffered;
private ByteArrayOutputStream baos;
-
+
public BufferedWriter(Writer writer) {
this.writer = writer;
baos = new ByteArrayOutputStream(1 << 20);
buffer = new DataOutputStream(baos);
buffered = 0;
}
-
+
private void flush() throws IOException {
buffer.close();
-
+
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-
+
IndexEntry ie = new IndexEntry(true);
for (int i = 0; i < buffered; i++) {
ie.readFields(dis);
writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
}
-
+
buffered = 0;
baos = new ByteArrayOutputStream(1 << 20);
buffer = new DataOutputStream(baos);
-
+
}
-
+
public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
if (buffer.size() > (10 * 1 << 20)) {
flush();
}
-
+
new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer);
buffered++;
}
-
+
public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
flush();
writer.addLast(key, data, offset, compressedSize, rawSize);
}
-
+
public void close(DataOutput out) throws IOException {
writer.close(out);
}
@@ -415,74 +415,74 @@ public class MultiLevelIndex {
public static class Writer {
private int threshold;
-
+
private ArrayList<IndexBlock> levels;
-
+
private int totalAdded;
-
+
private boolean addedLast = false;
-
+
private BlockFileWriter blockFileWriter;
-
+
Writer(BlockFileWriter blockFileWriter, int maxBlockSize) {
this.blockFileWriter = blockFileWriter;
this.threshold = maxBlockSize;
levels = new ArrayList<IndexBlock>();
}
-
+
private void add(int level, Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
if (level == levels.size()) {
levels.add(new IndexBlock(level, 0));
}
-
+
IndexBlock iblock = levels.get(level);
-
+
iblock.add(key, data, offset, compressedSize, rawSize);
}
-
+
private void flush(int level, Key lastKey, boolean last) throws IOException {
-
+
if (last && level == levels.size() - 1)
return;
-
+
IndexBlock iblock = levels.get(level);
if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last) {
ABlockWriter out = blockFileWriter.prepareDataBlock();
iblock.setHasNext(!last);
iblock.write(out);
out.close();
-
+
add(level + 1, lastKey, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize());
flush(level + 1, lastKey, last);
-
+
if (last)
levels.set(level, null);
else
levels.set(level, new IndexBlock(level, totalAdded));
}
}
-
+
public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
totalAdded++;
add(0, key, data, offset, compressedSize, rawSize);
flush(0, key, false);
}
-
+
public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
if (addedLast)
throw new IllegalStateException("already added last");
-
+
totalAdded++;
add(0, key, data, offset, compressedSize, rawSize);
flush(0, key, true);
addedLast = true;
-
+
}
-
+
public void close(DataOutput out) throws IOException {
if (totalAdded > 0 && !addedLast)
throw new IllegalStateException("did not call addLast");
-
+
out.writeInt(totalAdded);
// save root node
if (levels.size() > 0) {
@@ -490,32 +490,32 @@ public class MultiLevelIndex {
} else {
new IndexBlock(0, 0).write(out);
}
-
+
}
}
-
+
public static class Reader {
private IndexBlock rootBlock;
private BlockFileReader blockStore;
private int version;
private int size;
-
+
public class Node {
-
+
private Node parent;
private IndexBlock indexBlock;
private int currentPos;
-
+
Node(Node parent, IndexBlock iBlock) {
this.parent = parent;
this.indexBlock = iBlock;
}
-
+
Node(IndexBlock rootInfo) {
this.parent = null;
this.indexBlock = rootInfo;
}
-
+
private Node lookup(Key key) throws IOException {
int pos = Collections.binarySearch(indexBlock.getKeyIndex(), key, new Comparator<Key>() {
@Override
@@ -523,86 +523,86 @@ public class MultiLevelIndex {
return o1.compareTo(o2);
}
});
-
+
if (pos < 0)
pos = (pos * -1) - 1;
-
+
if (pos == indexBlock.getIndex().size()) {
if (parent != null)
throw new IllegalStateException();
this.currentPos = pos;
return this;
}
-
+
this.currentPos = pos;
-
+
if (indexBlock.getLevel() == 0) {
return this;
}
-
+
IndexEntry ie = indexBlock.getIndex().get(pos);
Node child = new Node(this, getIndexBlock(ie));
return child.lookup(key);
}
-
+
private Node getLast() throws IOException {
currentPos = indexBlock.getIndex().size() - 1;
if (indexBlock.getLevel() == 0)
return this;
-
+
IndexEntry ie = indexBlock.getIndex().get(currentPos);
Node child = new Node(this, getIndexBlock(ie));
return child.getLast();
}
-
+
private Node getFirst() throws IOException {
currentPos = 0;
if (indexBlock.getLevel() == 0)
return this;
-
+
IndexEntry ie = indexBlock.getIndex().get(currentPos);
Node child = new Node(this, getIndexBlock(ie));
return child.getFirst();
}
-
+
private Node getPrevious() throws IOException {
if (currentPos == 0)
return parent.getPrevious();
-
+
currentPos--;
-
+
IndexEntry ie = indexBlock.getIndex().get(currentPos);
Node child = new Node(this, getIndexBlock(ie));
return child.getLast();
-
+
}
-
+
private Node getNext() throws IOException {
if (currentPos == indexBlock.getIndex().size() - 1)
return parent.getNext();
-
+
currentPos++;
-
+
IndexEntry ie = indexBlock.getIndex().get(currentPos);
Node child = new Node(this, getIndexBlock(ie));
return child.getFirst();
-
+
}
-
+
Node getNextNode() throws IOException {
return parent.getNext();
}
-
+
Node getPreviousNode() throws IOException {
return parent.getPrevious();
}
}
-
+
static public class IndexIterator implements ListIterator<IndexEntry> {
-
+
private Node node;
private ListIterator<IndexEntry> liter;
-
+
private Node getPrevNode() {
try {
return node.getPreviousNode();
@@ -610,7 +610,7 @@ public class MultiLevelIndex {
throw new RuntimeException(e);
}
}
-
+
private Node getNextNode() {
try {
return node.getNextNode();
@@ -618,155 +618,155 @@ public class MultiLevelIndex {
throw new RuntimeException(e);
}
}
-
+
public IndexIterator() {
node = null;
}
-
+
public IndexIterator(Node node) {
this.node = node;
liter = node.indexBlock.getIndex().listIterator(node.currentPos);
}
-
+
@Override
public boolean hasNext() {
if (node == null)
return false;
-
+
if (!liter.hasNext()) {
return node.indexBlock.hasNext();
} else {
return true;
}
-
+
}
-
+
public IndexEntry peekPrevious() {
IndexEntry ret = previous();
next();
return ret;
}
-
+
public IndexEntry peek() {
IndexEntry ret = next();
previous();
return ret;
}
-
+
@Override
public IndexEntry next() {
if (!liter.hasNext()) {
node = getNextNode();
liter = node.indexBlock.getIndex().listIterator();
}
-
+
return liter.next();
}
-
+
@Override
public boolean hasPrevious() {
if (node == null)
return false;
-
+
if (!liter.hasPrevious()) {
return node.indexBlock.getOffset() > 0;
} else {
return true;
}
}
-
+
@Override
public IndexEntry previous() {
if (!liter.hasPrevious()) {
node = getPrevNode();
liter = node.indexBlock.getIndex().listIterator(node.indexBlock.getIndex().size());
}
-
+
return liter.previous();
}
-
+
@Override
public int nextIndex() {
return node.indexBlock.getOffset() + liter.nextIndex();
}
-
+
@Override
public int previousIndex() {
return node.indexBlock.getOffset() + liter.previousIndex();
}
-
+
@Override
public void remove() {
throw new UnsupportedOperationException();
}
-
+
@Override
public void set(IndexEntry e) {
throw new UnsupportedOperationException();
-
+
}
-
+
@Override
public void add(IndexEntry e) {
throw new UnsupportedOperationException();
}
-
+
}
-
+
public Reader(BlockFileReader blockStore, int version) {
this.version = version;
this.blockStore = blockStore;
}
-
+
private IndexBlock getIndexBlock(IndexEntry ie) throws IOException {
IndexBlock iblock = new IndexBlock();
ABlockReader in = blockStore.getMetaBlock(ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
iblock.readFields(in, version);
in.close();
-
+
return iblock;
}
-
+
public IndexIterator lookup(Key key) throws IOException {
Node node = new Node(rootBlock);
return new IndexIterator(node.lookup(key));
}
-
+
public void readFields(DataInput in) throws IOException {
-
+
size = 0;
-
+
if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
size = in.readInt();
}
-
+
rootBlock = new IndexBlock();
rootBlock.readFields(in, version);
-
+
if (version == RFile.RINDEX_VER_3 || version == RFile.RINDEX_VER_4) {
size = rootBlock.getIndex().size();
}
}
-
+
public int size() {
return size;
}
-
+
private void getIndexInfo(IndexBlock ib, Map<Integer,Long> sizesByLevel, Map<Integer,Long> countsByLevel) throws IOException {
Long size = sizesByLevel.get(ib.getLevel());
if (size == null)
size = 0l;
-
+
Long count = countsByLevel.get(ib.getLevel());
if (count == null)
count = 0l;
-
+
size += ib.index.sizeInBytes();
count++;
-
+
sizesByLevel.put(ib.getLevel(), size);
countsByLevel.put(ib.getLevel(), count);
-
+
if (ib.getLevel() > 0) {
for (IndexEntry ie : ib.index) {
IndexBlock cib = getIndexBlock(ie);
@@ -774,14 +774,14 @@ public class MultiLevelIndex {
}
}
}
-
+
public void getIndexInfo(Map<Integer,Long> sizes, Map<Integer,Long> counts) throws IOException {
getIndexInfo(rootBlock, sizes, counts);
}
-
+
public Key getLastKey() {
return rootBlock.getIndex().get(rootBlock.getIndex().size() - 1).getKey();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
index 43586dd..f29efcc 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -39,7 +39,7 @@ import com.beust.jcommander.Parameter;
public class PrintInfo {
private static final Logger log = Logger.getLogger(PrintInfo.class);
-
+
static class Opts extends Help {
@Parameter(names = {"-d", "--dump"}, description = "dump the key/value pairs")
boolean dump = false;
@@ -48,29 +48,29 @@ public class PrintInfo {
@Parameter(description = " <file> { <file> ... }")
List<String> files = new ArrayList<String>();
}
-
+
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
AccumuloConfiguration aconf = SiteConfiguration.getInstance(DefaultConfiguration.getInstance());
// TODO ACCUMULO-2462 This will only work for RFiles (path only, not URI) in HDFS when the correct filesystem for the given file
- // is on Property.INSTANCE_DFS_DIR or, when INSTANCE_DFS_DIR is not defined, is on the default filesystem
+ // is on Property.INSTANCE_DFS_DIR or, when INSTANCE_DFS_DIR is not defined, is on the default filesystem
// defined in the Hadoop's core-site.xml
//
// A workaround is to always provide a URI to this class
FileSystem hadoopFs = VolumeConfiguration.getDefaultVolume(conf, aconf).getFileSystem();
- FileSystem localFs = FileSystem.getLocal(conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
Opts opts = new Opts();
opts.parseArgs(PrintInfo.class.getName(), args);
if (opts.files.isEmpty()) {
System.err.println("No files were given");
System.exit(-1);
}
-
+
long countBuckets[] = new long[11];
long sizeBuckets[] = new long[countBuckets.length];
long totalSize = 0;
-
+
for (String arg : opts.files) {
Path path = new Path(arg);
FileSystem fs;
@@ -81,14 +81,14 @@ public class PrintInfo {
log.warn("Attempting to find file across filesystems. Consider providing URI instead of path");
fs = hadoopFs.exists(path) ? hadoopFs : localFs; // fall back to local
}
-
+
CachableBlockFile.Reader _rdr = new CachableBlockFile.Reader(fs, path, conf, null, null, aconf);
Reader iter = new RFile.Reader(_rdr);
-
+
iter.printInfo();
System.out.println();
org.apache.accumulo.core.file.rfile.bcfile.PrintInfo.main(new String[] {arg});
-
+
if (opts.histogram || opts.dump) {
iter.seek(new Range((Key) null, (Key) null), new ArrayList<ByteSequence>(), false);
while (iter.hasTop()) {
@@ -113,7 +113,7 @@ public class PrintInfo {
System.out.println(String.format("%11.0f : %10d %6.2f%%", Math.pow(10, i), countBuckets[i], sizeBuckets[i] * 100. / totalSize));
}
}
-
+
// If the output stream has closed, there is no reason to keep going.
if (System.out.checkError())
return;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 9dcb3a5..0b464d8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -68,47 +68,47 @@ import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
public class RFile {
-
+
public static final String EXTENSION = "rf";
-
+
private static final Logger log = Logger.getLogger(RFile.class);
-
+
private RFile() {}
-
+
private static final int RINDEX_MAGIC = 0x20637474;
static final int RINDEX_VER_7 = 7;
static final int RINDEX_VER_6 = 6;
// static final int RINDEX_VER_5 = 5; // unreleased
static final int RINDEX_VER_4 = 4;
static final int RINDEX_VER_3 = 3;
-
+
private static class LocalityGroupMetadata implements Writable {
-
+
private int startBlock;
private Key firstKey;
private Map<ByteSequence,MutableLong> columnFamilies;
-
+
private boolean isDefaultLG = false;
private String name;
private Set<ByteSequence> previousColumnFamilies;
-
+
private MultiLevelIndex.BufferedWriter indexWriter;
private MultiLevelIndex.Reader indexReader;
-
+
public LocalityGroupMetadata(int version, BlockFileReader br) {
columnFamilies = new HashMap<ByteSequence,MutableLong>();
indexReader = new MultiLevelIndex.Reader(br, version);
}
-
+
public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
this.startBlock = nextBlock;
isDefaultLG = true;
columnFamilies = new HashMap<ByteSequence,MutableLong>();
previousColumnFamilies = pcf;
-
+
indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
}
-
+
public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock, int indexBlockSize, BlockFileWriter bfw) {
this.startBlock = nextBlock;
this.name = name;
@@ -117,22 +117,22 @@ public class RFile {
for (ByteSequence cf : cfset) {
columnFamilies.put(cf, new MutableLong(0));
}
-
+
indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
}
-
+
private Key getFirstKey() {
return firstKey;
}
-
+
private void setFirstKey(Key key) {
if (firstKey != null)
throw new IllegalStateException();
this.firstKey = new Key(key);
}
-
+
public void updateColumnCount(Key key) {
-
+
if (isDefaultLG && columnFamilies == null) {
if (previousColumnFamilies.size() > 0) {
// only do this check when there are previous column families
@@ -141,23 +141,23 @@ public class RFile {
throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
}
}
-
+
// no longer keeping track of column families, so return
return;
}
-
+
ByteSequence cf = key.getColumnFamilyData();
MutableLong count = columnFamilies.get(cf);
-
+
if (count == null) {
if (!isDefaultLG) {
throw new IllegalArgumentException("invalid column family : " + cf);
}
-
+
if (previousColumnFamilies.contains(cf)) {
throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
}
-
+
if (columnFamilies.size() > Writer.MAX_CF_IN_DLG) {
// stop keeping track, there are too many
columnFamilies = null;
@@ -165,86 +165,86 @@ public class RFile {
}
count = new MutableLong(0);
columnFamilies.put(new ArrayByteSequence(cf.getBackingArray(), cf.offset(), cf.length()), count);
-
+
}
-
+
count.increment();
-
+
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
-
+
isDefaultLG = in.readBoolean();
if (!isDefaultLG) {
name = in.readUTF();
}
-
+
startBlock = in.readInt();
-
+
int size = in.readInt();
-
+
if (size == -1) {
if (!isDefaultLG)
throw new IllegalStateException("Non default LG " + name + " does not have column families");
-
+
columnFamilies = null;
} else {
if (columnFamilies == null)
columnFamilies = new HashMap<ByteSequence,MutableLong>();
else
columnFamilies.clear();
-
+
for (int i = 0; i < size; i++) {
int len = in.readInt();
byte cf[] = new byte[len];
in.readFully(cf);
long count = in.readLong();
-
+
columnFamilies.put(new ArrayByteSequence(cf), new MutableLong(count));
}
}
-
+
if (in.readBoolean()) {
firstKey = new Key();
firstKey.readFields(in);
} else {
firstKey = null;
}
-
+
indexReader.readFields(in);
}
-
+
@Override
public void write(DataOutput out) throws IOException {
-
+
out.writeBoolean(isDefaultLG);
if (!isDefaultLG) {
out.writeUTF(name);
}
-
+
out.writeInt(startBlock);
-
+
if (isDefaultLG && columnFamilies == null) {
// only expect null when default LG, otherwise let a NPE occur
out.writeInt(-1);
} else {
out.writeInt(columnFamilies.size());
-
+
for (Entry<ByteSequence,MutableLong> entry : columnFamilies.entrySet()) {
out.writeInt(entry.getKey().length());
out.write(entry.getKey().getBackingArray(), entry.getKey().offset(), entry.getKey().length());
out.writeLong(entry.getValue().longValue());
}
}
-
+
out.writeBoolean(firstKey != null);
if (firstKey != null)
firstKey.write(out);
-
+
indexWriter.close(out);
}
-
+
public void printInfo() throws IOException {
PrintStream out = System.out;
out.println("Locality group : " + (isDefaultLG ? "<DEFAULT>" : name));
@@ -258,55 +258,55 @@ public class RFile {
+ String.format("%,d bytes %,d blocks", entry.getValue(), countsByLevel.get(entry.getKey())));
}
out.println("\tFirst key : " + firstKey);
-
+
Key lastKey = null;
if (indexReader.size() > 0) {
lastKey = indexReader.getLastKey();
}
-
+
out.println("\tLast key : " + lastKey);
-
+
long numKeys = 0;
IndexIterator countIter = indexReader.lookup(new Key());
while (countIter.hasNext()) {
numKeys += countIter.next().getNumEntries();
}
-
+
out.println("\tNum entries : " + String.format("%,d", numKeys));
out.println("\tColumn families : " + (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
}
-
+
}
-
+
public static class Writer implements FileSKVWriter {
-
+
public static final int MAX_CF_IN_DLG = 1000;
-
+
private BlockFileWriter fileWriter;
private ABlockWriter blockWriter;
-
+
// private BlockAppender blockAppender;
private long blockSize = 100000;
private int indexBlockSize;
private int entries = 0;
-
+
private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
private LocalityGroupMetadata currentLocalityGroup = null;
private int nextBlock = 0;
-
+
private Key lastKeyInBlock = null;
-
+
private boolean dataClosed = false;
private boolean closed = false;
private Key prevKey = new Key();
private boolean startedDefaultLocalityGroup = false;
-
+
private HashSet<ByteSequence> previousColumnFamilies;
-
+
public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
}
-
+
public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
this.blockSize = blockSize;
this.indexBlockSize = indexBlockSize;
@@ -314,123 +314,123 @@ public class RFile {
this.blockWriter = null;
previousColumnFamilies = new HashSet<ByteSequence>();
}
-
+
@Override
public synchronized void close() throws IOException {
-
+
if (closed) {
return;
}
-
+
closeData();
-
+
ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
-
+
mba.writeInt(RINDEX_MAGIC);
mba.writeInt(RINDEX_VER_7);
-
+
if (currentLocalityGroup != null)
localityGroups.add(currentLocalityGroup);
-
+
mba.writeInt(localityGroups.size());
-
+
for (LocalityGroupMetadata lc : localityGroups) {
lc.write(mba);
}
-
+
mba.close();
-
+
fileWriter.close();
-
+
closed = true;
}
-
+
private void closeData() throws IOException {
-
+
if (dataClosed) {
return;
}
-
+
dataClosed = true;
-
+
if (blockWriter != null) {
closeBlock(lastKeyInBlock, true);
}
}
-
+
@Override
public void append(Key key, Value value) throws IOException {
-
+
if (dataClosed) {
throw new IllegalStateException("Cannont append, data closed");
}
-
+
if (key.compareTo(prevKey) < 0) {
throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
}
-
+
currentLocalityGroup.updateColumnCount(key);
-
+
if (currentLocalityGroup.getFirstKey() == null) {
currentLocalityGroup.setFirstKey(key);
}
-
+
if (blockWriter == null) {
blockWriter = fileWriter.prepareDataBlock();
} else if (blockWriter.getRawSize() > blockSize) {
closeBlock(prevKey, false);
blockWriter = fileWriter.prepareDataBlock();
}
-
+
RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
-
+
rk.write(blockWriter);
value.write(blockWriter);
entries++;
-
+
prevKey = new Key(key);
lastKeyInBlock = prevKey;
-
+
}
-
+
private void closeBlock(Key key, boolean lastBlock) throws IOException {
blockWriter.close();
-
+
if (lastBlock)
currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
else
currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
-
+
blockWriter = null;
lastKeyInBlock = null;
entries = 0;
nextBlock++;
}
-
+
@Override
public DataOutputStream createMetaStore(String name) throws IOException {
closeData();
-
+
return (DataOutputStream) fileWriter.prepareMetaBlock(name);
}
-
+
private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
if (dataClosed) {
throw new IllegalStateException("data closed");
}
-
+
if (startedDefaultLocalityGroup) {
throw new IllegalStateException("Can not start anymore new locality groups after default locality group started");
}
-
+
if (blockWriter != null) {
closeBlock(lastKeyInBlock, true);
}
-
+
if (currentLocalityGroup != null) {
localityGroups.add(currentLocalityGroup);
}
-
+
if (columnFamilies == null) {
startedDefaultLocalityGroup = true;
currentLocalityGroup = new LocalityGroupMetadata(nextBlock, previousColumnFamilies, indexBlockSize, fileWriter);
@@ -443,31 +443,31 @@ public class RFile {
currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, nextBlock, indexBlockSize, fileWriter);
previousColumnFamilies.addAll(columnFamilies);
}
-
+
prevKey = new Key();
}
-
+
@Override
public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
if (columnFamilies == null)
throw new NullPointerException();
-
+
_startNewLocalityGroup(name, columnFamilies);
}
-
+
@Override
public void startDefaultLocalityGroup() throws IOException {
_startNewLocalityGroup(null, null);
}
-
+
@Override
public boolean supportsLocalityGroups() {
return true;
}
}
-
+
private static class LocalityGroupReader extends LocalityGroup implements FileSKVIterator {
-
+
private BlockFileReader reader;
private MultiLevelIndex.Reader index;
private int blockCount;
@@ -476,7 +476,7 @@ public class RFile {
private boolean closed = false;
private int version;
private boolean checkRange = true;
-
+
private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version) throws IOException {
super(lgm.columnFamilies, lgm.isDefaultLG);
this.firstKey = lgm.firstKey;
@@ -484,11 +484,11 @@ public class RFile {
this.startBlock = lgm.startBlock;
blockCount = index.size();
this.version = version;
-
+
this.reader = reader;
-
+
}
-
+
public LocalityGroupReader(LocalityGroupReader lgr) {
super(lgr.columnFamilies, lgr.isDefaultLocalityGroup);
this.firstKey = lgr.firstKey;
@@ -498,20 +498,20 @@ public class RFile {
this.reader = lgr.reader;
this.version = lgr.version;
}
-
+
Iterator<IndexEntry> getIndex() throws IOException {
return index.lookup(new Key());
}
-
+
@Override
public void close() throws IOException {
closed = true;
hasTop = false;
if (currBlock != null)
currBlock.close();
-
+
}
-
+
private IndexIterator iiter;
private int entriesLeft;
private ABlockReader currBlock;
@@ -521,22 +521,22 @@ public class RFile {
private Range range = null;
private boolean hasTop = false;
private AtomicBoolean interruptFlag;
-
+
@Override
public Key getTopKey() {
return rk.getKey();
}
-
+
@Override
public Value getTopValue() {
return val;
}
-
+
@Override
public boolean hasTop() {
return hasTop;
}
-
+
@Override
public void next() throws IOException {
try {
@@ -546,20 +546,20 @@ public class RFile {
throw ioe;
}
}
-
+
private void _next() throws IOException {
-
+
if (!hasTop)
throw new IllegalStateException();
-
+
if (entriesLeft == 0) {
currBlock.close();
-
+
if (iiter.hasNext()) {
IndexEntry indexEntry = iiter.next();
entriesLeft = indexEntry.getNumEntries();
currBlock = getDataBlock(indexEntry);
-
+
checkRange = range.afterEndKey(indexEntry.getKey());
if (!checkRange)
hasTop = true;
@@ -571,7 +571,7 @@ public class RFile {
return;
}
}
-
+
prevKey = rk.getKey();
rk.readFields(currBlock);
val.readFields(currBlock);
@@ -579,30 +579,30 @@ public class RFile {
if (checkRange)
hasTop = !range.afterEndKey(rk.getKey());
}
-
+
private ABlockReader getDataBlock(IndexEntry indexEntry) throws IOException {
if (interruptFlag != null && interruptFlag.get())
throw new IterationInterruptedException();
-
+
if (version == RINDEX_VER_3 || version == RINDEX_VER_4)
return reader.getDataBlock(startBlock + iiter.previousIndex());
else
return reader.getDataBlock(indexEntry.getOffset(), indexEntry.getCompressedSize(), indexEntry.getRawSize());
-
+
}
-
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
-
+
if (closed)
throw new IllegalStateException("Locality group reader closed");
-
+
if (columnFamilies.size() != 0 || inclusive)
throw new IllegalArgumentException("I do not know how to filter column families");
-
+
if (interruptFlag != null && interruptFlag.get())
throw new IterationInterruptedException();
-
+
try {
_seek(range);
} catch (IOException ioe) {
@@ -610,7 +610,7 @@ public class RFile {
throw ioe;
}
}
-
+
private void reset() {
rk = null;
hasTop = false;
@@ -626,45 +626,45 @@ public class RFile {
}
}
}
-
+
private void _seek(Range range) throws IOException {
-
+
this.range = range;
this.checkRange = true;
-
+
if (blockCount == 0) {
// its an empty file
rk = null;
return;
}
-
+
Key startKey = range.getStartKey();
if (startKey == null)
startKey = new Key();
-
+
boolean reseek = true;
-
+
if (range.afterEndKey(firstKey)) {
// range is before first key in rfile, so there is nothing to do
reset();
reseek = false;
}
-
+
if (rk != null) {
if (range.beforeStartKey(prevKey) && range.afterEndKey(getTopKey())) {
// range is between the two keys in the file where the last range seeked to stopped, so there is
// nothing to do
reseek = false;
}
-
+
if (startKey.compareTo(getTopKey()) <= 0 && startKey.compareTo(prevKey) > 0) {
// current location in file can satisfy this request, no need to seek
reseek = false;
}
-
+
if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
// start key is within the unconsumed portion of the current block
-
+
// this code intentionally does not use the index associated with a cached block
// because if only forward seeks are being done, then there is no benefit to building
// and index for the block... could consider using the index if it exist but not
@@ -679,37 +679,37 @@ public class RFile {
prevKey = skippr.prevKey;
rk = skippr.rk;
}
-
+
reseek = false;
}
-
+
if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) && startKey.compareTo(firstKey) <= 0) {
// seeking before the beginning of the file, and already positioned at the first key in the file
// so there is nothing to do
reseek = false;
}
}
-
+
if (reseek) {
iiter = index.lookup(startKey);
-
+
reset();
-
+
if (!iiter.hasNext()) {
// past the last key
} else {
-
+
// if the index contains the same key multiple times, then go to the
// earliest index entry containing the key
while (iiter.hasPrevious() && iiter.peekPrevious().getKey().equals(iiter.peek().getKey())) {
iiter.previous();
}
-
+
if (iiter.hasPrevious())
prevKey = new Key(iiter.peekPrevious().getKey()); // initially prevKey is the last key of the prev block
else
prevKey = new Key(); // first block in the file, so set prev key to minimal key
-
+
IndexEntry indexEntry = iiter.next();
entriesLeft = indexEntry.getNumEntries();
currBlock = getDataBlock(indexEntry);
@@ -736,7 +736,7 @@ public class RFile {
val.readFields(currBlock);
valbs = new MutableByteSequence(val.get(), 0, val.getSize());
-
+
// just consumed one key from the input stream, so subtract one from entries left
entriesLeft = bie.getEntriesLeft() - 1;
prevKey = new Key(bie.getPrevKey());
@@ -754,76 +754,76 @@ public class RFile {
rk = skippr.rk;
}
}
-
+
hasTop = rk != null && !range.afterEndKey(rk.getKey());
-
+
while (hasTop() && range.beforeStartKey(getTopKey())) {
next();
}
}
-
+
@Override
public Key getFirstKey() throws IOException {
return firstKey;
}
-
+
@Override
public Key getLastKey() throws IOException {
if (index.size() == 0)
return null;
return index.getLastKey();
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
-
+
@Override
public void closeDeepCopies() throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public DataInputStream getMetaStore(String name) throws IOException {
throw new UnsupportedOperationException();
}
-
+
@Override
public void setInterruptFlag(AtomicBoolean flag) {
this.interruptFlag = flag;
}
-
+
@Override
public InterruptibleIterator getIterator() {
return this;
}
}
-
+
public static class Reader extends HeapIterator implements FileSKVIterator {
private BlockFileReader reader;
-
+
private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
-
+
private LocalityGroupReader lgReaders[];
private HashSet<ByteSequence> nonDefaultColumnFamilies;
-
+
private List<Reader> deepCopies;
private boolean deepCopy = false;
-
+
private AtomicBoolean interruptFlag;
-
+
public Reader(BlockFileReader rdr) throws IOException {
this.reader = rdr;
-
+
ABlockReader mb = reader.getMetaBlock("RFile.index");
- try{
+ try {
int magic = mb.readInt();
int ver = mb.readInt();
@@ -847,16 +847,16 @@ public class RFile {
} finally {
mb.close();
}
-
+
nonDefaultColumnFamilies = new HashSet<ByteSequence>();
for (LocalityGroupMetadata lgm : localityGroups) {
if (!lgm.isDefaultLG)
nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet());
}
-
+
createHeap(lgReaders.length);
}
-
+
private Reader(Reader r) {
super(r.lgReaders.length);
this.reader = r.reader;
@@ -869,7 +869,7 @@ public class RFile {
this.lgReaders[i].setInterruptFlag(r.interruptFlag);
}
}
-
+
private void closeLocalityGroupReaders() {
for (LocalityGroupReader lgr : lgReaders) {
try {
@@ -879,26 +879,26 @@ public class RFile {
}
}
}
-
+
@Override
public void closeDeepCopies() {
if (deepCopy)
throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported");
-
+
for (Reader deepCopy : deepCopies)
deepCopy.closeLocalityGroupReaders();
-
+
deepCopies.clear();
}
-
+
@Override
public void close() throws IOException {
if (deepCopy)
throw new RuntimeException("Calling close on a deep copy is not supported");
-
+
closeDeepCopies();
closeLocalityGroupReaders();
-
+
try {
reader.close();
} finally {
@@ -907,15 +907,15 @@ public class RFile {
*/
}
}
-
+
@Override
public Key getFirstKey() throws IOException {
if (lgReaders.length == 0) {
return null;
}
-
+
Key minKey = null;
-
+
for (int i = 0; i < lgReaders.length; i++) {
if (minKey == null) {
minKey = lgReaders[i].getFirstKey();
@@ -925,18 +925,18 @@ public class RFile {
minKey = firstKey;
}
}
-
+
return minKey;
}
-
+
@Override
public Key getLastKey() throws IOException {
if (lgReaders.length == 0) {
return null;
}
-
+
Key maxKey = null;
-
+
for (int i = 0; i < lgReaders.length; i++) {
if (maxKey == null) {
maxKey = lgReaders[i].getLastKey();
@@ -946,10 +946,10 @@ public class RFile {
maxKey = lastKey;
}
}
-
+
return maxKey;
}
-
+
@Override
public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException {
try {
@@ -958,7 +958,7 @@ public class RFile {
throw new NoSuchMetaStoreException("name = " + name, e);
}
}
-
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
Reader copy = new Reader(this);
@@ -966,53 +966,53 @@ public class RFile {
deepCopies.add(copy);
return copy;
}
-
+
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
throw new UnsupportedOperationException();
-
+
}
-
+
private int numLGSeeked = 0;
-
+
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
numLGSeeked = LocalityGroupIterator.seek(this, lgReaders, nonDefaultColumnFamilies, range, columnFamilies, inclusive);
}
-
+
int getNumLocalityGroupsSeeked() {
return numLGSeeked;
}
-
+
public FileSKVIterator getIndex() throws IOException {
-
+
ArrayList<Iterator<IndexEntry>> indexes = new ArrayList<Iterator<IndexEntry>>();
-
+
for (LocalityGroupReader lgr : lgReaders) {
indexes.add(lgr.getIndex());
}
-
+
return new MultiIndexIterator(this, indexes);
}
-
+
public void printInfo() throws IOException {
for (LocalityGroupMetadata lgm : localityGroups) {
lgm.printInfo();
}
-
+
}
-
+
@Override
public void setInterruptFlag(AtomicBoolean flag) {
if (deepCopy)
throw new RuntimeException("Calling setInterruptFlag on a deep copy is not supported");
-
+
if (deepCopies.size() != 0)
throw new RuntimeException("Setting interrupt flag after calling deep copy not supported");
-
+
setInterruptFlagInternal(flag);
}
-
+
private void setInterruptFlagInternal(AtomicBoolean flag) {
this.interruptFlag = flag;
for (LocalityGroupReader lgr : lgReaders) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 9fabe42..088abfe 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -38,20 +38,20 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class RFileOperations extends FileOperations {
-
+
private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
-
+
@Override
public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
return fs.getFileStatus(new Path(file)).getLen();
}
-
+
@Override
public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-
+
return openIndex(file, fs, conf, acuconf, null, null);
}
-
+
@Override
public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache)
throws IOException {
@@ -61,30 +61,30 @@ public class RFileOperations extends FileOperations {
// Reader reader = new RFile.Reader(in, len , conf);
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf);
final Reader reader = new RFile.Reader(_cbr);
-
+
return reader.getIndex();
}
-
+
@Override
public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
return openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
}
-
+
@Override
public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
BlockCache dataCache, BlockCache indexCache) throws IOException {
Path path = new Path(file);
-
+
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf);
Reader iter = new RFile.Reader(_cbr);
-
+
if (seekToBeginning) {
iter.seek(new Range((Key) null, null), EMPTY_CF_SET, false);
}
-
+
return iter;
}
-
+
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
AccumuloConfiguration tableConf) throws IOException {
@@ -92,7 +92,7 @@ public class RFileOperations extends FileOperations {
iter.seek(range, columnFamilies, inclusive);
return iter;
}
-
+
@Override
public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
@@ -100,7 +100,7 @@ public class RFileOperations extends FileOperations {
iter.seek(range, columnFamilies, inclusive);
return iter;
}
-
+
@Override
public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
@@ -119,10 +119,10 @@ public class RFileOperations extends FileOperations {
if (tblock > 0)
block = tblock;
int bufferSize = conf.getInt("io.file.buffer.size", 4096);
-
+
long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
-
+
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
return writer;