You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:57:40 UTC
[09/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
index 9b9cd51,0000000..4b3e41c
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/TFileDumper.java
@@@ -1,259 -1,0 +1,258 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile.BlockRegion;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile.MetaIndexEntry;
+import org.apache.accumulo.core.file.rfile.bcfile.TFile.TFileIndexEntry;
+import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Dumping the information of a TFile.
+ */
+class TFileDumper {
+ static final Log LOG = LogFactory.getLog(TFileDumper.class);
+
+ private TFileDumper() {
+ // namespace object not constructable.
+ }
+
+ private enum Align {
+ LEFT, CENTER, RIGHT, ZERO_PADDED;
+ static String format(String s, int width, Align align) {
+ if (s.length() >= width)
+ return s;
+ int room = width - s.length();
+ Align alignAdjusted = align;
+ if (room == 1) {
+ alignAdjusted = LEFT;
+ }
+ if (alignAdjusted == LEFT) {
+ return s + String.format("%" + room + "s", "");
+ }
+ if (alignAdjusted == RIGHT) {
+ return String.format("%" + room + "s", "") + s;
+ }
+ if (alignAdjusted == CENTER) {
+ int half = room / 2;
+ return String.format("%" + half + "s", "") + s + String.format("%" + (room - half) + "s", "");
+ }
+ throw new IllegalArgumentException("Unsupported alignment");
+ }
+
+ static String format(long l, int width, Align align) {
+ if (align == ZERO_PADDED) {
+ return String.format("%0" + width + "d", l);
+ }
+ return format(Long.toString(l), width, align);
+ }
+
+ static int calculateWidth(String caption, long max) {
+ return Math.max(caption.length(), Long.toString(max).length());
+ }
+ }
+
+ /**
+ * Dump information about TFile.
+ *
+ * @param file
+ * Path string of the TFile
+ * @param out
+ * PrintStream to output the information.
+ * @param conf
+ * The configuration object.
- * @throws IOException
+ */
+ static public void dumpInfo(String file, PrintStream out, Configuration conf) throws IOException {
+ final int maxKeySampleLen = 16;
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(conf);
+ long length = fs.getFileStatus(path).getLen();
+ FSDataInputStream fsdis = fs.open(path);
+ TFile.Reader reader = new TFile.Reader(fsdis, length, conf);
+ try {
+ LinkedHashMap<String,String> properties = new LinkedHashMap<String,String>();
+ int blockCnt = reader.readerBCF.getBlockCount();
+ int metaBlkCnt = reader.readerBCF.metaIndex.index.size();
+ properties.put("BCFile Version", reader.readerBCF.version.toString());
+ properties.put("TFile Version", reader.tfileMeta.version.toString());
+ properties.put("File Length", Long.toString(length));
+ properties.put("Data Compression", reader.readerBCF.getDefaultCompressionName());
+ properties.put("Record Count", Long.toString(reader.getEntryCount()));
+ properties.put("Sorted", Boolean.toString(reader.isSorted()));
+ if (reader.isSorted()) {
+ properties.put("Comparator", reader.getComparatorName());
+ }
+ properties.put("Data Block Count", Integer.toString(blockCnt));
+ long dataSize = 0, dataSizeUncompressed = 0;
+ if (blockCnt > 0) {
+ for (int i = 0; i < blockCnt; ++i) {
+ BlockRegion region = reader.readerBCF.dataIndex.getBlockRegionList().get(i);
+ dataSize += region.getCompressedSize();
+ dataSizeUncompressed += region.getRawSize();
+ }
+ properties.put("Data Block Bytes", Long.toString(dataSize));
+ if (reader.readerBCF.getDefaultCompressionName() != "none") {
+ properties.put("Data Block Uncompressed Bytes", Long.toString(dataSizeUncompressed));
+ properties.put("Data Block Compression Ratio", String.format("1:%.1f", (double) dataSizeUncompressed / dataSize));
+ }
+ }
+
+ properties.put("Meta Block Count", Integer.toString(metaBlkCnt));
+ long metaSize = 0, metaSizeUncompressed = 0;
+ if (metaBlkCnt > 0) {
+ Collection<MetaIndexEntry> metaBlks = reader.readerBCF.metaIndex.index.values();
+ boolean calculateCompression = false;
+ for (Iterator<MetaIndexEntry> it = metaBlks.iterator(); it.hasNext();) {
+ MetaIndexEntry e = it.next();
+ metaSize += e.getRegion().getCompressedSize();
+ metaSizeUncompressed += e.getRegion().getRawSize();
+ if (e.getCompressionAlgorithm() != Compression.Algorithm.NONE) {
+ calculateCompression = true;
+ }
+ }
+ properties.put("Meta Block Bytes", Long.toString(metaSize));
+ if (calculateCompression) {
+ properties.put("Meta Block Uncompressed Bytes", Long.toString(metaSizeUncompressed));
+ properties.put("Meta Block Compression Ratio", String.format("1:%.1f", (double) metaSizeUncompressed / metaSize));
+ }
+ }
+ properties.put("Meta-Data Size Ratio", String.format("1:%.1f", (double) dataSize / metaSize));
+ long leftOverBytes = length - dataSize - metaSize;
+ long miscSize = BCFile.Magic.size() * 2 + Long.SIZE / Byte.SIZE + Version.size();
+ long metaIndexSize = leftOverBytes - miscSize;
+ properties.put("Meta Block Index Bytes", Long.toString(metaIndexSize));
+ properties.put("Headers Etc Bytes", Long.toString(miscSize));
+ // Now output the properties table.
+ int maxKeyLength = 0;
+ Set<Map.Entry<String,String>> entrySet = properties.entrySet();
+ for (Iterator<Map.Entry<String,String>> it = entrySet.iterator(); it.hasNext();) {
+ Map.Entry<String,String> e = it.next();
+ if (e.getKey().length() > maxKeyLength) {
+ maxKeyLength = e.getKey().length();
+ }
+ }
+ for (Iterator<Map.Entry<String,String>> it = entrySet.iterator(); it.hasNext();) {
+ Map.Entry<String,String> e = it.next();
+ out.printf("%s : %s%n", Align.format(e.getKey(), maxKeyLength, Align.LEFT), e.getValue());
+ }
+ out.println();
+ reader.checkTFileDataIndex();
+ if (blockCnt > 0) {
+ String blkID = "Data-Block";
+ int blkIDWidth = Align.calculateWidth(blkID, blockCnt);
+ int blkIDWidth2 = Align.calculateWidth("", blockCnt);
+ String offset = "Offset";
+ int offsetWidth = Align.calculateWidth(offset, length);
+ String blkLen = "Length";
+ int blkLenWidth = Align.calculateWidth(blkLen, dataSize / blockCnt * 10);
+ String rawSize = "Raw-Size";
+ int rawSizeWidth = Align.calculateWidth(rawSize, dataSizeUncompressed / blockCnt * 10);
+ String records = "Records";
+ int recordsWidth = Align.calculateWidth(records, reader.getEntryCount() / blockCnt * 10);
+ String endKey = "End-Key";
+ int endKeyWidth = Math.max(endKey.length(), maxKeySampleLen * 2 + 5);
+
+ out.printf("%s %s %s %s %s %s%n", Align.format(blkID, blkIDWidth, Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
+ Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(rawSize, rawSizeWidth, Align.CENTER),
+ Align.format(records, recordsWidth, Align.CENTER), Align.format(endKey, endKeyWidth, Align.LEFT));
+
+ for (int i = 0; i < blockCnt; ++i) {
+ BlockRegion region = reader.readerBCF.dataIndex.getBlockRegionList().get(i);
+ TFileIndexEntry indexEntry = reader.tfileIndex.getEntry(i);
+ out.printf("%s %s %s %s %s ", Align.format(Align.format(i, blkIDWidth2, Align.ZERO_PADDED), blkIDWidth, Align.LEFT),
+ Align.format(region.getOffset(), offsetWidth, Align.LEFT), Align.format(region.getCompressedSize(), blkLenWidth, Align.LEFT),
+ Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT), Align.format(indexEntry.kvEntries, recordsWidth, Align.LEFT));
+ byte[] key = indexEntry.key;
+ boolean asAscii = true;
+ int sampleLen = Math.min(maxKeySampleLen, key.length);
+ for (int j = 0; j < sampleLen; ++j) {
+ byte b = key[j];
+ if ((b < 32 && b != 9) || (b == 127)) {
+ asAscii = false;
+ }
+ }
+ if (!asAscii) {
+ out.print("0X");
+ for (int j = 0; j < sampleLen; ++j) {
+ byte b = key[i];
+ out.printf("%X", b);
+ }
+ } else {
+ out.print(new String(key, 0, sampleLen, Constants.UTF8));
+ }
+ if (sampleLen < key.length) {
+ out.print("...");
+ }
+ out.println();
+ }
+ }
+
+ out.println();
+ if (metaBlkCnt > 0) {
+ String name = "Meta-Block";
+ int maxNameLen = 0;
+ Set<Map.Entry<String,MetaIndexEntry>> metaBlkEntrySet = reader.readerBCF.metaIndex.index.entrySet();
+ for (Iterator<Map.Entry<String,MetaIndexEntry>> it = metaBlkEntrySet.iterator(); it.hasNext();) {
+ Map.Entry<String,MetaIndexEntry> e = it.next();
+ if (e.getKey().length() > maxNameLen) {
+ maxNameLen = e.getKey().length();
+ }
+ }
+ int nameWidth = Math.max(name.length(), maxNameLen);
+ String offset = "Offset";
+ int offsetWidth = Align.calculateWidth(offset, length);
+ String blkLen = "Length";
+ int blkLenWidth = Align.calculateWidth(blkLen, metaSize / metaBlkCnt * 10);
+ String rawSize = "Raw-Size";
+ int rawSizeWidth = Align.calculateWidth(rawSize, metaSizeUncompressed / metaBlkCnt * 10);
+ String compression = "Compression";
+ int compressionWidth = compression.length();
+ out.printf("%s %s %s %s %s%n", Align.format(name, nameWidth, Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
+ Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(rawSize, rawSizeWidth, Align.CENTER),
+ Align.format(compression, compressionWidth, Align.LEFT));
+
+ for (Iterator<Map.Entry<String,MetaIndexEntry>> it = metaBlkEntrySet.iterator(); it.hasNext();) {
+ Map.Entry<String,MetaIndexEntry> e = it.next();
+ String blkName = e.getValue().getMetaName();
+ BlockRegion region = e.getValue().getRegion();
+ String blkCompression = e.getValue().getCompressionAlgorithm().getName();
+ out.printf("%s %s %s %s %s%n", Align.format(blkName, nameWidth, Align.LEFT), Align.format(region.getOffset(), offsetWidth, Align.LEFT),
+ Align.format(region.getCompressedSize(), blkLenWidth, Align.LEFT), Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT),
+ Align.format(blkCompression, compressionWidth, Align.LEFT));
+ }
+ }
+ } finally {
+ IOUtils.cleanup(LOG, reader, fsdis);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
index 45a59f6,0000000..9131d30
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Utils.java
@@@ -1,485 -1,0 +1,474 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Supporting Utility classes used by TFile, and shared by users of TFile.
+ */
+public final class Utils {
+
+ /**
+ * Prevent the instantiation of Utils.
+ */
+ private Utils() {
+ // nothing
+ }
+
+ /**
+ * Encoding an integer into a variable-length encoding format. Synonymous to <code>Utils#writeVLong(out, n)</code>.
+ *
+ * @param out
+ * output stream
+ * @param n
+ * The integer to be encoded
- * @throws IOException
+ * @see Utils#writeVLong(DataOutput, long)
+ */
+ public static void writeVInt(DataOutput out, int n) throws IOException {
+ writeVLong(out, n);
+ }
+
+ /**
+ * Encoding a Long integer into a variable-length encoding format.
+ * <ul>
+ * <li>if n in [-32, 127): encode in one byte with the actual value. Otherwise,
+ * <li>if n in [-20*2^8, 20*2^8): encode in two bytes: byte[0] = n/256 - 52; byte[1]=n&0xff. Otherwise,
+ * <li>if n IN [-16*2^16, 16*2^16): encode in three bytes: byte[0]=n/2^16 - 88; byte[1]=(n>>8)&0xff; byte[2]=n&0xff. Otherwise,
+ * <li>if n in [-8*2^24, 8*2^24): encode in four bytes: byte[0]=n/2^24 - 112; byte[1] = (n>>16)&0xff; byte[2] = (n>>8)&0xff; byte[3]=n&0xff. Otherwise:
+ * <li>if n in [-2^31, 2^31): encode in five bytes: byte[0]=-125; byte[1] = (n>>24)&0xff; byte[2]=(n>>16)&0xff; byte[3]=(n>>8)&0xff; byte[4]=n&0xff;
+ * <li>if n in [-2^39, 2^39): encode in six bytes: byte[0]=-124; byte[1] = (n>>32)&0xff; byte[2]=(n>>24)&0xff; byte[3]=(n>>16)&0xff; byte[4]=(n>>8)&0xff;
+ * byte[5]=n&0xff
+ * <li>if n in [-2^47, 2^47): encode in seven bytes: byte[0]=-123; byte[1] = (n>>40)&0xff; byte[2]=(n>>32)&0xff; byte[3]=(n>>24)&0xff; byte[4]=(n>>16)&0xff;
+ * byte[5]=(n>>8)&0xff; byte[6]=n&0xff;
+ * <li>if n in [-2^55, 2^55): encode in eight bytes: byte[0]=-122; byte[1] = (n>>48)&0xff; byte[2] = (n>>40)&0xff; byte[3]=(n>>32)&0xff; byte[4]=(n>>24)&0xff;
+ * byte[5]=(n>>16)&0xff; byte[6]=(n>>8)&0xff; byte[7]=n&0xff;
+ * <li>if n in [-2^63, 2^63): encode in nine bytes: byte[0]=-121; byte[1] = (n>>54)&0xff; byte[2] = (n>>48)&0xff; byte[3] = (n>>40)&0xff;
+ * byte[4]=(n>>32)&0xff; byte[5]=(n>>24)&0xff; byte[6]=(n>>16)&0xff; byte[7]=(n>>8)&0xff; byte[8]=n&0xff;
+ * </ul>
+ *
+ * @param out
+ * output stream
+ * @param n
+ * the integer number
- * @throws IOException
+ */
+ @SuppressWarnings("fallthrough")
+ public static void writeVLong(DataOutput out, long n) throws IOException {
+ if ((n < 128) && (n >= -32)) {
+ out.writeByte((int) n);
+ return;
+ }
+
+ long un = (n < 0) ? ~n : n;
+ // how many bytes do we need to represent the number with sign bit?
+ int len = (Long.SIZE - Long.numberOfLeadingZeros(un)) / 8 + 1;
+ int firstByte = (int) (n >> ((len - 1) * 8));
+ switch (len) {
+ case 1:
+ // fall it through to firstByte==-1, len=2.
+ firstByte >>= 8;
+ case 2:
+ if ((firstByte < 20) && (firstByte >= -20)) {
+ out.writeByte(firstByte - 52);
+ out.writeByte((int) n);
+ return;
+ }
+ // fall it through to firstByte==0/-1, len=3.
+ firstByte >>= 8;
+ case 3:
+ if ((firstByte < 16) && (firstByte >= -16)) {
+ out.writeByte(firstByte - 88);
+ out.writeShort((int) n);
+ return;
+ }
+ // fall it through to firstByte==0/-1, len=4.
+ firstByte >>= 8;
+ case 4:
+ if ((firstByte < 8) && (firstByte >= -8)) {
+ out.writeByte(firstByte - 112);
+ out.writeShort(((int) n) >>> 8);
+ out.writeByte((int) n);
+ return;
+ }
+ out.writeByte(len - 129);
+ out.writeInt((int) n);
+ return;
+ case 5:
+ out.writeByte(len - 129);
+ out.writeInt((int) (n >>> 8));
+ out.writeByte((int) n);
+ return;
+ case 6:
+ out.writeByte(len - 129);
+ out.writeInt((int) (n >>> 16));
+ out.writeShort((int) n);
+ return;
+ case 7:
+ out.writeByte(len - 129);
+ out.writeInt((int) (n >>> 24));
+ out.writeShort((int) (n >>> 8));
+ out.writeByte((int) n);
+ return;
+ case 8:
+ out.writeByte(len - 129);
+ out.writeLong(n);
+ return;
+ default:
+ throw new RuntimeException("Internel error");
+ }
+ }
+
+ /**
+ * Decoding the variable-length integer. Synonymous to <code>(int)Utils#readVLong(in)</code>.
+ *
+ * @param in
+ * input stream
+ * @return the decoded integer
- * @throws IOException
+ *
+ * @see Utils#readVLong(DataInput)
+ */
+ public static int readVInt(DataInput in) throws IOException {
+ long ret = readVLong(in);
+ if ((ret > Integer.MAX_VALUE) || (ret < Integer.MIN_VALUE)) {
+ throw new RuntimeException("Number too large to be represented as Integer");
+ }
+ return (int) ret;
+ }
+
+ /**
+ * Decoding the variable-length integer. Suppose the value of the first byte is FB, and the following bytes are NB[*].
+ * <ul>
+ * <li>if (FB >= -32), return (long)FB;
+ * <li>if (FB in [-72, -33]), return (FB+52)<<8 + NB[0]&0xff;
+ * <li>if (FB in [-104, -73]), return (FB+88)<<16 + (NB[0]&0xff)<<8 + NB[1]&0xff;
+ * <li>if (FB in [-120, -105]), return (FB+112)<<24 + (NB[0]&0xff)<<16 + (NB[1]&0xff)<<8 + NB[2]&0xff;
+ * <li>if (FB in [-128, -121]), return interpret NB[FB+129] as a signed big-endian integer.
+ *
+ * @param in
+ * input stream
+ * @return the decoded long integer.
- * @throws IOException
+ */
+
+ public static long readVLong(DataInput in) throws IOException {
+ int firstByte = in.readByte();
+ if (firstByte >= -32) {
+ return firstByte;
+ }
+
+ switch ((firstByte + 128) / 8) {
+ case 11:
+ case 10:
+ case 9:
+ case 8:
+ case 7:
+ return ((firstByte + 52) << 8) | in.readUnsignedByte();
+ case 6:
+ case 5:
+ case 4:
+ case 3:
+ return ((firstByte + 88) << 16) | in.readUnsignedShort();
+ case 2:
+ case 1:
+ return ((firstByte + 112) << 24) | (in.readUnsignedShort() << 8) | in.readUnsignedByte();
+ case 0:
+ int len = firstByte + 129;
+ switch (len) {
+ case 4:
+ return in.readInt();
+ case 5:
+ return ((long) in.readInt()) << 8 | in.readUnsignedByte();
+ case 6:
+ return ((long) in.readInt()) << 16 | in.readUnsignedShort();
+ case 7:
+ return ((long) in.readInt()) << 24 | (in.readUnsignedShort() << 8) | in.readUnsignedByte();
+ case 8:
+ return in.readLong();
+ default:
+ throw new IOException("Corrupted VLong encoding");
+ }
+ default:
+ throw new RuntimeException("Internal error");
+ }
+ }
+
+ /**
+ * Write a String as a VInt n, followed by n Bytes as in Text format.
- *
- * @param out
- * @param s
- * @throws IOException
+ */
+ public static void writeString(DataOutput out, String s) throws IOException {
+ if (s != null) {
+ Text text = new Text(s);
+ byte[] buffer = text.getBytes();
+ int len = text.getLength();
+ writeVInt(out, len);
+ out.write(buffer, 0, len);
+ } else {
+ writeVInt(out, -1);
+ }
+ }
+
+ /**
+ * Read a String as a VInt n, followed by n Bytes in Text format.
+ *
+ * @param in
+ * The input stream.
+ * @return The string
- * @throws IOException
+ */
+ public static String readString(DataInput in) throws IOException {
+ int length = readVInt(in);
+ if (length == -1)
+ return null;
+ byte[] buffer = new byte[length];
+ in.readFully(buffer);
+ return Text.decode(buffer);
+ }
+
+ /**
+ * A generic Version class. We suggest applications built on top of TFile use this class to maintain version information in their meta blocks.
+ *
+ * A version number consists of a major version and a minor version. The suggested usage of major and minor version number is to increment major version
+ * number when the new storage format is not backward compatible, and increment the minor version otherwise.
+ */
+ public static final class Version implements Comparable<Version> {
+ private final short major;
+ private final short minor;
+
+ /**
+ * Construct the Version object by reading from the input stream.
+ *
+ * @param in
+ * input stream
- * @throws IOException
+ */
+ public Version(DataInput in) throws IOException {
+ major = in.readShort();
+ minor = in.readShort();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param major
+ * major version.
+ * @param minor
+ * minor version.
+ */
+ public Version(short major, short minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ /**
+ * Write the object to a DataOutput. The serialized format of the Version is major version followed by minor version, both as big-endian short integers.
+ *
+ * @param out
+ * The DataOutput object.
- * @throws IOException
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(major);
+ out.writeShort(minor);
+ }
+
+ /**
+ * Get the major version.
+ *
+ * @return Major version.
+ */
+ public int getMajor() {
+ return major;
+ }
+
+ /**
+ * Get the minor version.
+ *
+ * @return The minor version.
+ */
+ public int getMinor() {
+ return minor;
+ }
+
+ /**
+ * Get the size of the serialized Version object.
+ *
+ * @return serialized size of the version object.
+ */
+ public static int size() {
+ return (Short.SIZE + Short.SIZE) / Byte.SIZE;
+ }
+
+ /**
+ * Return a string representation of the version.
+ */
+ @Override
+ public String toString() {
+ return new StringBuilder("v").append(major).append(".").append(minor).toString();
+ }
+
+ /**
+ * Test compatibility.
+ *
+ * @param other
+ * The Version object to test compatibility with.
+ * @return true if both versions have the same major version number; false otherwise.
+ */
+ public boolean compatibleWith(Version other) {
+ return major == other.major;
+ }
+
+ /**
+ * Compare this version with another version.
+ */
+ @Override
+ public int compareTo(Version that) {
+ if (major != that.major) {
+ return major - that.major;
+ }
+ return minor - that.minor;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other)
+ return true;
+ if (!(other instanceof Version))
+ return false;
+ return compareTo((Version) other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return (major << 16 + minor);
+ }
+ }
+
+ /**
+ * Lower bound binary search. Find the index to the first element in the list that compares greater than or equal to key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @param cmp
+ * Comparator for the key.
+ * @return The index to the desired element if it exists; or list.size() otherwise.
+ */
+ public static <T> int lowerBound(List<? extends T> list, T key, Comparator<? super T> cmp) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ T midVal = list.get(mid);
+ int ret = cmp.compare(midVal, key);
+ if (ret < 0)
+ low = mid + 1;
+ else
+ high = mid;
+ }
+ return low;
+ }
+
+ /**
+ * Upper bound binary search. Find the index to the first element in the list that compares greater than the input key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @param cmp
+ * Comparator for the key.
+ * @return The index to the desired element if it exists; or list.size() otherwise.
+ */
+ public static <T> int upperBound(List<? extends T> list, T key, Comparator<? super T> cmp) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ T midVal = list.get(mid);
+ int ret = cmp.compare(midVal, key);
+ if (ret <= 0)
+ low = mid + 1;
+ else
+ high = mid;
+ }
+ return low;
+ }
+
+ /**
+ * Lower bound binary search. Find the index to the first element in the list that compares greater than or equal to key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @return The index to the desired element if it exists; or list.size() otherwise.
+ */
+ public static <T> int lowerBound(List<? extends Comparable<? super T>> list, T key) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ Comparable<? super T> midVal = list.get(mid);
+ int ret = midVal.compareTo(key);
+ if (ret < 0)
+ low = mid + 1;
+ else
+ high = mid;
+ }
+ return low;
+ }
+
+ /**
+ * Upper bound binary search. Find the index to the first element in the list that compares greater than the input key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @return The index to the desired element if it exists; or list.size() otherwise.
+ */
+ public static <T> int upperBound(List<? extends Comparable<? super T>> list, T key) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ Comparable<? super T> midVal = list.get(mid);
+ int ret = midVal.compareTo(key);
+ if (ret <= 0)
+ low = mid + 1;
+ else
+ high = mid;
+ }
+ return low;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
index 54a1333,0000000..5b7b05c
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/TypedValueCombiner.java
@@@ -1,249 -1,0 +1,243 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+
+/**
+ * A Combiner that decodes each Value to type V before reducing, then encodes the result of typedReduce back to Value.
+ *
+ * Subclasses must implement a typedReduce method: public V typedReduce(Key key, Iterator<V> iter);
+ *
+ * This typedReduce method will be passed the most recent Key and an iterator over the Values (translated to Vs) for all non-deleted versions of that Key.
+ *
+ * Subclasses may implement a switch on the "type" variable to choose an Encoder in their init method.
+ */
+public abstract class TypedValueCombiner<V> extends Combiner {
+ private Encoder<V> encoder = null;
+ private boolean lossy = false;
+
+ protected static final String LOSSY = "lossy";
+
+ /**
+ * A Java Iterator that translates an Iterator<Value> to an Iterator<V> using the decode method of an Encoder.
+ */
+ private static class VIterator<V> implements Iterator<V> {
+ private Iterator<Value> source;
+ private Encoder<V> encoder;
+ private boolean lossy;
+
+ /**
+ * Constructs an Iterator<V> from an Iterator<Value>
+ *
+ * @param iter
+ * The source iterator
+ *
+ * @param encoder
+ * The Encoder whose decode method is used to translate from Value to V
+ *
+ * @param lossy
+ * Determines whether to error on failure to decode or ignore and move on
+ */
+ VIterator(Iterator<Value> iter, Encoder<V> encoder, boolean lossy) {
+ this.source = iter;
+ this.encoder = encoder;
+ this.lossy = lossy;
+ }
+
+ V next = null;
+ boolean hasNext = false;
+
+ @Override
+ public boolean hasNext() {
+ if (hasNext)
+ return true;
+
+ while (true) {
+ if (!source.hasNext())
+ return false;
+ try {
+ next = encoder.decode(source.next().get());
+ return hasNext = true;
+ } catch (ValueFormatException vfe) {
+ if (!lossy)
+ throw vfe;
+ }
+ }
+ }
+
+ @Override
+ public V next() {
+ if (!hasNext && !hasNext())
+ throw new NoSuchElementException();
+ V toRet = next;
+ next = null;
+ hasNext = false;
+ return toRet;
+ }
+
+ @Override
+ public void remove() {
+ source.remove();
+ }
+ }
+
+ /**
+ * An interface for translating from byte[] to V and back.
+ */
+ public static interface Encoder<V> {
+ public byte[] encode(V v);
+
+ public V decode(byte[] b) throws ValueFormatException;
+ }
+
+ /**
+ * Sets the Encoder<V> used to translate Values to V and back.
- *
- * @param encoder
+ */
+ protected void setEncoder(Encoder<V> encoder) {
+ this.encoder = encoder;
+ }
+
+ /**
+ * Instantiates and sets the Encoder<V> used to translate Values to V and back.
+ *
- * @param encoderClass
+ * @throws IllegalArgumentException
+ * if ClassNotFoundException, InstantiationException, or IllegalAccessException occurs
+ */
+ protected void setEncoder(String encoderClass) {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends Encoder<V>> clazz = (Class<? extends Encoder<V>>) AccumuloVFSClassLoader.loadClass(encoderClass, Encoder.class);
+ encoder = clazz.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException(e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Tests whether v remains the same when encoded and decoded with the current encoder.
+ *
- * @param v
+ * @throws IllegalStateException
+ * if an encoder has not been set.
+ * @throws IllegalArgumentException
+ * if the test fails.
+ */
+ protected void testEncoder(V v) {
+ if (encoder == null)
+ throw new IllegalStateException("encoder has not been initialized");
+ testEncoder(encoder, v);
+ }
+
+ /**
+ * Tests whether v remains the same when encoded and decoded with the given encoder.
+ *
- * @param encoder
- * @param v
+ * @throws IllegalArgumentException
+ * if the test fails.
+ */
+ public static <V> void testEncoder(Encoder<V> encoder, V v) {
+ try {
+ if (!v.equals(encoder.decode(encoder.encode(v))))
+ throw new IllegalArgumentException("something wrong with " + encoder.getClass().getName() + " -- doesn't encode and decode " + v + " properly");
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(encoder.getClass().getName() + " doesn't encode " + v.getClass().getName());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ TypedValueCombiner<V> newInstance = (TypedValueCombiner<V>) super.deepCopy(env);
+ newInstance.setEncoder(encoder);
+ return newInstance;
+ }
+
+ @Override
+ public Value reduce(Key key, Iterator<Value> iter) {
+ return new Value(encoder.encode(typedReduce(key, new VIterator<V>(iter, encoder, lossy))));
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ setLossyness(options);
+ }
+
+ private void setLossyness(Map<String,String> options) {
+ String loss = options.get(LOSSY);
+ if (loss == null)
+ lossy = false;
+ else
+ lossy = Boolean.parseBoolean(loss);
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ IteratorOptions io = super.describeOptions();
+ io.addNamedOption(LOSSY, "if true, failed decodes are ignored. Otherwise combiner will error on failed decodes (default false): <TRUE|FALSE>");
+ return io;
+ }
+
+ @Override
+ public boolean validateOptions(Map<String,String> options) {
+ if (super.validateOptions(options) == false)
+ return false;
+ try {
+ setLossyness(options);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("bad boolean " + LOSSY + ":" + options.get(LOSSY));
+ }
+ return true;
+ }
+
+ /**
+ * A convenience method to set the "lossy" option on a TypedValueCombiner. If true, the combiner will ignore any values which fail to decode. Otherwise, the
+ * combiner will throw an error which will interrupt the action (and prevent potential data loss). False is the default behavior.
+ *
+ * @param is
+ * iterator settings object to configure
+ * @param lossy
+ * if true the combiner will ignored values which fail to decode; otherwise error.
+ */
+ public static void setLossyness(IteratorSetting is, boolean lossy) {
+ is.addOption(LOSSY, Boolean.toString(lossy));
+ }
+
+ /**
+ * Reduces a list of V into a single V.
+ *
+ * @param key
+ * The most recent version of the Key being reduced.
+ *
+ * @param iter
+ * An iterator over the V for different versions of the key.
+ *
+ * @return The combined V.
+ */
+ public abstract V typedReduce(Key key, Iterator<V> iter);
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
index 7bb2228,0000000..7ede7fe
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/ValueFormatException.java
@@@ -1,40 -1,0 +1,34 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+/**
+ * Exception used for TypedValueCombiner and it's Encoders decode() function
+ */
+public class ValueFormatException extends IllegalArgumentException {
+
- /**
- * @param string
- */
+ public ValueFormatException(String string) {
+ super(string);
+ }
+
- /**
- * @param nfe
- */
+ public ValueFormatException(Exception nfe) {
+ super(nfe);
+ }
+
+ private static final long serialVersionUID = 4170291568272971821L;
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
index e5fe62a,0000000..37a234c
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/MapFileIterator.java
@@@ -1,162 -1,0 +1,156 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.system;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.NoSuchMetaStoreException;
+import org.apache.accumulo.core.file.map.MapFileUtil;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile.Reader;
+import org.apache.log4j.Logger;
+
+public class MapFileIterator implements FileSKVIterator {
+ private static final Logger log = Logger.getLogger(MapFileIterator.class);
+
+ private Reader reader;
+ private Value topValue;
+ private Key topKey;
+ private AtomicBoolean interruptFlag;
+ private int interruptCheckCount = 0;
+ private FileSystem fs;
+ private String dirName;
+
- /**
- * @param acuconf
- * @param fs
- * @param dir
- * @param conf
- * @throws IOException
- */
+ public MapFileIterator(AccumuloConfiguration acuconf, FileSystem fs, String dir, Configuration conf) throws IOException {
+ this.reader = MapFileUtil.openMapFile(acuconf, fs, dir, conf);
+ this.fs = fs;
+ this.dirName = dir;
+ }
+
+ @Override
+ public void setInterruptFlag(AtomicBoolean flag) {
+ this.interruptFlag = flag;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasTop() {
+ return topKey != null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ if (interruptFlag != null && interruptCheckCount++ % 100 == 0 && interruptFlag.get())
+ throw new IterationInterruptedException();
+
+ reader.next(topKey, topValue);
+ }
+
++ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ if (columnFamilies.size() != 0 || inclusive) {
+ throw new IllegalArgumentException("I do not know how to filter column families");
+ }
+
+ if (range == null)
+ throw new IllegalArgumentException("Cannot seek to null range");
+
+ if (interruptFlag != null && interruptFlag.get())
+ throw new IterationInterruptedException();
+
+ Key key = range.getStartKey();
+ if (key == null) {
+ key = new Key();
+ }
+
+ reader.seek(key);
+
+ while (hasTop() && range.beforeStartKey(getTopKey())) {
+ next();
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ try {
+ SortedKeyValueIterator<Key,Value> other = env.reserveMapFileReader(dirName);
+ ((InterruptibleIterator) other).setInterruptFlag(interruptFlag);
+ log.debug("deep copying MapFile: " + this + " -> " + other);
+ return other;
+ } catch (IOException e) {
+ log.error("failed to clone map file reader", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Key getFirstKey() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Key getLastKey() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataInputStream getMetaStore(String name) throws IOException {
+ Path path = new Path(this.dirName, name);
+ if (!fs.exists(path))
+ throw new NoSuchMetaStoreException("name = " + name);
+ return fs.open(path);
+ }
+
+ @Override
+ public void closeDeepCopies() throws IOException {
+ // nothing to do, deep copies are externally managed/closed
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
index 4f8207c,0000000..86798dd
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/GrepIterator.java
@@@ -1,104 -1,0 +1,101 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * This iterator provides exact string matching. It searches both the Key and Value for the string. The string to match is specified by the "term" option.
+ */
+public class GrepIterator extends Filter {
+
+ private byte term[];
+
+ @Override
+ public boolean accept(Key k, Value v) {
+ return match(v.get()) || match(k.getRowData()) || match(k.getColumnFamilyData()) || match(k.getColumnQualifierData());
+ }
+
+ private boolean match(ByteSequence bs) {
+ return indexOf(bs.getBackingArray(), bs.offset(), bs.length(), term) >= 0;
+ }
+
+ private boolean match(byte[] ba) {
+ return indexOf(ba, 0, ba.length, term) >= 0;
+ }
+
+ // copied code below from java string and modified
+
+ private static int indexOf(byte[] source, int sourceOffset, int sourceCount, byte[] target) {
+ byte first = target[0];
+ int targetCount = target.length;
+ int max = sourceOffset + (sourceCount - targetCount);
+
+ for (int i = sourceOffset; i <= max; i++) {
+ /* Look for first character. */
+ if (source[i] != first) {
+ while (++i <= max && source[i] != first)
+ continue;
+ }
+
+ /* Found first character, now look at the rest of v2 */
+ if (i <= max) {
+ int j = i + 1;
+ int end = j + targetCount - 1;
+ for (int k = 1; j < end && source[j] == target[k]; j++, k++)
+ continue;
+
+ if (j == end) {
+ /* Found whole string. */
+ return i - sourceOffset;
+ }
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ GrepIterator copy = (GrepIterator) super.deepCopy(env);
+ copy.term = Arrays.copyOf(term, term.length);
+ return copy;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ term = options.get("term").getBytes(Constants.UTF8);
+ }
+
+ /**
+ * Encode the grep term as an option for a ScanIterator
- *
- * @param cfg
- * @param term
+ */
+ public static void setTerm(IteratorSetting cfg, String term) {
+ cfg.addOption("term", term);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
index 447200b,0000000..39cba6d
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java
@@@ -1,558 -1,0 +1,548 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of
+ * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index.
+ *
+ * The table structure should have the following form:
+ *
+ * row: shardID, colfam: term, colqual: docID
+ *
+ * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The
+ * result will have an empty column family, as follows:
+ *
+ * row: shardID, colfam: (empty), colqual: docID
+ *
+ * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs.
+ *
+ * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes
+ * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method.
+ *
+ * README.shard in docs/examples shows an example of using the IntersectingIterator.
+ */
+public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> {
+
+ protected Text nullText = new Text();
+
+ protected Text getPartition(Key key) {
+ return key.getRow();
+ }
+
+ protected Text getTerm(Key key) {
+ return key.getColumnFamily();
+ }
+
+ protected Text getDocID(Key key) {
+ return key.getColumnQualifier();
+ }
+
+ protected Key buildKey(Text partition, Text term) {
+ return new Key(partition, (term == null) ? nullText : term);
+ }
+
+ protected Key buildKey(Text partition, Text term, Text docID) {
+ return new Key(partition, (term == null) ? nullText : term, docID);
+ }
+
+ protected Key buildFollowingPartitionKey(Key key) {
+ return key.followingKey(PartialKey.ROW);
+ }
+
+ protected static final Logger log = Logger.getLogger(IntersectingIterator.class);
+
+ public static class TermSource {
+ public SortedKeyValueIterator<Key,Value> iter;
+ public Text term;
+ public Collection<ByteSequence> seekColfams;
+ public boolean notFlag;
+
+ public TermSource(TermSource other) {
+ this.iter = other.iter;
+ this.term = other.term;
+ this.notFlag = other.notFlag;
+ this.seekColfams = other.seekColfams;
+ }
+
+ public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) {
+ this(iter, term, false);
+ }
+
+ public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) {
+ this.iter = iter;
+ this.term = term;
+ this.notFlag = notFlag;
+ // The desired column families for this source is the term itself
+ this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength()));
+ }
+
+ public String getTermString() {
+ return (this.term == null) ? "Iterator" : this.term.toString();
+ }
+ }
+
+ protected TermSource[] sources;
+ int sourcesCount = 0;
+
+ Range overallRange;
+
+ // query-time settings
+ protected Text currentPartition = null;
+ protected Text currentDocID = new Text(emptyByteArray);
+ static final byte[] emptyByteArray = new byte[0];
+
+ protected Key topKey = null;
+ protected Value value = new Value(emptyByteArray);
+
+ public IntersectingIterator() {}
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ return new IntersectingIterator(this, env);
+ }
+
+ private IntersectingIterator(IntersectingIterator other, IteratorEnvironment env) {
+ if (other.sources != null) {
+ sourcesCount = other.sourcesCount;
+ sources = new TermSource[sourcesCount];
+ for (int i = 0; i < sourcesCount; i++) {
+ sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term);
+ }
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ // we don't really care about values
+ return value;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return currentPartition != null;
+ }
+
+ // precondition: currentRow is not null
+ private boolean seekOneSource(int sourceID) throws IOException {
+ // find the next key in the appropriate column family that is at or beyond the cursor (currentRow, currentCQ)
+ // advance the cursor if this source goes beyond it
+ // return whether we advanced the cursor
+
+ // within this loop progress must be made in one of the following forms:
+ // - currentRow or currentCQ must be increased
+ // - the given source must advance its iterator
+ // this loop will end when any of the following criteria are met
+ // - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentCQ)
+ // - the given source is out of data and currentRow is set to null
+ // - the given source has advanced beyond the endRow and currentRow is set to null
+ boolean advancedCursor = false;
+
+ if (sources[sourceID].notFlag) {
+ while (true) {
+ if (sources[sourceID].iter.hasTop() == false) {
+ // an empty column that you are negating is a valid condition
+ break;
+ }
+ // check if we're past the end key
+ int endCompare = -1;
+ // we should compare the row to the end of the range
+ if (overallRange.getEndKey() != null) {
+ endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
+ if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
+ // an empty column that you are negating is a valid condition
+ break;
+ }
+ }
+ int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
+ // check if this source is already at or beyond currentRow
+ // if not, then seek to at least the current row
+
+ if (partitionCompare > 0) {
+ // seek to at least the currentRow
+ Key seekKey = buildKey(currentPartition, sources[sourceID].term);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+ continue;
+ }
+ // check if this source has gone beyond currentRow
+ // if so, this is a valid condition for negation
+ if (partitionCompare < 0) {
+ break;
+ }
+ // we have verified that the current source is positioned in currentRow
+ // now we must make sure we're in the right columnFamily in the current row
+ // Note: Iterators are auto-magically set to the correct columnFamily
+ if (sources[sourceID].term != null) {
+ int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
+ // check if this source is already on the right columnFamily
+ // if not, then seek forwards to the right columnFamily
+ if (termCompare > 0) {
+ Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+ continue;
+ }
+ // check if this source is beyond the right columnFamily
+ // if so, then this is a valid condition for negating
+ if (termCompare < 0) {
+ break;
+ }
+ }
+
+ // we have verified that we are in currentRow and the correct column family
+ // make sure we are at or beyond columnQualifier
+ Text docID = getDocID(sources[sourceID].iter.getTopKey());
+ int docIDCompare = currentDocID.compareTo(docID);
+ // If we are past the target, this is a valid result
+ if (docIDCompare < 0) {
+ break;
+ }
+ // if this source is not yet at the currentCQ then advance in this source
+ if (docIDCompare > 0) {
+ // seek forwards
+ Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+ continue;
+ }
+ // if we are equal to the target, this is an invalid result.
+ // Force the entire process to go to the next row.
+ // We are advancing column 0 because we forced that column to not contain a !
+ // when we did the init()
+ if (docIDCompare == 0) {
+ sources[0].iter.next();
+ advancedCursor = true;
+ break;
+ }
+ }
+ } else {
+ while (true) {
+ if (sources[sourceID].iter.hasTop() == false) {
+ currentPartition = null;
+ // setting currentRow to null counts as advancing the cursor
+ return true;
+ }
+ // check if we're past the end key
+ int endCompare = -1;
+ // we should compare the row to the end of the range
+
+ if (overallRange.getEndKey() != null) {
+ endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow());
+ if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) {
+ currentPartition = null;
+ // setting currentRow to null counts as advancing the cursor
+ return true;
+ }
+ }
+ int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey()));
+ // check if this source is already at or beyond currentRow
+ // if not, then seek to at least the current row
+ if (partitionCompare > 0) {
+ // seek to at least the currentRow
+ Key seekKey = buildKey(currentPartition, sources[sourceID].term);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+ continue;
+ }
+ // check if this source has gone beyond currentRow
+ // if so, advance currentRow
+ if (partitionCompare < 0) {
+ currentPartition.set(getPartition(sources[sourceID].iter.getTopKey()));
+ currentDocID.set(emptyByteArray);
+ advancedCursor = true;
+ continue;
+ }
+ // we have verified that the current source is positioned in currentRow
+ // now we must make sure we're in the right columnFamily in the current row
+ // Note: Iterators are auto-magically set to the correct columnFamily
+
+ if (sources[sourceID].term != null) {
+ int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey()));
+ // check if this source is already on the right columnFamily
+ // if not, then seek forwards to the right columnFamily
+ if (termCompare > 0) {
+ Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+ continue;
+ }
+ // check if this source is beyond the right columnFamily
+ // if so, then seek to the next row
+ if (termCompare < 0) {
+ // we're out of entries in the current row, so seek to the next one
+ // byte[] currentRowBytes = currentRow.getBytes();
+ // byte[] nextRow = new byte[currentRowBytes.length + 1];
+ // System.arraycopy(currentRowBytes, 0, nextRow, 0, currentRowBytes.length);
+ // nextRow[currentRowBytes.length] = (byte)0;
+ // // we should reuse text objects here
+ // sources[sourceID].seek(new Key(new Text(nextRow),columnFamilies[sourceID]));
+ if (endCompare == 0) {
+ // we're done
+ currentPartition = null;
+ // setting currentRow to null counts as advancing the cursor
+ return true;
+ }
+ Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey());
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+ continue;
+ }
+ }
+ // we have verified that we are in currentRow and the correct column family
+ // make sure we are at or beyond columnQualifier
+ Text docID = getDocID(sources[sourceID].iter.getTopKey());
+ int docIDCompare = currentDocID.compareTo(docID);
+ // if this source has advanced beyond the current column qualifier then advance currentCQ and return true
+ if (docIDCompare < 0) {
+ currentDocID.set(docID);
+ advancedCursor = true;
+ break;
+ }
+ // if this source is not yet at the currentCQ then seek in this source
+ if (docIDCompare > 0) {
+ // seek forwards
+ Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID);
+ sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true);
+ continue;
+ }
+ // this source is at the current row, in its column family, and at currentCQ
+ break;
+ }
+ }
+ return advancedCursor;
+ }
+
+ @Override
+ public void next() throws IOException {
+ if (currentPartition == null) {
+ return;
+ }
+ // precondition: the current row is set up and the sources all have the same column qualifier
+ // while we don't have a match, seek in the source with the smallest column qualifier
+ sources[0].iter.next();
+ advanceToIntersection();
+ }
+
+ protected void advanceToIntersection() throws IOException {
+ boolean cursorChanged = true;
+ while (cursorChanged) {
+ // seek all of the sources to at least the highest seen column qualifier in the current row
+ cursorChanged = false;
+ for (int i = 0; i < sourcesCount; i++) {
+ if (currentPartition == null) {
+ topKey = null;
+ return;
+ }
+ if (seekOneSource(i)) {
+ cursorChanged = true;
+ break;
+ }
+ }
+ }
+ topKey = buildKey(currentPartition, nullText, currentDocID);
+ }
+
+ public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
+ if (iter.hasTop())
+ return iter.getTopKey().toString();
+ return "";
+ }
+
+ private static final String columnFamiliesOptionName = "columnFamilies";
+ private static final String notFlagOptionName = "notFlag";
+
+ /**
- * @param columns
+ * @return encoded columns
+ */
+ protected static String encodeColumns(Text[] columns) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < columns.length; i++) {
+ sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i])), Constants.UTF8));
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+ /**
- * @param flags
+ * @return encoded flags
+ */
+ protected static String encodeBooleans(boolean[] flags) {
+ byte[] bytes = new byte[flags.length];
+ for (int i = 0; i < flags.length; i++) {
+ if (flags[i])
+ bytes[i] = 1;
+ else
+ bytes[i] = 0;
+ }
+ return new String(Base64.encodeBase64(bytes), Constants.UTF8);
+ }
+
+ protected static Text[] decodeColumns(String columns) {
+ String[] columnStrings = columns.split("\n");
+ Text[] columnTexts = new Text[columnStrings.length];
+ for (int i = 0; i < columnStrings.length; i++) {
+ columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes(Constants.UTF8)));
+ }
+ return columnTexts;
+ }
+
+ /**
- * @param flags
+ * @return decoded flags
+ */
+ protected static boolean[] decodeBooleans(String flags) {
+ // return null of there were no flags
+ if (flags == null)
+ return null;
+
+ byte[] bytes = Base64.decodeBase64(flags.getBytes(Constants.UTF8));
+ boolean[] bFlags = new boolean[bytes.length];
+ for (int i = 0; i < bytes.length; i++) {
+ if (bytes[i] == 1)
+ bFlags[i] = true;
+ else
+ bFlags[i] = false;
+ }
+ return bFlags;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ Text[] terms = decodeColumns(options.get(columnFamiliesOptionName));
+ boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName));
+
+ if (terms.length < 2) {
+ throw new IllegalArgumentException("IntersectionIterator requires two or more columns families");
+ }
+
+ // Scan the not flags.
+ // There must be at least one term that isn't negated
+ // And we are going to re-order such that the first term is not a ! term
+ if (notFlag == null) {
+ notFlag = new boolean[terms.length];
+ for (int i = 0; i < terms.length; i++)
+ notFlag[i] = false;
+ }
+ if (notFlag[0]) {
+ for (int i = 1; i < notFlag.length; i++) {
+ if (notFlag[i] == false) {
+ Text swapFamily = new Text(terms[0]);
+ terms[0].set(terms[i]);
+ terms[i].set(swapFamily);
+ notFlag[0] = false;
+ notFlag[i] = true;
+ break;
+ }
+ }
+ if (notFlag[0]) {
+ throw new IllegalArgumentException("IntersectionIterator requires at lest one column family without not");
+ }
+ }
+
+ sources = new TermSource[terms.length];
+ sources[0] = new TermSource(source, terms[0]);
+ for (int i = 1; i < terms.length; i++) {
+ sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]);
+ }
+ sourcesCount = terms.length;
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+ overallRange = new Range(range);
+ currentPartition = new Text();
+ currentDocID.set(emptyByteArray);
+
+ // seek each of the sources to the right column family within the row given by key
+ for (int i = 0; i < sourcesCount; i++) {
+ Key sourceKey;
+ if (range.getStartKey() != null) {
+ if (range.getStartKey().getColumnQualifier() != null) {
+ sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term, range.getStartKey().getColumnQualifier());
+ } else {
+ sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term);
+ }
+ // Seek only to the term for this source as a column family
+ sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true);
+ } else {
+ // Seek only to the term for this source as a column family
+ sources[i].iter.seek(range, sources[i].seekColfams, true);
+ }
+ }
+ advanceToIntersection();
+ }
+
+ public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, Text term, boolean notFlag) {
+ // Check if we have space for the added Source
+ if (sources == null) {
+ sources = new TermSource[1];
+ } else {
+ // allocate space for node, and copy current tree.
+ // TODO: Should we change this to an ArrayList so that we can just add() ? - ACCUMULO-1309
+ TermSource[] localSources = new TermSource[sources.length + 1];
+ int currSource = 0;
+ for (TermSource myTerm : sources) {
+ // TODO: Do I need to call new here? or can I just re-use the term? - ACCUMULO-1309
+ localSources[currSource] = new TermSource(myTerm);
+ currSource++;
+ }
+ sources = localSources;
+ }
+ sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag);
+ sourcesCount++;
+ }
+
+ /**
+ * Encode the columns to be used when iterating.
- *
- * @param cfg
- * @param columns
+ */
+ public static void setColumnFamilies(IteratorSetting cfg, Text[] columns) {
+ if (columns.length < 2)
+ throw new IllegalArgumentException("Must supply at least two terms to intersect");
+ cfg.addOption(IntersectingIterator.columnFamiliesOptionName, IntersectingIterator.encodeColumns(columns));
+ }
+
+ /**
+ * Encode columns and NOT flags indicating which columns should be negated (docIDs will be excluded if matching negated columns, instead of included).
- *
- * @param cfg
- * @param columns
- * @param notFlags
+ */
+ public static void setColumnFamilies(IteratorSetting cfg, Text[] columns, boolean[] notFlags) {
+ if (columns.length < 2)
+ throw new IllegalArgumentException("Must supply at least two terms to intersect");
+ if (columns.length != notFlags.length)
+ throw new IllegalArgumentException("columns and notFlags arrays must be the same length");
+ setColumnFamilies(cfg, columns);
+ cfg.addOption(IntersectingIterator.notFlagOptionName, IntersectingIterator.encodeBooleans(notFlags));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
index a232796,0000000..2d2fa74
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowFilter.java
@@@ -1,165 -1,0 +1,164 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This iterator makes it easy to select rows that meet a given criteria. Its an alternative to the {@link WholeRowIterator}. There are a few things to consider
+ * when deciding which one to use.
+ *
+ * First the WholeRowIterator requires that the row fit in memory and that the entire row is read before a decision is made. This iterator has neither
+ * requirement, it allows seeking within a row to avoid reading the entire row to make a decision. So even if your rows fit into memory, this extending this
+ * iterator may be better choice because you can seek.
+ *
+ * Second the WholeRowIterator is currently the only way to achieve row isolation with the {@link BatchScanner}. With the normal {@link Scanner} row isolation
+ * can be enabled and this Iterator may be used.
+ *
+ * Third the row acceptance test will be executed every time this Iterator is seeked. If the row is large, then the row will fetched in batches of key/values.
+ * As each batch is fetched the test may be re-executed because the iterator stack is reseeked for each batch. The batch size may be increased to reduce the
+ * number of times the test is executed. With the normal Scanner, if isolation is enabled then it will read an entire row w/o seeking this iterator.
+ *
+ */
+public abstract class RowFilter extends WrappingIterator {
+
+ private RowIterator decisionIterator;
+ private Collection<ByteSequence> columnFamilies;
+ Text currentRow;
+ private boolean inclusive;
+ private Range range;
+ private boolean hasTop;
+
+ private static class RowIterator extends WrappingIterator {
+ private Range rowRange;
+ private boolean hasTop;
+
+ RowIterator(SortedKeyValueIterator<Key,Value> source) {
+ super.setSource(source);
+ }
+
+ void setRow(Range row) {
+ this.rowRange = row;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return hasTop && super.hasTop();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+
+ range = rowRange.clip(range, true);
+ if (range == null) {
+ hasTop = false;
+ } else {
+ hasTop = true;
+ super.seek(range, columnFamilies, inclusive);
+ }
+ }
+ }
+
+ private void skipRows() throws IOException {
+ SortedKeyValueIterator<Key,Value> source = getSource();
+ while (source.hasTop()) {
+ Text row = source.getTopKey().getRow();
+
+ if (currentRow != null && currentRow.equals(row))
+ break;
+
+ Range rowRange = new Range(row);
+ decisionIterator.setRow(rowRange);
+ decisionIterator.seek(rowRange, columnFamilies, inclusive);
+
+ if (acceptRow(decisionIterator)) {
+ currentRow = row;
+ break;
+ } else {
+ currentRow = null;
+ int count = 0;
+ while (source.hasTop() && count < 10 && source.getTopKey().getRow().equals(row)) {
+ count++;
+ source.next();
+ }
+
+ if (source.hasTop() && source.getTopKey().getRow().equals(row)) {
+ Range nextRow = new Range(row, false, null, false);
+ nextRow = range.clip(nextRow, true);
+ if (nextRow == null)
+ hasTop = false;
+ else
+ source.seek(nextRow, columnFamilies, inclusive);
+ }
+ }
+ }
+ }
+
+ /**
+ * Implementation should return false to suppress a row.
+ *
+ *
+ * @param rowIterator
+ * - An iterator over the row. This iterator is confined to the row. Seeking past the end of the row will return no data. Seeking before the row will
+ * always set top to the first column in the current row. By default this iterator will only see the columns the parent was seeked with. To see more
+ * columns reseek this iterator with those columns.
+ * @return false if a row should be suppressed, otherwise true.
- * @throws IOException
+ */
+ public abstract boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator) throws IOException;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.decisionIterator = new RowIterator(source.deepCopy(env));
+ }
+
+ @Override
+ public boolean hasTop() {
+ return hasTop && super.hasTop();
+ }
+
+ @Override
+ public void next() throws IOException {
+ super.next();
+ skipRows();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+ super.seek(range, columnFamilies, inclusive);
+ this.columnFamilies = columnFamilies;
+ this.inclusive = inclusive;
+ this.range = range;
+ currentRow = null;
+ hasTop = true;
+ skipRows();
+
+ }
+}