You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2012/11/20 18:00:22 UTC
svn commit: r1411745 - in /accumulo/trunk: ./
core/src/main/java/org/apache/accumulo/core/file/rfile/
core/src/test/java/org/apache/accumulo/core/file/rfile/
Author: ctubbsii
Date: Tue Nov 20 17:00:21 2012
New Revision: 1411745
URL: http://svn.apache.org/viewvc?rev=1411745&view=rev
Log:
ACCUMULO-790 Added prefix compression for relative key files, per key component
Modified:
accumulo/trunk/.gitignore
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
Modified: accumulo/trunk/.gitignore
URL: http://svn.apache.org/viewvc/accumulo/trunk/.gitignore?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/.gitignore (original)
+++ accumulo/trunk/.gitignore Tue Nov 20 17:00:21 2012
@@ -83,6 +83,7 @@
/core/.classpath
# /examples/
+/examples/lib
/examples/target
/examples/.settings
/examples/.classpath
@@ -125,6 +126,12 @@
/trace/.project
/trace/.settings
+# /test/
+/test/target
+/test/.classpath
+/test/.project
+/test/.settings
+
# /test/system/auto/
/test/system/auto/*.pyc
/test/system/auto/fake_disk_failure.so
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java Tue Nov 20 17:00:21 2012
@@ -58,13 +58,15 @@ public class BlockIndex {
public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
private Key key;
+ private Key prevKey;
private int entriesLeft;
private int pos;
- public BlockIndexEntry(int pos, int entriesLeft, Key key) {
+ public BlockIndexEntry(int pos, int entriesLeft, Key key, Key prevKey) {
this.pos = pos;
this.entriesLeft = entriesLeft;
this.key = key;
+ this.prevKey = prevKey;
}
/**
@@ -87,14 +89,18 @@ public class BlockIndex {
return key.compareTo(o.key);
}
+ @Override
public String toString() {
return key + " " + entriesLeft + " " + pos;
}
+
+ public Key getPrevKey() {
+ return prevKey;
+ }
}
public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
-
// get a local ref to the index, another thread could change it
BlockIndexEntry[] blockIndex = this.blockIndex;
@@ -150,12 +156,13 @@ public class BlockIndex {
while (count < (indexEntry.getNumEntries() - interval + 1)) {
+ Key myPrevKey = rk.getKey();
int pos = cacheBlock.getPosition();
rk.readFields(cacheBlock);
val.readFields(cacheBlock);
if (count > 0 && count % interval == 0) {
- index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey()));
+ index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey(), myPrevKey));
}
count++;
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java Tue Nov 20 17:00:21 2012
@@ -252,7 +252,7 @@ public class MultiLevelIndex {
public void readFields(DataInput in, int version) throws IOException {
- if (version == RFile.RINDEX_VER_6) {
+ if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
level = in.readInt();
offset = in.readInt();
hasNext = in.readBoolean();
@@ -723,7 +723,7 @@ public class MultiLevelIndex {
size = 0;
- if (version == RFile.RINDEX_VER_6) {
+ if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
size = in.readInt();
}
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Tue Nov 20 17:00:21 2012
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.file.rfi
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -78,7 +79,9 @@ public class RFile {
private RFile() {}
private static final int RINDEX_MAGIC = 0x20637474;
+ static final int RINDEX_VER_7 = 7;
static final int RINDEX_VER_6 = 6;
+ // static final int RINDEX_VER_5 = 5; // unreleased
static final int RINDEX_VER_4 = 4;
static final int RINDEX_VER_3 = 3;
@@ -327,6 +330,7 @@ public class RFile {
previousColumnFamilies = new HashSet<ByteSequence>();
}
+ @Override
public synchronized void close() throws IOException {
if (closed) {
@@ -338,7 +342,7 @@ public class RFile {
ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
mba.writeInt(RINDEX_MAGIC);
- mba.writeInt(RINDEX_VER_6);
+ mba.writeInt(RINDEX_VER_7);
if (currentLocalityGroup != null)
localityGroups.add(currentLocalityGroup);
@@ -369,6 +373,7 @@ public class RFile {
}
}
+ @Override
public void append(Key key, Value value) throws IOException {
if (dataClosed) {
@@ -685,14 +690,12 @@ public class RFile {
// and speed up others.
MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
- RelativeKey tmpRk = new RelativeKey();
- Key pKey = new Key(getTopKey());
- int fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, pKey, getTopKey());
- if (fastSkipped > 0) {
- entriesLeft -= fastSkipped;
+ SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, getTopKey());
+ if (skippr.skipped > 0) {
+ entriesLeft -= skippr.skipped;
val = new Value(valbs.toArray());
- prevKey = pKey;
- rk = tmpRk;
+ prevKey = skippr.prevKey;
+ rk = skippr.rk;
}
reseek = false;
@@ -705,8 +708,6 @@ public class RFile {
}
}
- int fastSkipped = -1;
-
if (reseek) {
iiter = index.lookup(startKey);
@@ -735,7 +736,6 @@ public class RFile {
if (!checkRange)
hasTop = true;
- RelativeKey tmpRk = new RelativeKey();
MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
Key currKey = null;
@@ -747,7 +747,8 @@ public class RFile {
if (bie != null) {
// we are seeked to the current position of the key in the index
// need to prime the read process and read this key from the block
- tmpRk.setPrevKey(bie.getKey());
+ RelativeKey tmpRk = new RelativeKey();
+ tmpRk.setPrevKey(bie.getPrevKey());
tmpRk.readFields(currBlock);
val = new Value();
@@ -756,18 +757,19 @@ public class RFile {
// just consumed one key from the input stream, so subtract one from entries left
entriesLeft = bie.getEntriesLeft() - 1;
- prevKey = new Key(bie.getKey());
+ prevKey = new Key(bie.getPrevKey());
currKey = bie.getKey();
}
}
}
- fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
- entriesLeft -= fastSkipped;
+ SkippR skippr = RelativeKey.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
+ prevKey = skippr.prevKey;
+ entriesLeft -= skippr.skipped;
val = new Value(valbs.toArray());
// set rk when everything above is successful, if exception
// occurs rk will not be set
- rk = tmpRk;
+ rk = skippr.rk;
}
}
@@ -842,7 +844,7 @@ public class RFile {
if (magic != RINDEX_MAGIC)
throw new IOException("Did not see expected magic number, saw " + magic);
- if (ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
+ if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
throw new IOException("Did not see expected version, saw " + ver);
int size = mb.readInt();
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java Tue Nov 20 17:00:21 2012
@@ -16,94 +16,115 @@
*/
package org.apache.accumulo.core.file.rfile;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.zip.GZIPOutputStream;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-public class RelativeKey implements WritableComparable<RelativeKey> {
+public class RelativeKey implements Writable {
- private Key key;
-
- private byte fieldsSame;
+ private static final byte BIT = 0x01;
+ private Key key;
private Key prevKey;
- private static final byte ROW_SAME = 0x01;
- private static final byte CF_SAME = 0x02;
- private static final byte CQ_SAME = 0x04;
- private static final byte CV_SAME = 0x08;
- private static final byte TS_SAME = 0x10;
- private static final byte DELETED = 0x20;
-
- private static HashMap<Text,Integer> colFams = new HashMap<Text,Integer>();
-
- private static long bytesWritten = 0;
+ private byte fieldsSame;
+ private byte fieldsPrefixed;
- public static void printStats() throws Exception {
- System.out.println("colFams.size() : " + colFams.size());
- Set<Entry<Text,Integer>> es = colFams.entrySet();
-
- int sum = 0;
-
- for (Entry<Text,Integer> entry : es) {
- sum += entry.getKey().getLength();
- }
-
- System.out.println("Total Column name bytes : " + sum);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(baos));
- for (Entry<Text,Integer> entry : es) {
- entry.getKey().write(dos);
- dos.writeInt(entry.getValue());
- }
-
- dos.close();
-
- System.out.println("Compressed column map size : " + baos.toByteArray().length);
- System.out.printf("Bytes written : %,d%n", bytesWritten);
-
- }
+ // Exact match compression options (first byte) and flag for further
+ private static final byte ROW_SAME = BIT << 0;
+ private static final byte CF_SAME = BIT << 1;
+ private static final byte CQ_SAME = BIT << 2;
+ private static final byte CV_SAME = BIT << 3;
+ private static final byte TS_SAME = BIT << 4;
+ private static final byte DELETED = BIT << 5;
+ // private static final byte UNUSED_1_6 = BIT << 6;
+ private static final byte PREFIX_COMPRESSION_ENABLED = (byte) (BIT << 7);
+
+ // Prefix compression (second byte)
+ private static final byte ROW_COMMON_PREFIX = BIT << 0;
+ private static final byte CF_COMMON_PREFIX = BIT << 1;
+ private static final byte CQ_COMMON_PREFIX = BIT << 2;
+ private static final byte CV_COMMON_PREFIX = BIT << 3;
+ private static final byte TS_DIFF = BIT << 4;
+
+ // private static final byte UNUSED_2_5 = BIT << 5;
+ // private static final byte UNUSED_2_6 = BIT << 6;
+ // private static final byte UNUSED_2_7 = (byte) (BIT << 7);
+
+ // Values for prefix compression
+ int rowCommonPrefixLen;
+ int cfCommonPrefixLen;
+ int cqCommonPrefixLen;
+ int cvCommonPrefixLen;
+ long tsDiff;
+ /**
+ * This constructor is used when one needs to read from an input stream
+ */
public RelativeKey() {
}
+ /**
+ * This constructor is used when constructing a key for writing to an output stream
+ */
public RelativeKey(Key prevKey, Key key) {
this.key = key;
fieldsSame = 0;
+ fieldsPrefixed = 0;
+
+ ByteSequence prevKeyScratch;
+ ByteSequence keyScratch;
if (prevKey != null) {
- if (prevKey.getRowData().equals(key.getRowData()))
+
+ prevKeyScratch = prevKey.getRowData();
+ keyScratch = key.getRowData();
+ rowCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+ if (rowCommonPrefixLen == -1)
fieldsSame |= ROW_SAME;
+ else if (rowCommonPrefixLen > 1)
+ fieldsPrefixed |= ROW_COMMON_PREFIX;
- if (prevKey.getColumnFamilyData().equals(key.getColumnFamilyData()))
+ prevKeyScratch = prevKey.getColumnFamilyData();
+ keyScratch = key.getColumnFamilyData();
+ cfCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+ if (cfCommonPrefixLen == -1)
fieldsSame |= CF_SAME;
+ else if (cfCommonPrefixLen > 1)
+ fieldsPrefixed |= CF_COMMON_PREFIX;
- if (prevKey.getColumnQualifierData().equals(key.getColumnQualifierData()))
+ prevKeyScratch = prevKey.getColumnQualifierData();
+ keyScratch = key.getColumnQualifierData();
+ cqCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+ if (cqCommonPrefixLen == -1)
fieldsSame |= CQ_SAME;
+ else if (cqCommonPrefixLen > 1)
+ fieldsPrefixed |= CQ_COMMON_PREFIX;
- if (prevKey.getColumnVisibilityData().equals(key.getColumnVisibilityData()))
+ prevKeyScratch = prevKey.getColumnVisibilityData();
+ keyScratch = key.getColumnVisibilityData();
+ cvCommonPrefixLen = getCommonPrefix(prevKeyScratch, keyScratch);
+ if (cvCommonPrefixLen == -1)
fieldsSame |= CV_SAME;
+ else if (cvCommonPrefixLen > 1)
+ fieldsPrefixed |= CV_COMMON_PREFIX;
- if (prevKey.getTimestamp() == key.getTimestamp())
+ tsDiff = key.getTimestamp() - prevKey.getTimestamp();
+ if (tsDiff == 0)
fieldsSame |= TS_SAME;
+ else
+ fieldsPrefixed |= TS_DIFF;
+ fieldsSame |= fieldsPrefixed == 0 ? 0 : PREFIX_COMPRESSION_ENABLED;
}
// stored deleted information in bit vector instead of its own byte
@@ -111,6 +132,31 @@ public class RelativeKey implements Writ
fieldsSame |= DELETED;
}
+ /**
+ *
+ * @return -1 (exact match) or the number of bytes in common
+ */
+ static int getCommonPrefix(ByteSequence prev, ByteSequence cur) {
+ if (prev == cur)
+ return -1; // infinite... exact match
+
+ int prevLen = prev.length();
+ int curLen = cur.length();
+ int maxChecks = Math.min(prevLen, curLen);
+ int common = 0;
+ while (common < maxChecks) {
+ int a = prev.byteAt(common) & 0xff;
+ int b = cur.byteAt(common) & 0xff;
+ if (a != b)
+ return common;
+ common++;
+ }
+ // no differences found
+ // either exact or matches the part checked, so if they are the same length, they are an exact match,
+ // and if not, then they have a common prefix over all the checks we've done
+ return prevLen == curLen ? -1 : maxChecks;
+ }
+
public void setPrevKey(Key pk) {
this.prevKey = pk;
}
@@ -118,42 +164,56 @@ public class RelativeKey implements Writ
@Override
public void readFields(DataInput in) throws IOException {
fieldsSame = in.readByte();
+ if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) {
+ fieldsPrefixed = in.readByte();
+ } else {
+ fieldsPrefixed = 0;
+ }
byte[] row, cf, cq, cv;
long ts;
- if ((fieldsSame & ROW_SAME) == 0) {
- row = read(in);
- } else {
+ if ((fieldsSame & ROW_SAME) == ROW_SAME) {
row = prevKey.getRowData().toArray();
+ } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) {
+ row = readPrefix(in, prevKey.getRowData());
+ } else {
+ row = read(in);
}
- if ((fieldsSame & CF_SAME) == 0) {
- cf = read(in);
- } else {
+ if ((fieldsSame & CF_SAME) == CF_SAME) {
cf = prevKey.getColumnFamilyData().toArray();
+ } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) {
+ cf = readPrefix(in, prevKey.getColumnFamilyData());
+ } else {
+ cf = read(in);
}
- if ((fieldsSame & CQ_SAME) == 0) {
- cq = read(in);
- } else {
+ if ((fieldsSame & CQ_SAME) == CQ_SAME) {
cq = prevKey.getColumnQualifierData().toArray();
+ } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) {
+ cq = readPrefix(in, prevKey.getColumnQualifierData());
+ } else {
+ cq = read(in);
}
- if ((fieldsSame & CV_SAME) == 0) {
- cv = read(in);
- } else {
+ if ((fieldsSame & CV_SAME) == CV_SAME) {
cv = prevKey.getColumnVisibilityData().toArray();
+ } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) {
+ cv = readPrefix(in, prevKey.getColumnVisibilityData());
+ } else {
+ cv = read(in);
}
- if ((fieldsSame & TS_SAME) == 0) {
- ts = WritableUtils.readVLong(in);
- } else {
+ if ((fieldsSame & TS_SAME) == TS_SAME) {
ts = prevKey.getTimestamp();
+ } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) {
+ ts = WritableUtils.readVLong(in) + prevKey.getTimestamp();
+ } else {
+ ts = WritableUtils.readVLong(in);
}
- this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) != 0, false);
-
+ this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) == DELETED, false);
this.prevKey = this.key;
}
@@ -182,10 +242,22 @@ public class RelativeKey implements Writ
}
}
- int fastSkip(DataInput in, Key seekKey, MByteSequence value, Key pkey, Key currKey) throws IOException {
+ public static class SkippR {
+ RelativeKey rk;
+ int skipped;
+ Key prevKey;
+
+ SkippR(RelativeKey rk, int skipped, Key prevKey) {
+ this.rk = rk;
+ this.skipped = skipped;
+ this.prevKey = prevKey;
+ }
+ }
+
+ public static SkippR fastSkip(DataInput in, Key seekKey, MByteSequence value, Key prevKey, Key currKey) throws IOException {
// this method assumes that fast skip is being called on a compressed block where the last key
- // in the compressed block is >= seekKey... therefore this method should go passed the end of the
- // compressed block... if it does, there is probably an error in the callers logic
+ // in the compressed block is >= seekKey... therefore this method shouldn't go past the end of the
+ // compressed block... if it does, there is probably an error in the caller's logic
// this method mostly avoids object allocation and only does compares when the row changes
@@ -204,11 +276,11 @@ public class RelativeKey implements Writ
if (currKey != null) {
- prow = new MByteSequence(pkey.getRowData());
- pcf = new MByteSequence(pkey.getColumnFamilyData());
- pcq = new MByteSequence(pkey.getColumnQualifierData());
- pcv = new MByteSequence(pkey.getColumnVisibilityData());
- pts = pkey.getTimestamp();
+ prow = new MByteSequence(currKey.getRowData());
+ pcf = new MByteSequence(currKey.getColumnFamilyData());
+ pcq = new MByteSequence(currKey.getColumnQualifierData());
+ pcv = new MByteSequence(currKey.getColumnVisibilityData());
+ pts = currKey.getTimestamp();
row = new MByteSequence(currKey.getRowData());
cf = new MByteSequence(currKey.getColumnFamilyData());
@@ -221,15 +293,24 @@ public class RelativeKey implements Writ
cqCmp = cq.compareTo(stopCQ);
if (rowCmp >= 0) {
- if (rowCmp > 0)
- return 0;
+ if (rowCmp > 0) {
+ RelativeKey rk = new RelativeKey();
+ rk.key = rk.prevKey = new Key(currKey);
+ return new SkippR(rk, 0, prevKey);
+ }
if (cfCmp >= 0) {
- if (cfCmp > 0)
- return 0;
+ if (cfCmp > 0) {
+ RelativeKey rk = new RelativeKey();
+ rk.key = rk.prevKey = new Key(currKey);
+ return new SkippR(rk, 0, prevKey);
+ }
- if (cqCmp >= 0)
- return 0;
+ if (cqCmp >= 0) {
+ RelativeKey rk = new RelativeKey();
+ rk.key = rk.prevKey = new Key(currKey);
+ return new SkippR(rk, 0, prevKey);
+ }
}
}
@@ -246,22 +327,31 @@ public class RelativeKey implements Writ
}
byte fieldsSame = -1;
+ byte fieldsPrefixed = 0;
int count = 0;
+ Key newPrevKey = null;
while (true) {
- pdel = (fieldsSame & DELETED) != 0;
+ pdel = (fieldsSame & DELETED) == DELETED;
fieldsSame = in.readByte();
+ if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED)
+ fieldsPrefixed = in.readByte();
+ else
+ fieldsPrefixed = 0;
boolean changed = false;
- if ((fieldsSame & ROW_SAME) == 0) {
+ if ((fieldsSame & ROW_SAME) != ROW_SAME) {
MByteSequence tmp = prow;
prow = row;
row = tmp;
+ if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX)
+ readPrefix(in, row, prow);
+ else
read(in, row);
// read a new row, so need to compare...
@@ -269,41 +359,54 @@ public class RelativeKey implements Writ
changed = true;
}// else the row is the same as the last, so no need to compare
- if ((fieldsSame & CF_SAME) == 0) {
+ if ((fieldsSame & CF_SAME) != CF_SAME) {
MByteSequence tmp = pcf;
pcf = cf;
cf = tmp;
+ if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX)
+ readPrefix(in, cf, pcf);
+ else
read(in, cf);
cfCmp = cf.compareTo(stopCF);
changed = true;
}
- if ((fieldsSame & CQ_SAME) == 0) {
+ if ((fieldsSame & CQ_SAME) != CQ_SAME) {
MByteSequence tmp = pcq;
pcq = cq;
cq = tmp;
+ if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX)
+ readPrefix(in, cq, pcq);
+ else
read(in, cq);
cqCmp = cq.compareTo(stopCQ);
changed = true;
}
- if ((fieldsSame & CV_SAME) == 0) {
+ if ((fieldsSame & CV_SAME) != CV_SAME) {
MByteSequence tmp = pcv;
pcv = cv;
cv = tmp;
+ if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX)
+ readPrefix(in, cv, pcv);
+ else
read(in, cv);
}
- if ((fieldsSame & TS_SAME) == 0) {
+ if ((fieldsSame & TS_SAME) != TS_SAME) {
pts = ts;
+
+ if ((fieldsPrefixed & TS_DIFF) == TS_DIFF)
+ ts = WritableUtils.readVLong(in) + pts;
+ else
ts = WritableUtils.readVLong(in);
}
@@ -332,33 +435,39 @@ public class RelativeKey implements Writ
// when the current keys field is same as the last, then
// set the prev keys field the same as the current key
- trow = (fieldsSame & ROW_SAME) == 0 ? prow : row;
- tcf = (fieldsSame & CF_SAME) == 0 ? pcf : cf;
- tcq = (fieldsSame & CQ_SAME) == 0 ? pcq : cq;
- tcv = (fieldsSame & CV_SAME) == 0 ? pcv : cv;
- tts = (fieldsSame & TS_SAME) == 0 ? pts : ts;
+ trow = (fieldsSame & ROW_SAME) == ROW_SAME ? row : prow;
+ tcf = (fieldsSame & CF_SAME) == CF_SAME ? cf : pcf;
+ tcq = (fieldsSame & CQ_SAME) == CQ_SAME ? cq : pcq;
+ tcv = (fieldsSame & CV_SAME) == CV_SAME ? cv : pcv;
+ tts = (fieldsSame & TS_SAME) == TS_SAME ? ts : pts;
- Key tmp = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(),
+ newPrevKey = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(),
tcq.offset(), tcq.length(), tcv.getBackingArray(), tcv.offset(), tcv.length(), tts);
- tmp.setDeleted(pdel);
- pkey.set(tmp);
+ newPrevKey.setDeleted(pdel);
+ } else if (count == 1) {
+ if (currKey != null)
+ newPrevKey = currKey;
+ else
+ newPrevKey = prevKey;
+ } else {
+ throw new IllegalStateException();
}
- this.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(),
+ RelativeKey result = new RelativeKey();
+ result.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(),
cq.length(), cv.getBackingArray(), cv.offset(), cv.length(), ts);
- this.key.setDeleted((fieldsSame & DELETED) != 0);
+ result.key.setDeleted((fieldsSame & DELETED) != 0);
+ result.prevKey = result.key;
- this.prevKey = this.key;
-
- return count;
+ return new SkippR(result, count, newPrevKey);
}
- private void read(DataInput in, MByteSequence mbseq) throws IOException {
+ private static void read(DataInput in, MByteSequence mbseq) throws IOException {
int len = WritableUtils.readVInt(in);
read(in, mbseq, len);
}
- private void readValue(DataInput in, MByteSequence mbseq) throws IOException {
+ private static void readValue(DataInput in, MByteSequence mbseq) throws IOException {
int len = in.readInt();
read(in, mbseq, len);
}
@@ -391,16 +500,49 @@ public class RelativeKey implements Writ
return ret;
}
- private void read(DataInput in, MByteSequence mbseq, int len) throws IOException {
- if (mbseq.getBackingArray().length < len) {
- mbseq.setArray(new byte[nextArraySize(len)]);
+ private static void read(DataInput in, MByteSequence mbseqDestination, int len) throws IOException {
+ if (mbseqDestination.getBackingArray().length < len) {
+ mbseqDestination.setArray(new byte[nextArraySize(len)]);
+ }
+
+ in.readFully(mbseqDestination.getBackingArray(), 0, len);
+ mbseqDestination.setLength(len);
+ }
+
+ private static byte[] readPrefix(DataInput in, ByteSequence prefixSource) throws IOException {
+ int prefixLen = WritableUtils.readVInt(in);
+ int remainingLen = WritableUtils.readVInt(in);
+ byte[] data = new byte[prefixLen + remainingLen];
+ if (prefixSource.isBackedByArray()) {
+ System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), data, 0, prefixLen);
+ } else {
+ byte[] prefixArray = prefixSource.toArray();
+ System.arraycopy(prefixArray, 0, data, 0, prefixLen);
+ }
+ // read remaining
+ in.readFully(data, prefixLen, remainingLen);
+ return data;
}
- in.readFully(mbseq.getBackingArray(), 0, len);
- mbseq.setLength(len);
+ private static void readPrefix(DataInput in, MByteSequence dest, ByteSequence prefixSource) throws IOException {
+ int prefixLen = WritableUtils.readVInt(in);
+ int remainingLen = WritableUtils.readVInt(in);
+ int len = prefixLen + remainingLen;
+ if (dest.getBackingArray().length < len) {
+ dest.setArray(new byte[nextArraySize(len)]);
+ }
+ if (prefixSource.isBackedByArray()) {
+ System.arraycopy(prefixSource.getBackingArray(), prefixSource.offset(), dest.getBackingArray(), 0, prefixLen);
+ } else {
+ byte[] prefixArray = prefixSource.toArray();
+ System.arraycopy(prefixArray, 0, dest.getBackingArray(), 0, prefixLen);
+ }
+ // read remaining
+ in.readFully(dest.getBackingArray(), prefixLen, remainingLen);
+ dest.setLength(len);
}
- private byte[] read(DataInput in) throws IOException {
+ private static byte[] read(DataInput in) throws IOException {
int len = WritableUtils.readVInt(in);
byte[] data = new byte[len];
in.readFully(data);
@@ -411,52 +553,75 @@ public class RelativeKey implements Writ
return key;
}
- private void write(DataOutput out, ByteSequence bs) throws IOException {
+ private static void write(DataOutput out, ByteSequence bs) throws IOException {
WritableUtils.writeVInt(out, bs.length());
out.write(bs.getBackingArray(), bs.offset(), bs.length());
}
+ private static void writePrefix(DataOutput out, ByteSequence bs, int commonPrefixLength) throws IOException {
+ WritableUtils.writeVInt(out, commonPrefixLength);
+ WritableUtils.writeVInt(out, bs.length() - commonPrefixLength);
+ out.write(bs.getBackingArray(), bs.offset() + commonPrefixLength, bs.length() - commonPrefixLength);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(fieldsSame);
- // System.out.printf("wrote fs %x%n", fieldsSame);
-
- bytesWritten += 1;
+ if ((fieldsSame & PREFIX_COMPRESSION_ENABLED) == PREFIX_COMPRESSION_ENABLED) {
+ out.write(fieldsPrefixed);
+ }
- if ((fieldsSame & ROW_SAME) == 0) {
+ if ((fieldsSame & ROW_SAME) == ROW_SAME) {
+ // same, write nothing
+ } else if ((fieldsPrefixed & ROW_COMMON_PREFIX) == ROW_COMMON_PREFIX) {
+ // similar, write what's common
+ writePrefix(out, key.getRowData(), rowCommonPrefixLen);
+ } else {
+ // write it all
write(out, key.getRowData());
}
- if ((fieldsSame & CF_SAME) == 0) {
+ if ((fieldsSame & CF_SAME) == CF_SAME) {
+ // same, write nothing
+ } else if ((fieldsPrefixed & CF_COMMON_PREFIX) == CF_COMMON_PREFIX) {
+ // similar, write what's common
+ writePrefix(out, key.getColumnFamilyData(), cfCommonPrefixLen);
+ } else {
+ // write it all
write(out, key.getColumnFamilyData());
}
- if ((fieldsSame & CQ_SAME) == 0) {
-
+ if ((fieldsSame & CQ_SAME) == CQ_SAME) {
+ // same, write nothing
+ } else if ((fieldsPrefixed & CQ_COMMON_PREFIX) == CQ_COMMON_PREFIX) {
+ // similar, write what's common
+ writePrefix(out, key.getColumnQualifierData(), cqCommonPrefixLen);
+ } else {
+ // write it all
write(out, key.getColumnQualifierData());
-
- /*
- * Integer id = colFams.get(key.getColumnQualifier()); if(id == null){ id = nextId++; colFams.put(key.getColumnQualifier(), id); }
- *
- * WritableUtils.writeVInt(out, id); bytesWritten += 1;
- */
-
}
- if ((fieldsSame & CV_SAME) == 0) {
+ if ((fieldsSame & CV_SAME) == CV_SAME) {
+ // same, write nothing
+ } else if ((fieldsPrefixed & CV_COMMON_PREFIX) == CV_COMMON_PREFIX) {
+ // similar, write what's common
+ writePrefix(out, key.getColumnVisibilityData(), cvCommonPrefixLen);
+ } else {
+ // write it all
write(out, key.getColumnVisibilityData());
}
- if ((fieldsSame & TS_SAME) == 0) {
+ if ((fieldsSame & TS_SAME) == TS_SAME) {
+ // same, write nothing
+ } else if ((fieldsPrefixed & TS_DIFF) == TS_DIFF) {
+ // similar, write what's common
+ WritableUtils.writeVLong(out, tsDiff);
+ } else {
+ // write it all
WritableUtils.writeVLong(out, key.getTimestamp());
}
}
- @Override
- public int compareTo(RelativeKey o) {
- throw new UnsupportedOperationException();
- }
-
}
Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java Tue Nov 20 17:00:21 2012
@@ -75,7 +75,7 @@ public class MultiLevelIndexTest extends
FSDataInputStream in = new FSDataInputStream(bais);
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance());
- Reader reader = new Reader(_cbr, RFile.RINDEX_VER_6);
+ Reader reader = new Reader(_cbr, RFile.RINDEX_VER_7);
BlockRead rootIn = _cbr.getMetaBlock("root");
reader.readFields(rootIn);
rootIn.close();
Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java Tue Nov 20 17:00:21 2012
@@ -16,6 +16,10 @@
*/
package org.apache.accumulo.core.file.rfile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -29,8 +33,6 @@ import java.util.Iterator;
import java.util.Random;
import java.util.Set;
-import junit.framework.TestCase;
-
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -53,8 +55,9 @@ import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.junit.Test;
-public class RFileTest extends TestCase {
+public class RFileTest {
private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
@@ -200,18 +203,19 @@ public class RFileTest extends TestCase
}
}
- private Key nk(String row, String cf, String cq, String cv, long ts) {
+ static Key nk(String row, String cf, String cq, String cv, long ts) {
return new Key(row.getBytes(), cf.getBytes(), cq.getBytes(), cv.getBytes(), ts);
}
- private Value nv(String val) {
+ static Value nv(String val) {
return new Value(val.getBytes());
}
- private String nf(String prefix, int i) {
+ static String nf(String prefix, int i) {
return String.format(prefix + "%06d", i);
}
+ @Test
public void test1() throws IOException {
// test an emprt file
@@ -230,6 +234,7 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test
public void test2() throws IOException {
// test an rfile with one entry
@@ -266,6 +271,7 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test
public void test3() throws IOException {
// test an rfile with multiple rows having multiple columns
@@ -423,6 +429,7 @@ public class RFileTest extends TestCase
assertFalse(evi.hasNext());
}
+ @Test
public void test4() throws IOException {
TestRFile trf = new TestRFile();
@@ -465,6 +472,7 @@ public class RFileTest extends TestCase
}
}
+ @Test
public void test5() throws IOException {
TestRFile trf = new TestRFile();
@@ -493,6 +501,7 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test
public void test6() throws IOException {
TestRFile trf = new TestRFile();
@@ -525,6 +534,7 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test
public void test7() throws IOException {
// these test excercise setting the end key of a range
@@ -576,6 +586,7 @@ public class RFileTest extends TestCase
trf.reader.close();
}
+ @Test
public void test8() throws IOException {
TestRFile trf = new TestRFile();
@@ -692,6 +703,7 @@ public class RFileTest extends TestCase
return cfs;
}
+ @Test
public void test9() throws IOException {
TestRFile trf = new TestRFile();
@@ -833,6 +845,7 @@ public class RFileTest extends TestCase
}
+ @Test
public void test10() throws IOException {
// test empty locality groups
@@ -961,6 +974,7 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test
public void test11() throws IOException {
// test locality groups with more than two entries
@@ -1065,6 +1079,7 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test
public void test12() throws IOException {
// test inserting column fams not in locality groups
@@ -1096,6 +1111,7 @@ public class RFileTest extends TestCase
}
+ @Test
public void test13() throws IOException {
// test inserting column fam in default loc group that was in
// previous locality group
@@ -1137,6 +1153,7 @@ public class RFileTest extends TestCase
}
+ @Test
public void test14() throws IOException {
// test starting locality group after default locality group was started
@@ -1162,6 +1179,7 @@ public class RFileTest extends TestCase
trf.writer.close();
}
+ @Test
public void test16() throws IOException {
TestRFile trf = new TestRFile();
@@ -1180,6 +1198,7 @@ public class RFileTest extends TestCase
trf.closeWriter();
}
+ @Test
public void test17() throws IOException {
// add alot of the same keys to rfile that cover multiple blocks...
// this should cause the keys in the index to be exactly the same...
@@ -1318,6 +1337,7 @@ public class RFileTest extends TestCase
assertEquals(nonExcluded, colFamsSeen);
}
+ @Test
public void test18() throws IOException {
// test writing more column families to default LG than it will track
@@ -1369,6 +1389,7 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test
public void test19() throws IOException {
// test RFile metastore
TestRFile trf = new TestRFile();
@@ -1421,9 +1442,16 @@ public class RFileTest extends TestCase
trf.closeReader();
}
+ @Test(expected = NullPointerException.class)
+ public void testMissingUnreleasedVersions() throws Exception {
+ runVersionTest(5);
+ }
+
+ @Test
public void testOldVersions() throws Exception {
runVersionTest(3);
runVersionTest(4);
+ runVersionTest(6);
}
private void runVersionTest(int version) throws IOException {
Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java?rev=1411745&r1=1411744&r2=1411745&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RelativeKeyTest.java Tue Nov 20 17:00:21 2012
@@ -16,13 +16,29 @@
*/
package org.apache.accumulo.core.file.rfile;
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
-/**
- *
- */
-public class RelativeKeyTest extends TestCase {
- public void test1() {
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RelativeKeyTest {
+
+ @Test
+ public void testBasicRelativeKey() {
assertEquals(1, RelativeKey.nextArraySize(0));
assertEquals(1, RelativeKey.nextArraySize(1));
assertEquals(2, RelativeKey.nextArraySize(2));
@@ -44,4 +60,203 @@ public class RelativeKeyTest extends Tes
assertEquals(Integer.MAX_VALUE, RelativeKey.nextArraySize(Integer.MAX_VALUE));
}
+ @Test
+ public void testCommonPrefix() {
+ // exact matches
+ ArrayByteSequence exact = new ArrayByteSequence("abc");
+ assertEquals(-1, RelativeKey.getCommonPrefix(exact, exact));
+ assertEquals(-1, commonPrefixHelper("", ""));
+ assertEquals(-1, commonPrefixHelper("a", "a"));
+ assertEquals(-1, commonPrefixHelper("aa", "aa"));
+ assertEquals(-1, commonPrefixHelper("aaa", "aaa"));
+ assertEquals(-1, commonPrefixHelper("abab", "abab"));
+ assertEquals(-1, commonPrefixHelper(new String("aaa"), new ArrayByteSequence("aaa").toString()));
+ assertEquals(-1, commonPrefixHelper("abababababab".substring(3, 6), "ccababababcc".substring(3, 6)));
+
+ // no common prefix
+ assertEquals(0, commonPrefixHelper("", "a"));
+ assertEquals(0, commonPrefixHelper("a", ""));
+ assertEquals(0, commonPrefixHelper("a", "b"));
+ assertEquals(0, commonPrefixHelper("aaaa", "bbbb"));
+
+ // some common prefix
+ assertEquals(1, commonPrefixHelper("a", "ab"));
+ assertEquals(1, commonPrefixHelper("ab", "ac"));
+ assertEquals(1, commonPrefixHelper("ab", "ac"));
+ assertEquals(2, commonPrefixHelper("aa", "aaaa"));
+ assertEquals(4, commonPrefixHelper("aaaaa", "aaaab"));
+ }
+
+ private int commonPrefixHelper(String a, String b) {
+ return RelativeKey.getCommonPrefix(new ArrayByteSequence(a), new ArrayByteSequence(b));
+ }
+
+ @Test
+ public void testReadWritePrefix() throws IOException {
+ Key prevKey = new Key("row1", "columnfamily1", "columnqualifier1", "columnvisibility1", 1000);
+ Key newKey = new Key("row2", "columnfamily2", "columnqualifier2", "columnvisibility2", 3000);
+ RelativeKey expected = new RelativeKey(prevKey, newKey);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ expected.write(out);
+
+ RelativeKey actual = new RelativeKey();
+ actual.setPrevKey(prevKey);
+ actual.readFields(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+
+ assertEquals(expected.getKey(), actual.getKey());
+ }
+
+ private static ArrayList<Key> expectedKeys;
+ private static ArrayList<Value> expectedValues;
+ private static ArrayList<Integer> expectedPositions;
+ private static ByteArrayOutputStream baos;
+
+ @BeforeClass
+ public static void initSource() throws IOException {
+ int initialListSize = 10000;
+
+ baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+
+ expectedKeys = new ArrayList<Key>(initialListSize);
+ expectedValues = new ArrayList<Value>(initialListSize);
+ expectedPositions = new ArrayList<Integer>(initialListSize);
+
+ Key prev = null;
+ int val = 0;
+ for (int row = 0; row < 4; row++) {
+ String rowS = RFileTest.nf("r_", row);
+ for (int cf = 0; cf < 4; cf++) {
+ String cfS = RFileTest.nf("cf_", cf);
+ for (int cq = 0; cq < 4; cq++) {
+ String cqS = RFileTest.nf("cq_", cq);
+ for (int cv = 'A'; cv < 'A' + 4; cv++) {
+ String cvS = "" + (char) cv;
+ for (int ts = 4; ts > 0; ts--) {
+ Key k = RFileTest.nk(rowS, cfS, cqS, cvS, ts);
+ k.setDeleted(true);
+ Value v = RFileTest.nv("" + val);
+ expectedPositions.add(out.size());
+ new RelativeKey(prev, k).write(out);
+ prev = k;
+ v.write(out);
+ expectedKeys.add(k);
+ expectedValues.add(v);
+
+ k = RFileTest.nk(rowS, cfS, cqS, cvS, ts);
+ v = RFileTest.nv("" + val);
+ expectedPositions.add(out.size());
+ new RelativeKey(prev, k).write(out);
+ prev = k;
+ v.write(out);
+ expectedKeys.add(k);
+ expectedValues.add(v);
+
+ val++;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private DataInputStream in;
+
+ @Before
+ public void setupDataInputStream() {
+ in = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ in.mark(0);
+ }
+
+ @Test
+ public void testSeekBeforeEverything() throws IOException {
+ Key seekKey = new Key();
+ Key prevKey = new Key();
+ Key currKey = null;
+ MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+
+ RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+ assertEquals(1, skippr.skipped);
+ assertEquals(new Key(), skippr.prevKey);
+ assertEquals(expectedKeys.get(0), skippr.rk.getKey());
+ assertEquals(expectedValues.get(0).toString(), value.toString());
+
+ // ensure we can advance after fastskip
+ skippr.rk.readFields(in);
+ assertEquals(expectedKeys.get(1), skippr.rk.getKey());
+
+ in.reset();
+
+ seekKey = new Key("a", "b", "c", "d", 1);
+ seekKey.setDeleted(true);
+ skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+ assertEquals(1, skippr.skipped);
+ assertEquals(new Key(), skippr.prevKey);
+ assertEquals(expectedKeys.get(0), skippr.rk.getKey());
+ assertEquals(expectedValues.get(0).toString(), value.toString());
+
+ skippr.rk.readFields(in);
+ assertEquals(expectedKeys.get(1), skippr.rk.getKey());
+ }
+
+ @Test(expected = EOFException.class)
+ public void testSeekAfterEverything() throws IOException {
+ Key seekKey = new Key("s", "t", "u", "v", 1);
+ Key prevKey = new Key();
+ Key currKey = null;
+ MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+
+ RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+ }
+
+ @Test
+ public void testSeekMiddle() throws IOException {
+ int seekIndex = expectedKeys.size() / 2;
+ Key seekKey = expectedKeys.get(seekIndex);
+ Key prevKey = new Key();
+ Key currKey = null;
+ MByteSequence value = new MByteSequence(new byte[64], 0, 0);
+
+ RelativeKey.SkippR skippr = RelativeKey.fastSkip(in, seekKey, value, prevKey, currKey);
+
+ assertEquals(seekIndex + 1, skippr.skipped);
+ assertEquals(expectedKeys.get(seekIndex - 1), skippr.prevKey);
+ assertEquals(expectedKeys.get(seekIndex), skippr.rk.getKey());
+ assertEquals(expectedValues.get(seekIndex).toString(), value.toString());
+
+ skippr.rk.readFields(in);
+ assertEquals(expectedValues.get(seekIndex + 1).toString(), value.toString());
+
+ // try fast skipping to a key that does not exist
+ in.reset();
+ Key fKey = expectedKeys.get(seekIndex).followingKey(PartialKey.ROW_COLFAM_COLQUAL);
+ int i;
+ for (i = seekIndex; expectedKeys.get(i).compareTo(fKey) < 0; i++) {}
+
+ skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, prevKey, currKey);
+ assertEquals(i + 1, skippr.skipped);
+ assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
+ assertEquals(expectedKeys.get(i), skippr.rk.getKey());
+ assertEquals(expectedValues.get(i).toString(), value.toString());
+
+ // try fast skipping to our current location
+ skippr = RelativeKey.fastSkip(in, expectedKeys.get(i), value, expectedKeys.get(i - 1), expectedKeys.get(i));
+ assertEquals(0, skippr.skipped);
+ assertEquals(expectedKeys.get(i - 1), skippr.prevKey);
+ assertEquals(expectedKeys.get(i), skippr.rk.getKey());
+ assertEquals(expectedValues.get(i).toString(), value.toString());
+
+ // try fast skipping 1 column family ahead from our current location, testing fastskip from middle of block as opposed to stating at beginning of block
+ fKey = expectedKeys.get(i).followingKey(PartialKey.ROW_COLFAM);
+ int j;
+ for (j = i; expectedKeys.get(j).compareTo(fKey) < 0; j++) {}
+ skippr = RelativeKey.fastSkip(in, fKey, value, expectedKeys.get(i - 1), expectedKeys.get(i));
+ assertEquals(j - i, skippr.skipped);
+ assertEquals(expectedKeys.get(j - 1), skippr.prevKey);
+ assertEquals(expectedKeys.get(j), skippr.rk.getKey());
+ assertEquals(expectedValues.get(j).toString(), value.toString());
+
+ }
}