You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/11/20 22:06:05 UTC
svn commit: r1543947 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc: InStream.java
RecordReaderImpl.java
Author: hashutosh
Date: Wed Nov 20 21:06:05 2013
New Revision: 1543947
URL: http://svn.apache.org/r1543947
Log:
HIVE-5663 : Refactor ORC RecordReader to operate on direct & wrapped ByteBuffers (Gopal V via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1543947&r1=1543946&r2=1543947&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Wed Nov 20 21:06:05 2013
@@ -29,10 +29,8 @@ abstract class InStream extends InputStr
private final long[] offsets;
private final long length;
private long currentOffset;
- private byte[] range;
+ private ByteBuffer range;
private int currentRange;
- private int offsetInRange;
- private int limitInRange;
public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
long length) {
@@ -41,42 +39,39 @@ abstract class InStream extends InputStr
this.offsets = offsets;
this.length = length;
currentRange = 0;
- offsetInRange = 0;
- limitInRange = 0;
currentOffset = 0;
}
@Override
public int read() {
- if (offsetInRange >= limitInRange) {
+ if (range == null || range.remaining() == 0) {
if (currentOffset == length) {
return -1;
}
seek(currentOffset);
}
currentOffset += 1;
- return 0xff & range[offsetInRange++];
+ return 0xff & range.get();
}
@Override
public int read(byte[] data, int offset, int length) {
- if (offsetInRange >= limitInRange) {
+ if (range == null || range.remaining() == 0) {
if (currentOffset == this.length) {
return -1;
}
seek(currentOffset);
}
- int actualLength = Math.min(length, limitInRange - offsetInRange);
- System.arraycopy(range, offsetInRange, data, offset, actualLength);
- offsetInRange += actualLength;
+ int actualLength = Math.min(length, range.remaining());
+ range.get(data, offset, actualLength);
currentOffset += actualLength;
return actualLength;
}
@Override
public int available() {
- if (offsetInRange < limitInRange) {
- return limitInRange - offsetInRange;
+ if (range != null && range.remaining() > 0) {
+ return range.remaining();
}
return (int) (length - currentOffset);
}
@@ -85,6 +80,10 @@ abstract class InStream extends InputStr
public void close() {
currentRange = bytes.length;
currentOffset = length;
+ // explicit de-ref of bytes[]
+ for(int i = 0; i < bytes.length; i++) {
+ bytes[i] = null;
+ }
}
@Override
@@ -98,10 +97,10 @@ abstract class InStream extends InputStr
desired - offsets[i] < bytes[i].remaining()) {
currentOffset = desired;
currentRange = i;
- this.range = bytes[i].array();
- offsetInRange = bytes[i].arrayOffset() + bytes[i].position();
- limitInRange = bytes[i].arrayOffset() + bytes[i].limit();
- offsetInRange += desired - offsets[i];
+ this.range = bytes[i].duplicate();
+ int pos = range.position();
+ pos += (int)(desired - offsets[i]); // this is why we duplicate
+ this.range.position(pos);
return;
}
}
@@ -113,7 +112,7 @@ abstract class InStream extends InputStr
public String toString() {
return "uncompressed stream " + name + " position: " + currentOffset +
" length: " + length + " range: " + currentRange +
- " offset: " + offsetInRange + " limit: " + limitInRange;
+ " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
}
}
@@ -123,14 +122,13 @@ abstract class InStream extends InputStr
private final long[] offsets;
private final int bufferSize;
private final long length;
- private ByteBuffer uncompressed = null;
+ private ByteBuffer uncompressed;
private final CompressionCodec codec;
- private byte[] compressed = null;
+ private ByteBuffer compressed;
private long currentOffset;
private int currentRange;
- private int offsetInCompressed;
- private int limitInCompressed;
private boolean isUncompressedOriginal;
+ private boolean isDirect = false;
public CompressedStream(String name, ByteBuffer[] input,
long[] offsets, long length,
@@ -140,49 +138,59 @@ abstract class InStream extends InputStr
this.name = name;
this.codec = codec;
this.length = length;
+ if(this.length > 0) {
+ isDirect = this.bytes[0].isDirect();
+ }
this.offsets = offsets;
this.bufferSize = bufferSize;
currentOffset = 0;
currentRange = 0;
- offsetInCompressed = 0;
- limitInCompressed = 0;
+ }
+
+ private ByteBuffer allocateBuffer(int size) {
+ // TODO: use the same pool as the ORC readers
+ if(isDirect == true) {
+ return ByteBuffer.allocateDirect(size);
+ } else {
+ return ByteBuffer.allocate(size);
+ }
}
private void readHeader() throws IOException {
- if (compressed == null || offsetInCompressed >= limitInCompressed) {
+ if (compressed == null || compressed.remaining() <= 0) {
seek(currentOffset);
}
- if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) {
- int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) |
- ((0xff & compressed[offsetInCompressed + 1]) << 7) |
- ((0xff & compressed[offsetInCompressed]) >> 1);
+ if (compressed.remaining() > OutStream.HEADER_SIZE) {
+ int b0 = compressed.get() & 0xff;
+ int b1 = compressed.get() & 0xff;
+ int b2 = compressed.get() & 0xff;
+ boolean isOriginal = (b0 & 0x01) == 1;
+ int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
if (chunkLength > bufferSize) {
throw new IllegalArgumentException("Buffer size too small. size = " +
bufferSize + " needed = " + chunkLength);
}
- boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1;
- offsetInCompressed += OutStream.HEADER_SIZE;
+ // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
+ assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+ currentOffset += OutStream.HEADER_SIZE;
+
+ ByteBuffer slice = this.slice(chunkLength);
+
if (isOriginal) {
+ uncompressed = slice;
isUncompressedOriginal = true;
- uncompressed = bytes[currentRange].duplicate();
- uncompressed.position(offsetInCompressed -
- bytes[currentRange].arrayOffset());
- uncompressed.limit(offsetInCompressed + chunkLength);
} else {
if (isUncompressedOriginal) {
- uncompressed = ByteBuffer.allocate(bufferSize);
+ uncompressed = allocateBuffer(bufferSize);
isUncompressedOriginal = false;
} else if (uncompressed == null) {
- uncompressed = ByteBuffer.allocate(bufferSize);
+ uncompressed = allocateBuffer(bufferSize);
} else {
uncompressed.clear();
}
- codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed,
- chunkLength),
- uncompressed);
+ codec.decompress(slice, uncompressed);
}
- offsetInCompressed += chunkLength;
- currentOffset += chunkLength + OutStream.HEADER_SIZE;
} else {
throw new IllegalStateException("Can't read header at " + this);
}
@@ -208,10 +216,7 @@ abstract class InStream extends InputStr
readHeader();
}
int actualLength = Math.min(length, uncompressed.remaining());
- System.arraycopy(uncompressed.array(),
- uncompressed.arrayOffset() + uncompressed.position(), data,
- offset, actualLength);
- uncompressed.position(uncompressed.position() + actualLength);
+ uncompressed.get(data, offset, actualLength);
return actualLength;
}
@@ -229,10 +234,12 @@ abstract class InStream extends InputStr
@Override
public void close() {
uncompressed = null;
+ compressed = null;
currentRange = bytes.length;
- offsetInCompressed = 0;
- limitInCompressed = 0;
currentOffset = length;
+ for(int i = 0; i < bytes.length; i++) {
+ bytes[i] = null;
+ }
}
@Override
@@ -249,16 +256,62 @@ abstract class InStream extends InputStr
}
}
+ /* slices a read only contigous buffer of chunkLength */
+ private ByteBuffer slice(int chunkLength) throws IOException {
+ int len = chunkLength;
+ final long oldOffset = currentOffset;
+ ByteBuffer slice;
+ if (compressed.remaining() >= len) {
+ slice = compressed.slice();
+ // simple case
+ slice.limit(len);
+ currentOffset += len;
+ compressed.position(compressed.position() + len);
+ return slice;
+ } else if (currentRange >= (bytes.length - 1)) {
+ // nothing has been modified yet
+ throw new IOException("EOF in " + this + " while trying to read " +
+ chunkLength + " bytes");
+ }
+
+ // we need to consolidate 2 or more buffers into 1
+ // first clear out compressed buffers
+ ByteBuffer copy = allocateBuffer(chunkLength);
+ currentOffset += compressed.remaining();
+ len -= compressed.remaining();
+ copy.put(compressed);
+
+ while (len > 0 && (++currentRange) < bytes.length) {
+ compressed = bytes[currentRange].duplicate();
+ if (compressed.remaining() >= len) {
+ slice = compressed.slice();
+ slice.limit(len);
+ copy.put(slice);
+ currentOffset += len;
+ compressed.position(compressed.position() + len);
+ return copy;
+ }
+ currentOffset += compressed.remaining();
+ len -= compressed.remaining();
+ copy.put(compressed);
+ }
+
+ // restore offsets for exception clarity
+ seek(oldOffset);
+ throw new IOException("EOF in " + this + " while trying to read " +
+ chunkLength + " bytes");
+ }
+
private void seek(long desired) throws IOException {
for(int i = 0; i < bytes.length; ++i) {
if (offsets[i] <= desired &&
desired - offsets[i] < bytes[i].remaining()) {
currentRange = i;
- compressed = bytes[i].array();
- offsetInCompressed = (int) (bytes[i].arrayOffset() +
- bytes[i].position() + (desired - offsets[i]));
+ compressed = bytes[i].duplicate();
+ int pos = compressed.position();
+ pos += (int)(desired - offsets[i]);
+ compressed.position(pos);
currentOffset = desired;
- limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit();
return;
}
}
@@ -267,11 +320,9 @@ abstract class InStream extends InputStr
if (segments != 0 &&
desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
currentRange = segments - 1;
- compressed = bytes[currentRange].array();
- offsetInCompressed = bytes[currentRange].arrayOffset() +
- bytes[currentRange].limit();
+ compressed = bytes[currentRange].duplicate();
+ compressed.position(compressed.limit());
currentOffset = desired;
- limitInCompressed = offsetInCompressed;
return;
}
throw new IOException("Seek outside of data in " + this + " to " +
@@ -294,7 +345,7 @@ abstract class InStream extends InputStr
public String toString() {
return "compressed stream " + name + " position: " + currentOffset +
" length: " + length + " range: " + currentRange +
- " offset: " + offsetInCompressed + " limit: " + limitInCompressed +
+ " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
rangeString() +
(uncompressed == null ? "" :
" uncompressed: " + uncompressed.position() + " to " +
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1543947&r1=1543946&r2=1543947&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Nov 20 21:06:05 2013
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -75,6 +76,7 @@ class RecordReaderImpl implements Record
private long rowCountInStripe = 0;
private final Map<StreamName, InStream> streams =
new HashMap<StreamName, InStream>();
+ List<BufferChunk> bufferChunks = new ArrayList<BufferChunk>(0);
private final TreeReader reader;
private final OrcProto.RowIndex[] indexes;
private final SearchArgument sarg;
@@ -125,6 +127,7 @@ class RecordReaderImpl implements Record
rows += stripe.getNumberOfRows();
}
}
+
firstRow = skippedRows;
totalRowCount = rows;
reader = createTreeReader(path, 0, types, included);
@@ -2176,6 +2179,17 @@ class RecordReaderImpl implements Record
return null;
}
+ private void clearStreams() throws IOException {
+ // explicit close of all streams to de-ref ByteBuffers
+ for(InStream is: streams.values()) {
+ is.close();
+ }
+ if(bufferChunks != null) {
+ bufferChunks.clear();
+ }
+ streams.clear();
+ }
+
/**
* Read the current stripe into memory.
* @throws IOException
@@ -2183,7 +2197,7 @@ class RecordReaderImpl implements Record
private void readStripe() throws IOException {
StripeInformation stripe = stripes.get(currentStripe);
stripeFooter = readStripeFooter(stripe);
- streams.clear();
+ clearStreams();
// setup the position in the stripe
rowCountInStripe = stripe.getNumberOfRows();
rowInStripe = 0;
@@ -2223,28 +2237,17 @@ class RecordReaderImpl implements Record
private void readAllDataStreams(StripeInformation stripe
) throws IOException {
- byte[] buffer =
- new byte[(int) (stripe.getDataLength())];
- file.seek(stripe.getOffset() + stripe.getIndexLength());
- file.readFully(buffer, 0, buffer.length);
- int sectionOffset = 0;
- for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
- if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
- int sectionLength = (int) section.getLength();
- ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
- sectionLength);
- StreamName name = new StreamName(section.getColumn(),
- section.getKind());
- streams.put(name,
- InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer},
- new long[]{0}, sectionLength, codec, bufferSize));
- sectionOffset += sectionLength;
- }
- }
+ long start = stripe.getIndexLength();
+ long end = start + stripe.getDataLength();
+ // explicitly trigger 1 big read
+ DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
+ bufferChunks = readDiskRanges(file, stripe.getOffset(), Arrays.asList(ranges));
+ List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
+ createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
}
/**
- * The secionts of stripe that we need to read.
+ * The sections of stripe that we need to read.
*/
static class DiskRange {
/** the first address we need to read. */
@@ -2275,6 +2278,30 @@ class RecordReaderImpl implements Record
}
}
+ /**
+ * The sections of stripe that we have read.
+ * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
+ */
+ static class BufferChunk {
+ final ByteBuffer chunk;
+ /** the first address we need to read. */
+ final long offset;
+ /** end of the buffer **/
+ final long end;
+
+ BufferChunk(ByteBuffer chunk, long offset) {
+ this.offset = offset;
+ this.chunk = chunk;
+ end = offset + chunk.remaining();
+ }
+
+ @Override
+ public final String toString() {
+ return "range start: " + offset + " size: " + chunk.remaining() + " type: "
+ + (chunk.isDirect() ? "direct" : "array-backed");
+ }
+ }
+
private static final int BYTE_STREAM_POSITIONS = 1;
private static final int RUN_LENGTH_BYTE_POSITIONS =
BYTE_STREAM_POSITIONS + 1;
@@ -2460,17 +2487,17 @@ class RecordReaderImpl implements Record
* ranges
* @throws IOException
*/
- static byte[][] readDiskRanges(FSDataInputStream file,
+ List<BufferChunk> readDiskRanges(FSDataInputStream file,
long base,
List<DiskRange> ranges) throws IOException {
- byte[][] result = new byte[ranges.size()][];
- int i = 0;
+ ArrayList<BufferChunk> result = new ArrayList<RecordReaderImpl.BufferChunk>(ranges.size());
for(DiskRange range: ranges) {
int len = (int) (range.end - range.offset);
- result[i] = new byte[len];
- file.seek(base + range.offset);
- file.readFully(result[i]);
- i += 1;
+ long off = range.offset;
+ file.seek(base + off);
+ byte[] buffer = new byte[len];
+ file.readFully(buffer, 0, buffer.length);
+ result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
}
return result;
}
@@ -2509,8 +2536,7 @@ class RecordReaderImpl implements Record
}
static void createStreams(List<OrcProto.Stream> streamDescriptions,
- List<DiskRange> ranges,
- byte[][] bytes,
+ List<BufferChunk> ranges,
boolean[] includeColumn,
CompressionCodec codec,
int bufferSize,
@@ -2519,13 +2545,13 @@ class RecordReaderImpl implements Record
long offset = 0;
for(OrcProto.Stream streamDesc: streamDescriptions) {
int column = streamDesc.getColumn();
- if (includeColumn[column] &&
+ if ((includeColumn == null || includeColumn[column]) &&
StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
long length = streamDesc.getLength();
int first = -1;
int last = -2;
- for(int i=0; i < bytes.length; ++i) {
- DiskRange range = ranges.get(i);
+ for(int i=0; i < ranges.size(); ++i) {
+ BufferChunk range = ranges.get(i);
if (overlap(offset, offset+length, range.offset, range.end)) {
if (first == -1) {
first = i;
@@ -2536,12 +2562,24 @@ class RecordReaderImpl implements Record
ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
long[] offsets = new long[last - first + 1];
for(int i=0; i < buffers.length; ++i) {
- DiskRange range = ranges.get(i + first);
+ BufferChunk range = ranges.get(i + first);
long start = Math.max(range.offset, offset);
long end = Math.min(range.end, offset+length);
- buffers[i] = ByteBuffer.wrap(bytes[first + i],
- Math.max(0, (int) (offset - range.offset)), (int) (end - start));
- offsets[i] = Math.max(0, range.offset - offset);
+ buffers[i] = range.chunk.slice();
+ assert range.chunk.position() == 0; // otherwise we'll mix up positions
+ /*
+ * buffers are positioned in-wards if the offset > range.offset
+ * offsets[i] == range.offset - offset, except if offset > range.offset
+ */
+ if(offset > range.offset) {
+ buffers[i].position((int)(offset - range.offset));
+ buffers[i].limit((int)(end - range.offset));
+ offsets[i] = 0;
+ } else {
+ buffers[i].position(0);
+ buffers[i].limit((int)(end - range.offset));
+ offsets[i] = (range.offset - offset);
+ }
}
StreamName name = new StreamName(column, streamDesc.getKind());
streams.put(name, InStream.create(name.toString(), buffers, offsets,
@@ -2565,8 +2603,8 @@ class RecordReaderImpl implements Record
if (LOG.isDebugEnabled()) {
LOG.debug("merge = " + stringifyDiskRanges(chunks));
}
- byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks);
- createStreams(streamList, chunks, bytes, included, codec, bufferSize,
+ bufferChunks = readDiskRanges(file, stripe.getOffset(), chunks);
+ createStreams(streamList, bufferChunks, included, codec, bufferSize,
streams);
}
@@ -2666,6 +2704,7 @@ class RecordReaderImpl implements Record
@Override
public void close() throws IOException {
+ clearStreams();
file.close();
}