You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/04/09 19:57:42 UTC
[11/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/data/Range.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/data/Range.java
index 65873c3,0000000..122436b
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Range.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Range.java
@@@ -1,906 -1,0 +1,899 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
- import java.io.InvalidObjectException;
+import java.io.IOException;
++import java.io.InvalidObjectException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This class is used to specify a range of Accumulo Keys.
+ *
+ */
+
+public class Range implements WritableComparable<Range> {
+
+ private Key start;
+ private Key stop;
+ private boolean startKeyInclusive;
+ private boolean stopKeyInclusive;
+ private boolean infiniteStartKey;
+ private boolean infiniteStopKey;
+
+ /**
+ * Creates a range that goes from negative to positive infinity
+ */
+
+ public Range() {
+ this((Key) null, true, (Key) null, true);
+ }
+
+ /**
+ * Creates a range from startKey inclusive to endKey inclusive
+ *
+ * @param startKey
+ * set this to null when negative infinity is needed
+ * @param endKey
+ * set this to null when positive infinity is needed
+ */
+ public Range(Key startKey, Key endKey) {
+ this(startKey, true, endKey, true);
+ }
+
+ /**
+ * Creates a range that covers an entire row
+ *
+ * @param row
+ * set this to null to cover all rows
+ */
+ public Range(CharSequence row) {
+ this(row, true, row, true);
+ }
+
+ /**
+ * Creates a range that covers an entire row
+ *
+ * @param row
+ * set this to null to cover all rows
+ */
+ public Range(Text row) {
+ this(row, true, row, true);
+ }
+
+ /**
+ * Creates a range from startRow inclusive to endRow inclusive
+ *
+ * @param startRow
+ * set this to null when negative infinity is needed
+ * @param endRow
+ * set this to null when positive infinity is needed
+ */
+ public Range(Text startRow, Text endRow) {
+ this(startRow, true, endRow, true);
+ }
+
+ /**
+ * Creates a range from startRow inclusive to endRow inclusive
+ *
+ * @param startRow
+ * set this to null when negative infinity is needed
+ * @param endRow
+ * set this to null when positive infinity is needed
+ */
+ public Range(CharSequence startRow, CharSequence endRow) {
+ this(startRow, true, endRow, true);
+ }
+
+ /**
+ * Creates a range from startRow to endRow
+ *
+ * @param startRow
+ * set this to null when negative infinity is needed
+ * @param startRowInclusive
+ * determines if the start row is skipped
+ * @param endRow
+ * set this to null when positive infinity is needed
+ * @param endRowInclusive
+ * determines if the endRow is included
+ */
+
+ public Range(Text startRow, boolean startRowInclusive, Text endRow, boolean endRowInclusive) {
+ this((startRow == null ? null : (startRowInclusive ? new Key(startRow) : new Key(startRow).followingKey(PartialKey.ROW))), true, (endRow == null ? null
+ : (endRowInclusive ? new Key(endRow).followingKey(PartialKey.ROW) : new Key(endRow))), false);
+ }
+
+ /**
+ * Creates a range from startRow to endRow
+ *
+ * @param startRow
+ * set this to null when negative infinity is needed
+ * @param startRowInclusive
+ * determines if the start row is skipped
+ * @param endRow
+ * set this to null when positive infinity is needed
+ * @param endRowInclusive
+ * determines if the endRow is included
+ */
+
+ public Range(CharSequence startRow, boolean startRowInclusive, CharSequence endRow, boolean endRowInclusive) {
+ this(startRow == null ? null : new Text(startRow.toString()), startRowInclusive, endRow == null ? null : new Text(endRow.toString()), endRowInclusive);
+ }
+
+ /**
+ * Creates a range from startKey to endKey
+ *
+ * @param startKey
+ * set this to null when negative infinity is needed
+ * @param startKeyInclusive
+ * determines if the ranges includes the start key
+ * @param endKey
+ * set this to null when infinity is needed
+ * @param endKeyInclusive
+ * determines if the range includes the end key
+ */
+ public Range(Key startKey, boolean startKeyInclusive, Key endKey, boolean endKeyInclusive) {
+ this.start = startKey;
+ this.startKeyInclusive = startKeyInclusive;
+ this.infiniteStartKey = startKey == null;
+ this.stop = endKey;
+ this.stopKeyInclusive = endKeyInclusive;
+ this.infiniteStopKey = stop == null;
+
+ if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(endKey)) {
+ throw new IllegalArgumentException("Start key must be less than end key in range (" + startKey + ", " + endKey + ")");
+ }
+ }
+
+ /**
+ * Copies a range
+ */
+ public Range(Range range) {
+ this(range.start, range.startKeyInclusive, range.infiniteStartKey, range.stop, range.stopKeyInclusive, range.infiniteStopKey);
+ }
+
+ /**
+ * Creates a range from start to stop.
+ *
+ * @param start
+ * set this to null when negative infinity is needed
+ * @param stop
+ * set this to null when infinity is needed
+ * @param startKeyInclusive
+ * determines if the ranges includes the start key
+ * @param stopKeyInclusive
+ * determines if the range includes the end key
+ * @param infiniteStartKey
+ * true if start key is negative infinity (null)
+ * @param infiniteStopKey
+ * true if stop key is positive infinity (null)
+ * @throws IllegalArgumentException if stop is before start, or infiniteStartKey is true but start is not null, or infiniteStopKey is true but stop is not
+ * null
+ */
+ public Range(Key start, Key stop, boolean startKeyInclusive, boolean stopKeyInclusive, boolean infiniteStartKey, boolean infiniteStopKey) {
+ this(start, startKeyInclusive, infiniteStartKey, stop, stopKeyInclusive, infiniteStopKey);
+ if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) {
+ throw new IllegalArgumentException("Start key must be less than end key in range (" + start + ", " + stop + ")");
+ }
+ }
+
+ /**
+ * Creates a range from start to stop. Unlike the public six-argument method,
+ * this one does not assure that stop is after start, which helps performance
+ * in cases where that assurance is already in place.
+ *
+ * @param start
+ * set this to null when negative infinity is needed
+ * @param startKeyInclusive
+ * determines if the ranges includes the start key
+ * @param infiniteStartKey
+ * true if start key is negative infinity (null)
+ * @param stop
+ * set this to null when infinity is needed
+ * @param stopKeyInclusive
+ * determines if the range includes the end key
+ * @param infiniteStopKey
+ * true if stop key is positive infinity (null)
+ * @throws IllegalArgumentException if infiniteStartKey is true but start is not null, or infiniteStopKey is true but stop is not null
+ */
+ protected Range(Key start, boolean startKeyInclusive, boolean infiniteStartKey, Key stop, boolean stopKeyInclusive, boolean infiniteStopKey) {
+ if (infiniteStartKey && start != null)
+ throw new IllegalArgumentException();
+
+ if (infiniteStopKey && stop != null)
+ throw new IllegalArgumentException();
+
+ this.start = start;
+ this.stop = stop;
+ this.startKeyInclusive = startKeyInclusive;
+ this.stopKeyInclusive = stopKeyInclusive;
+ this.infiniteStartKey = infiniteStartKey;
+ this.infiniteStopKey = infiniteStopKey;
+ }
+
+ public Range(TRange trange) {
+ this(trange.start == null ? null : new Key(trange.start), trange.startKeyInclusive, trange.infiniteStartKey,
+ trange.stop == null ? null : new Key(trange.stop), trange.stopKeyInclusive, trange.infiniteStopKey);
+ if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) {
+ throw new IllegalArgumentException("Start key must be less than end key in range (" + start + ", " + stop + ")");
+ }
+ }
+
+ /**
+ * @return the first key in the range, null if the key is infinite
+ */
+ public Key getStartKey() {
+ if (infiniteStartKey) {
+ return null;
+ }
+ return start;
+ }
+
+ /**
+ *
- * @param key
+ * @return true if the given key is before the range, otherwise false
+ */
+ public boolean beforeStartKey(Key key) {
+ if (infiniteStartKey) {
+ return false;
+ }
+
+ if (startKeyInclusive)
+ return key.compareTo(start) < 0;
+ return key.compareTo(start) <= 0;
+ }
+
+ /**
+ * @return the last key in the range, null if the end key is infinite
+ */
+
+ public Key getEndKey() {
+ if (infiniteStopKey) {
+ return null;
+ }
+ return stop;
+ }
+
+ /**
- * @param key
+ * @return true if the given key is after the range, otherwise false
+ */
+
+ public boolean afterEndKey(Key key) {
+ if (infiniteStopKey)
+ return false;
+
+ if (stopKeyInclusive)
+ return stop.compareTo(key) < 0;
+ return stop.compareTo(key) <= 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int startHash = infiniteStartKey ? 0 : start.hashCode() + (startKeyInclusive ? 1 : 0);
+ int stopHash = infiniteStopKey ? 0 : stop.hashCode() + (stopKeyInclusive ? 1 : 0);
+
+ return startHash + stopHash;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Range)
+ return equals((Range) o);
+ return false;
+ }
+
+ public boolean equals(Range otherRange) {
+
+ return compareTo(otherRange) == 0;
+ }
+
+ /**
+ * Compares this range to another range. Compares in the order start key, inclusiveness of start key, end key, inclusiveness of end key. Infinite keys sort
+ * first, and non-infinite keys are compared with {@link Key#compareTo(Key)}. Inclusive sorts before non-inclusive.
+ */
++ @Override
+ public int compareTo(Range o) {
+ int comp;
+
+ if (infiniteStartKey)
+ if (o.infiniteStartKey)
+ comp = 0;
+ else
+ comp = -1;
+ else if (o.infiniteStartKey)
+ comp = 1;
+ else {
+ comp = start.compareTo(o.start);
+ if (comp == 0)
+ if (startKeyInclusive && !o.startKeyInclusive)
+ comp = -1;
+ else if (!startKeyInclusive && o.startKeyInclusive)
+ comp = 1;
+
+ }
+
+ if (comp == 0)
+ if (infiniteStopKey)
+ if (o.infiniteStopKey)
+ comp = 0;
+ else
+ comp = 1;
+ else if (o.infiniteStopKey)
+ comp = -1;
+ else {
+ comp = stop.compareTo(o.stop);
+ if (comp == 0)
+ if (stopKeyInclusive && !o.stopKeyInclusive)
+ comp = 1;
+ else if (!stopKeyInclusive && o.stopKeyInclusive)
+ comp = -1;
+ }
+
+ return comp;
+ }
+
+ /**
+ *
- * @param key
+ * @return true if the given key falls within the range
+ */
+ public boolean contains(Key key) {
+ return !beforeStartKey(key) && !afterEndKey(key);
+ }
+
+ /**
+ * Takes a collection on range and merges overlapping and adjacent ranges. For example given the following input
+ *
+ * <pre>
+ * [a,c], (c, d], (g,m), (j,t]
+ * </pre>
+ *
+ * the following ranges would be returned
+ *
+ * <pre>
+ * [a,d], (g,t]
+ * </pre>
+ *
- * @param ranges
+ * @return list of merged ranges
+ */
+
+ public static List<Range> mergeOverlapping(Collection<Range> ranges) {
+ if (ranges.size() == 0)
+ return Collections.emptyList();
+
+ List<Range> ral = new ArrayList<Range>(ranges);
+ Collections.sort(ral);
+
+ ArrayList<Range> ret = new ArrayList<Range>(ranges.size());
+
+ Range currentRange = ral.get(0);
+ boolean currentStartKeyInclusive = ral.get(0).startKeyInclusive;
+
+ for (int i = 1; i < ral.size(); i++) {
+ // because of inclusive switch, equal keys may not be seen
+
+ if (currentRange.infiniteStopKey) {
+ // this range has the minimal start key and
+ // an infinite end key so it will contain all
+ // other ranges
+ break;
+ }
+
+ Range range = ral.get(i);
+
+ boolean startKeysEqual;
+ if (range.infiniteStartKey) {
+ // previous start key must be infinite because it is sorted
+ assert (currentRange.infiniteStartKey);
+ startKeysEqual = true;
+ } else if (currentRange.infiniteStartKey) {
+ startKeysEqual = false;
+ } else if (currentRange.start.equals(range.start)) {
+ startKeysEqual = true;
+ } else {
+ startKeysEqual = false;
+ }
+
+ if (startKeysEqual || currentRange.contains(range.start)
+ || (!currentRange.stopKeyInclusive && range.startKeyInclusive && range.start.equals(currentRange.stop))) {
+ int cmp;
+
+ if (range.infiniteStopKey || (cmp = range.stop.compareTo(currentRange.stop)) > 0 || (cmp == 0 && range.stopKeyInclusive)) {
+ currentRange = new Range(currentRange.getStartKey(), currentStartKeyInclusive, range.getEndKey(), range.stopKeyInclusive);
+ }/* else currentRange contains ral.get(i) */
+ } else {
+ ret.add(currentRange);
+ currentRange = range;
+ currentStartKeyInclusive = range.startKeyInclusive;
+ }
+ }
+
+ ret.add(currentRange);
+
+ return ret;
+ }
+
+ /**
+ * Creates a range which represents the intersection of this range and the passed in range. The following example will print true.
+ *
+ * <pre>
+ * Range range1 = new Range("a", "f");
+ * Range range2 = new Range("c", "n");
+ * Range range3 = range1.clip(range2);
+ * System.out.println(range3.equals(new Range("c", "f")));
+ * </pre>
+ *
- * @param range
+ * @return the intersection
+ * @throws IllegalArgumentException
+ * if ranges does not overlap
+ */
+
+ public Range clip(Range range) {
+ return clip(range, false);
+ }
+
+ /**
+ * Same as other clip function except if gives the option to return null of the ranges do not overlap instead of throwing an exception.
+ *
+ * @see Range#clip(Range)
- * @param range
+ * @param returnNullIfDisjoint
+ * If the ranges do not overlap and true is passed, then null is returned otherwise an exception is thrown.
+ * @return the intersection
+ */
+
+ public Range clip(Range range, boolean returnNullIfDisjoint) {
+
+ Key sk = range.getStartKey();
+ boolean ski = range.isStartKeyInclusive();
+
+ Key ek = range.getEndKey();
+ boolean eki = range.isEndKeyInclusive();
+
+ if (range.getStartKey() == null) {
+ if (getStartKey() != null) {
+ sk = getStartKey();
+ ski = isStartKeyInclusive();
+ }
+ } else if (afterEndKey(range.getStartKey())
+ || (getEndKey() != null && range.getStartKey().equals(getEndKey()) && !(range.isStartKeyInclusive() && isEndKeyInclusive()))) {
+ if (returnNullIfDisjoint)
+ return null;
+ throw new IllegalArgumentException("Range " + range + " does not overlap " + this);
+ } else if (beforeStartKey(range.getStartKey())) {
+ sk = getStartKey();
+ ski = isStartKeyInclusive();
+ }
+
+ if (range.getEndKey() == null) {
+ if (getEndKey() != null) {
+ ek = getEndKey();
+ eki = isEndKeyInclusive();
+ }
+ } else if (beforeStartKey(range.getEndKey())
+ || (getStartKey() != null && range.getEndKey().equals(getStartKey()) && !(range.isEndKeyInclusive() && isStartKeyInclusive()))) {
+ if (returnNullIfDisjoint)
+ return null;
+ throw new IllegalArgumentException("Range " + range + " does not overlap " + this);
+ } else if (afterEndKey(range.getEndKey())) {
+ ek = getEndKey();
+ eki = isEndKeyInclusive();
+ }
+
+ return new Range(sk, ski, ek, eki);
+ }
+
+ /**
+ * Creates a new range that is bounded by the columns passed in. The stary key in the returned range will have a column >= to the minimum column. The end key
+ * in the returned range will have a column <= the max column.
+ *
- *
- * @param min
- * @param max
+ * @return a column bounded range
+ * @throws IllegalArgumentException
+ * if min > max
+ */
+
+ public Range bound(Column min, Column max) {
+
+ if (min.compareTo(max) > 0) {
+ throw new IllegalArgumentException("min column > max column " + min + " " + max);
+ }
+
+ Key sk = getStartKey();
+ boolean ski = isStartKeyInclusive();
+
+ if (sk != null) {
+
+ ByteSequence cf = sk.getColumnFamilyData();
+ ByteSequence cq = sk.getColumnQualifierData();
+
+ ByteSequence mincf = new ArrayByteSequence(min.columnFamily);
+ ByteSequence mincq;
+
+ if (min.columnQualifier != null)
+ mincq = new ArrayByteSequence(min.columnQualifier);
+ else
+ mincq = new ArrayByteSequence(new byte[0]);
+
+ int cmp = cf.compareTo(mincf);
+
+ if (cmp < 0 || (cmp == 0 && cq.compareTo(mincq) < 0)) {
+ ski = true;
+ sk = new Key(sk.getRowData().toArray(), mincf.toArray(), mincq.toArray(), new byte[0], Long.MAX_VALUE, true);
+ }
+ }
+
+ Key ek = getEndKey();
+ boolean eki = isEndKeyInclusive();
+
+ if (ek != null) {
+ ByteSequence row = ek.getRowData();
+ ByteSequence cf = ek.getColumnFamilyData();
+ ByteSequence cq = ek.getColumnQualifierData();
+ ByteSequence cv = ek.getColumnVisibilityData();
+
+ ByteSequence maxcf = new ArrayByteSequence(max.columnFamily);
+ ByteSequence maxcq = null;
+ if (max.columnQualifier != null)
+ maxcq = new ArrayByteSequence(max.columnQualifier);
+
+ boolean set = false;
+
+ int comp = cf.compareTo(maxcf);
+
+ if (comp > 0) {
+ set = true;
+ } else if (comp == 0 && maxcq != null && cq.compareTo(maxcq) > 0) {
+ set = true;
+ } else if (!eki && row.length() > 0 && row.byteAt(row.length() - 1) == 0 && cf.length() == 0 && cq.length() == 0 && cv.length() == 0
+ && ek.getTimestamp() == Long.MAX_VALUE) {
+ row = row.subSequence(0, row.length() - 1);
+ set = true;
+ }
+
+ if (set) {
+ eki = false;
+ if (maxcq == null)
+ ek = new Key(row.toArray(), maxcf.toArray(), new byte[0], new byte[0], 0, false).followingKey(PartialKey.ROW_COLFAM);
+ else
+ ek = new Key(row.toArray(), maxcf.toArray(), maxcq.toArray(), new byte[0], 0, false).followingKey(PartialKey.ROW_COLFAM_COLQUAL);
+ }
+ }
+
+ return new Range(sk, ski, ek, eki);
+ }
+
++ @Override
+ public String toString() {
+ return ((startKeyInclusive && start != null) ? "[" : "(") + (start == null ? "-inf" : start) + "," + (stop == null ? "+inf" : stop)
+ + ((stopKeyInclusive && stop != null) ? "]" : ")");
+ }
+
++ @Override
+ public void readFields(DataInput in) throws IOException {
+ infiniteStartKey = in.readBoolean();
+ infiniteStopKey = in.readBoolean();
+ if (!infiniteStartKey) {
+ start = new Key();
+ start.readFields(in);
+ } else {
+ start = null;
+ }
+
+ if (!infiniteStopKey) {
+ stop = new Key();
+ stop.readFields(in);
+ } else {
+ stop = null;
+ }
+
+ startKeyInclusive = in.readBoolean();
+ stopKeyInclusive = in.readBoolean();
+
+ if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) {
+ throw new InvalidObjectException("Start key must be less than end key in range (" + start + ", " + stop + ")");
+ }
+ }
+
++ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(infiniteStartKey);
+ out.writeBoolean(infiniteStopKey);
+ if (!infiniteStartKey)
+ start.write(out);
+ if (!infiniteStopKey)
+ stop.write(out);
+ out.writeBoolean(startKeyInclusive);
+ out.writeBoolean(stopKeyInclusive);
+ }
+
+ public boolean isStartKeyInclusive() {
+ return startKeyInclusive;
+ }
+
+ public boolean isEndKeyInclusive() {
+ return stopKeyInclusive;
+ }
+
+ public TRange toThrift() {
+ return new TRange(start == null ? null : start.toThrift(), stop == null ? null : stop.toThrift(), startKeyInclusive, stopKeyInclusive, infiniteStartKey,
+ infiniteStopKey);
+ }
+
+ public boolean isInfiniteStartKey() {
+ return infiniteStartKey;
+ }
+
+ public boolean isInfiniteStopKey() {
+ return infiniteStopKey;
+ }
+
+ /**
+ * Creates a range that covers an exact row Returns the same Range as new Range(row)
+ *
+ * @param row
+ * all keys in the range will have this row
+ */
+ public static Range exact(Text row) {
+ return new Range(row);
+ }
+
+ /**
+ * Creates a range that covers an exact row and column family
+ *
+ * @param row
+ * all keys in the range will have this row
+ *
+ * @param cf
+ * all keys in the range will have this column family
+ */
+ public static Range exact(Text row, Text cf) {
+ Key startKey = new Key(row, cf);
+ return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false);
+ }
+
+ /**
+ * Creates a range that covers an exact row, column family, and column qualifier
+ *
+ * @param row
+ * all keys in the range will have this row
+ *
+ * @param cf
+ * all keys in the range will have this column family
+ *
+ * @param cq
+ * all keys in the range will have this column qualifier
+ */
+ public static Range exact(Text row, Text cf, Text cq) {
+ Key startKey = new Key(row, cf, cq);
+ return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), false);
+ }
+
+ /**
+ * Creates a range that covers an exact row, column family, column qualifier, and visibility
+ *
+ * @param row
+ * all keys in the range will have this row
+ *
+ * @param cf
+ * all keys in the range will have this column family
+ *
+ * @param cq
+ * all keys in the range will have this column qualifier
+ *
+ * @param cv
+ * all keys in the range will have this column visibility
+ */
+ public static Range exact(Text row, Text cf, Text cq, Text cv) {
+ Key startKey = new Key(row, cf, cq, cv);
+ return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS), false);
+ }
+
+ /**
+ * Creates a range that covers an exact row, column family, column qualifier, visibility, and timestamp
+ *
+ * @param row
+ * all keys in the range will have this row
+ *
+ * @param cf
+ * all keys in the range will have this column family
+ *
+ * @param cq
+ * all keys in the range will have this column qualifier
+ *
+ * @param cv
+ * all keys in the range will have this column visibility
+ *
+ * @param ts
+ * all keys in the range will have this timestamp
+ */
+ public static Range exact(Text row, Text cf, Text cq, Text cv, long ts) {
+ Key startKey = new Key(row, cf, cq, cv, ts);
+ return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME), false);
+ }
+
+ /**
+ * Returns a Text that sorts just after all Texts beginning with a prefix
- *
- * @param prefix
+ */
+ public static Text followingPrefix(Text prefix) {
+ byte[] prefixBytes = prefix.getBytes();
+
+ // find the last byte in the array that is not 0xff
+ int changeIndex = prefix.getLength() - 1;
+ while (changeIndex >= 0 && prefixBytes[changeIndex] == (byte) 0xff)
+ changeIndex--;
+ if (changeIndex < 0)
+ return null;
+
+ // copy prefix bytes into new array
+ byte[] newBytes = new byte[changeIndex + 1];
+ System.arraycopy(prefixBytes, 0, newBytes, 0, changeIndex + 1);
+
+ // increment the selected byte
+ newBytes[changeIndex]++;
+ return new Text(newBytes);
+ }
+
+ /**
+ * Returns a Range that covers all rows beginning with a prefix
+ *
+ * @param rowPrefix
+ * all keys in the range will have rows that begin with this prefix
+ */
+ public static Range prefix(Text rowPrefix) {
+ Text fp = followingPrefix(rowPrefix);
+ return new Range(new Key(rowPrefix), true, fp == null ? null : new Key(fp), false);
+ }
+
+ /**
+ * Returns a Range that covers all column families beginning with a prefix within a given row
+ *
+ * @param row
+ * all keys in the range will have this row
+ *
+ * @param cfPrefix
+ * all keys in the range will have column families that begin with this prefix
+ */
+ public static Range prefix(Text row, Text cfPrefix) {
+ Text fp = followingPrefix(cfPrefix);
+ return new Range(new Key(row, cfPrefix), true, fp == null ? new Key(row).followingKey(PartialKey.ROW) : new Key(row, fp), false);
+ }
+
+ /**
+ * Returns a Range that covers all column qualifiers beginning with a prefix within a given row and column family
+ *
+ * @param row
+ * all keys in the range will have this row
+ *
+ * @param cf
+ * all keys in the range will have this column family
+ *
+ * @param cqPrefix
+ * all keys in the range will have column qualifiers that begin with this prefix
+ */
+ public static Range prefix(Text row, Text cf, Text cqPrefix) {
+ Text fp = followingPrefix(cqPrefix);
+ return new Range(new Key(row, cf, cqPrefix), true, fp == null ? new Key(row, cf).followingKey(PartialKey.ROW_COLFAM) : new Key(row, cf, fp), false);
+ }
+
+ /**
+ * Returns a Range that covers all column visibilities beginning with a prefix within a given row, column family, and column qualifier
+ *
+ * @param row
+ * all keys in the range will have this row
+ *
+ * @param cf
+ * all keys in the range will have this column family
+ *
+ * @param cq
+ * all keys in the range will have this column qualifier
+ *
+ * @param cvPrefix
+ * all keys in the range will have column visibilities that begin with this prefix
+ */
+ public static Range prefix(Text row, Text cf, Text cq, Text cvPrefix) {
+ Text fp = followingPrefix(cvPrefix);
+ return new Range(new Key(row, cf, cq, cvPrefix), true, fp == null ? new Key(row, cf, cq).followingKey(PartialKey.ROW_COLFAM_COLQUAL) : new Key(row, cf, cq,
+ fp), false);
+ }
+
+ /**
+ * Creates a range that covers an exact row
+ *
+ * @see Range#exact(Text)
+ */
+ public static Range exact(CharSequence row) {
+ return Range.exact(new Text(row.toString()));
+ }
+
+ /**
+ * Creates a range that covers an exact row and column family
+ *
+ * @see Range#exact(Text, Text)
+ */
+ public static Range exact(CharSequence row, CharSequence cf) {
+ return Range.exact(new Text(row.toString()), new Text(cf.toString()));
+ }
+
+ /**
+ * Creates a range that covers an exact row, column family, and column qualifier
+ *
+ * @see Range#exact(Text, Text, Text)
+ */
+ public static Range exact(CharSequence row, CharSequence cf, CharSequence cq) {
+ return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()));
+ }
+
+ /**
+ * Creates a range that covers an exact row, column family, column qualifier, and visibility
+ *
+ * @see Range#exact(Text, Text, Text, Text)
+ */
+ public static Range exact(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv) {
+ return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString()));
+ }
+
+ /**
+ * Creates a range that covers an exact row, column family, column qualifier, visibility, and timestamp
+ *
+ * @see Range#exact(Text, Text, Text, Text, long)
+ */
+ public static Range exact(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv, long ts) {
+ return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString()), ts);
+ }
+
+ /**
+ * Returns a Range that covers all rows beginning with a prefix
+ *
+ * @see Range#prefix(Text)
+ */
+ public static Range prefix(CharSequence rowPrefix) {
+ return Range.prefix(new Text(rowPrefix.toString()));
+ }
+
+ /**
+ * Returns a Range that covers all column families beginning with a prefix within a given row
+ *
+ * @see Range#prefix(Text, Text)
+ */
+ public static Range prefix(CharSequence row, CharSequence cfPrefix) {
+ return Range.prefix(new Text(row.toString()), new Text(cfPrefix.toString()));
+ }
+
+ /**
+ * Returns a Range that covers all column qualifiers beginning with a prefix within a given row and column family
+ *
+ * @see Range#prefix(Text, Text, Text)
+ */
+ public static Range prefix(CharSequence row, CharSequence cf, CharSequence cqPrefix) {
+ return Range.prefix(new Text(row.toString()), new Text(cf.toString()), new Text(cqPrefix.toString()));
+ }
+
+ /**
+ * Returns a Range that covers all column visibilities beginning with a prefix within a given row, column family, and column qualifier
+ *
+ * @see Range#prefix(Text, Text, Text, Text)
+ */
+ public static Range prefix(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cvPrefix) {
+ return Range.prefix(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cvPrefix.toString()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
index 2b3cdf5,0000000..0ac5308
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
@@@ -1,181 -1,0 +1,176 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
+
+/**
+ *
+ */
+public class BlockIndex {
+
+ public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
+
+ BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
+
+ int accessCount = blockIndex.accessCount.incrementAndGet();
+
+ // 1 is a power of two, but do not care about it
+ if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
+ blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
+ }
+
+ if (blockIndex.blockIndex != null)
+ return blockIndex;
+
+ return null;
+ }
+
+ private static boolean isPowerOfTwo(int x) {
+ return ((x > 0) && (x & (x - 1)) == 0);
+ }
+
+ private AtomicInteger accessCount = new AtomicInteger(0);
+ private volatile BlockIndexEntry[] blockIndex = null;
+
+ public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
+
+ private Key prevKey;
+ private int entriesLeft;
+ private int pos;
+
+ public BlockIndexEntry(int pos, int entriesLeft, Key prevKey) {
+ this.pos = pos;
+ this.entriesLeft = entriesLeft;
+ this.prevKey = prevKey;
+ }
+
- /**
- * @param key
- */
+ public BlockIndexEntry(Key key) {
+ this.prevKey = key;
+ }
-
-
+
+ public int getEntriesLeft() {
+ return entriesLeft;
+ }
+
+ @Override
+ public int compareTo(BlockIndexEntry o) {
+ return prevKey.compareTo(o.prevKey);
+ }
+
+ @Override
+ public String toString() {
+ return prevKey + " " + entriesLeft + " " + pos;
+ }
+
+ public Key getPrevKey() {
+ return prevKey;
+ }
+ }
+
+ public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
+
+ // get a local ref to the index, another thread could change it
+ BlockIndexEntry[] blockIndex = this.blockIndex;
+
+ int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
+
+ int index;
+
+ if (pos < 0) {
+ if (pos == -1)
+ return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning
+ // of block
+ index = (pos * -1) - 2;
+ } else {
+ // found exact key in index
+ index = pos;
+ while (index > 0) {
+ if (blockIndex[index].getPrevKey().equals(startKey))
+ index--;
+ else
+ break;
+ }
+ }
+
+ // handle case where multiple keys in block are exactly the same, want to find the earliest key in the index
+ while (index - 1 > 0) {
+ if (blockIndex[index].getPrevKey().equals(blockIndex[index - 1].getPrevKey()))
+ index--;
+ else
+ break;
+
+ }
+
+ if (index == 0 && blockIndex[index].getPrevKey().equals(startKey))
+ return null;
+
+ BlockIndexEntry bie = blockIndex[index];
+ cacheBlock.seek(bie.pos);
+ return bie;
+ }
+
+ private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
+ cacheBlock.seek(0);
+
+ RelativeKey rk = new RelativeKey();
+ Value val = new Value();
+
+ int interval = indexEntry.getNumEntries() / indexEntries;
+
+ if (interval <= 32)
+ return;
+
+ // multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one
+ if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
+ return;
+
+ int count = 0;
+
+ ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1);
+
+ while (count < (indexEntry.getNumEntries() - interval + 1)) {
+
+ Key myPrevKey = rk.getKey();
+ int pos = cacheBlock.getPosition();
+ rk.readFields(cacheBlock);
+ val.readFields(cacheBlock);
+
+ if (count > 0 && count % interval == 0) {
+ index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, myPrevKey));
+ }
+
+ count++;
+ }
+
+ this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]);
+
+ cacheBlock.seek(0);
+ }
+
+ BlockIndexEntry[] getIndexEntries() {
+ return blockIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 7277c65,0000000..7d15851
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@@ -1,971 -1,0 +1,965 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead;
+import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.Scalar;
+import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarComparator;
+import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarLong;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Block Compressed file, the underlying physical storage layer for TFile. BCFile provides the basic block level compression for the data block and meta blocks.
+ * It is separated from TFile as it may be used for other block-compressed file implementation.
+ */
+public final class BCFile {
+ // the current version of BCFile impl, increment them (major or minor) made
+ // enough changes
+ static final Version API_VERSION = new Version((short) 1, (short) 0);
+ static final Log LOG = LogFactory.getLog(BCFile.class);
+
+ /**
+ * Prevent the instantiation of BCFile objects.
+ */
+ private BCFile() {
+ // nothing
+ }
+
+ /**
+ * BCFile writer, the entry point for creating a new BCFile.
+ */
+ static public class Writer implements Closeable {
+ private final FSDataOutputStream out;
+ private final Configuration conf;
+ // the single meta block containing index of compressed data blocks
+ final DataIndex dataIndex;
+ // index for meta blocks
+ final MetaIndex metaIndex;
+ boolean blkInProgress = false;
+ private boolean metaBlkSeen = false;
+ private boolean closed = false;
+ long errorCount = 0;
+ // reusable buffers.
+ private BytesWritable fsOutputBuffer;
+
+ /**
+ * Call-back interface to register a block after a block is closed.
+ */
+ private static interface BlockRegister {
+ /**
+ * Register a block that is fully closed.
+ *
+ * @param raw
+ * The size of block in terms of uncompressed bytes.
+ * @param offsetStart
+ * The start offset of the block.
+ * @param offsetEnd
+ * One byte after the end of the block. Compressed block size is offsetEnd - offsetStart.
+ */
+ public void register(long raw, long offsetStart, long offsetEnd);
+ }
+
+ /**
+ * Intermediate class that maintain the state of a Writable Compression Block.
+ */
+ private static final class WBlockState {
+ private final Algorithm compressAlgo;
+ private Compressor compressor; // !null only if using native
+ // Hadoop compression
+ private final FSDataOutputStream fsOut;
+ private final long posStart;
+ private final SimpleBufferedOutputStream fsBufferedOutput;
+ private OutputStream out;
+
+ /**
+ * @param compressionAlgo
+ * The compression algorithm to be used to for compression.
- * @throws IOException
+ */
+ public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
+ this.compressAlgo = compressionAlgo;
+ this.fsOut = fsOut;
+ this.posStart = fsOut.getPos();
+
+ fsOutputBuffer.setCapacity(TFile.getFSOutputBufferSize(conf));
+
+ this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
+ this.compressor = compressAlgo.getCompressor();
+
+ try {
+ this.out = compressionAlgo.createCompressionStream(fsBufferedOutput, compressor, 0);
+ } catch (IOException e) {
+ compressAlgo.returnCompressor(compressor);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the output stream for BlockAppender's consumption.
+ *
+ * @return the output stream suitable for writing block data.
+ */
+ OutputStream getOutputStream() {
+ return out;
+ }
+
+ /**
+ * Get the current position in file.
+ *
+ * @return The current byte offset in underlying file.
+ * @throws IOException
+ */
+ long getCurrentPos() throws IOException {
+ return fsOut.getPos() + fsBufferedOutput.size();
+ }
+
+ long getStartPos() {
+ return posStart;
+ }
+
+ /**
+ * Current size of compressed data.
+ *
+ * @throws IOException
+ */
+ long getCompressedSize() throws IOException {
+ long ret = getCurrentPos() - posStart;
+ return ret;
+ }
+
+ /**
+ * Finishing up the current block.
+ */
+ public void finish() throws IOException {
+ try {
+ if (out != null) {
+ out.flush();
+ out = null;
+ }
+ } finally {
+ compressAlgo.returnCompressor(compressor);
+ compressor = null;
+ }
+ }
+ }
+
+ /**
+ * Access point to stuff data into a block.
+ *
+ */
+ public class BlockAppender extends DataOutputStream {
+ private final BlockRegister blockRegister;
+ private final WBlockState wBlkState;
+ private boolean closed = false;
+
+ /**
+ * Constructor
+ *
+ * @param register
+ * the block register, which is called when the block is closed.
+ * @param wbs
+ * The writable compression block state.
+ */
+ BlockAppender(BlockRegister register, WBlockState wbs) {
+ super(wbs.getOutputStream());
+ this.blockRegister = register;
+ this.wBlkState = wbs;
+ }
+
+ /**
+ * Get the raw size of the block.
+ *
+ * @return the number of uncompressed bytes written through the BlockAppender so far.
- * @throws IOException
+ */
+ public long getRawSize() throws IOException {
+ /**
+ * Expecting the size() of a block not exceeding 4GB. Assuming the size() will wrap to negative integer if it exceeds 2GB.
+ */
+ return size() & 0x00000000ffffffffL;
+ }
+
+ /**
+ * Get the compressed size of the block in progress.
+ *
+ * @return the number of compressed bytes written to the underlying FS file. The size may be smaller than actual need to compress the all data written due
+ * to internal buffering inside the compressor.
- * @throws IOException
+ */
+ public long getCompressedSize() throws IOException {
+ return wBlkState.getCompressedSize();
+ }
+
+ public long getStartPos() {
+ return wBlkState.getStartPos();
+ }
+
+ @Override
+ public void flush() {
+ // The down stream is a special kind of stream that finishes a
+ // compression block upon flush. So we disable flush() here.
+ }
+
+ /**
+ * Signaling the end of write to the block. The block register will be called for registering the finished block.
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed == true) {
+ return;
+ }
+ try {
+ ++errorCount;
+ wBlkState.finish();
+ blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos());
+ --errorCount;
+ } finally {
+ closed = true;
+ blkInProgress = false;
+ }
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param fout
+ * FS output stream.
+ * @param compressionName
+ * Name of the compression algorithm, which will be used for all data blocks.
- * @throws IOException
+ * @see Compression#getSupportedAlgorithms
+ */
+ public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks) throws IOException {
+ if (fout.getPos() != 0) {
+ throw new IOException("Output file not at zero offset.");
+ }
+
+ this.out = fout;
+ this.conf = conf;
+ dataIndex = new DataIndex(compressionName, trackDataBlocks);
+ metaIndex = new MetaIndex();
+ fsOutputBuffer = new BytesWritable();
+ Magic.write(fout);
+ }
+
+ /**
+ * Close the BCFile Writer. Attempting to use the Writer after calling <code>close</code> is not allowed and may lead to undetermined results.
+ */
++ @Override
+ public void close() throws IOException {
+ if (closed == true) {
+ return;
+ }
+
+ try {
+ if (errorCount == 0) {
+ if (blkInProgress == true) {
+ throw new IllegalStateException("Close() called with active block appender.");
+ }
+
+ // add metaBCFileIndex to metaIndex as the last meta block
+ BlockAppender appender = prepareMetaBlock(DataIndex.BLOCK_NAME, getDefaultCompressionAlgorithm());
+ try {
+ dataIndex.write(appender);
+ } finally {
+ appender.close();
+ }
+
+ long offsetIndexMeta = out.getPos();
+ metaIndex.write(out);
+
+ // Meta Index and the trailing section are written out directly.
+ out.writeLong(offsetIndexMeta);
+
+ API_VERSION.write(out);
+ Magic.write(out);
+ out.flush();
+ }
+ } finally {
+ closed = true;
+ }
+ }
+
+ private Algorithm getDefaultCompressionAlgorithm() {
+ return dataIndex.getDefaultCompressionAlgorithm();
+ }
+
+ private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo) throws IOException, MetaBlockAlreadyExists {
+ if (blkInProgress == true) {
+ throw new IllegalStateException("Cannot create Meta Block until previous block is closed.");
+ }
+
+ if (metaIndex.getMetaByName(name) != null) {
+ throw new MetaBlockAlreadyExists("name=" + name);
+ }
+
+ MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
+ WBlockState wbs = new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
+ BlockAppender ba = new BlockAppender(mbr, wbs);
+ blkInProgress = true;
+ metaBlkSeen = true;
+ return ba;
+ }
+
+ /**
+ * Create a Meta Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Regular
+ * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
+ *
+ * @param name
+ * The name of the Meta Block. The name must not conflict with existing Meta Blocks.
+ * @param compressionName
+ * The name of the compression algorithm to be used.
+ * @return The BlockAppender stream
- * @throws IOException
+ * @throws MetaBlockAlreadyExists
+ * If the meta block with the name already exists.
+ */
+ public BlockAppender prepareMetaBlock(String name, String compressionName) throws IOException, MetaBlockAlreadyExists {
+ return prepareMetaBlock(name, Compression.getCompressionAlgorithmByName(compressionName));
+ }
+
+ /**
+ * Create a Meta Block and obtain an output stream for adding data into the block. The Meta Block will be compressed with the same compression algorithm as
+ * data blocks. There can only be one BlockAppender stream active at any time. Regular Blocks may not be created after the first Meta Blocks. The caller
+ * must call BlockAppender.close() to conclude the block creation.
+ *
+ * @param name
+ * The name of the Meta Block. The name must not conflict with existing Meta Blocks.
+ * @return The BlockAppender stream
+ * @throws MetaBlockAlreadyExists
+ * If the meta block with the name already exists.
- * @throws IOException
+ */
+ public BlockAppender prepareMetaBlock(String name) throws IOException, MetaBlockAlreadyExists {
+ return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
+ }
+
+ /**
+ * Create a Data Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Data
+ * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
+ *
+ * @return The BlockAppender stream
- * @throws IOException
+ */
+ public BlockAppender prepareDataBlock() throws IOException {
+ if (blkInProgress == true) {
+ throw new IllegalStateException("Cannot create Data Block until previous block is closed.");
+ }
+
+ if (metaBlkSeen == true) {
+ throw new IllegalStateException("Cannot create Data Block after Meta Blocks.");
+ }
+
+ DataBlockRegister dbr = new DataBlockRegister();
+
+ WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf);
+ BlockAppender ba = new BlockAppender(dbr, wbs);
+ blkInProgress = true;
+ return ba;
+ }
+
+ /**
+ * Callback to make sure a meta block is added to the internal list when its stream is closed.
+ */
+ private class MetaBlockRegister implements BlockRegister {
+ private final String name;
+ private final Algorithm compressAlgo;
+
+ MetaBlockRegister(String name, Algorithm compressAlgo) {
+ this.name = name;
+ this.compressAlgo = compressAlgo;
+ }
+
++ @Override
+ public void register(long raw, long begin, long end) {
+ metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw)));
+ }
+ }
+
+ /**
+ * Callback to make sure a data block is added to the internal list when it's being closed.
+ *
+ */
+ private class DataBlockRegister implements BlockRegister {
+ DataBlockRegister() {
+ // do nothing
+ }
+
++ @Override
+ public void register(long raw, long begin, long end) {
+ dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
+ }
+ }
+ }
+
+ /**
+ * BCFile Reader, interface to read the file's data and meta blocks.
+ */
+ static public class Reader implements Closeable {
+ private static final String META_NAME = "BCFile.metaindex";
+ private final FSDataInputStream in;
+ private final Configuration conf;
+ final DataIndex dataIndex;
+ // Index for meta blocks
+ final MetaIndex metaIndex;
+ final Version version;
+
+ /**
+ * Intermediate class that maintain the state of a Readable Compression Block.
+ */
+ static private final class RBlockState {
+ private final Algorithm compressAlgo;
+ private Decompressor decompressor;
+ private final BlockRegion region;
+ private final InputStream in;
+
+ public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf) throws IOException {
+ this.compressAlgo = compressionAlgo;
+ this.region = region;
+ this.decompressor = compressionAlgo.getDecompressor();
+
+ try {
+ this.in = compressAlgo.createDecompressionStream(new BoundedRangeFileInputStream(fsin, this.region.getOffset(), this.region.getCompressedSize()),
+ decompressor, TFile.getFSInputBufferSize(conf));
+ } catch (IOException e) {
+ compressAlgo.returnDecompressor(decompressor);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the output stream for BlockAppender's consumption.
+ *
+ * @return the output stream suitable for writing block data.
+ */
+ public InputStream getInputStream() {
+ return in;
+ }
+
+ public String getCompressionName() {
+ return compressAlgo.getName();
+ }
+
+ public BlockRegion getBlockRegion() {
+ return region;
+ }
+
+ public void finish() throws IOException {
+ try {
+ in.close();
+ } finally {
+ compressAlgo.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+
+ /**
+ * Access point to read a block.
+ */
+ public static class BlockReader extends DataInputStream {
+ private final RBlockState rBlkState;
+ private boolean closed = false;
+
+ BlockReader(RBlockState rbs) {
+ super(rbs.getInputStream());
+ rBlkState = rbs;
+ }
+
+ /**
+ * Finishing reading the block. Release all resources.
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed == true) {
+ return;
+ }
+ try {
+ // Do not set rBlkState to null. People may access stats after calling
+ // close().
+ rBlkState.finish();
+ } finally {
+ closed = true;
+ }
+ }
+
+ /**
+ * Get the name of the compression algorithm used to compress the block.
+ *
+ * @return name of the compression algorithm.
+ */
+ public String getCompressionName() {
+ return rBlkState.getCompressionName();
+ }
+
+ /**
+ * Get the uncompressed size of the block.
+ *
+ * @return uncompressed size of the block.
+ */
+ public long getRawSize() {
+ return rBlkState.getBlockRegion().getRawSize();
+ }
+
+ /**
+ * Get the compressed size of the block.
+ *
+ * @return compressed size of the block.
+ */
+ public long getCompressedSize() {
+ return rBlkState.getBlockRegion().getCompressedSize();
+ }
+
+ /**
+ * Get the starting position of the block in the file.
+ *
+ * @return the starting position of the block in the file.
+ */
+ public long getStartPos() {
+ return rBlkState.getBlockRegion().getOffset();
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param fin
+ * FS input stream.
+ * @param fileLength
+ * Length of the corresponding file
- * @throws IOException
+ */
+ public Reader(FSDataInputStream fin, long fileLength, Configuration conf) throws IOException {
+ this.in = fin;
+ this.conf = conf;
+
+ // move the cursor to the beginning of the tail, containing: offset to the
+ // meta block index, version and magic
+ fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE);
+ long offsetIndexMeta = fin.readLong();
+ version = new Version(fin);
+ Magic.readAndVerify(fin);
+
+ if (!version.compatibleWith(BCFile.API_VERSION)) {
+ throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
+ }
+
+ // read meta index
+ fin.seek(offsetIndexMeta);
+ metaIndex = new MetaIndex(fin);
+
+ // read data:BCFile.index, the data block index
+ BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
+ try {
+ dataIndex = new DataIndex(blockR);
+ } finally {
+ blockR.close();
+ }
+ }
+
+ public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf) throws IOException {
+ this.in = fin;
+ this.conf = conf;
+
+ BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
+ BlockRead cachedDataIndex = cache.getCachedMetaBlock(DataIndex.BLOCK_NAME);
+
+ if (cachedMetaIndex == null || cachedDataIndex == null) {
+ // move the cursor to the beginning of the tail, containing: offset to the
+ // meta block index, version and magic
+ fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE);
+ long offsetIndexMeta = fin.readLong();
+ version = new Version(fin);
+ Magic.readAndVerify(fin);
+
+ if (!version.compatibleWith(BCFile.API_VERSION)) {
+ throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
+ }
+
+ // read meta index
+ fin.seek(offsetIndexMeta);
+ metaIndex = new MetaIndex(fin);
+ if (cachedMetaIndex == null) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ metaIndex.write(dos);
+ dos.close();
+ cache.cacheMetaBlock(META_NAME, baos.toByteArray());
+ }
+
+ // read data:BCFile.index, the data block index
+ if (cachedDataIndex == null) {
+ BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
+ cachedDataIndex = cache.cacheMetaBlock(DataIndex.BLOCK_NAME, blockR);
+ }
+
+ dataIndex = new DataIndex(cachedDataIndex);
+ cachedDataIndex.close();
+
+ } else {
+ // Logger.getLogger(Reader.class).debug("Read bcfile !METADATA from cache");
+ version = null;
+ metaIndex = new MetaIndex(cachedMetaIndex);
+ dataIndex = new DataIndex(cachedDataIndex);
+ }
+ }
+
+ /**
+ * Get the name of the default compression algorithm.
+ *
+ * @return the name of the default compression algorithm.
+ */
+ public String getDefaultCompressionName() {
+ return dataIndex.getDefaultCompressionAlgorithm().getName();
+ }
+
+ /**
+ * Get version of BCFile file being read.
+ *
+ * @return version of BCFile file being read.
+ */
+ public Version getBCFileVersion() {
+ return version;
+ }
+
+ /**
+ * Get version of BCFile API.
+ *
+ * @return version of BCFile API.
+ */
+ public Version getAPIVersion() {
+ return API_VERSION;
+ }
+
+ /**
+ * Finishing reading the BCFile. Release all resources.
+ */
++ @Override
+ public void close() {
+ // nothing to be done now
+ }
+
+ /**
+ * Get the number of data blocks.
+ *
+ * @return the number of data blocks.
+ */
+ public int getBlockCount() {
+ return dataIndex.getBlockRegionList().size();
+ }
+
+ /**
+ * Stream access to a Meta Block.
+ *
+ * @param name
+ * meta block name
+ * @return BlockReader input stream for reading the meta block.
- * @throws IOException
+ * @throws MetaBlockDoesNotExist
+ * The Meta Block with the given name does not exist.
+ */
+ public BlockReader getMetaBlock(String name) throws IOException, MetaBlockDoesNotExist {
+ MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
+ if (imeBCIndex == null) {
+ throw new MetaBlockDoesNotExist("name=" + name);
+ }
+
+ BlockRegion region = imeBCIndex.getRegion();
+ return createReader(imeBCIndex.getCompressionAlgorithm(), region);
+ }
+
+ /**
+ * Stream access to a Data Block.
+ *
+ * @param blockIndex
+ * 0-based data block index.
+ * @return BlockReader input stream for reading the data block.
- * @throws IOException
+ */
+ public BlockReader getDataBlock(int blockIndex) throws IOException {
+ if (blockIndex < 0 || blockIndex >= getBlockCount()) {
+ throw new IndexOutOfBoundsException(String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
+ }
+
+ BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
+ return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
+ }
+
+ public BlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException {
+ BlockRegion region = new BlockRegion(offset, compressedSize, rawSize);
+ return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
+ }
+
+ private BlockReader createReader(Algorithm compressAlgo, BlockRegion region) throws IOException {
+ RBlockState rbs = new RBlockState(compressAlgo, in, region, conf);
+ return new BlockReader(rbs);
+ }
+
+ /**
+ * Find the smallest Block index whose starting offset is greater than or equal to the specified offset.
+ *
+ * @param offset
+ * User-specific offset.
+ * @return the index to the data Block if such block exists; or -1 otherwise.
+ */
+ public int getBlockIndexNear(long offset) {
+ ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
+ int idx = Utils.lowerBound(list, new ScalarLong(offset), new ScalarComparator());
+
+ if (idx == list.size()) {
+ return -1;
+ }
+
+ return idx;
+ }
+ }
+
+ /**
+ * Index for all Meta blocks.
+ */
+ static class MetaIndex {
+ // use a tree map, for getting a meta block entry by name
+ final Map<String,MetaIndexEntry> index;
+
+ // for write
+ public MetaIndex() {
+ index = new TreeMap<String,MetaIndexEntry>();
+ }
+
+ // for read, construct the map from the file
+ public MetaIndex(DataInput in) throws IOException {
+ int count = Utils.readVInt(in);
+ index = new TreeMap<String,MetaIndexEntry>();
+
+ for (int nx = 0; nx < count; nx++) {
+ MetaIndexEntry indexEntry = new MetaIndexEntry(in);
+ index.put(indexEntry.getMetaName(), indexEntry);
+ }
+ }
+
+ public void addEntry(MetaIndexEntry indexEntry) {
+ index.put(indexEntry.getMetaName(), indexEntry);
+ }
+
+ public MetaIndexEntry getMetaByName(String name) {
+ return index.get(name);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVInt(out, index.size());
+
+ for (MetaIndexEntry indexEntry : index.values()) {
+ indexEntry.write(out);
+ }
+ }
+ }
+
+ /**
+ * An entry describes a meta block in the MetaIndex.
+ */
+ static final class MetaIndexEntry {
+ private final String metaName;
+ private final Algorithm compressionAlgorithm;
+ private final static String defaultPrefix = "data:";
+
+ private final BlockRegion region;
+
+ public MetaIndexEntry(DataInput in) throws IOException {
+ String fullMetaName = Utils.readString(in);
+ if (fullMetaName.startsWith(defaultPrefix)) {
+ metaName = fullMetaName.substring(defaultPrefix.length(), fullMetaName.length());
+ } else {
+ throw new IOException("Corrupted Meta region Index");
+ }
+
+ compressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
+ region = new BlockRegion(in);
+ }
+
+ public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm, BlockRegion region) {
+ this.metaName = metaName;
+ this.compressionAlgorithm = compressionAlgorithm;
+ this.region = region;
+ }
+
+ public String getMetaName() {
+ return metaName;
+ }
+
+ public Algorithm getCompressionAlgorithm() {
+ return compressionAlgorithm;
+ }
+
+ public BlockRegion getRegion() {
+ return region;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeString(out, defaultPrefix + metaName);
+ Utils.writeString(out, compressionAlgorithm.getName());
+
+ region.write(out);
+ }
+ }
+
+ /**
+ * Index of all compressed data blocks.
+ */
+ static class DataIndex {
+ final static String BLOCK_NAME = "BCFile.index";
+
+ private final Algorithm defaultCompressionAlgorithm;
+
+ // for data blocks, each entry specifies a block's offset, compressed size
+ // and raw size
+ private final ArrayList<BlockRegion> listRegions;
+
+ private boolean trackBlocks;
+
+ // for read, deserialized from a file
+ public DataIndex(DataInput in) throws IOException {
+ defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
+
+ int n = Utils.readVInt(in);
+ listRegions = new ArrayList<BlockRegion>(n);
+
+ for (int i = 0; i < n; i++) {
+ BlockRegion region = new BlockRegion(in);
+ listRegions.add(region);
+ }
+ }
+
+ // for write
+ public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) {
+ this.trackBlocks = trackBlocks;
+ this.defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
+ listRegions = new ArrayList<BlockRegion>();
+ }
+
+ public Algorithm getDefaultCompressionAlgorithm() {
+ return defaultCompressionAlgorithm;
+ }
+
+ public ArrayList<BlockRegion> getBlockRegionList() {
+ return listRegions;
+ }
+
+ public void addBlockRegion(BlockRegion region) {
+ if (trackBlocks)
+ listRegions.add(region);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeString(out, defaultCompressionAlgorithm.getName());
+
+ Utils.writeVInt(out, listRegions.size());
+
+ for (BlockRegion region : listRegions) {
+ region.write(out);
+ }
+ }
+ }
+
+ /**
+ * Magic number uniquely identifying a BCFile in the header/footer.
+ */
+ static final class Magic {
+ private final static byte[] AB_MAGIC_BCFILE = {
+ // ... total of 16 bytes
+ (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91, (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf, (byte) 0x41,
+ (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1, (byte) 0x50};
+
+ public static void readAndVerify(DataInput in) throws IOException {
+ byte[] abMagic = new byte[size()];
+ in.readFully(abMagic);
+
+ // check against AB_MAGIC_BCFILE, if not matching, throw an
+ // Exception
+ if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
+ throw new IOException("Not a valid BCFile.");
+ }
+ }
+
+ public static void write(DataOutput out) throws IOException {
+ out.write(AB_MAGIC_BCFILE);
+ }
+
+ public static int size() {
+ return AB_MAGIC_BCFILE.length;
+ }
+ }
+
+ /**
+ * Block region.
+ */
+ static final class BlockRegion implements Scalar {
+ private final long offset;
+ private final long compressedSize;
+ private final long rawSize;
+
+ public BlockRegion(DataInput in) throws IOException {
+ offset = Utils.readVLong(in);
+ compressedSize = Utils.readVLong(in);
+ rawSize = Utils.readVLong(in);
+ }
+
+ public BlockRegion(long offset, long compressedSize, long rawSize) {
+ this.offset = offset;
+ this.compressedSize = compressedSize;
+ this.rawSize = rawSize;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVLong(out, offset);
+ Utils.writeVLong(out, compressedSize);
+ Utils.writeVLong(out, rawSize);
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getCompressedSize() {
+ return compressedSize;
+ }
+
+ public long getRawSize() {
+ return rawSize;
+ }
+
+ @Override
+ public long magnitude() {
+ return offset;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
index 2b57638,0000000..d7734a2
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
@@@ -1,91 -1,0 +1,89 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * Adaptor class to wrap byte-array backed objects (including java byte array) as RawComparable objects.
+ */
+public final class ByteArray implements RawComparable {
+ private final byte[] buffer;
+ private final int offset;
+ private final int len;
+
+ /**
+ * Constructing a ByteArray from a {@link BytesWritable}.
- *
- * @param other
+ */
+ public ByteArray(BytesWritable other) {
+ this(other.getBytes(), 0, other.getLength());
+ }
+
+ /**
+ * Wrap a whole byte array as a RawComparable.
+ *
+ * @param buffer
+ * the byte array buffer.
+ */
+ public ByteArray(byte[] buffer) {
+ this(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Wrap a partial byte array as a RawComparable.
+ *
+ * @param buffer
+ * the byte array buffer.
+ * @param offset
+ * the starting offset
+ * @param len
+ * the length of the consecutive bytes to be wrapped.
+ */
+ public ByteArray(byte[] buffer, int offset, int len) {
+ if ((offset | len | (buffer.length - offset - len)) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ this.buffer = buffer;
+ this.offset = offset;
+ this.len = len;
+ }
+
+ /**
+ * @return the underlying buffer.
+ */
+ @Override
+ public byte[] buffer() {
+ return buffer;
+ }
+
+ /**
+ * @return the offset in the buffer.
+ */
+ @Override
+ public int offset() {
+ return offset;
+ }
+
+ /**
+ * @return the size of the byte array.
+ */
+ @Override
+ public int size() {
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
index a075d87,0000000..345d406
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
@@@ -1,418 -1,0 +1,416 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Several related classes to support chunk-encoded sub-streams on top of a regular stream.
+ */
+final class Chunk {
+
+ /**
+ * Prevent the instantiation of class.
+ */
+ private Chunk() {
+ // nothing
+ }
+
+ /**
+ * Decoding a chain of chunks encoded through ChunkEncoder or SingleChunkEncoder.
+ */
+ static public class ChunkDecoder extends InputStream {
+ private DataInputStream in = null;
+ private boolean lastChunk;
+ private int remain = 0;
+ private boolean closed;
+
+ public ChunkDecoder() {
+ lastChunk = true;
+ closed = true;
+ }
+
+ public void reset(DataInputStream downStream) {
+ // no need to wind forward the old input.
+ in = downStream;
+ lastChunk = false;
+ remain = 0;
+ closed = false;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param in
+ * The source input stream which contains chunk-encoded data stream.
+ */
+ public ChunkDecoder(DataInputStream in) {
+ this.in = in;
+ lastChunk = false;
+ closed = false;
+ }
+
+ /**
+ * Have we reached the last chunk.
+ *
+ * @return true if we have reached the last chunk.
- * @throws java.io.IOException
+ */
+ public boolean isLastChunk() throws IOException {
+ checkEOF();
+ return lastChunk;
+ }
+
+ /**
+ * How many bytes remain in the current chunk?
+ *
+ * @return remaining bytes left in the current chunk.
- * @throws java.io.IOException
+ */
+ public int getRemain() throws IOException {
+ checkEOF();
+ return remain;
+ }
+
+ /**
+ * Reading the length of next chunk.
+ *
+ * @throws java.io.IOException
+ * when no more data is available.
+ */
+ private void readLength() throws IOException {
+ remain = Utils.readVInt(in);
+ if (remain >= 0) {
+ lastChunk = true;
+ } else {
+ remain = -remain;
+ }
+ }
+
+ /**
+ * Check whether we reach the end of the stream.
+ *
+ * @return false if the chunk encoded stream has more data to read (in which case available() will be greater than 0); true otherwise.
+ * @throws java.io.IOException
+ * on I/O errors.
+ */
+ private boolean checkEOF() throws IOException {
+ if (isClosed())
+ return true;
+ while (true) {
+ if (remain > 0)
+ return false;
+ if (lastChunk)
+ return true;
+ readLength();
+ }
+ }
+
+ @Override
+ /*
+ * This method never blocks the caller. Returning 0 does not mean we reach the end of the stream.
+ */
+ public int available() {
+ return remain;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (checkEOF())
+ return -1;
+ int ret = in.read();
+ if (ret < 0)
+ throw new IOException("Corrupted chunk encoding stream");
+ --remain;
+ return ret;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ if (!checkEOF()) {
+ int n = Math.min(remain, len);
+ int ret = in.read(b, off, n);
+ if (ret < 0)
+ throw new IOException("Corrupted chunk encoding stream");
+ remain -= ret;
+ return ret;
+ }
+ return -1;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (!checkEOF()) {
+ long ret = in.skip(Math.min(remain, n));
+ remain -= ret;
+ return ret;
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed == false) {
+ try {
+ while (!checkEOF()) {
+ skip(Integer.MAX_VALUE);
+ }
+ } finally {
+ closed = true;
+ }
+ }
+ }
+ }
+
+ /**
+ * Chunk Encoder. Encoding the output data into a chain of chunks in the following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n, byte[len_n].
+ * Where len1, len2, ..., len_n are the lengths of the data chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks cannot have length 0.
+ * All lengths are in the range of 0 to Integer.MAX_VALUE and are encoded in Utils.VInt format.
+ */
+ static public class ChunkEncoder extends OutputStream {
+ /**
+ * The data output stream it connects to.
+ */
+ private DataOutputStream out;
+
+ /**
+ * The internal buffer that is only used when we do not know the advertised size.
+ */
+ private byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer. This value is always in the range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt> through
+ * <tt>buf[count-1]</tt> contain valid byte data.
+ */
+ private int count;
+
+ /**
+ * Constructor.
+ *
+ * @param out
+ * the underlying output stream.
+ * @param buf
+ * user-supplied buffer. The buffer would be used exclusively by the ChunkEncoder during its life cycle.
+ */
+ public ChunkEncoder(DataOutputStream out, byte[] buf) {
+ this.out = out;
+ this.buf = buf;
+ this.count = 0;
+ }
+
+ /**
+ * Write out a chunk.
+ *
+ * @param chunk
+ * The chunk buffer.
+ * @param offset
+ * Offset to chunk buffer for the beginning of chunk.
+ * @param len
+ * @param last
+ * Is this the last call to flushBuffer?
+ */
+ private void writeChunk(byte[] chunk, int offset, int len, boolean last) throws IOException {
+ if (last) { // always write out the length for the last chunk.
+ Utils.writeVInt(out, len);
+ if (len > 0) {
+ out.write(chunk, offset, len);
+ }
+ } else {
+ if (len > 0) {
+ Utils.writeVInt(out, -len);
+ out.write(chunk, offset, len);
+ }
+ }
+ }
+
+ /**
+ * Write out a chunk that is a concatenation of the internal buffer plus user supplied data. This will never be the last block.
+ *
+ * @param data
+ * User supplied data buffer.
+ * @param offset
+ * Offset to user data buffer.
+ * @param len
+ * User data buffer size.
+ */
+ private void writeBufData(byte[] data, int offset, int len) throws IOException {
+ if (count + len > 0) {
+ Utils.writeVInt(out, -(count + len));
+ out.write(buf, 0, count);
+ count = 0;
+ out.write(data, offset, len);
+ }
+ }
+
+ /**
+ * Flush the internal buffer.
+ *
+ * Is this the last call to flushBuffer?
+ *
+ * @throws java.io.IOException
+ */
+ private void flushBuffer() throws IOException {
+ if (count > 0) {
+ writeChunk(buf, 0, count, false);
+ count = 0;
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (count >= buf.length) {
+ flushBuffer();
+ }
+ buf[count++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte b[]) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if ((len + count) >= buf.length) {
+ /*
+ * If the input data do not fit in buffer, flush the output buffer and then write the data directly. In this way buffered streams will cascade
+ * harmlessly.
+ */
+ writeBufData(b, off, len);
+ return;
+ }
+
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (buf != null) {
+ try {
+ writeChunk(buf, 0, count, true);
+ } finally {
+ buf = null;
+ out = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * Encode the whole stream as a single chunk. Expecting to know the size of the chunk up-front.
+ */
+ static public class SingleChunkEncoder extends OutputStream {
+ /**
+ * The data output stream it connects to.
+ */
+ private final DataOutputStream out;
+
+ /**
+ * The remaining bytes to be written.
+ */
+ private int remain;
+ private boolean closed = false;
+
+ /**
+ * Constructor.
+ *
+ * @param out
+ * the underlying output stream.
+ * @param size
+ * The total # of bytes to be written as a single chunk.
+ * @throws java.io.IOException
+ * if an I/O error occurs.
+ */
+ public SingleChunkEncoder(DataOutputStream out, int size) throws IOException {
+ this.out = out;
+ this.remain = size;
+ Utils.writeVInt(out, size);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (remain > 0) {
+ out.write(b);
+ --remain;
+ } else {
+ throw new IOException("Writing more bytes than advertised size.");
+ }
+ }
+
+ @Override
+ public void write(byte b[]) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if (remain >= len) {
+ out.write(b, off, len);
+ remain -= len;
+ } else {
+ throw new IOException("Writing more bytes than advertised size.");
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed == true) {
+ return;
+ }
+
+ try {
+ if (remain > 0) {
+ throw new IOException("Writing less bytes than advertised size.");
+ }
+ } finally {
+ closed = true;
+ }
+ }
+ }
+}