You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/22 22:45:01 UTC
[03/15] git commit: ACCUMULO-652 initial mods to RFile to keep track
of extra block statistics
ACCUMULO-652 initial mods to RFile to keep track of extra block statistics
git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/ACCUMULO-652@1354475 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3fcd07de
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3fcd07de
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3fcd07de
Branch: refs/heads/ACCUMULO-652
Commit: 3fcd07de50699ae829ddbe892579126837306ab4
Parents: fd77a56
Author: Adam Fuchs <af...@apache.org>
Authored: Wed Jun 27 12:48:16 2012 +0000
Committer: Adam Fuchs <af...@apache.org>
Committed: Wed Jun 27 12:48:16 2012 +0000
----------------------------------------------------------------------
.../core/file/rfile/MultiLevelIndex.java | 457 ++++++++++---------
.../apache/accumulo/core/file/rfile/RFile.java | 183 +++++++-
.../accumulo/core/iterators/Filterer.java | 24 +
.../accumulo/core/iterators/Predicate.java | 24 +
.../predicates/TimestampRangePredicate.java | 54 +++
.../core/iterators/system/HeapIterator.java | 14 +-
.../core/iterators/system/VisibilityFilter.java | 21 +-
.../core/security/ColumnVisibility.java | 342 +++++++++-----
.../core/security/VisibilityConstraint.java | 9 +-
.../core/file/rfile/MultiLevelIndexTest.java | 16 +-
.../accumulo/core/file/rfile/RFileTest.java | 45 +-
.../core/file/rfile/TimestampFilterTest.java | 98 ++++
.../iterators/user/IndexedDocIteratorTest.java | 14 +-
.../core/security/ColumnVisibilityTest.java | 60 ++-
.../core/security/VisibilityEvaluatorTest.java | 20 +-
.../examples/wikisearch/parser/EventFields.java | 6 +-
16 files changed, 983 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index b973cc3..e2b4b15 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -27,10 +27,11 @@ import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.RandomAccess;
+import java.util.Stack;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.file.blockfile.ABlockReader;
@@ -38,37 +39,58 @@ import org.apache.accumulo.core.file.blockfile.ABlockWriter;
import org.apache.accumulo.core.file.blockfile.BlockFileReader;
import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
import org.apache.accumulo.core.file.rfile.bcfile.Utils;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.io.WritableComparable;
public class MultiLevelIndex {
public static class IndexEntry implements WritableComparable<IndexEntry> {
private Key key;
+ private long minTimestamp;
+ private long maxTimestamp;
+ private ColumnVisibility minimumVisibility = null;
private int entries;
private long offset;
private long compressedSize;
private long rawSize;
- private boolean newFormat;
+ private int format;
- IndexEntry(Key k, int e, long offset, long compressedSize, long rawSize) {
+ IndexEntry(Key k, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int e, long offset, long compressedSize, long rawSize, int version) {
this.key = k;
+ this.minTimestamp = minTimestamp;
+ this.maxTimestamp = maxTimestamp;
+ this.minimumVisibility = minimumVisibility;
this.entries = e;
this.offset = offset;
this.compressedSize = compressedSize;
this.rawSize = rawSize;
- newFormat = true;
+ format = version;
}
- public IndexEntry(boolean newFormat) {
- this.newFormat = newFormat;
+ public IndexEntry(int format) {
+ this.format = format;
}
@Override
public void readFields(DataInput in) throws IOException {
key = new Key();
key.readFields(in);
+ if(format == RFile.RINDEX_VER_7)
+ {
+ minTimestamp = in.readLong();
+ maxTimestamp = in.readLong();
+ byte[] visibility = new byte[in.readInt()];
+ in.readFully(visibility);
+ minimumVisibility = new ColumnVisibility(visibility);
+ }
+ else
+ {
+ minTimestamp = Long.MIN_VALUE;
+ maxTimestamp = Long.MAX_VALUE;
+ }
entries = in.readInt();
- if (newFormat) {
+ if (format == RFile.RINDEX_VER_6 || format == RFile.RINDEX_VER_7) {
offset = Utils.readVLong(in);
compressedSize = Utils.readVLong(in);
rawSize = Utils.readVLong(in);
@@ -82,8 +104,16 @@ public class MultiLevelIndex {
@Override
public void write(DataOutput out) throws IOException {
key.write(out);
+ if(format == RFile.RINDEX_VER_7)
+ {
+ out.writeLong(minTimestamp);
+ out.writeLong(maxTimestamp);
+ byte[] visibility = minimumVisibility.getExpression();
+ out.writeInt(visibility.length);
+ out.write(visibility);
+ }
out.writeInt(entries);
- if (newFormat) {
+ if (format == RFile.RINDEX_VER_6 || format == RFile.RINDEX_VER_7) {
Utils.writeVLong(out, offset);
Utils.writeVLong(out, compressedSize);
Utils.writeVLong(out, rawSize);
@@ -121,12 +151,12 @@ public class MultiLevelIndex {
private int[] offsets;
private byte[] data;
- private boolean newFormat;
+ private int format;
- SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
+ SerializedIndex(int[] offsets, byte[] data, int format) {
this.offsets = offsets;
this.data = data;
- this.newFormat = newFormat;
+ this.format = format;
}
@Override
@@ -140,7 +170,7 @@ public class MultiLevelIndex {
ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
DataInputStream dis = new DataInputStream(bais);
- IndexEntry ie = new IndexEntry(newFormat);
+ IndexEntry ie = new IndexEntry(format);
try {
ie.readFields(dis);
} catch (IOException e) {
@@ -203,6 +233,10 @@ public class MultiLevelIndex {
private ByteArrayOutputStream indexBytes;
private DataOutputStream indexOut;
+ private long minTimestamp = Long.MAX_VALUE;
+ private long maxTimestamp = Long.MIN_VALUE;
+ private ColumnVisibility minimumVisibility = null;
+
private ArrayList<Integer> offsets;
private int level;
private int offset;
@@ -212,8 +246,6 @@ public class MultiLevelIndex {
private boolean hasNext;
public IndexBlock(int level, int totalAdded) {
- // System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")");
-
this.level = level;
this.offset = totalAdded;
@@ -224,9 +256,17 @@ public class MultiLevelIndex {
public IndexBlock() {}
- public void add(Key key, int value, long offset, long compressedSize, long rawSize) throws IOException {
+ public void add(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int value, long offset, long compressedSize, long rawSize, int version) throws IOException {
offsets.add(indexOut.size());
- new IndexEntry(key, value, offset, compressedSize, rawSize).write(indexOut);
+ if (this.minTimestamp > minTimestamp)
+ this.minTimestamp = minTimestamp;
+ if (this.maxTimestamp < maxTimestamp)
+ this.maxTimestamp = maxTimestamp;
+ if(this.minimumVisibility == null)
+ this.minimumVisibility = minimumVisibility;
+ else
+ this.minimumVisibility = this.minimumVisibility.or(minimumVisibility);
+ new IndexEntry(key, minTimestamp, maxTimestamp, minimumVisibility, value, offset, compressedSize, rawSize, version).write(indexOut);
}
int getSize() {
@@ -252,7 +292,7 @@ public class MultiLevelIndex {
public void readFields(DataInput in, int version) throws IOException {
- if (version == RFile.RINDEX_VER_6) {
+ if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
level = in.readInt();
offset = in.readInt();
hasNext = in.readBoolean();
@@ -267,7 +307,7 @@ public class MultiLevelIndex {
byte[] serializedIndex = new byte[indexSize];
in.readFully(serializedIndex);
- index = new SerializedIndex(offsets, serializedIndex, true);
+ index = new SerializedIndex(offsets, serializedIndex, version);
keyIndex = new KeyIndex(offsets, serializedIndex);
} else if (version == RFile.RINDEX_VER_3) {
level = 0;
@@ -281,7 +321,7 @@ public class MultiLevelIndex {
ArrayList<Integer> oal = new ArrayList<Integer>();
for (int i = 0; i < size; i++) {
- IndexEntry ie = new IndexEntry(false);
+ IndexEntry ie = new IndexEntry(version);
oal.add(dos.size());
ie.readFields(in);
ie.write(dos);
@@ -295,7 +335,7 @@ public class MultiLevelIndex {
}
byte[] serializedIndex = baos.toByteArray();
- index = new SerializedIndex(oia, serializedIndex, false);
+ index = new SerializedIndex(oia, serializedIndex, version);
keyIndex = new KeyIndex(oia, serializedIndex);
} else if (version == RFile.RINDEX_VER_4) {
level = 0;
@@ -312,7 +352,7 @@ public class MultiLevelIndex {
byte[] indexData = new byte[size];
in.readFully(indexData);
- index = new SerializedIndex(offsets, indexData, false);
+ index = new SerializedIndex(offsets, indexData, version);
keyIndex = new KeyIndex(offsets, indexData);
} else {
throw new RuntimeException("Unexpected version " + version);
@@ -356,12 +396,14 @@ public class MultiLevelIndex {
private DataOutputStream buffer;
private int buffered;
private ByteArrayOutputStream baos;
+ private final int version;
public BufferedWriter(Writer writer) {
this.writer = writer;
baos = new ByteArrayOutputStream(1 << 20);
buffer = new DataOutputStream(baos);
buffered = 0;
+ version = RFile.RINDEX_VER_7;
}
private void flush() throws IOException {
@@ -369,10 +411,10 @@ public class MultiLevelIndex {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
- IndexEntry ie = new IndexEntry(true);
+ IndexEntry ie = new IndexEntry(version);
for (int i = 0; i < buffered; i++) {
ie.readFields(dis);
- writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
+ writer.add(ie.getKey(), ie.minTimestamp, ie.maxTimestamp, ie.minimumVisibility, ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize(), ie.format);
}
buffered = 0;
@@ -381,18 +423,18 @@ public class MultiLevelIndex {
}
- public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+ public void add(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
if (buffer.size() > (10 * 1 << 20)) {
flush();
}
- new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer);
+ new IndexEntry(key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, version).write(buffer);
buffered++;
}
- public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+ public void addLast(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
flush();
- writer.addLast(key, data, offset, compressedSize, rawSize);
+ writer.addLast(key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, version);
}
public void close(DataOutput out) throws IOException {
@@ -417,30 +459,26 @@ public class MultiLevelIndex {
levels = new ArrayList<IndexBlock>();
}
- private void add(int level, Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+ private void add(int level, Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, boolean last, int version)
+ throws IOException {
if (level == levels.size()) {
levels.add(new IndexBlock(level, 0));
}
IndexBlock iblock = levels.get(level);
- iblock.add(key, data, offset, compressedSize, rawSize);
- }
-
- private void flush(int level, Key lastKey, boolean last) throws IOException {
+ iblock.add(key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, version);
if (last && level == levels.size() - 1)
return;
- IndexBlock iblock = levels.get(level);
if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last) {
ABlockWriter out = blockFileWriter.prepareDataBlock();
iblock.setHasNext(!last);
iblock.write(out);
out.close();
- add(level + 1, lastKey, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize());
- flush(level + 1, lastKey, last);
+ add(level + 1, key, iblock.minTimestamp, iblock.maxTimestamp, iblock.minimumVisibility, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize(), last, version);
if (last)
levels.set(level, null);
@@ -449,19 +487,17 @@ public class MultiLevelIndex {
}
}
- public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+ public void add(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
totalAdded++;
- add(0, key, data, offset, compressedSize, rawSize);
- flush(0, key, false);
+ add(0, key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, false, version);
}
- public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+ public void addLast(Key key, long minTimestamp, long maxTimestamp, ColumnVisibility minimumVisibility, int data, long offset, long compressedSize, long rawSize, int version) throws IOException {
if (addedLast)
throw new IllegalStateException("already added last");
totalAdded++;
- add(0, key, data, offset, compressedSize, rawSize);
- flush(0, key, true);
+ add(0, key, minTimestamp, maxTimestamp, minimumVisibility, data, offset, compressedSize, rawSize, true, version);
addedLast = true;
}
@@ -487,215 +523,196 @@ public class MultiLevelIndex {
private int version;
private int size;
- public class Node {
-
- private Node parent;
- private IndexBlock indexBlock;
- private int currentPos;
-
- Node(Node parent, IndexBlock iBlock) {
- this.parent = parent;
- this.indexBlock = iBlock;
- }
+ class StackEntry {
+ public final IndexBlock block;
+ public int offset;
- Node(IndexBlock rootInfo) {
- this.parent = null;
- this.indexBlock = rootInfo;
- }
-
- private Node lookup(Key key) throws IOException {
- int pos = Collections.binarySearch(indexBlock.getKeyIndex(), key, new Comparator<Key>() {
- @Override
- public int compare(Key o1, Key o2) {
- return o1.compareTo(o2);
- }
- });
-
- if (pos < 0)
- pos = (pos * -1) - 1;
-
- if (pos == indexBlock.getIndex().size()) {
- if (parent != null)
- throw new IllegalStateException();
- this.currentPos = pos;
- return this;
- }
-
- this.currentPos = pos;
-
- if (indexBlock.getLevel() == 0) {
- return this;
- }
-
- IndexEntry ie = indexBlock.getIndex().get(pos);
- Node child = new Node(this, getIndexBlock(ie));
- return child.lookup(key);
- }
-
- private Node getLast() throws IOException {
- currentPos = indexBlock.getIndex().size() - 1;
- if (indexBlock.getLevel() == 0)
- return this;
-
- IndexEntry ie = indexBlock.getIndex().get(currentPos);
- Node child = new Node(this, getIndexBlock(ie));
- return child.getLast();
- }
-
- private Node getFirst() throws IOException {
- currentPos = 0;
- if (indexBlock.getLevel() == 0)
- return this;
-
- IndexEntry ie = indexBlock.getIndex().get(currentPos);
- Node child = new Node(this, getIndexBlock(ie));
- return child.getFirst();
- }
-
- private Node getPrevious() throws IOException {
- if (currentPos == 0)
- return parent.getPrevious();
-
- currentPos--;
-
- IndexEntry ie = indexBlock.getIndex().get(currentPos);
- Node child = new Node(this, getIndexBlock(ie));
- return child.getLast();
-
- }
-
- private Node getNext() throws IOException {
- if (currentPos == indexBlock.getIndex().size() - 1)
- return parent.getNext();
-
- currentPos++;
-
- IndexEntry ie = indexBlock.getIndex().get(currentPos);
- Node child = new Node(this, getIndexBlock(ie));
- return child.getFirst();
-
- }
-
- Node getNextNode() throws IOException {
- return parent.getNext();
- }
-
- Node getPreviousNode() throws IOException {
- return parent.getPrevious();
+ public StackEntry(IndexBlock block, int offset) {
+ this.block = block;
+ this.offset = offset;
}
}
- public class IndexIterator implements ListIterator<IndexEntry> {
-
- private Node node;
- private ListIterator<IndexEntry> liter;
+ class IndexIterator implements Iterator<IndexEntry> {
+ private Stack<StackEntry> position = new Stack<StackEntry>();
+ private final TimestampRangePredicate timestampFilter;
- private Node getPrevNode() {
+ private IndexIterator(TimestampRangePredicate timestampFilter, Key lookupKey) {
+ this.timestampFilter = timestampFilter;
try {
- return node.getPreviousNode();
+ seek(lookupKey);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- private Node getNextNode() {
- try {
- return node.getNextNode();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ private final boolean checkFilterIndexEntry(IndexEntry ie) {
+ if(timestampFilter == null)
+ if (timestampFilter != null && (ie.maxTimestamp < timestampFilter.startTimestamp || ie.minTimestamp > timestampFilter.endTimestamp)) {
+ return false;
+ }
+ return true;
+ }
+
+ private void seek(Key lookupKey) throws IOException {
+ StackEntry top = new StackEntry(rootBlock, -1);
+ position.add(top);
+ while (true) {
+ top = position.peek();
+ // go down the tree
+ int pos = Collections.binarySearch(top.block.getKeyIndex(), lookupKey, new Comparator<Key>() {
+ @Override
+ public int compare(Key o1, Key o2) {
+ return o1.compareTo(o2);
+ }
+ });
+
+
+ if (pos < 0) {
+ pos = (pos * -1) - 1;
+ } else if (pos < top.block.getKeyIndex().size()) {
+ // the exact key was found, so we want to go back to the first identical match
+ while (pos > 0 && top.block.getKeyIndex().get(pos - 1).equals(lookupKey)) {
+ pos--;
+ }
+ }
+
+
+ IndexEntry ie = null;
+ List<IndexEntry> index = top.block.getIndex();
+
+ if(pos > 0)
+ {
+ // look backwards to find any initial previousEntry that might match the timestamp range such that no entry within the given timestamp range is between the seeked key and the previousKey
+ previousEntry = index.get(pos-1);
+ // TODO: find the offset for this block
+ previousIndex = Integer.MIN_VALUE;
+ }
+
+ while (pos < index.size()) {
+ ie = index.get(pos);
+ // filter on timestampRange by skipping forward until a block passes the predicate
+ if (checkFilterIndexEntry(ie))
+ break;
+ pos++;
+ }
+
+
+ if (pos == index.size()) {
+ position.pop();
+ goToNext();
+ return;
+ } else {
+ if (top.block.level == 0) {
+ // found a matching index entry
+ top.offset = pos - 1;
+ return;
+ } else {
+ top.offset = pos;
+ position.add(new StackEntry(getIndexBlock(ie), 0));
+ }
+ }
}
}
- public IndexIterator() {
- node = null;
- }
-
- public IndexIterator(Node node) {
- this.node = node;
- liter = node.indexBlock.getIndex().listIterator(node.currentPos);
- }
-
- @Override
- public boolean hasNext() {
- if (node == null)
- return false;
-
- if (!liter.hasNext()) {
- return node.indexBlock.hasNext();
- } else {
- return true;
+ private void goToNext() throws IOException {
+ int numSkippedBlocks = 0;
+ // traverse the index tree forwards
+ while (position.isEmpty() == false) {
+ StackEntry top = position.peek();
+ top.offset++;
+ List<IndexEntry> index = top.block.getIndex();
+ while (top.offset < index.size()) {
+ if (checkFilterIndexEntry(index.get(top.offset)))
+ break;
+ numSkippedBlocks++;
+ top.offset++;
+ }
+ if (top.offset == index.size()) {
+ // go up
+ position.pop();
+ } else {
+ if (top.block.level == 0) {
+ // success!
+ return;
+ }
+ // go down
+ position.add(new StackEntry(getIndexBlock(index.get(top.offset)), -1));
+ }
}
-
}
- public IndexEntry peekPrevious() {
- IndexEntry ret = previous();
- next();
- return ret;
- }
+ IndexEntry nextEntry = null;
+ IndexEntry previousEntry = null;
+ int nextIndex = -1;
+ int previousIndex = -1;
- public IndexEntry peek() {
- IndexEntry ret = next();
- previous();
- return ret;
+ private void prepNext() {
+ if (nextEntry == null) {
+ try {
+ goToNext();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (position.isEmpty())
+ return;
+ StackEntry e = position.peek();
+ nextEntry = e.block.getIndex().get(e.offset);
+ nextIndex = e.block.getOffset() + e.offset;
+ }
}
- @Override
- public IndexEntry next() {
- if (!liter.hasNext()) {
- node = getNextNode();
- liter = node.indexBlock.getIndex().listIterator();
- }
+ public boolean hasNext() {
+ if (nextEntry == null)
+ prepNext();
+ return nextEntry != null;
- return liter.next();
}
- @Override
+ // initially, previous key is last key of the previous block
public boolean hasPrevious() {
- if (node == null)
- return false;
-
- if (!liter.hasPrevious()) {
- return node.indexBlock.getOffset() > 0;
- } else {
- return true;
- }
+ return previousEntry != null;
}
- @Override
- public IndexEntry previous() {
- if (!liter.hasPrevious()) {
- node = getPrevNode();
- liter = node.indexBlock.getIndex().listIterator(node.indexBlock.getIndex().size());
- }
-
- return liter.previous();
+ public int nextIndex() {
+ if (nextEntry == null)
+ prepNext();
+ return nextIndex;
}
- @Override
- public int nextIndex() {
- return node.indexBlock.getOffset() + liter.nextIndex();
+ public IndexEntry peek() {
+ if (nextEntry == null)
+ prepNext();
+ return nextEntry;
}
- @Override
- public int previousIndex() {
- return node.indexBlock.getOffset() + liter.previousIndex();
+ private int blocksReturned = 0;
+
+ public IndexEntry next() {
+ prepNext();
+ previousEntry = nextEntry;
+ nextEntry = null;
+ previousIndex = nextIndex;
+ nextIndex = -1;
+ return previousEntry;
}
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
+ public IndexEntry peekPrevious() {
+ return previousEntry;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Iterator#remove()
+ */
@Override
- public void set(IndexEntry e) {
+ public void remove() {
throw new UnsupportedOperationException();
-
}
- @Override
- public void add(IndexEntry e) {
- throw new UnsupportedOperationException();
+ public int previousIndex() {
+ return previousIndex;
}
}
@@ -714,16 +731,15 @@ public class MultiLevelIndex {
return iblock;
}
- public IndexIterator lookup(Key key) throws IOException {
- Node node = new Node(rootBlock);
- return new IndexIterator(node.lookup(key));
+ IndexIterator lookup(Key key) throws IOException {
+ return new IndexIterator(timestampRange, key);
}
public void readFields(DataInput in) throws IOException {
size = 0;
- if (version == RFile.RINDEX_VER_6) {
+ if (version == RFile.RINDEX_VER_6 || version == RFile.RINDEX_VER_7) {
size = in.readInt();
}
@@ -769,6 +785,15 @@ public class MultiLevelIndex {
public Key getLastKey() {
return rootBlock.getIndex().get(rootBlock.getIndex().size() - 1).getKey();
}
+
+ TimestampRangePredicate timestampRange;
+
+ /**
+ * @param r
+ */
+ public void setTimestampRange(TimestampRangePredicate r) {
+ this.timestampRange = r;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index c2eac1d..06000f8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -56,10 +56,14 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
+import org.apache.accumulo.core.iterators.Filterer;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.Predicate;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
import org.apache.accumulo.core.iterators.system.HeapIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -77,6 +81,7 @@ public class RFile {
private RFile() {}
private static final int RINDEX_MAGIC = 0x20637474;
+ static final int RINDEX_VER_7 = 7;
static final int RINDEX_VER_6 = 6;
static final int RINDEX_VER_4 = 4;
static final int RINDEX_VER_3 = 3;
@@ -301,6 +306,11 @@ public class RFile {
private int indexBlockSize;
private int entries = 0;
+ // some aggregate stats to keep on a per-block basis
+ private long minTimestamp = Long.MAX_VALUE;
+ private long maxTimestamp = Long.MIN_VALUE;
+ private ColumnVisibility minimumVisibility = null;
+
private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
private LocalityGroupMetadata currentLocalityGroup = null;
private int nextBlock = 0;
@@ -337,7 +347,7 @@ public class RFile {
ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
mba.writeInt(RINDEX_MAGIC);
- mba.writeInt(RINDEX_VER_6);
+ mba.writeInt(RINDEX_VER_7);
if (currentLocalityGroup != null)
localityGroups.add(currentLocalityGroup);
@@ -368,8 +378,28 @@ public class RFile {
}
}
+ private void updateBlockStats(Key key, Value value)
+ {
+ if(minTimestamp > key.getTimestamp())
+ minTimestamp = key.getTimestamp();
+ if(maxTimestamp < key.getTimestamp())
+ maxTimestamp = key.getTimestamp();
+ if(minimumVisibility == null)
+ minimumVisibility = new ColumnVisibility(key.getColumnVisibility());
+ else
+ minimumVisibility = minimumVisibility.or(new ColumnVisibility(key.getColumnVisibility()));
+ entries++;
+ }
+
+ private void clearBlockStats()
+ {
+ minTimestamp = Long.MAX_VALUE;
+ maxTimestamp = Long.MIN_VALUE;
+ minimumVisibility = null;
+ entries = 0;
+ }
+
public void append(Key key, Value value) throws IOException {
-
if (dataClosed) {
throw new IllegalStateException("Cannont append, data closed");
}
@@ -395,7 +425,8 @@ public class RFile {
rk.write(blockWriter);
value.write(blockWriter);
- entries++;
+ updateBlockStats(key,value);
+
prevKey = new Key(key);
lastKeyInBlock = prevKey;
@@ -406,13 +437,13 @@ public class RFile {
blockWriter.close();
if (lastBlock)
- currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+ currentLocalityGroup.indexWriter.addLast(key, minTimestamp, maxTimestamp, minimumVisibility, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize(), RINDEX_VER_7);
else
- currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+ currentLocalityGroup.indexWriter.add(key, minTimestamp, maxTimestamp, minimumVisibility, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize(), RINDEX_VER_7);
+ clearBlockStats();
blockWriter = null;
lastKeyInBlock = null;
- entries = 0;
nextBlock++;
}
@@ -475,7 +506,7 @@ public class RFile {
}
}
- private static class LocalityGroupReader implements FileSKVIterator {
+ private static class LocalityGroupReader implements FileSKVIterator, Filterer<Key,Value> {
private BlockFileReader reader;
private MultiLevelIndex.Reader index;
@@ -578,7 +609,7 @@ public class RFile {
return;
}
}
-
+
prevKey = rk.getKey();
rk.readFields(currBlock);
val.readFields(currBlock);
@@ -650,14 +681,15 @@ public class RFile {
boolean reseek = true;
if (range.afterEndKey(firstKey)) {
- // range is before first key in rfile, so there is nothing to do
+ // range is before first key in this locality group, so there is nothing to do
reset();
reseek = false;
}
- if (rk != null) {
+ // always reseek if the filter changed since the last seek
+ if (filterChanged == false && rk != null) {
if (range.beforeStartKey(prevKey) && range.afterEndKey(getTopKey())) {
- // range is between the two keys in the file where the last range seeked to stopped, so there is
+ // range is between the two keys in the locality group where the last range seeked to stopped, so there is
// nothing to do
reseek = false;
}
@@ -702,12 +734,6 @@ public class RFile {
// past the last key
} else {
- // if the index contains the same key multiple times, then go to the
- // earliest index entry containing the key
- while (iiter.hasPrevious() && iiter.peekPrevious().getKey().equals(iiter.peek().getKey())) {
- iiter.previous();
- }
-
if (iiter.hasPrevious())
prevKey = new Key(iiter.peekPrevious().getKey()); // initially prevKey is the last key of the prev block
else
@@ -771,9 +797,35 @@ public class RFile {
public void setInterruptFlag(AtomicBoolean flag) {
this.interruptFlag = flag;
}
+
+ private TimestampRangePredicate timestampRange;
+ private boolean filterChanged = false;
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
+ */
+ @Override
+ public void applyFilter(Predicate<Key,Value> filter) {
+ // TODO support general filters
+ if(filter instanceof TimestampRangePredicate)
+ {
+ filterChanged = true;
+ TimestampRangePredicate p = (TimestampRangePredicate)filter;
+ // intersect with previous timestampRange
+ if(timestampRange != null)
+ timestampRange = new TimestampRangePredicate(Math.max(p.startTimestamp, timestampRange.startTimestamp), Math.min(p.endTimestamp, timestampRange.endTimestamp));
+ else
+ timestampRange = p;
+ index.setTimestampRange(timestampRange);
+ }
+ else
+ {
+ throw new RuntimeException("yikes, not yet implemented");
+ }
+ }
}
- public static class Reader extends HeapIterator implements FileSKVIterator {
+ public static class Reader extends HeapIterator implements FileSKVIterator, Filterer<Key,Value> {
private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
@@ -799,7 +851,7 @@ public class RFile {
if (magic != RINDEX_MAGIC)
throw new IOException("Did not see expected magic number, saw " + magic);
- if (ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
+ if (ver != RINDEX_VER_7 && ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
throw new IOException("Did not see expected version, saw " + ver);
int size = mb.readInt();
@@ -947,6 +999,9 @@ public class RFile {
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ topKey = null;
+ topValue = null;
+
clear();
numLGSeeked = 0;
@@ -1001,6 +1056,8 @@ public class RFile {
}
if (include) {
+ if(timestampFilter != null)
+ lgr.applyFilter(timestampFilter);
lgr.seek(range, EMPTY_CF_SET, false);
addSource(lgr);
numLGSeeked++;
@@ -1047,6 +1104,94 @@ public class RFile {
lgr.setInterruptFlag(interruptFlag);
}
}
+
+ ArrayList<Predicate<Key,Value>> filters = new ArrayList<Predicate<Key,Value>>();
+
+ TimestampRangePredicate timestampFilter = null;
+
+ Key topKey;
+ Value topValue;
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.core.iterators.system.HeapIterator#hasTop()
+ */
+ @Override
+ public boolean hasTop() {
+ if(topKey == null)
+ {
+ while(super.hasTop())
+ {
+ topKey = super.getTopKey();
+ topValue = super.getTopValue();
+ // check all the filters to see if we found a valid key/value pair
+ boolean keep = true;
+ for(Predicate<Key,Value> filter: filters)
+ {
+ if(!filter.evaluate(topKey, topValue))
+ {
+ keep = false;
+ try {
+ super.next();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ break;
+ }
+ }
+ if(keep == true)
+ return true;
+ }
+ // ran out of key/value pairs
+ topKey = null;
+ topValue = null;
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.core.iterators.system.HeapIterator#next()
+ */
+ @Override
+ public void next() throws IOException {
+ topKey = null;
+ topValue = null;
+ super.next();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.core.iterators.system.HeapIterator#getTopKey()
+ */
+ @Override
+ public Key getTopKey() {
+ if(topKey == null)
+ hasTop();
+ return topKey;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.core.iterators.system.HeapIterator#getTopValue()
+ */
+ @Override
+ public Value getTopValue() {
+ if(topValue == null)
+ hasTop();
+ return topValue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.core.iterators.Filterer#applyFilter(org.apache.accumulo.core.iterators.Predicate)
+ */
+ @Override
+ public void applyFilter(Predicate<Key,Value> filter) {
+ filters.add(filter);
+ // the HeapIterator will pass this filter on to its children, a collection of LocalityGroupReaders
+ if(filter instanceof TimestampRangePredicate)
+ this.timestampFilter = (TimestampRangePredicate)filter;
+ }
}
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java b/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
new file mode 100644
index 0000000..bda3665
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/Filterer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+/**
+ *
+ */
+public interface Filterer<K,V> {
+ public void applyFilter(Predicate<K,V> filter);
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java b/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java
new file mode 100644
index 0000000..99a6e8b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/Predicate.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+/**
+ * Predicate<K,V> supports a single method that is used to evaluate an input (K,V) pair as true or false
+ */
+public interface Predicate<K,V> {
+ public boolean evaluate(K k, V v);
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java b/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java
new file mode 100644
index 0000000..eb5080b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/predicates/TimestampRangePredicate.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.predicates;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Predicate;
+
+/**
+ * TimestampRangeFilter is used to determine whether a Key/Value pair falls within a timestamp range
+ */
+public class TimestampRangePredicate implements Predicate<Key,Value> {
+
+ public final long startTimestamp;
+ public final long endTimestamp;
+
+
+ /**
+ * @param startTimestamp - inclusive first allowable timestamp
+ * @param endTimestamp - inclusive last allowable timestamp
+ */
+ public TimestampRangePredicate(long startTimestamp, long endTimestamp) {
+ super();
+ this.startTimestamp = startTimestamp;
+ this.endTimestamp = endTimestamp;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.accumulo.core.iterators.Predicate#evaluate(java.lang.Object, java.lang.Object)
+ */
+ /**
+ * return true IFF the key falls within the timestamp range
+ */
+ @Override
+ public boolean evaluate(Key k, Value v) {
+ long timestamp = k.getTimestamp();
+ return timestamp >= startTimestamp && timestamp <= endTimestamp;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
index e54f37c..72aa3e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
@@ -17,9 +17,12 @@
package org.apache.accumulo.core.iterators.system;
import java.io.IOException;
+import java.util.ArrayList;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filterer;
+import org.apache.accumulo.core.iterators.Predicate;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.commons.collections.buffer.PriorityBuffer;
@@ -55,28 +58,29 @@ public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value>
}
@Override
- final public Key getTopKey() {
+ public Key getTopKey() {
return currentIter.getTopKey();
}
@Override
- final public Value getTopValue() {
+ public Value getTopValue() {
return currentIter.getTopValue();
}
@Override
- final public boolean hasTop() {
+ public boolean hasTop() {
return heap.size() > 0;
}
@Override
- final public void next() throws IOException {
+ public void next() throws IOException {
switch (heap.size()) {
case 0:
throw new IllegalStateException("Called next() when there is no top");
case 1:
// optimization for case when heap contains one entry,
// avoids remove and add
+ // TODO apply the filters
currentIter.next();
if (!currentIter.hasTop()) {
heap.remove();
@@ -85,6 +89,7 @@ public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value>
break;
default:
Index idx = (Index) heap.remove();
+ // TODO apply the filters
idx.iter.next();
if (idx.iter.hasTop()) {
heap.add(idx);
@@ -111,5 +116,4 @@ public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value>
else
currentIter = null;
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
index a41f7be..a4391c0 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java
@@ -23,26 +23,22 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.VisibilityEvaluator;
-import org.apache.accumulo.core.security.VisibilityParseException;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
public class VisibilityFilter extends Filter {
- private VisibilityEvaluator ve;
+ private Authorizations auths;
private Text defaultVisibility;
private LRUMap cache;
private Text tmpVis;
private static final Logger log = Logger.getLogger(VisibilityFilter.class);
- public VisibilityFilter() {}
-
public VisibilityFilter(SortedKeyValueIterator<Key,Value> iterator, Authorizations authorizations, byte[] defaultVisibility) {
setSource(iterator);
- this.ve = new VisibilityEvaluator(authorizations);
+ this.auths = authorizations;
this.defaultVisibility = new Text(defaultVisibility);
this.cache = new LRUMap(1000);
this.tmpVis = new Text();
@@ -50,7 +46,7 @@ public class VisibilityFilter extends Filter {
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- return new VisibilityFilter(getSource().deepCopy(env), ve.getAuthorizations(), TextUtil.getBytes(defaultVisibility));
+ return new VisibilityFilter(getSource().deepCopy(env), auths, TextUtil.getBytes(defaultVisibility));
}
@Override
@@ -66,13 +62,8 @@ public class VisibilityFilter extends Filter {
if (b != null)
return b;
- try {
- Boolean bb = ve.evaluate(new ColumnVisibility(testVis));
- cache.put(new Text(testVis), bb);
- return bb;
- } catch (VisibilityParseException e) {
- log.error("Parse Error", e);
- return false;
- }
+ Boolean bb = new ColumnVisibility(testVis).evaluate(auths);
+ cache.put(new Text(testVis), bb);
+ return bb;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java b/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
index 5a825f2..1b72b33 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/ColumnVisibility.java
@@ -16,16 +16,16 @@
*/
package org.apache.accumulo.core.security;
-import java.util.ArrayList;
+import java.io.ByteArrayOutputStream;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
+import java.util.Iterator;
+import java.util.TreeSet;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparator;
/**
* Validate the column visibility is a valid expression and set the visibility for a Mutation. See {@link ColumnVisibility#ColumnVisibility(byte[])} for the
@@ -33,117 +33,190 @@ import org.apache.hadoop.io.WritableComparator;
*/
public class ColumnVisibility {
- Node node = null;
- private byte[] expression;
-
- /**
- * Accessor for the underlying byte string.
- *
- * @return byte array representation of a visibility expression
- */
- public byte[] getExpression() {
- return expression;
- }
+ private Node node = null;
public static enum NodeType {
TERM, OR, AND,
}
-
- public static class Node {
- public final static List<Node> EMPTY = Collections.emptyList();
- NodeType type;
- int start = 0;
- int end = 0;
- List<Node> children = EMPTY;
+
+ private static abstract class Node implements Comparable<Node> {
+ protected final NodeType type;
- public Node(NodeType type) {
+ public Node(NodeType type)
+ {
this.type = type;
}
-
- public Node(int start, int end) {
- this.type = NodeType.TERM;
- this.start = start;
- this.end = end;
+
+ public byte[] generate() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ generate(baos,false);
+ return baos.toByteArray();
}
- public void add(Node child) {
- if (children == EMPTY)
- children = new ArrayList<Node>();
-
- children.add(child);
- }
+ public abstract boolean evaluate(Authorizations auths);
+
+ protected abstract void generate(ByteArrayOutputStream baos, boolean parens);
+ }
+
+ private static class TermNode extends Node {
+
+ final ByteSequence bs;
- public NodeType getType() {
- return type;
+ public TermNode(final ByteSequence bs) {
+ super(NodeType.TERM);
+ this.bs = bs;
}
- public List<Node> getChildren() {
- return children;
+ public boolean evaluate(Authorizations auths)
+ {
+ return auths.contains(bs);
+ }
+
+
+ protected void generate(ByteArrayOutputStream baos, boolean parens)
+ {
+ baos.write(bs.getBackingArray(), bs.offset(), bs.length());
}
- public int getTermStart() {
- return start;
+ @Override
+ public boolean equals(Object other) {
+ if(other instanceof TermNode)
+ {
+ return bs.compareTo(((TermNode)other).bs) == 0;
+ }
+ return false;
}
- public int getTermEnd() {
- return end;
+ @Override
+ public int compareTo(Node o) {
+ if(o.type == NodeType.TERM)
+ {
+ return bs.compareTo(((TermNode)o).bs);
+ }
+ return type.ordinal() - o.type.ordinal();
}
}
- public static class NodeComparator implements Comparator<Node> {
+ private abstract static class AggregateNode extends Node {
+
+ /**
+ * @param type
+ */
+ public AggregateNode(NodeType type) {
+ super(type);
+ }
- byte[] text;
+ protected TreeSet<Node> children = new TreeSet<Node>();
- NodeComparator(byte[] text) {
- this.text = text;
+ protected abstract byte getOperator();
+
+ @Override
+ protected void generate(ByteArrayOutputStream baos, boolean parens) {
+ if(parens)
+ baos.write('(');
+ boolean first = true;
+ for(Node child:children)
+ {
+ if(!first)
+ baos.write(getOperator());
+ child.generate(baos, true);
+ first = false;
+ }
+ if(parens)
+ baos.write(')');
}
@Override
- public int compare(Node a, Node b) {
- int diff = a.type.ordinal() - b.type.ordinal();
- if (diff != 0)
- return diff;
- switch (a.type) {
- case TERM:
- return WritableComparator.compareBytes(text, a.start, a.end - a.start, text, b.start, b.end - b.start);
- case OR:
- case AND:
- diff = a.children.size() - b.children.size();
- if (diff != 0)
- return diff;
- for (int i = 0; i < a.children.size(); i++) {
- diff = compare(a.children.get(i), b.children.get(i));
- if (diff != 0)
- return diff;
- }
+ public int compareTo(Node o) {
+ int ordinalDiff = type.ordinal() - o.type.ordinal();
+ if(ordinalDiff != 0)
+ return ordinalDiff;
+ AggregateNode other = (AggregateNode)o;
+ int childCountDifference = children.size() - other.children.size();
+ if(childCountDifference != 0)
+ return childCountDifference;
+ Iterator<Node> otherChildren = other.children.iterator();
+ for(Node n1:children)
+ {
+ int comp = n1.compareTo(otherChildren.next());
+ if(comp != 0)
+ return comp;
}
return 0;
}
+
}
- static private void flatten(Node root, byte[] expression, StringBuilder out) {
- if (root.type == NodeType.TERM)
- out.append(new String(expression, root.start, root.end - root.start));
- else {
- String sep = "";
- Collections.sort(root.children, new NodeComparator(expression));
- for (Node c : root.children) {
- out.append(sep);
- boolean parens = (c.type != NodeType.TERM && root.type != c.type);
- if (parens)
- out.append("(");
- flatten(c, expression, out);
- if (parens)
- out.append(")");
- sep = root.type == NodeType.AND ? "&" : "|";
- }
+ private static class OrNode extends AggregateNode {
+
+ public OrNode() {
+ super(NodeType.OR);
+ }
+
+ @Override
+ public boolean evaluate(Authorizations auths) {
+ for(Node child:children)
+ if(child.evaluate(auths))
+ return true;
+ return false;
+ }
+
+ @Override
+ protected byte getOperator() {
+ return '|';
}
+
+ }
+
+ private static class AndNode extends AggregateNode {
+
+ public AndNode()
+ {
+ super(NodeType.AND);
+ }
+
+ @Override
+ public boolean evaluate(Authorizations auths) {
+ for(Node child:children)
+ if(!child.evaluate(auths))
+ return false;
+ return true;
+ }
+
+ @Override
+ protected byte getOperator() {
+ return '&';
+ }
+
}
+
+ private byte[] expression = null;
+ /**
+ * @deprecated
+ * @see org.apache.accumulo.security.ColumnVisibility#getExpression()
+ */
public byte[] flatten() {
- StringBuilder builder = new StringBuilder();
- flatten(node, expression, builder);
- return builder.toString().getBytes();
+ return getExpression();
+ }
+
+ /**
+ * Generate the byte[] that represents this ColumnVisibility.
+ * @return a byte[] representation of this visibility
+ */
+ public byte[] getExpression(){
+ if(expression != null)
+ return expression;
+ expression = _flatten();
+ return expression;
+ }
+
+ private static final byte[] emptyExpression = new byte[0];
+
+ private byte[] _flatten() {
+ if(node == null)
+ return emptyExpression;
+ return node.generate();
}
private static class ColumnVisibilityParser {
@@ -170,7 +243,7 @@ public class ColumnVisibility {
if (start != end) {
if (expr != null)
throw new BadArgumentException("expression needs | or &", new String(expression), start);
- return new Node(start, end);
+ return new TermNode(new ArrayByteSequence(expression, start, end - start));
}
if (expr == null)
throw new BadArgumentException("empty term", new String(expression), start);
@@ -189,9 +262,9 @@ public class ColumnVisibility {
if (!result.type.equals(NodeType.AND))
throw new BadArgumentException("cannot mix & and |", new String(expression), index - 1);
} else {
- result = new Node(NodeType.AND);
+ result = new AndNode();
}
- result.add(expr);
+ ((AggregateNode)result).children.add(expr);
expr = null;
termStart = index;
break;
@@ -202,9 +275,9 @@ public class ColumnVisibility {
if (!result.type.equals(NodeType.OR))
throw new BadArgumentException("cannot mix | and &", new String(expression), index - 1);
} else {
- result = new Node(NodeType.OR);
+ result = new OrNode();
}
- result.add(expr);
+ ((AggregateNode)result).children.add(expr);
expr = null;
termStart = index;
break;
@@ -225,11 +298,21 @@ public class ColumnVisibility {
if (result == null)
return child;
if (result.type == child.type)
- for (Node c : child.children)
- result.add(c);
+ {
+ AggregateNode parenNode = (AggregateNode)child;
+ for (Node c : parenNode.children)
+ ((AggregateNode)result).children.add(c);
+ }
else
- result.add(child);
- result.end = index - 1;
+ ((AggregateNode)result).children.add(child);
+ if (result.type != NodeType.TERM)
+ {
+ AggregateNode resultNode = (AggregateNode)result;
+ if (resultNode.children.size() == 1)
+ return resultNode.children.first();
+ if (resultNode.children.size() < 2)
+ throw new BadArgumentException("missing term", new String(expression), index);
+ }
return result;
}
default: {
@@ -241,12 +324,24 @@ public class ColumnVisibility {
}
Node child = processTerm(termStart, index, expr, expression);
if (result != null)
- result.add(child);
+ {
+ if(result.type == child.type)
+ {
+ ((AggregateNode)result).children.addAll(((AggregateNode)child).children);
+ }
+ else
+ ((AggregateNode)result).children.add(child);
+ }
else
result = child;
if (result.type != NodeType.TERM)
- if (result.children.size() < 2)
+ {
+ AggregateNode resultNode = (AggregateNode)result;
+ if (resultNode.children.size() == 1)
+ return resultNode.children.first();
+ if (resultNode.children.size() < 2)
throw new BadArgumentException("missing term", new String(expression), index);
+ }
return result;
}
}
@@ -256,14 +351,12 @@ public class ColumnVisibility {
ColumnVisibilityParser p = new ColumnVisibilityParser();
node = p.parse(expression);
}
- this.expression = expression;
}
/**
* Empty visibility. Normally, elements with empty visibility can be seen by everyone. Though, one could change this behavior with filters.
*/
public ColumnVisibility() {
- expression = new byte[0];
}
/**
@@ -279,6 +372,10 @@ public class ColumnVisibility {
this(TextUtil.getBytes(expression));
}
+ private ColumnVisibility(Node node) {
+ this.node = node;
+ }
+
/**
* Set the column visibility for a Mutation.
*
@@ -313,7 +410,7 @@ public class ColumnVisibility {
@Override
public String toString() {
- return "[" + new String(expression) + "]";
+ return "[" + new String(this.getExpression()) + "]";
}
/**
@@ -329,16 +426,55 @@ public class ColumnVisibility {
/**
* Compares two ColumnVisibilities for string equivalence, not as a meaningful comparison of terms and conditions.
*/
- public boolean equals(ColumnVisibility otherLe) {
- return Arrays.equals(expression, otherLe.expression);
- }
+// public boolean equals(ColumnVisibility otherLe) {
+// return Arrays.equals(expression, otherLe.expression);
+// }
@Override
public int hashCode() {
- return Arrays.hashCode(expression);
+ return Arrays.hashCode(getExpression());
}
- public Node getParseTree() {
- return node;
+ public boolean evaluate(Authorizations auths) {
+ if(node == null)
+ return true;
+ return node.evaluate(auths);
}
+
+ public ColumnVisibility or(ColumnVisibility other)
+ {
+ if(node == null)
+ return this;
+ if(other.node == null)
+ return other;
+ OrNode orNode = new OrNode();
+ if(other.node instanceof OrNode)
+ orNode.children.addAll(((OrNode)other.node).children);
+ else
+ orNode.children.add(other.node);
+ if(node instanceof OrNode)
+ orNode.children.addAll(((OrNode)node).children);
+ else
+ orNode.children.add(node);
+ return new ColumnVisibility(orNode);
+ }
+
+ public ColumnVisibility and(ColumnVisibility other)
+ {
+ if(node == null)
+ return other;
+ if(other.node == null)
+ return this;
+ AndNode andNode = new AndNode();
+ if(other.node instanceof AndNode)
+ andNode.children.addAll(((AndNode)other.node).children);
+ else
+ andNode.children.add(other.node);
+ if(node instanceof AndNode)
+ andNode.children.addAll(((AndNode)node).children);
+ else
+ andNode.children.add(node);
+ return new ColumnVisibility(andNode);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java b/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
index c8b33ba..1df543f 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/VisibilityConstraint.java
@@ -47,7 +47,7 @@ public class VisibilityConstraint implements Constraint {
if (updates.size() > 1)
ok = new HashSet<String>();
- VisibilityEvaluator ve = null;
+ Authorizations auths = env.getAuthorizations();
for (ColumnUpdate update : updates) {
@@ -59,16 +59,11 @@ public class VisibilityConstraint implements Constraint {
try {
- if (ve == null)
- ve = new VisibilityEvaluator(env.getAuthorizations());
-
- if (!ve.evaluate(new ColumnVisibility(cv)))
+ if (!new ColumnVisibility(cv).evaluate(auths))
return Collections.singletonList(new Short((short) 2));
} catch (BadArgumentException bae) {
return Collections.singletonList(new Short((short) 1));
- } catch (VisibilityParseException e) {
- return Collections.singletonList(new Short((short) 1));
}
if (ok != null)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index c5e2501..3da616d 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -58,9 +59,9 @@ public class MultiLevelIndexTest extends TestCase {
BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
for (int i = 0; i < num; i++)
- mliw.add(new Key(String.format("%05d000", i)), i, 0, 0, 0);
+ mliw.add(new Key(String.format("%05d000", i)), 0l, 0l, new ColumnVisibility(), i, 0, 0, 0, RFile.RINDEX_VER_7);
- mliw.addLast(new Key(String.format("%05d000", num)), num, 0, 0, 0);
+ mliw.addLast(new Key(String.format("%05d000", num)), 0l, 0l, new ColumnVisibility(), num, 0, 0, 0, RFile.RINDEX_VER_7);
ABlockWriter root = _cbw.prepareMetaBlock("root");
mliw.close(root);
@@ -75,7 +76,7 @@ public class MultiLevelIndexTest extends TestCase {
FSDataInputStream in = new FSDataInputStream(bais);
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance());
- Reader reader = new Reader(_cbr, RFile.RINDEX_VER_6);
+ Reader reader = new Reader(_cbr, RFile.RINDEX_VER_7);
BlockRead rootIn = _cbr.getMetaBlock("root");
reader.readFields(rootIn);
rootIn.close();
@@ -90,15 +91,6 @@ public class MultiLevelIndexTest extends TestCase {
assertEquals(num + 1, count);
- while (liter.hasPrevious()) {
- count--;
- assertEquals(count, liter.previousIndex());
- assertEquals(count, liter.peekPrevious().getNumEntries());
- assertEquals(count, liter.previous().getNumEntries());
- }
-
- assertEquals(0, count);
-
// go past the end
liter = reader.lookup(new Key(String.format("%05d000", num + 1)));
assertFalse(liter.hasNext());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index ed7cf7b..71f5c6c 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -24,8 +24,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Random;
import java.util.Set;
import junit.framework.TestCase;
@@ -51,8 +53,11 @@ import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.junit.Test;
-public class RFileTest extends TestCase {
+import static org.junit.Assert.*;
+
+public class RFileTest {
private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
@@ -206,6 +211,7 @@ public class RFileTest extends TestCase {
return String.format(prefix + "%06d", i);
}
+ @Test
public void test1() throws IOException {
// test an emprt file
@@ -224,6 +230,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void test2() throws IOException {
// test an rfile with one entry
@@ -260,6 +267,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void test3() throws IOException {
// test an rfile with multiple rows having multiple columns
@@ -403,6 +411,7 @@ public class RFileTest extends TestCase {
assertFalse(evi.hasNext());
}
+ @Test
public void test4() throws IOException {
TestRFile trf = new TestRFile();
@@ -445,6 +454,7 @@ public class RFileTest extends TestCase {
}
}
+ @Test
public void test5() throws IOException {
TestRFile trf = new TestRFile();
@@ -473,6 +483,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void test6() throws IOException {
TestRFile trf = new TestRFile();
@@ -505,6 +516,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void test7() throws IOException {
// these test excercise setting the end key of a range
@@ -556,6 +568,7 @@ public class RFileTest extends TestCase {
trf.reader.close();
}
+ @Test
public void test8() throws IOException {
TestRFile trf = new TestRFile();
@@ -672,6 +685,7 @@ public class RFileTest extends TestCase {
return cfs;
}
+ @Test
public void test9() throws IOException {
TestRFile trf = new TestRFile();
@@ -813,6 +827,7 @@ public class RFileTest extends TestCase {
}
+ @Test
public void test10() throws IOException {
// test empty locality groups
@@ -941,6 +956,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void test11() throws IOException {
// test locality groups with more than two entries
@@ -1045,6 +1061,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void test12() throws IOException {
// test inserting column fams not in locality groups
@@ -1076,6 +1093,7 @@ public class RFileTest extends TestCase {
}
+ @Test
public void test13() throws IOException {
// test inserting column fam in default loc group that was in
// previous locality group
@@ -1117,6 +1135,7 @@ public class RFileTest extends TestCase {
}
+ @Test
public void test14() throws IOException {
// test starting locality group after default locality group was started
@@ -1142,6 +1161,7 @@ public class RFileTest extends TestCase {
trf.writer.close();
}
+ @Test
public void test16() throws IOException {
TestRFile trf = new TestRFile();
@@ -1160,6 +1180,7 @@ public class RFileTest extends TestCase {
trf.closeWriter();
}
+ @Test
public void test17() throws IOException {
// add alot of the same keys to rfile that cover multiple blocks...
// this should cause the keys in the index to be exactly the same...
@@ -1298,6 +1319,7 @@ public class RFileTest extends TestCase {
assertEquals(nonExcluded, colFamsSeen);
}
+ @Test
public void test18() throws IOException {
// test writing more column families to default LG than it will track
@@ -1349,6 +1371,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void test19() throws IOException {
// test RFile metastore
TestRFile trf = new TestRFile();
@@ -1401,6 +1424,7 @@ public class RFileTest extends TestCase {
trf.closeReader();
}
+ @Test
public void testOldVersions() throws Exception {
runVersionTest(3);
runVersionTest(4);
@@ -1459,4 +1483,23 @@ public class RFileTest extends TestCase {
reader.close();
}
+
+ @Test
+ public void testSingleKeyBlocks() throws IOException
+ {
+ byte[] bytes = new byte[2000];
+ Random r = new Random();
+ r.nextBytes(bytes);
+ TestRFile trf = new TestRFile();
+ trf.openWriter(false);
+ Value vBig = new Value(bytes);
+ trf.writer.startNewLocalityGroup("one", Collections.singleton((ByteSequence)(new ArrayByteSequence("one"))));
+ trf.writer.append(new Key("r1","one"), vBig);
+ trf.writer.append(new Key("r2","one"), vBig);
+ trf.writer.startNewLocalityGroup("two", Collections.singleton((ByteSequence)(new ArrayByteSequence("two"))));
+ trf.writer.append(new Key("r1","two"), vBig);
+ trf.writer.append(new Key("r2","two"), vBig);
+ trf.writer.close();
+
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
new file mode 100644
index 0000000..c58f924
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/TimestampFilterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.iterators.Predicate;
+import org.apache.accumulo.core.iterators.predicates.TimestampRangePredicate;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+
+public class TimestampFilterTest {
+
+ @Test
+ public void testRFileTimestampFiltering() throws Exception {
+ // TODO create an RFile with increasing timestamp and random key order
+ Predicate<Key,Value> timeRange = new TimestampRangePredicate(100, 110);
+ int expected = 0;
+ Random r = new Random();
+ Configuration conf = new Configuration();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
+ CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf);
+ RFile.Writer writer = new RFile.Writer(_cbw, 1000, 1000);
+ writer.startDefaultLocalityGroup();
+ byte [] row = new byte[10];
+ byte [] colFam = new byte[10];
+ byte [] colQual = new byte[10];
+ Value value = new Value(new byte[0]);
+ byte [] colVis = new byte[0];
+ TreeMap<Key,Value> inputBuffer = new TreeMap<Key,Value>();
+ for(int i = 0; i < 100000; i++)
+ {
+ r.nextBytes(row);
+ r.nextBytes(colFam);
+ r.nextBytes(colQual);
+ Key k = new Key(row,colFam,colQual,colVis,(long)i);
+ if(timeRange.evaluate(k, value))
+ expected++;
+ inputBuffer.put(k, value);
+ }
+ for(Entry<Key,Value> e:inputBuffer.entrySet())
+ {
+ writer.append(e.getKey(), e.getValue());
+ }
+ writer.close();
+
+ // scan the RFile to bring back keys in a given timestamp range
+ byte[] data = baos.toByteArray();
+ ByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
+ FSDataInputStream in = new FSDataInputStream(bais);
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
+ RFile.Reader reader = new RFile.Reader(_cbr);
+ int count = 0;
+ reader.applyFilter(timeRange);
+ reader.seek(new Range(), Collections.EMPTY_SET, false);
+ while(reader.hasTop())
+ {
+ count++;
+ assertTrue(timeRange.evaluate(reader.getTopKey(),reader.getTopValue()));
+ reader.next();
+ }
+ assertEquals(expected, count);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
index 5508a4d..4681a61 100644
--- a/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/iterators/user/IndexedDocIteratorTest.java
@@ -24,8 +24,6 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.TreeMap;
-import junit.framework.TestCase;
-
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -40,8 +38,12 @@ import org.apache.accumulo.core.iterators.system.MultiIterator;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
-public class IndexedDocIteratorTest extends TestCase {
+public class IndexedDocIteratorTest {
private static final Logger log = Logger.getLogger(IndexedDocIteratorTest.class);
@@ -171,7 +173,7 @@ public class IndexedDocIteratorTest extends TestCase {
public void testNull() {}
- @Override
+ @Before
public void setUp() {
Logger.getRootLogger().setLevel(Level.ERROR);
}
@@ -179,6 +181,7 @@ public class IndexedDocIteratorTest extends TestCase {
private static final int NUM_ROWS = 5;
private static final int NUM_DOCIDS = 200;
+ @Test
public void test1() throws IOException {
columnFamilies = new Text[2];
columnFamilies[0] = new Text("CC");
@@ -216,6 +219,7 @@ public class IndexedDocIteratorTest extends TestCase {
cleanup();
}
+ @Test
public void test2() throws IOException {
columnFamilies = new Text[3];
columnFamilies[0] = new Text("A");
@@ -250,6 +254,7 @@ public class IndexedDocIteratorTest extends TestCase {
cleanup();
}
+ @Test
public void test3() throws IOException {
columnFamilies = new Text[6];
columnFamilies[0] = new Text("C");
@@ -292,6 +297,7 @@ public class IndexedDocIteratorTest extends TestCase {
cleanup();
}
+ @Test
public void test4() throws IOException {
columnFamilies = new Text[3];
boolean[] notFlags = new boolean[3];
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java b/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
index df1863a..d463f42 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/ColumnVisibilityTest.java
@@ -16,8 +16,7 @@
*/
package org.apache.accumulo.core.security;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import org.junit.Test;
@@ -64,13 +63,6 @@ public class ColumnVisibilityTest {
shouldThrow("a*b");
}
- public void normalized(String... values) {
- for (int i = 0; i < values.length; i += 2) {
- ColumnVisibility cv = new ColumnVisibility(values[i].getBytes());
- assertArrayEquals(cv.flatten(), values[i + 1].getBytes());
- }
- }
-
@Test
public void testComplexCompound() {
shouldNotThrow("(a|b)&(x|y)");
@@ -79,11 +71,61 @@ public class ColumnVisibilityTest {
shouldNotThrow("(one&two)|(foo&bar)", "(one|foo)&three", "one|foo|bar", "(one|foo)|bar", "((one|foo)|bar)&two");
}
+ public void normalized(String... values) {
+ for (int i = 0; i < values.length; i += 2) {
+ ColumnVisibility cv = new ColumnVisibility(values[i].getBytes());
+ assertArrayEquals(cv.getExpression(), values[i + 1].getBytes());
+ }
+ }
+
@Test
public void testNormalization() {
normalized("a", "a", "(a)", "a", "b|a", "a|b", "(b)|a", "a|b", "(b|(a|c))&x", "x&(a|b|c)", "(((a)))", "a");
+ normalized("a|a", "a", "a|(a&a)", "a", "(a&b)|(b&a)", "a&b");
+ normalized("a|(a|(a|b))","a|b");
+ normalized("a|(a|(a|a))","a");
+ }
+
+ public void aOrBEqualC(String a, String b, String c)
+ {
+ ColumnVisibility cvA = new ColumnVisibility(a.getBytes());
+ ColumnVisibility cvB = new ColumnVisibility(b.getBytes());
+ ColumnVisibility cvC = cvA.or(cvB);
+ assertArrayEquals(cvC.getExpression(), c.getBytes());
+ // check that we didn't disturb the original ColumnVisibilities
+ assertArrayEquals(cvA.getExpression(), a.getBytes());
+ assertArrayEquals(cvB.getExpression(), b.getBytes());
+ }
+
+ @Test
+ public void testDisjunction() {
+ aOrBEqualC("a", "b", "a|b");
+ aOrBEqualC("c|(a&b)", "b", "b|c|(a&b)");
+ aOrBEqualC("c|(a&b)", "a|c","a|c|(a&b)");
+ aOrBEqualC("a&b","c&d","(a&b)|(c&d)");
+ aOrBEqualC("a","","");
}
+ public void aAndBEqualC(String a, String b, String c)
+ {
+ ColumnVisibility cvA = new ColumnVisibility(a.getBytes());
+ ColumnVisibility cvB = new ColumnVisibility(b.getBytes());
+ ColumnVisibility cvC = cvA.and(cvB);
+ assertArrayEquals(cvC.getExpression(), c.getBytes());
+ // check that we didn't disturb the original ColumnVisibilities
+ assertArrayEquals(cvA.getExpression(), a.getBytes());
+ assertArrayEquals(cvB.getExpression(), b.getBytes());
+ }
+
+ @Test
+ public void testConjunction() {
+ aAndBEqualC("a", "b", "a&b");
+ aAndBEqualC("a&b", "c", "a&b&c");
+ aAndBEqualC("a&(b|(c&d))", "e&(b|(c&d))","a&e&(b|(c&d))");
+ aAndBEqualC("a|b","c|d","(a|b)&(c|d)");
+ aAndBEqualC("a","","a");
+ }
+
@Test
public void testDanglingOperators() {
shouldThrow("a|b&");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fcd07de/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java b/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
index b5c2455..7612e15 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/VisibilityEvaluatorTest.java
@@ -26,33 +26,33 @@ import org.junit.Test;
public class VisibilityEvaluatorTest {
@Test
- public void testVisibilityEvaluator() throws VisibilityParseException {
- VisibilityEvaluator ct = new VisibilityEvaluator(ByteArraySet.fromStrings("one", "two", "three", "four"));
+ public void testVisibilityEvaluator() {
+ Authorizations auths = new Authorizations(ByteArraySet.fromStrings("one", "two", "three", "four"));
// test for and
- assertTrue("'and' test", ct.evaluate(new ColumnVisibility("one&two")));
+ assertTrue("'and' test", new ColumnVisibility("one&two").evaluate(auths));
// test for or
- assertTrue("'or' test", ct.evaluate(new ColumnVisibility("foor|four")));
+ assertTrue("'or' test", new ColumnVisibility("foor|four").evaluate(auths));
// test for and and or
- assertTrue("'and' and 'or' test", ct.evaluate(new ColumnVisibility("(one&two)|(foo&bar)")));
+ assertTrue("'and' and 'or' test", new ColumnVisibility("(one&two)|(foo&bar)").evaluate(auths));
// test for false negatives
for (String marking : new String[] {"one", "one|five", "five|one", "(one)", "(one&two)|(foo&bar)", "(one|foo)&three", "one|foo|bar", "(one|foo)|bar",
"((one|foo)|bar)&two"}) {
- assertTrue(marking, ct.evaluate(new ColumnVisibility(marking)));
+ assertTrue(marking, new ColumnVisibility(marking).evaluate(auths));
}
// test for false positives
for (String marking : new String[] {"five", "one&five", "five&one", "((one|foo)|bar)&goober"}) {
- assertFalse(marking, ct.evaluate(new ColumnVisibility(marking)));
+ assertFalse(marking, new ColumnVisibility(marking).evaluate(auths));
}
// test missing separators; these should throw an exception
for (String marking : new String[] {"one(five)", "(five)one", "(one)(two)", "a|(b(c))"}) {
try {
- ct.evaluate(new ColumnVisibility(marking));
+ new ColumnVisibility(marking).evaluate(auths);
fail(marking + " failed to throw");
} catch (Throwable e) {
// all is good
@@ -62,7 +62,7 @@ public class VisibilityEvaluatorTest {
// test unexpected separator
for (String marking : new String[] {"&(five)", "|(five)", "(five)&", "five|", "a|(b)&", "(&five)", "(five|)"}) {
try {
- ct.evaluate(new ColumnVisibility(marking));
+ new ColumnVisibility(marking).evaluate(auths);
fail(marking + " failed to throw");
} catch (Throwable e) {
// all is good
@@ -72,7 +72,7 @@ public class VisibilityEvaluatorTest {
// test mismatched parentheses
for (String marking : new String[] {"(", ")", "(a&b", "b|a)"}) {
try {
- ct.evaluate(new ColumnVisibility(marking));
+ new ColumnVisibility(marking).evaluate(auths);
fail(marking + " failed to throw");
} catch (Throwable e) {
// all is good