You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/08/19 21:07:34 UTC
svn commit: r805934 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
Author: zshao
Date: Wed Aug 19 19:07:31 2009
New Revision: 805934
URL: http://svn.apache.org/viewvc?rev=805934&view=rev
Log:
HIVE-756. Performance improvement for RCFile and ColumnarSerDe. (Ning Zhang via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Aug 19 19:07:31 2009
@@ -26,6 +26,9 @@
HIVE-759. Add "hive.intermediate.compression.codec/type" option.
(Yongqiang He via zshao)
+ HIVE-756. Performance improvement for RCFile and ColumnarSerDe.
+ (Ning Zhang via zshao)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Aug 19 19:07:31 2009
@@ -224,8 +224,14 @@
String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
String[] list = StringUtils.split(skips);
ArrayList<Integer> result = new ArrayList<Integer>(list.length);
- for (int i = 0; i < list.length; i++) {
- result.add(Integer.parseInt(list[i]));
+ if (list.length == 1 && list[0] == "" ) {
+ // this is the case when conf value is specified by with a empty string ""
+ // the is to select all columns by default
+ result.add(new Integer(-1));
+ } else {
+ for (int i = 0; i < list.length; i++) {
+ result.add(Integer.parseInt(list[i]));
+ }
}
return result;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Aug 19 19:07:31 2009
@@ -863,6 +863,8 @@
private Decompressor keyDecompressor;
NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
+
+ int[] prjColIDs = null; // selected column IDs
/** Create a new RCFile reader. */
public Reader(FileSystem fs, Path file, Configuration conf)
@@ -891,17 +893,29 @@
java.util.ArrayList<Integer> notSkipIDs = HiveFileFormatUtils.getReadColumnIDs(conf);
skippedColIDs = new boolean[columnNumber];
+
if (notSkipIDs.size() > 0) {
+
+ boolean to_skip; // default value for skipping a column
+
+ if (notSkipIDs.size() == 1 && notSkipIDs.get(0).intValue() == -1 ) {
+ // this is the case to get all columns
+ to_skip = false;
+ } else {
+ to_skip = true;
+ }
+
for (int i = 0; i < skippedColIDs.length; i++) {
- skippedColIDs[i] = true;
+ skippedColIDs[i] = to_skip;
}
for (int read : notSkipIDs) {
- if (read < columnNumber)
+ if (read < columnNumber)
skippedColIDs[read] = false;
}
} else {
+ // no column name is specified e.g, in select count(1) from tt; skip all columns
for (int i = 0; i < skippedColIDs.length; i++) {
- skippedColIDs[i] = false;
+ skippedColIDs[i] = true;
}
}
@@ -913,6 +927,14 @@
}
}
+ // get list of selected column IDs
+ prjColIDs = new int[loadColumnNum];
+ for ( int i = 0, j = 0; i < columnNumber; ++i ) {
+ if (!skippedColIDs[i]) {
+ prjColIDs[j++] = i;
+ }
+ }
+
colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber];
columnRowReadIndex = new int[columnNumber];
for (int i = 0; i < columnNumber; i++) {
@@ -1128,9 +1150,8 @@
readRowsIndexInBuffer = 0;
recordsNumInValBuffer = currentKey.numberRows;
- for (int i = 0; i < columnNumber; i++) {
- if (skippedColIDs[i])
- continue;
+ for (int j = 0; j < prjColIDs.length; j++) {
+ int i = prjColIDs[j];
colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
.getData(), currentKey.allCellValLenBuffer[i].getLength());
columnRowReadIndex[i] = 0;
@@ -1258,29 +1279,23 @@
if (!currentValue.inited) {
currentValueBuffer();
+ ret.resetValid(columnNumber); // do this only when not intialized
}
// we do not use BytesWritable here to avoid the byte-copy from
// DataOutputStream to BytesWritable
- ret.resetValid(columnNumber);
+ for (int j = 0; j < prjColIDs.length; ++j) {
+ int i = prjColIDs[j];
- for (int i = 0, readIndex = 0; i < columnNumber; i++) {
BytesRefWritable ref = ret.unCheckedGet(i);
- if (skippedColIDs[i]) {
- if (ref != BytesRefWritable.ZeroBytesRefWritable)
- ret.set(i, BytesRefWritable.ZeroBytesRefWritable);
- continue;
- }
-
int columnCurrentRowStart = (int) columnRowReadIndex[i];
int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
columnRowReadIndex[i] = columnCurrentRowStart + length;
- ref.set(currentValue.loadedColumnsValueBuffer[readIndex].getData(),
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
columnCurrentRowStart, length);
- readIndex++;
}
rowFetched = true;
}
Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=805934&r1=805933&r2=805934&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Wed Aug 19 19:07:31 2009
@@ -29,6 +29,8 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.Text;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* ColumnarStruct is different from LazyStruct in that ColumnarStruct's field
@@ -43,7 +45,12 @@
* The fields of the struct.
*/
LazyObject[] fields;
-
+
+ private static final Log LOG = LogFactory.getLog(ColumnarStruct.class);
+
+ boolean initialized = false; // init() function is called?
+ int[] prjColIDs = null; // list of projected column IDs
+
/**
* Construct a ColumnarStruct object with the TypeInfo. It creates the first
* level object at the first place
@@ -115,27 +122,59 @@
return fields[fieldID].getObject();
}
-
+
+ /* ============================ [PERF] ===================================
+ * This function is called for every row. Setting up the selected/projected
+ * columns at the first call, and don't do that for the following calls.
+ * Ideally this should be done in the constructor where we don't need to
+ * branch in the function for each row.
+ * =========================================================================
+ */
public void init(BytesRefArrayWritable cols) {
- int fieldIndex = 0;
- int min = cols.size() < fields.length ? cols.size() : fields.length;
-
- for (; fieldIndex < min; fieldIndex++) {
- BytesRefWritable passedInField = cols.get(fieldIndex);
- cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
- if (passedInField.length > 0) {
- // if (fields[fieldIndex] == null)
- // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
- // .get(fieldIndex));
- fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], passedInField
- .getStart(), passedInField.getLength());
- fieldIsNull[fieldIndex] = false;
- } else {
+ if (initialized) { // short cut for non-first calls
+ for (int i = 0; i < prjColIDs.length; ++i ) {
+ int fieldIndex = prjColIDs[i];
+ BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+ cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+ fields[fieldIndex].init(cachedByteArrayRef[fieldIndex],
+ passedInField.getStart(),
+ passedInField.getLength());
+ }
+ } else { // first time call init()
+ int fieldIndex = 0;
+ int min = cols.size() < fields.length ? cols.size() : fields.length;
+
+ ArrayList<Integer> tmp_sel_cols = new ArrayList<Integer>();
+
+ for (; fieldIndex < min; fieldIndex++) {
+
+ // call the faster unCheckedGet()
+ // alsert: min <= cols.size()
+ BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
+
+ if (passedInField.length > 0) {
+ // if (fields[fieldIndex] == null)
+ // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
+ // .get(fieldIndex));
+ tmp_sel_cols.add(fieldIndex);
+ cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
+ fields[fieldIndex].init(cachedByteArrayRef[fieldIndex],
+ passedInField.getStart(),
+ passedInField.getLength());
+ fieldIsNull[fieldIndex] = false;
+ } else
+ fieldIsNull[fieldIndex] = true;
+ }
+ for (; fieldIndex < fields.length; fieldIndex++)
fieldIsNull[fieldIndex] = true;
+
+ // maintain a list of non-NULL column IDs
+ prjColIDs = new int[tmp_sel_cols.size()];
+ for (int i = 0; i < prjColIDs.length; ++i ) {
+ prjColIDs[i] = tmp_sel_cols.get(i).intValue();
}
+ initialized = true;
}
- for (; fieldIndex < fields.length; fieldIndex++)
- fieldIsNull[fieldIndex] = true;
}
ArrayList<Object> cachedList;