You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2011/08/23 04:25:31 UTC
svn commit: r1160525 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
Author: cws
Date: Tue Aug 23 02:25:30 2011
New Revision: 1160525
URL: http://svn.apache.org/viewvc?rev=1160525&view=rev
Log:
HIVE-2350. Improve RCFile Read Speed (Tim Armstrong via cws)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1160525&r1=1160524&r2=1160525&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Aug 23 02:25:30 2011
@@ -339,6 +339,7 @@ public class RCFile {
keyBuffer.eachColumnUncompressedValueLen[colIndex]);
loadedColumnsValueBuffer[index] = decompressedData;
decompressedFlag[index] = true;
+ numCompressed--;
return decompressedData.getData();
}
}
@@ -346,6 +347,7 @@ public class RCFile {
// used to load columns' value into memory
private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
private boolean[] decompressedFlag = null;
+ private int numCompressed;
private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
boolean inited = false;
@@ -409,7 +411,11 @@ public class RCFile {
deflatFilter = codec.createInputStream(decompressBuffer,
valDecompressor);
}
-
+ if (codec != null) {
+ numCompressed = decompressedFlag.length;
+ } else {
+ numCompressed = 0;
+ }
for (int k = 0, readIndex = 0; k < columnNumber; k++) {
if (skippedColIDs[k]) {
continue;
@@ -456,6 +462,9 @@ public class RCFile {
}
addIndex++;
}
+ if (codec != null) {
+ numCompressed = decompressedFlag.length;
+ }
if (skipTotal != 0) {
in.skipBytes(skipTotal);
@@ -964,7 +973,12 @@ public class RCFile {
*
*/
public static class Reader {
-
+ private static class SelectedColumn {
+ public int colIndex;
+ public int rowReadIndex;
+ public int runLength;
+ public int prvLength;
+ }
private final Path file;
private final FSDataInputStream in;
@@ -999,16 +1013,21 @@ public class RCFile {
private int passedRowsNum = 0;
- private int[] columnRowReadIndex = null;
- private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
- private final int[] columnRunLength;
- private final int[] columnPrvLength;
+
private boolean decompress = false;
private Decompressor keyDecompressor;
NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
- int[] prjColIDs = null; // selected column IDs
+ //Current state of each selected column - e.g. current run length, etc.
+ // The size of the array is equal to the number of selected columns
+ private final SelectedColumn[] selectedColumns;
+
+ // map of original column id -> index among selected columns
+ private final int[] revPrjColIDs;
+
+ // column value lengths for each of the selected columns
+ private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
/** Create a new RCFile reader. */
public Reader(FileSystem fs, Path file, Configuration conf) throws IOException {
@@ -1053,7 +1072,7 @@ public class RCFile {
java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
.getReadColumnIDs(conf);
- skippedColIDs = new boolean[columnNumber];
+ boolean[] skippedColIDs = new boolean[columnNumber];
if (notSkipIDs.size() > 0) {
for (int i = 0; i < skippedColIDs.length; i++) {
skippedColIDs[i] = true;
@@ -1081,31 +1100,31 @@ public class RCFile {
}
}
+
+ revPrjColIDs = new int[columnNumber];
// get list of selected column IDs
- prjColIDs = new int[loadColumnNum];
+ selectedColumns = new SelectedColumn[loadColumnNum];
+ colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
for (int i = 0, j = 0; i < columnNumber; ++i) {
if (!skippedColIDs[i]) {
- prjColIDs[j++] = i;
- }
- }
-
- colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
- columnRunLength = new int[columnNumber];
- columnPrvLength = new int[columnNumber];
- columnRowReadIndex = new int[columnNumber];
- for (int i = 0; i < columnNumber; i++) {
- columnRowReadIndex[i] = 0;
- if (!skippedColIDs[i]) {
- colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+ SelectedColumn col = new SelectedColumn();
+ col.colIndex = i;
+ col.runLength = 0;
+ col.prvLength = -1;
+ col.rowReadIndex = 0;
+ selectedColumns[j] = col;
+ colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
+ revPrjColIDs[i] = j;
+ j++;
+ } else {
+ revPrjColIDs[i] = -1;
}
- columnRunLength[i] = 0;
- columnPrvLength[i] = -1;
}
currentKey = createKeyBuffer();
currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec);
}
-
+
/**
* Override this method to specialize the type of
* {@link FSDataInputStream} returned.
@@ -1334,13 +1353,14 @@ public class RCFile {
readRowsIndexInBuffer = 0;
recordsNumInValBuffer = currentKey.numberRows;
- for (int prjColID : prjColIDs) {
- int i = prjColID;
- colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
- .getData(), currentKey.allCellValLenBuffer[i].getLength());
- columnRowReadIndex[i] = 0;
- columnRunLength[i] = 0;
- columnPrvLength[i] = -1;
+ for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
+ SelectedColumn col = selectedColumns[selIx];
+ int colIx = col.colIndex;
+ NonSyncDataOutputBuffer buf = currentKey.allCellValLenBuffer[colIx];
+ colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength());
+ col.rowReadIndex = 0;
+ col.runLength = 0;
+ col.prvLength = -1;
}
return currentKeyLength;
@@ -1384,8 +1404,8 @@ public class RCFile {
*/
public BytesRefArrayWritable getColumn(int columnID,
BytesRefArrayWritable rest) throws IOException {
-
- if (skippedColIDs[columnID]) {
+ int selColIdx = revPrjColIDs[columnID];
+ if (selColIdx == -1) {
return null;
}
@@ -1402,16 +1422,26 @@ public class RCFile {
int columnNextRowStart = 0;
fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
.getData(), currentKey.allCellValLenBuffer[columnID].getLength());
+ SelectedColumn selCol = selectedColumns[selColIdx];
+ byte[] uncompData = null;
+ ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null;
+ boolean decompressed = currentValue.decompressedFlag[selColIdx];
+ if (decompressed) {
+ uncompData =
+ currentValue.loadedColumnsValueBuffer[selColIdx].getData();
+ } else {
+ decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx];
+ }
for (int i = 0; i < recordsNumInValBuffer; i++) {
- int length = getColumnNextValueLength(columnID);
+ colAdvanceRow(selColIdx, selCol);
+ int length = selCol.prvLength;
BytesRefWritable currentCell = rest.get(i);
- if (currentValue.decompressedFlag[columnID]) {
- currentCell.set(currentValue.loadedColumnsValueBuffer[columnID]
- .getData(), columnNextRowStart, length);
+
+ if (decompressed) {
+ currentCell.set(uncompData, columnNextRowStart, length);
} else {
- currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID],
- columnNextRowStart, length);
+ currentCell.set(decompCallBack, columnNextRowStart, length);
}
columnNextRowStart = columnNextRowStart + length;
}
@@ -1490,44 +1520,62 @@ public class RCFile {
// we do not use BytesWritable here to avoid the byte-copy from
// DataOutputStream to BytesWritable
-
- for (int j = 0; j < prjColIDs.length; ++j) {
- int i = prjColIDs[j];
-
- BytesRefWritable ref = ret.unCheckedGet(i);
-
- int columnCurrentRowStart = columnRowReadIndex[i];
- int length = getColumnNextValueLength(i);
- columnRowReadIndex[i] = columnCurrentRowStart + length;
-
- if (currentValue.decompressedFlag[j]) {
+ if (currentValue.numCompressed > 0) {
+ for (int j = 0; j < selectedColumns.length; ++j) {
+ SelectedColumn col = selectedColumns[j];
+ int i = col.colIndex;
+
+ BytesRefWritable ref = ret.unCheckedGet(i);
+
+ colAdvanceRow(j, col);
+
+ if (currentValue.decompressedFlag[j]) {
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ col.rowReadIndex, col.prvLength);
+ } else {
+ ref.set(currentValue.lazyDecompressCallbackObjs[j],
+ col.rowReadIndex, col.prvLength);
+ }
+ col.rowReadIndex += col.prvLength;
+ }
+ } else {
+ // This version of the loop eliminates a condition check and branch
+ // and is measurably faster (20% or so)
+ for (int j = 0; j < selectedColumns.length; ++j) {
+ SelectedColumn col = selectedColumns[j];
+ int i = col.colIndex;
+
+ BytesRefWritable ref = ret.unCheckedGet(i);
+
+ colAdvanceRow(j, col);
ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
- columnCurrentRowStart, length);
- } else {
- ref.set(currentValue.lazyDecompressCallbackObjs[j],
- columnCurrentRowStart, length);
+ col.rowReadIndex, col.prvLength);
+ col.rowReadIndex += col.prvLength;
}
}
rowFetched = true;
}
- private int getColumnNextValueLength(int i) throws IOException {
- if (columnRunLength[i] > 0) {
- --columnRunLength[i];
- return columnPrvLength[i];
+ /**
+ * Advance column state to the next now: update offsets, run lengths etc
+ * @param selCol - index among selectedColumns
+ * @param col - column object to update the state of. prvLength will be
+ * set to the new read position
+ * @throws IOException
+ */
+ private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException {
+ if (col.runLength > 0) {
+ --col.runLength;
} else {
- int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
+ int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[selCol]);
if (length < 0) {
// we reach a runlength here, use the previous length and reset
// runlength
- columnRunLength[i] = ~length;
- columnRunLength[i]--;
- length = columnPrvLength[i];
+ col.runLength = (~length) - 1;
} else {
- columnPrvLength[i] = length;
- columnRunLength[i] = 0;
+ col.prvLength = length;
+ col.runLength = 0;
}
- return length;
}
}