You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/09/08 22:28:50 UTC
svn commit: r812678 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
Author: zshao
Date: Tue Sep 8 20:28:50 2009
New Revision: 812678
URL: http://svn.apache.org/viewvc?rev=812678&view=rev
Log:
HIVE-756. Performance improvement for RCFile and ColumnarSerDe in Hive. (Ning Zhang via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Tue Sep 8 20:28:50 2009
@@ -56,6 +56,9 @@
HIVE-813. Show the actual exception thrown in UDF evaluation
(Zheng Shao via rmurthy)
+ HIVE-756. Performance improvement for RCFile and ColumnarSerDe in Hive.
+ (Ning Zhang via zshao)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Sep 8 20:28:50 2009
@@ -863,6 +863,8 @@
private Decompressor keyDecompressor;
NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
+
+ int[] prjColIDs = null; // selected column IDs
/** Create a new RCFile reader. */
public Reader(FileSystem fs, Path file, Configuration conf)
@@ -900,6 +902,9 @@
skippedColIDs[read] = false;
}
} else {
+ // TODO: if no column name is specified e.g, in select count(1) from tt;
+ // skip all columns, this should be distinguished from the case:
+ // select * from tt;
for (int i = 0; i < skippedColIDs.length; i++) {
skippedColIDs[i] = false;
}
@@ -913,6 +918,14 @@
}
}
+ // get list of selected column IDs
+ prjColIDs = new int[loadColumnNum];
+ for ( int i = 0, j = 0; i < columnNumber; ++i ) {
+ if (!skippedColIDs[i]) {
+ prjColIDs[j++] = i;
+ }
+ }
+
colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
columnRowReadIndex = new int[columnNumber];
for (int i = 0; i < columnNumber; i++) {
@@ -1128,9 +1141,8 @@
readRowsIndexInBuffer = 0;
recordsNumInValBuffer = currentKey.numberRows;
- for (int i = 0; i < columnNumber; i++) {
- if (skippedColIDs[i])
- continue;
+ for (int j = 0; j < prjColIDs.length; j++) {
+ int i = prjColIDs[j];
colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
.getData(), currentKey.allCellValLenBuffer[i].getLength());
columnRowReadIndex[i] = 0;
@@ -1258,29 +1270,23 @@
if (!currentValue.inited) {
currentValueBuffer();
+ ret.resetValid(columnNumber); // do this only when not intialized
}
// we do not use BytesWritable here to avoid the byte-copy from
// DataOutputStream to BytesWritable
- ret.resetValid(columnNumber);
+ for (int j = 0; j < prjColIDs.length; ++j) {
+ int i = prjColIDs[j];
- for (int i = 0, readIndex = 0; i < columnNumber; i++) {
BytesRefWritable ref = ret.unCheckedGet(i);
- if (skippedColIDs[i]) {
- if (ref != BytesRefWritable.ZeroBytesRefWritable)
- ret.set(i, BytesRefWritable.ZeroBytesRefWritable);
- continue;
- }
-
int columnCurrentRowStart = (int) columnRowReadIndex[i];
int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
columnRowReadIndex[i] = columnCurrentRowStart + length;
- ref.set(currentValue.loadedColumnsValueBuffer[readIndex].getData(),
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
columnCurrentRowStart, length);
- readIndex++;
}
rowFetched = true;
}
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Tue Sep 8 20:28:50 2009
@@ -175,7 +175,7 @@
StructObjectInspector oi = (StructObjectInspector) serDe
.getObjectInspector();
List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
- assertEquals(8, fieldRefs.size());
+ assertEquals("Field size should be 8", 8, fieldRefs.size());
for (int j = 0; j < fieldRefs.size(); j++) {
Object fieldData = oi.getStructFieldData(row, fieldRefs.get(j));
Object standardWritableData = ObjectInspectorUtils.copyToStandardObject(fieldData,
@@ -301,7 +301,7 @@
StructObjectInspector oi = (StructObjectInspector) serDe
.getObjectInspector();
List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
- assertEquals(8, fieldRefs.size());
+ assertEquals("Field size should be 8", 8, fieldRefs.size());
for (int i = 0; i < fieldRefs.size(); i++) {
Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
Object standardWritableData = ObjectInspectorUtils.copyToStandardObject(fieldData,
@@ -309,7 +309,7 @@
assertEquals("Field " + i, standardWritableData, expectedFieldsData[i]);
}
// Serialize
- assertEquals(BytesRefArrayWritable.class, serDe.getSerializedClass());
+ assertEquals("Class of the serialized object should be BytesRefArrayWritable", BytesRefArrayWritable.class, serDe.getSerializedClass());
BytesRefArrayWritable serializedText = (BytesRefArrayWritable) serDe
.serialize(row, oi);
assertEquals("Serialized data", s, serializedText);
@@ -334,6 +334,7 @@
LongWritable rowID = new LongWritable();
BytesRefArrayWritable cols = new BytesRefArrayWritable();
+
while (reader.next(rowID)) {
reader.getCurrentRow(cols);
Object row = serDe.deserialize(cols);
@@ -341,16 +342,16 @@
StructObjectInspector oi = (StructObjectInspector) serDe
.getObjectInspector();
List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
- assertEquals(8, fieldRefs.size());
+ assertEquals("Field size should be 8", 8, fieldRefs.size());
- for (int i = 0; i < fieldRefs.size(); i++) {
+ for (int i : readCols) {
Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
Object standardWritableData = ObjectInspectorUtils.copyToStandardObject(fieldData,
fieldRefs.get(i).getFieldObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
assertEquals("Field " + i, standardWritableData, expectedPartitalFieldsData[i]);
}
- assertEquals(BytesRefArrayWritable.class, serDe.getSerializedClass());
+ assertEquals("Class of the serialized object should be BytesRefArrayWritable", BytesRefArrayWritable.class, serDe.getSerializedClass());
BytesRefArrayWritable serializedBytes = (BytesRefArrayWritable) serDe
.serialize(row, oi);
assertEquals("Serialized data", patialS, serializedBytes);
Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=812678&r1=812677&r2=812678&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Tue Sep 8 20:28:50 2009
@@ -29,6 +29,8 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.Text;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* ColumnarStruct is different from LazyStruct in that ColumnarStruct's field
@@ -43,7 +45,12 @@
* The fields of the struct.
*/
LazyObject[] fields;
-
+
+ private static final Log LOG = LogFactory.getLog(ColumnarStruct.class);
+
+ boolean initialized = false; // init() function is called?
+ int[] prjColIDs = null; // list of projected column IDs
+
/**
* Construct a ColumnarStruct object with the TypeInfo. It creates the first
* level object at the first place
@@ -115,27 +122,59 @@
return fields[fieldID].getObject();
}
-
+
+ /* ============================ [PERF] ===================================
+ * This function is called for every row. Setting up the selected/projected
+ * columns at the first call, and don't do that for the following calls.
+ * Ideally this should be done in the constructor where we don't need to
+ * branch in the function for each row.
+ * =========================================================================
+ */
public void init(BytesRefArrayWritable cols) {
- int fieldIndex = 0;
- int min = cols.size() < fields.length ? cols.size() : fields.length;
-
- for (; fieldIndex < min; fieldIndex++) {
- BytesRefWritable passedInField = cols.get(fieldIndex);
- cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
- if (passedInField.length > 0) {
- // if (fields[fieldIndex] == null)
- // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
- // .get(fieldIndex));
- fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], passedInField
- .getStart(), passedInField.getLength());
- fieldIsNull[fieldIndex] = false;
- } else {
+ if (initialized) { // short cut for non-first calls
+ for (int i = 0; i < prjColIDs.length; ++i ) {
+ int fieldIndex = prjColIDs[i];
+ BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+ cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+ fields[fieldIndex].init(cachedByteArrayRef[fieldIndex],
+ passedInField.getStart(),
+ passedInField.getLength());
+ }
+ } else { // first time call init()
+ int fieldIndex = 0;
+ int min = cols.size() < fields.length ? cols.size() : fields.length;
+
+ ArrayList<Integer> tmp_sel_cols = new ArrayList<Integer>();
+
+ for (; fieldIndex < min; fieldIndex++) {
+
+ // call the faster unCheckedGet()
+ // alsert: min <= cols.size()
+ BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+
+ if (passedInField.length > 0) {
+ // if (fields[fieldIndex] == null)
+ // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
+ // .get(fieldIndex));
+ tmp_sel_cols.add(fieldIndex);
+ cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+ fields[fieldIndex].init(cachedByteArrayRef[fieldIndex],
+ passedInField.getStart(),
+ passedInField.getLength());
+ fieldIsNull[fieldIndex] = false;
+ } else
+ fieldIsNull[fieldIndex] = true;
+ }
+ for (; fieldIndex < fields.length; fieldIndex++)
fieldIsNull[fieldIndex] = true;
+
+ // maintain a list of non-NULL column IDs
+ prjColIDs = new int[tmp_sel_cols.size()];
+ for (int i = 0; i < prjColIDs.length; ++i ) {
+ prjColIDs[i] = tmp_sel_cols.get(i).intValue();
}
+ initialized = true;
}
- for (; fieldIndex < fields.length; fieldIndex++)
- fieldIsNull[fieldIndex] = true;
}
ArrayList<Object> cachedList;