You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC
svn commit: r1522098 [17/30] - in /hive/branches/vectorization: ./
beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/a...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java Thu Sep 12 01:21:10 2013
@@ -18,30 +18,46 @@
package org.apache.hadoop.hive.ql.exec.persistence;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.AbstractList;
import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
import java.util.List;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-public class MapJoinRowContainer<Row> extends AbstractRowContainer<Row> {
-
- private List<Row> list;
-
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinRowContainer extends AbstractRowContainer<List<Object>> {
+ private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+
+ private final List<List<Object>> list;
private int index;
+ private byte aliasFilter = (byte) 0xff;
public MapJoinRowContainer() {
index = 0;
- list = new ArrayList<Row>(1);
- }
+ list = new ArrayList<List<Object>>(1);
+ }
@Override
- public void add(Row t) throws HiveException {
+ public void add(List<Object> t) {
list.add(t);
}
+ public void add(Object[] t) {
+ add(toList(t));
+ }
@Override
- public Row first() throws HiveException {
+ public List<Object> first() {
index = 0;
if (index < list.size()) {
return list.get(index);
@@ -50,13 +66,12 @@ public class MapJoinRowContainer<Row> ex
}
@Override
- public Row next() throws HiveException {
+ public List<Object> next() {
index++;
if (index < list.size()) {
return list.get(index);
}
return null;
-
}
/**
@@ -73,28 +88,88 @@ public class MapJoinRowContainer<Row> ex
* Remove all elements in the RowContainer.
*/
@Override
- public void clear() throws HiveException {
+ public void clear() {
list.clear();
index = 0;
}
-
- public List<Row> getList() {
- return list;
+
+ public byte getAliasFilter() {
+ return aliasFilter;
+ }
+
+ public MapJoinRowContainer copy() {
+ MapJoinRowContainer result = new MapJoinRowContainer();
+ for(List<Object> item : list) {
+ result.add(item);
+ }
+ return result;
}
-
- public void setList(List<Row> list) {
- this.list = list;
+
+ @SuppressWarnings({"unchecked"})
+ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
+ throws IOException, SerDeException {
+ clear();
+ SerDe serde = context.getSerDe();
+ long numRows = in.readLong();
+ for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+ container.readFields(in);
+ List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
+ serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+ if(value == null) {
+ add(toList(EMPTY_OBJECT_ARRAY));
+ } else {
+ Object[] valuesArray = value.toArray();
+ if (context.hasFilterTag()) {
+ aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
+ }
+ add(toList(valuesArray));
+ }
+ }
}
+
+ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
+ throws IOException, SerDeException {
+ SerDe serde = context.getSerDe();
+ ObjectInspector valueObjectInspector = context.getStandardOI();
+ long numRows = size();
+ long numRowsWritten = 0L;
+ out.writeLong(numRows);
+ for (List<Object> row = first(); row != null; row = next()) {
+ serde.serialize(row.toArray(), valueObjectInspector).write(out);
+ ++numRowsWritten;
+ }
+ if(numRows != size()) {
+ throw new ConcurrentModificationException("Values was modifified while persisting");
+ }
+ if(numRowsWritten != numRows) {
+ throw new IllegalStateException("Expected to write " + numRows + " but wrote " + numRowsWritten);
+ }
+ }
+
+ private List<Object> toList(Object[] array) {
+ return new NoCopyingArrayList(array);
+ }
+ /**
+ * In this use case our objects will not be modified
+ * so we don't care about copying in and out.
+ */
+ private static class NoCopyingArrayList extends AbstractList<Object> {
+ private Object[] array;
+ public NoCopyingArrayList(Object[] array) {
+ this.array = array;
+ }
+ @Override
+ public Object get(int index) {
+ return array[index];
+ }
- public void reset(MapJoinRowContainer<Object[]> other) throws HiveException {
- list.clear();
- Object[] obj;
- for (obj = other.first(); obj != null; obj = other.next()) {
- ArrayList<Object> ele = new ArrayList(obj.length);
- for (int i = 0; i < obj.length; i++) {
- ele.add(obj[i]);
- }
- list.add((Row) ele);
+ @Override
+ public int size() {
+ return array.length;
}
+
+ public Object[] toArray() {
+ return array;
+ }
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Thu Sep 12 01:21:10 2013
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -71,18 +71,18 @@ import org.apache.hadoop.util.Reflection
* reading.
*
*/
-public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row> {
+public class RowContainer<ROW extends List<Object>> extends AbstractRowContainer<ROW> {
protected static Log LOG = LogFactory.getLog(RowContainer.class);
// max # of rows can be put into one block
private static final int BLOCKSIZE = 25000;
- private Row[] currentWriteBlock; // the last block that add() should append to
- private Row[] currentReadBlock; // the current block where the cursor is in
+ private ROW[] currentWriteBlock; // the last block that add() should append to
+ private ROW[] currentReadBlock; // the current block where the cursor is in
// since currentReadBlock may assigned to currentWriteBlock, we need to store
// original read block
- private Row[] firstReadBlockPointer;
+ private ROW[] firstReadBlockPointer;
private int blockSize; // number of objects in the block before it is spilled
// to disk
private int numFlushedBlocks; // total # of blocks
@@ -108,7 +108,7 @@ public class RowContainer<Row extends Li
RecordWriter rw = null;
InputFormat<WritableComparable, Writable> inputFormat = null;
InputSplit[] inputSplits = null;
- private Row dummyRow = null;
+ private ROW dummyRow = null;
private final Reporter reporter;
Writable val = null; // cached to use serialize data
@@ -130,7 +130,7 @@ public class RowContainer<Row extends Li
this.addCursor = 0;
this.numFlushedBlocks = 0;
this.tmpFile = null;
- this.currentWriteBlock = (Row[]) new ArrayList[blockSize];
+ this.currentWriteBlock = (ROW[]) new ArrayList[blockSize];
this.currentReadBlock = this.currentWriteBlock;
this.firstReadBlockPointer = currentReadBlock;
this.serde = null;
@@ -142,7 +142,7 @@ public class RowContainer<Row extends Li
this.reporter = reporter;
}
}
-
+
private JobConf getLocalFSJobConfClone(Configuration jc) {
if (this.jobCloneUsingLocalFs == null) {
this.jobCloneUsingLocalFs = new JobConf(jc);
@@ -158,13 +158,13 @@ public class RowContainer<Row extends Li
}
@Override
- public void add(Row t) throws HiveException {
+ public void add(ROW t) throws HiveException {
if (this.tblDesc != null) {
- if (addCursor >= blockSize) { // spill the current block to tmp file
+ if (willSpill()) { // spill the current block to tmp file
spillBlock(currentWriteBlock, addCursor);
addCursor = 0;
if (numFlushedBlocks == 1) {
- currentWriteBlock = (Row[]) new ArrayList[blockSize];
+ currentWriteBlock = (ROW[]) new ArrayList[blockSize];
}
}
currentWriteBlock[addCursor++] = t;
@@ -178,7 +178,7 @@ public class RowContainer<Row extends Li
}
@Override
- public Row first() throws HiveException {
+ public ROW first() throws HiveException {
if (size == 0) {
return null;
}
@@ -221,10 +221,10 @@ public class RowContainer<Row extends Li
localJc, reporter);
currentSplitPointer++;
- nextBlock();
+ nextBlock(0);
}
// we are guaranteed that we can get data here (since 'size' is not zero)
- Row ret = currentReadBlock[itrCursor++];
+ ROW ret = currentReadBlock[itrCursor++];
removeKeys(ret);
return ret;
} catch (Exception e) {
@@ -234,7 +234,7 @@ public class RowContainer<Row extends Li
}
@Override
- public Row next() throws HiveException {
+ public ROW next() throws HiveException {
if (!firstCalled) {
throw new RuntimeException("Call first() then call next().");
@@ -252,19 +252,16 @@ public class RowContainer<Row extends Li
return null;
}
- Row ret;
+ ROW ret;
if (itrCursor < this.readBlockSize) {
ret = this.currentReadBlock[itrCursor++];
removeKeys(ret);
return ret;
} else {
- nextBlock();
+ nextBlock(0);
if (this.readBlockSize == 0) {
if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) {
- this.itrCursor = 0;
- this.readBlockSize = this.addCursor;
- this.firstReadBlockPointer = this.currentReadBlock;
- currentReadBlock = currentWriteBlock;
+ setWriteBlockAsReadBlock();
} else {
return null;
}
@@ -273,7 +270,7 @@ public class RowContainer<Row extends Li
}
}
- private void removeKeys(Row ret) {
+ private void removeKeys(ROW ret) {
if (this.keyObject != null && this.currentReadBlock != this.currentWriteBlock) {
int len = this.keyObject.size();
int rowSize = ((ArrayList) ret).size();
@@ -285,39 +282,10 @@ public class RowContainer<Row extends Li
private final ArrayList<Object> row = new ArrayList<Object>(2);
- private void spillBlock(Row[] block, int length) throws HiveException {
+ private void spillBlock(ROW[] block, int length) throws HiveException {
try {
if (tmpFile == null) {
-
- String suffix = ".tmp";
- if (this.keyObject != null) {
- suffix = "." + this.keyObject.toString() + suffix;
- }
-
- while (true) {
- parentFile = File.createTempFile("hive-rowcontainer", "");
- boolean success = parentFile.delete() && parentFile.mkdir();
- if (success) {
- break;
- }
- LOG.debug("retry creating tmp row-container directory...");
- }
-
- tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
- LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
- // Delete the temp file if the JVM terminate normally through Hadoop job
- // kill command.
- // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
- parentFile.deleteOnExit();
- tmpFile.deleteOnExit();
-
- // rFile = new RandomAccessFile(tmpFile, "rw");
- HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
- tempOutPath = new Path(tmpFile.toString());
- JobConf localJc = getLocalFSJobConfClone(jc);
- rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
- hiveOutputFormat, serde.getSerializedClass(), false,
- tblDesc.getProperties(), tempOutPath, reporter);
+ setupWriter();
} else if (rw == null) {
throw new HiveException("RowContainer has already been closed for writing.");
}
@@ -329,14 +297,14 @@ public class RowContainer<Row extends Li
if (this.keyObject != null) {
row.set(1, this.keyObject);
for (int i = 0; i < length; ++i) {
- Row currentValRow = block[i];
+ ROW currentValRow = block[i];
row.set(0, currentValRow);
Writable outVal = serde.serialize(row, standardOI);
rw.write(outVal);
}
} else {
for (int i = 0; i < length; ++i) {
- Row currentValRow = block[i];
+ ROW currentValRow = block[i];
Writable outVal = serde.serialize(currentValRow, standardOI);
rw.write(outVal);
}
@@ -350,6 +318,9 @@ public class RowContainer<Row extends Li
} catch (Exception e) {
clear();
LOG.error(e.toString(), e);
+ if ( e instanceof HiveException ) {
+ throw (HiveException) e;
+ }
throw new HiveException(e);
}
}
@@ -364,7 +335,7 @@ public class RowContainer<Row extends Li
return size;
}
- private boolean nextBlock() throws HiveException {
+ protected boolean nextBlock(int readIntoOffset) throws HiveException {
itrCursor = 0;
this.readBlockSize = 0;
if (this.numFlushedBlocks == 0) {
@@ -376,13 +347,13 @@ public class RowContainer<Row extends Li
val = serde.getSerializedClass().newInstance();
}
boolean nextSplit = true;
- int i = 0;
+ int i = readIntoOffset;
if (rr != null) {
Object key = rr.createKey();
while (i < this.currentReadBlock.length && rr.next(key, val)) {
nextSplit = false;
- this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
+ this.currentReadBlock[i++] = (ROW) ObjectInspectorUtils.copyToStandardObject(serde
.deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
}
}
@@ -393,7 +364,7 @@ public class RowContainer<Row extends Li
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
reporter);
currentSplitPointer++;
- return nextBlock();
+ return nextBlock(0);
}
this.readBlockSize = i;
@@ -504,4 +475,119 @@ public class RowContainer<Row extends Li
this.tblDesc = tblDesc;
}
+ protected int getAddCursor() {
+ return addCursor;
+ }
+
+ protected final boolean willSpill() {
+ return addCursor >= blockSize;
+ }
+
+ protected int getBlockSize() {
+ return blockSize;
+ }
+
+ protected void setupWriter() throws HiveException {
+ try {
+
+ if ( tmpFile != null ) {
+ return;
+ }
+
+ String suffix = ".tmp";
+ if (this.keyObject != null) {
+ suffix = "." + this.keyObject.toString() + suffix;
+ }
+
+ while (true) {
+ parentFile = File.createTempFile("hive-rowcontainer", "");
+ boolean success = parentFile.delete() && parentFile.mkdir();
+ if (success) {
+ break;
+ }
+ LOG.debug("retry creating tmp row-container directory...");
+ }
+
+ tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
+ LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
+ // Delete the temp file if the JVM terminate normally through Hadoop job
+ // kill command.
+ // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
+ parentFile.deleteOnExit();
+ tmpFile.deleteOnExit();
+
+ // rFile = new RandomAccessFile(tmpFile, "rw");
+ HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
+ tempOutPath = new Path(tmpFile.toString());
+ JobConf localJc = getLocalFSJobConfClone(jc);
+ rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
+ hiveOutputFormat, serde.getSerializedClass(), false,
+ tblDesc.getProperties(), tempOutPath, reporter);
+ } catch (Exception e) {
+ clear();
+ LOG.error(e.toString(), e);
+ throw new HiveException(e);
+ }
+
+ }
+
+ protected RecordWriter getRecordWriter() {
+ return rw;
+ }
+
+ protected InputSplit[] getInputSplits() {
+ return inputSplits;
+ }
+
+ protected boolean endOfCurrentReadBlock() {
+ if (tblDesc == null) {
+ return false;
+ }
+ return itrCursor >= this.readBlockSize;
+ }
+
+ protected int getCurrentReadBlockSize() {
+ return readBlockSize;
+ }
+
+ protected void setWriteBlockAsReadBlock() {
+ this.itrCursor = 0;
+ this.readBlockSize = this.addCursor;
+ this.firstReadBlockPointer = this.currentReadBlock;
+ currentReadBlock = currentWriteBlock;
+ }
+
+ protected org.apache.hadoop.mapred.RecordReader setReaderAtSplit(int splitNum)
+ throws IOException {
+ JobConf localJc = getLocalFSJobConfClone(jc);
+ currentSplitPointer = splitNum;
+ if ( rr != null ) {
+ rr.close();
+ }
+ // open record reader to read next split
+ rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
+ reporter);
+ currentSplitPointer++;
+ return rr;
+ }
+
+ protected ROW getReadBlockRow(int rowOffset) {
+ itrCursor = rowOffset + 1;
+ return currentReadBlock[rowOffset];
+ }
+
+ protected void resetCurrentReadBlockToFirstReadBlock() {
+ currentReadBlock = firstReadBlockPointer;
+ }
+
+ protected void resetReadBlocks() {
+ this.currentReadBlock = this.currentWriteBlock;
+ this.firstReadBlockPointer = currentReadBlock;
+ }
+
+ protected void close() throws HiveException {
+ clear();
+ currentReadBlock = firstReadBlockPointer = currentWriteBlock = null;
+ }
+
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java Thu Sep 12 01:21:10 2013
@@ -40,11 +40,11 @@ public class FilterStringColLikeStringSc
private Pattern compiledPattern;
private PatternType type = PatternType.NONE;
private String simpleStringPattern;
+ private final Text simplePattern = new Text();
- private transient Text simplePattern = new Text();
private transient ByteBuffer byteBuffer;
private transient CharBuffer charBuffer;
- private transient CharsetDecoder decoder;
+ private transient final CharsetDecoder decoder;
// Doing characters comparison directly instead of regular expression
// matching for simple patterns like "%abc%".
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java Thu Sep 12 01:21:10 2013
@@ -70,9 +70,6 @@ public class HiveHistoryUtil {
private static final Pattern pattern = Pattern.compile(KEY + "=" + "\""
+ VALUE + "\"");
- // temp buffer for parsed dataa
- private static Map<String, String> parseBuffer = new HashMap<String, String>();
-
/**
* Parse a single line of history.
*
@@ -81,6 +78,8 @@ public class HiveHistoryUtil {
* @throws IOException
*/
private static void parseLine(String line, Listener l) throws IOException {
+ Map<String, String> parseBuffer = new HashMap<String, String>();
+
// extract the record type
int idx = line.indexOf(' ');
String recType = line.substring(0, idx);
@@ -96,8 +95,5 @@ public class HiveHistoryUtil {
}
l.handle(RecordTypes.valueOf(recType), parseBuffer);
-
- parseBuffer.clear();
}
-
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Thu Sep 12 01:21:10 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.parse.SplitSample;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -262,6 +263,8 @@ public class CombineHiveInputFormat<K ex
*/
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(LOG, PerfLogger.GET_SPLITS);
init(job);
Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
Map<String, Operator<? extends OperatorDesc>> aliasToWork =
@@ -269,8 +272,11 @@ public class CombineHiveInputFormat<K ex
CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
.getCombineFileInputFormat();
+ InputSplit[] splits = null;
if (combine == null) {
- return super.getSplits(job, numSplits);
+ splits = super.getSplits(job, numSplits);
+ perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+ return splits;
}
if (combine.getInputPathsShim(job).length == 0) {
@@ -320,9 +326,10 @@ public class CombineHiveInputFormat<K ex
// If path is a directory
if (fStats.isDir()) {
dirs.offer(path);
- }
- else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
- return super.getSplits(job, numSplits);
+ } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
+ splits = super.getSplits(job, numSplits);
+ perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+ return splits;
}
while (dirs.peek() != null) {
@@ -331,9 +338,11 @@ public class CombineHiveInputFormat<K ex
for (int idx = 0; idx < fStatus.length; idx++) {
if (fStatus[idx].isDir()) {
dirs.offer(fStatus[idx].getPath());
- }
- else if ((new CompressionCodecFactory(job)).getCodec(fStatus[idx].getPath()) != null) {
- return super.getSplits(job, numSplits);
+ } else if ((new CompressionCodecFactory(job)).getCodec(
+ fStatus[idx].getPath()) != null) {
+ splits = super.getSplits(job, numSplits);
+ perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+ return splits;
}
}
}
@@ -341,7 +350,9 @@ public class CombineHiveInputFormat<K ex
}
if (inputFormat instanceof SymlinkTextInputFormat) {
- return super.getSplits(job, numSplits);
+ splits = super.getSplits(job, numSplits);
+ perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+ return splits;
}
Path filterPath = path;
@@ -411,10 +422,11 @@ public class CombineHiveInputFormat<K ex
}
LOG.info("number of splits " + result.size());
+ perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
return result.toArray(new CombineHiveInputSplit[result.size()]);
}
- private void processPaths(JobConf job, CombineFileInputFormatShim combine,
+ private void processPaths(JobConf job, CombineFileInputFormatShim combine,
List<InputSplitShim> iss, Path... path) throws IOException {
JobConf currJob = new JobConf(job);
FileInputFormat.setInputPaths(currJob, path);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Sep 12 01:21:10 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.io.HiveIOE
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -257,7 +258,8 @@ public class HiveInputFormat<K extends W
}
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(LOG, PerfLogger.GET_SPLITS);
init(job);
Path[] dirs = FileInputFormat.getInputPaths(job);
@@ -296,7 +298,7 @@ public class HiveInputFormat<K extends W
}
LOG.info("number of splits " + result.size());
-
+ perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
return result.toArray(new HiveInputSplit[result.size()]);
}
@@ -428,6 +430,8 @@ public class HiveInputFormat<K extends W
} else {
ColumnProjectionUtils.setFullyReadColumns(jobConf);
}
+ ColumnProjectionUtils.appendReadColumnNames(jobConf,
+ tableScan.getNeededColumns());
pushFilters(jobConf, tableScan);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Thu Sep 12 01:21:10 2013
@@ -35,6 +35,12 @@ public class HiveKey extends BytesWritab
hashCodeValid = false;
}
+ public HiveKey(byte[] bytes, int hashcode) {
+ super(bytes);
+ myHashCode = hashcode;
+ hashCodeValid = true;
+ }
+
protected int myHashCode;
public void setHashCode(int myHashCode) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Thu Sep 12 01:21:10 2013
@@ -1601,7 +1601,7 @@ public class RCFile {
}
}
/* move the last 16 bytes to the prefix area */
- System.arraycopy(buffer, buffer.length - prefix - 1, buffer, 0, prefix);
+ System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
n = (int)Math.min(n, end - in.getPos());
}
} catch (ChecksumException e) { // checksum failure
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Thu Sep 12 01:21:10 2013
@@ -41,7 +41,7 @@ class BitFieldReader {
current = 0xff & input.next();
bitsLeft = 8;
} else {
- throw new EOFException("Read past end of bit field from " + input);
+ throw new EOFException("Read past end of bit field from " + this);
}
}
@@ -111,4 +111,10 @@ class BitFieldReader {
bitsLeft = (int) (8 - (totalBits % 8));
}
}
+
+ @Override
+ public String toString() {
+ return "bit reader current: " + current + " bits left: " + bitsLeft +
+ " bit size: " + bitSize + " from " + input;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Sep 12 01:21:10 2013
@@ -25,97 +25,149 @@ abstract class InStream extends InputStr
private static class UncompressedStream extends InStream {
private final String name;
- private byte[] array;
- private int offset;
- private final int base;
- private final int limit;
+ private final ByteBuffer[] bytes;
+ private final long[] offsets;
+ private final long length;
+ private long currentOffset;
+ private byte[] range;
+ private int currentRange;
+ private int offsetInRange;
+ private int limitInRange;
- public UncompressedStream(String name, ByteBuffer input) {
+ public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
+ long length) {
this.name = name;
- this.array = input.array();
- base = input.arrayOffset() + input.position();
- offset = base;
- limit = input.arrayOffset() + input.limit();
+ this.bytes = input;
+ this.offsets = offsets;
+ this.length = length;
+ currentRange = 0;
+ offsetInRange = 0;
+ limitInRange = 0;
+ currentOffset = 0;
}
@Override
public int read() {
- if (offset == limit) {
- return -1;
+ if (offsetInRange >= limitInRange) {
+ if (currentOffset == length) {
+ return -1;
+ }
+ seek(currentOffset);
}
- return 0xff & array[offset++];
+ currentOffset += 1;
+ return 0xff & range[offsetInRange++];
}
@Override
public int read(byte[] data, int offset, int length) {
- if (this.offset == limit) {
- return -1;
+ if (offsetInRange >= limitInRange) {
+ if (currentOffset == this.length) {
+ return -1;
+ }
+ seek(currentOffset);
}
- int actualLength = Math.min(length, limit - this.offset);
- System.arraycopy(array, this.offset, data, offset, actualLength);
- this.offset += actualLength;
+ int actualLength = Math.min(length, limitInRange - offsetInRange);
+ System.arraycopy(range, offsetInRange, data, offset, actualLength);
+ offsetInRange += actualLength;
+ currentOffset += actualLength;
return actualLength;
}
@Override
public int available() {
- return limit - offset;
+ if (offsetInRange < limitInRange) {
+ return limitInRange - offsetInRange;
+ }
+ return (int) (length - currentOffset);
}
@Override
public void close() {
- array = null;
- offset = 0;
+ currentRange = bytes.length;
+ currentOffset = length;
}
@Override
public void seek(PositionProvider index) throws IOException {
- offset = base + (int) index.getNext();
+ seek(index.getNext());
+ }
+
+ public void seek(long desired) {
+ for(int i = 0; i < bytes.length; ++i) {
+ if (offsets[i] <= desired &&
+ desired - offsets[i] < bytes[i].remaining()) {
+ currentOffset = desired;
+ currentRange = i;
+ this.range = bytes[i].array();
+ offsetInRange = bytes[i].arrayOffset() + bytes[i].position();
+ limitInRange = bytes[i].arrayOffset() + bytes[i].limit();
+ offsetInRange += desired - offsets[i];
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Seek in " + name + " to " +
+ desired + " is outside of the data");
}
@Override
public String toString() {
- return "uncompressed stream " + name + " base: " + base +
- " offset: " + offset + " limit: " + limit;
+ return "uncompressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + offsetInRange + " limit: " + limitInRange;
}
}
private static class CompressedStream extends InStream {
private final String name;
- private byte[] array;
+ private final ByteBuffer[] bytes;
+ private final long[] offsets;
private final int bufferSize;
+ private final long length;
private ByteBuffer uncompressed = null;
private final CompressionCodec codec;
- private int offset;
- private final int base;
- private final int limit;
+ private byte[] compressed = null;
+ private long currentOffset;
+ private int currentRange;
+ private int offsetInCompressed;
+ private int limitInCompressed;
private boolean isUncompressedOriginal;
- public CompressedStream(String name, ByteBuffer input,
+ public CompressedStream(String name, ByteBuffer[] input,
+ long[] offsets, long length,
CompressionCodec codec, int bufferSize
) {
- this.array = input.array();
+ this.bytes = input;
this.name = name;
this.codec = codec;
+ this.length = length;
+ this.offsets = offsets;
this.bufferSize = bufferSize;
- base = input.arrayOffset() + input.position();
- offset = base;
- limit = input.arrayOffset() + input.limit();
+ currentOffset = 0;
+ currentRange = 0;
+ offsetInCompressed = 0;
+ limitInCompressed = 0;
}
private void readHeader() throws IOException {
- if (limit - offset > OutStream.HEADER_SIZE) {
- int chunkLength = ((0xff & array[offset + 2]) << 15) |
- ((0xff & array[offset + 1]) << 7) | ((0xff & array[offset]) >> 1);
+ if (compressed == null || offsetInCompressed >= limitInCompressed) {
+ seek(currentOffset);
+ }
+ if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) {
+ int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) |
+ ((0xff & compressed[offsetInCompressed + 1]) << 7) |
+ ((0xff & compressed[offsetInCompressed]) >> 1);
if (chunkLength > bufferSize) {
throw new IllegalArgumentException("Buffer size too small. size = " +
bufferSize + " needed = " + chunkLength);
}
- boolean isOriginal = (array[offset] & 0x01) == 1;
- offset += OutStream.HEADER_SIZE;
+ boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1;
+ offsetInCompressed += OutStream.HEADER_SIZE;
if (isOriginal) {
isUncompressedOriginal = true;
- uncompressed = ByteBuffer.wrap(array, offset, chunkLength);
+ uncompressed = bytes[currentRange].duplicate();
+ uncompressed.position(offsetInCompressed -
+ bytes[currentRange].arrayOffset());
+ uncompressed.limit(offsetInCompressed + chunkLength);
} else {
if (isUncompressedOriginal) {
uncompressed = ByteBuffer.allocate(bufferSize);
@@ -125,19 +177,21 @@ abstract class InStream extends InputStr
} else {
uncompressed.clear();
}
- codec.decompress(ByteBuffer.wrap(array, offset, chunkLength),
+ codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed,
+ chunkLength),
uncompressed);
}
- offset += chunkLength;
+ offsetInCompressed += chunkLength;
+ currentOffset += chunkLength + OutStream.HEADER_SIZE;
} else {
- throw new IllegalStateException("Can't read header");
+ throw new IllegalStateException("Can't read header at " + this);
}
}
@Override
public int read() throws IOException {
if (uncompressed == null || uncompressed.remaining() == 0) {
- if (offset == limit) {
+ if (currentOffset == length) {
return -1;
}
readHeader();
@@ -148,7 +202,7 @@ abstract class InStream extends InputStr
@Override
public int read(byte[] data, int offset, int length) throws IOException {
if (uncompressed == null || uncompressed.remaining() == 0) {
- if (this.offset == this.limit) {
+ if (currentOffset == this.length) {
return -1;
}
readHeader();
@@ -164,7 +218,7 @@ abstract class InStream extends InputStr
@Override
public int available() throws IOException {
if (uncompressed == null || uncompressed.remaining() == 0) {
- if (offset == limit) {
+ if (currentOffset == length) {
return 0;
}
readHeader();
@@ -174,27 +228,74 @@ abstract class InStream extends InputStr
@Override
public void close() {
- array = null;
uncompressed = null;
- offset = 0;
+ currentRange = bytes.length;
+ offsetInCompressed = 0;
+ limitInCompressed = 0;
+ currentOffset = length;
}
@Override
public void seek(PositionProvider index) throws IOException {
- offset = base + (int) index.getNext();
- int uncompBytes = (int) index.getNext();
- if (uncompBytes != 0) {
+ seek(index.getNext());
+ long uncompressedBytes = index.getNext();
+ if (uncompressedBytes != 0) {
readHeader();
- uncompressed.position(uncompressed.position() + uncompBytes);
+ uncompressed.position(uncompressed.position() +
+ (int) uncompressedBytes);
} else if (uncompressed != null) {
+ // mark the uncompressed buffer as done
uncompressed.position(uncompressed.limit());
}
}
+ private void seek(long desired) throws IOException {
+ for(int i = 0; i < bytes.length; ++i) {
+ if (offsets[i] <= desired &&
+ desired - offsets[i] < bytes[i].remaining()) {
+ currentRange = i;
+ compressed = bytes[i].array();
+ offsetInCompressed = (int) (bytes[i].arrayOffset() +
+ bytes[i].position() + (desired - offsets[i]));
+ currentOffset = desired;
+ limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit();
+ return;
+ }
+ }
+ // if they are seeking to the precise end, go ahead and let them go there
+ int segments = bytes.length;
+ if (segments != 0 &&
+ desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
+ currentRange = segments - 1;
+ compressed = bytes[currentRange].array();
+ offsetInCompressed = bytes[currentRange].arrayOffset() +
+ bytes[currentRange].limit();
+ currentOffset = desired;
+ limitInCompressed = offsetInCompressed;
+ return;
+ }
+ throw new IOException("Seek outside of data in " + this + " to " +
+ desired);
+ }
+
+ private String rangeString() {
+ StringBuilder builder = new StringBuilder();
+ for(int i=0; i < offsets.length; ++i) {
+ if (i != 0) {
+ builder.append("; ");
+ }
+ builder.append(" range " + i + " = " + offsets[i] + " to " +
+ bytes[i].remaining());
+ }
+ return builder.toString();
+ }
+
@Override
public String toString() {
- return "compressed stream " + name + " base: " + base +
- " offset: " + offset + " limit: " + limit +
+ return "compressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + offsetInCompressed + " limit: " + limitInCompressed +
+ rangeString() +
(uncompressed == null ? "" :
" uncompressed: " + uncompressed.position() + " to " +
uncompressed.limit());
@@ -203,14 +304,29 @@ abstract class InStream extends InputStr
public abstract void seek(PositionProvider index) throws IOException;
+ /**
+ * Create an input stream from a list of buffers.
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream
+ * @param offsets a list of offsets (the same length as input) that must
+ * contain the first offset of the each set of bytes in input
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return an input stream
+ * @throws IOException
+ */
public static InStream create(String name,
- ByteBuffer input,
+ ByteBuffer[] input,
+ long[] offsets,
+ long length,
CompressionCodec codec,
int bufferSize) throws IOException {
if (codec == null) {
- return new UncompressedStream(name, input);
+ return new UncompressedStream(name, input, offsets, length);
} else {
- return new CompressedStream(name, input, codec, bufferSize);
+ return new CompressedStream(name, input, offsets, length, codec,
+ bufferSize);
}
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Thu Sep 12 01:21:10 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io.orc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.io.IOException;
@@ -47,19 +48,67 @@ public final class OrcFile {
* prevent the new reader from reading ORC files generated by any released
* version of Hive.
*/
- public static final int MAJOR_VERSION = 0;
- public static final int MINOR_VERSION = 11;
+ public static enum Version {
+ V_0_11("0.11", 0, 11),
+ V_0_12("0.12", 0, 12);
+
+ public static final Version CURRENT = V_0_12;
+
+ private final String name;
+ private final int major;
+ private final int minor;
+
+ private Version(String name, int major, int minor) {
+ this.name = name;
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public static Version byName(String name) {
+ for(Version version: values()) {
+ if (version.name.equals(name)) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ORC version " + name);
+ }
+
+ /**
+ * Get the human readable name for the version.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the major version number.
+ */
+ public int getMajor() {
+ return major;
+ }
+
+ /**
+ * Get the minor version number.
+ */
+ public int getMinor() {
+ return minor;
+ }
+ }
// the table properties that control ORC files
public static final String COMPRESSION = "orc.compress";
- static final String DEFAULT_COMPRESSION = "ZLIB";
public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size";
- static final String DEFAULT_COMPRESSION_BLOCK_SIZE = "262144";
public static final String STRIPE_SIZE = "orc.stripe.size";
- static final String DEFAULT_STRIPE_SIZE = "268435456";
public static final String ROW_INDEX_STRIDE = "orc.row.index.stride";
- static final String DEFAULT_ROW_INDEX_STRIDE = "10000";
public static final String ENABLE_INDEXES = "orc.create.index";
+ public static final String BLOCK_PADDING = "orc.block.padding";
+
+ static final long DEFAULT_STRIPE_SIZE = 256 * 1024 * 1024;
+ static final CompressionKind DEFAULT_COMPRESSION_KIND =
+ CompressionKind.ZLIB;
+ static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
+ static final int DEFAULT_ROW_INDEX_STRIDE = 10000;
+ static final boolean DEFAULT_BLOCK_PADDING = true;
// unused
private OrcFile() {}
@@ -77,7 +126,145 @@ public final class OrcFile {
}
/**
- * Create an ORC file streamFactory.
+ * Options for creating ORC file writers.
+ */
+ public static class WriterOptions {
+ private final Configuration configuration;
+ private FileSystem fileSystemValue = null;
+ private ObjectInspector inspectorValue = null;
+ private long stripeSizeValue = DEFAULT_STRIPE_SIZE;
+ private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
+ private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
+ private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
+ private CompressionKind compressValue = DEFAULT_COMPRESSION_KIND;
+ private MemoryManager memoryManagerValue;
+ private Version versionValue;
+
+ WriterOptions(Configuration conf) {
+ configuration = conf;
+ memoryManagerValue = getMemoryManager(conf);
+ String versionName =
+ conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
+ if (versionName == null) {
+ versionValue = Version.CURRENT;
+ } else {
+ versionValue = Version.byName(versionName);
+ }
+ }
+
+ /**
+ * Provide the filesystem for the path, if the client has it available.
+ * If it is not provided, it will be found from the path.
+ */
+ public WriterOptions fileSystem(FileSystem value) {
+ fileSystemValue = value;
+ return this;
+ }
+
+ /**
+ * Set the stripe size for the file. The writer stores the contents of the
+ * stripe in memory until this memory limit is reached and the stripe
+ * is flushed to the HDFS file and the next stripe started.
+ */
+ public WriterOptions stripeSize(long value) {
+ stripeSizeValue = value;
+ return this;
+ }
+
+ /**
+ * Set the distance between entries in the row index. The minimum value is
+ * 1000 to prevent the index from overwhelming the data. If the stride is
+ * set to 0, no indexes will be included in the file.
+ */
+ public WriterOptions rowIndexStride(int value) {
+ rowIndexStrideValue = value;
+ return this;
+ }
+
+ /**
+ * The size of the memory buffers used for compressing and storing the
+ * stripe in memory.
+ */
+ public WriterOptions bufferSize(int value) {
+ bufferSizeValue = value;
+ return this;
+ }
+
+ /**
+ * Sets whether the HDFS blocks are padded to prevent stripes from
+ * straddling blocks. Padding improves locality and thus the speed of
+ * reading, but costs space.
+ */
+ public WriterOptions blockPadding(boolean value) {
+ blockPaddingValue = value;
+ return this;
+ }
+
+ /**
+ * Sets the generic compression that is used to compress the data.
+ */
+ public WriterOptions compress(CompressionKind value) {
+ compressValue = value;
+ return this;
+ }
+
+ /**
+ * A required option that sets the object inspector for the rows. Used
+ * to determine the schema for the file.
+ */
+ public WriterOptions inspector(ObjectInspector value) {
+ inspectorValue = value;
+ return this;
+ }
+
+ /**
+ * Sets the version of the file that will be written.
+ */
+ public WriterOptions version(Version value) {
+ versionValue = value;
+ return this;
+ }
+
+ /**
+ * A package local option to set the memory manager.
+ */
+ WriterOptions memory(MemoryManager value) {
+ memoryManagerValue = value;
+ return this;
+ }
+ }
+
+ /**
+ * Create a default set of write options that can be modified.
+ */
+ public static WriterOptions writerOptions(Configuration conf) {
+ return new WriterOptions(conf);
+ }
+
+ /**
+ * Create an ORC file writer. This is the public interface for creating
+ * writers going forward and new options will only be added to this method.
+ * @param path filename to write to
+ * @param options the options
+ * @return a new ORC file writer
+ * @throws IOException
+ */
+ public static Writer createWriter(Path path,
+ WriterOptions opts
+ ) throws IOException {
+ FileSystem fs = opts.fileSystemValue == null ?
+ path.getFileSystem(opts.configuration) : opts.fileSystemValue;
+
+ return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
+ opts.stripeSizeValue, opts.compressValue,
+ opts.bufferSizeValue, opts.rowIndexStrideValue,
+ opts.memoryManagerValue, opts.blockPaddingValue,
+ opts.versionValue);
+ }
+
+ /**
+ * Create an ORC file writer. This method is provided for API backward
+ * compatability with Hive 0.11.
* @param fs file system
* @param path filename to write to
* @param inspector the ObjectInspector that inspects the rows
@@ -86,7 +273,7 @@ public final class OrcFile {
* @param bufferSize the number of bytes to compress at once
* @param rowIndexStride the number of rows between row index entries or
* 0 to suppress all indexes
- * @return a new ORC file streamFactory
+ * @return a new ORC file writer
* @throws IOException
*/
public static Writer createWriter(FileSystem fs,
@@ -97,8 +284,14 @@ public final class OrcFile {
CompressionKind compress,
int bufferSize,
int rowIndexStride) throws IOException {
- return new WriterImpl(fs, path, conf, inspector, stripeSize, compress,
- bufferSize, rowIndexStride, getMemoryManager(conf));
+ return createWriter(path,
+ writerOptions(conf)
+ .fileSystem(fs)
+ .inspector(inspector)
+ .stripeSize(stripeSize)
+ .compress(compress)
+ .bufferSize(bufferSize)
+ .rowIndexStride(rowIndexStride));
}
private static MemoryManager memoryManager = null;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Sep 12 01:21:10 2013
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -31,7 +34,12 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -48,6 +56,8 @@ public class OrcInputFormat extends Fil
VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
+ private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+
private static class OrcRecordReader
implements RecordReader<NullWritable, OrcStruct> {
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
@@ -59,14 +69,38 @@ public class OrcInputFormat extends Fil
OrcRecordReader(Reader file, Configuration conf,
long offset, long length) throws IOException {
- this.reader = file.rows(offset, length,
- findIncludedColumns(file.getTypes(), conf));
+ String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ String columnNamesString =
+ conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ String[] columnNames = null;
+ SearchArgument sarg = null;
List<OrcProto.Type> types = file.getTypes();
if (types.size() == 0) {
numColumns = 0;
} else {
numColumns = types.get(0).getSubtypesCount();
}
+ columnNames = new String[types.size()];
+ LOG.info("included column ids = " +
+ conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "null"));
+ LOG.info("included columns names = " +
+ conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "null"));
+ boolean[] includeColumn = findIncludedColumns(types, conf);
+ if (serializedPushdown != null && columnNamesString != null) {
+ sarg = SearchArgument.FACTORY.create
+ (Utilities.deserializeExpression(serializedPushdown, conf));
+ LOG.info("ORC pushdown predicate: " + sarg);
+ String[] neededColumnNames = columnNamesString.split(",");
+ int i = 0;
+ for(int columnId: types.get(0).getSubtypesList()) {
+ if (includeColumn[columnId]) {
+ columnNames[columnId] = neededColumnNames[i++];
+ }
+ }
+ } else {
+ LOG.info("No ORC pushdown predicate");
+ }
+ this.reader = file.rows(offset, length,includeColumn, sarg, columnNames);
this.offset = offset;
this.length = length;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Thu Sep 12 01:21:10 2013
@@ -47,32 +47,20 @@ public class OrcOutputFormat extends Fil
implements RecordWriter<NullWritable, OrcSerdeRow>,
FileSinkOperator.RecordWriter {
private Writer writer = null;
- private final FileSystem fs;
private final Path path;
- private final Configuration conf;
- private final long stripeSize;
- private final int compressionSize;
- private final CompressionKind compress;
- private final int rowIndexStride;
-
- OrcRecordWriter(FileSystem fs, Path path, Configuration conf,
- String stripeSize, String compress,
- String compressionSize, String rowIndexStride) {
- this.fs = fs;
+ private final OrcFile.WriterOptions options;
+
+ OrcRecordWriter(Path path, OrcFile.WriterOptions options) {
this.path = path;
- this.conf = conf;
- this.stripeSize = Long.valueOf(stripeSize);
- this.compress = CompressionKind.valueOf(compress);
- this.compressionSize = Integer.valueOf(compressionSize);
- this.rowIndexStride = Integer.valueOf(rowIndexStride);
+ this.options = options;
}
@Override
public void write(NullWritable nullWritable,
OrcSerdeRow row) throws IOException {
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
- stripeSize, compress, compressionSize, rowIndexStride);
+ options.inspector(row.getInspector());
+ writer = OrcFile.createWriter(path, options);
}
writer.addRow(row.getRow());
}
@@ -81,9 +69,8 @@ public class OrcOutputFormat extends Fil
public void write(Writable row) throws IOException {
OrcSerdeRow serdeRow = (OrcSerdeRow) row;
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, this.conf,
- serdeRow.getInspector(), stripeSize, compress, compressionSize,
- rowIndexStride);
+ options.inspector(serdeRow.getInspector());
+ writer = OrcFile.createWriter(path, options);
}
writer.addRow(serdeRow.getRow());
}
@@ -102,8 +89,8 @@ public class OrcOutputFormat extends Fil
ObjectInspector inspector = ObjectInspectorFactory.
getStandardStructObjectInspector(new ArrayList<String>(),
new ArrayList<ObjectInspector>());
- writer = OrcFile.createWriter(fs, path, this.conf, inspector,
- stripeSize, compress, compressionSize, rowIndexStride);
+ options.inspector(inspector);
+ writer = OrcFile.createWriter(path, options);
}
writer.close();
}
@@ -113,9 +100,8 @@ public class OrcOutputFormat extends Fil
public RecordWriter<NullWritable, OrcSerdeRow>
getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
Progressable reporter) throws IOException {
- return new OrcRecordWriter(fileSystem, new Path(name), conf,
- OrcFile.DEFAULT_STRIPE_SIZE, OrcFile.DEFAULT_COMPRESSION,
- OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE, OrcFile.DEFAULT_ROW_INDEX_STRIDE);
+ return new
+ OrcRecordWriter(new Path(name), OrcFile.writerOptions(conf));
}
@Override
@@ -126,20 +112,42 @@ public class OrcOutputFormat extends Fil
boolean isCompressed,
Properties tableProperties,
Progressable reporter) throws IOException {
- String stripeSize = tableProperties.getProperty(OrcFile.STRIPE_SIZE,
- OrcFile.DEFAULT_STRIPE_SIZE);
- String compression = tableProperties.getProperty(OrcFile.COMPRESSION,
- OrcFile.DEFAULT_COMPRESSION);
- String compressionSize =
- tableProperties.getProperty(OrcFile.COMPRESSION_BLOCK_SIZE,
- OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE);
- String rowIndexStride =
- tableProperties.getProperty(OrcFile.ROW_INDEX_STRIDE,
- OrcFile.DEFAULT_ROW_INDEX_STRIDE);
- if ("false".equals(tableProperties.getProperty(OrcFile.ENABLE_INDEXES))) {
- rowIndexStride = "0";
+ OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+ if (tableProperties.containsKey(OrcFile.STRIPE_SIZE)) {
+ options.stripeSize(Long.parseLong
+ (tableProperties.getProperty(OrcFile.STRIPE_SIZE)));
+ }
+
+ if (tableProperties.containsKey(OrcFile.COMPRESSION)) {
+ options.compress(CompressionKind.valueOf
+ (tableProperties.getProperty(OrcFile.COMPRESSION)));
+ }
+
+ if (tableProperties.containsKey(OrcFile.COMPRESSION_BLOCK_SIZE)) {
+ options.bufferSize(Integer.parseInt
+ (tableProperties.getProperty
+ (OrcFile.COMPRESSION_BLOCK_SIZE)));
+ }
+
+ if (tableProperties.containsKey(OrcFile.ROW_INDEX_STRIDE)) {
+ options.rowIndexStride(Integer.parseInt
+ (tableProperties.getProperty
+ (OrcFile.ROW_INDEX_STRIDE)));
}
- return new OrcRecordWriter(path.getFileSystem(conf), path, conf,
- stripeSize, compression, compressionSize, rowIndexStride);
+
+ if (tableProperties.containsKey(OrcFile.ENABLE_INDEXES)) {
+ if ("false".equals(tableProperties.getProperty
+ (OrcFile.ENABLE_INDEXES))) {
+ options.rowIndexStride(0);
+ }
+ }
+
+ if (tableProperties.containsKey(OrcFile.BLOCK_PADDING)) {
+ options.blockPadding(Boolean.parseBoolean
+ (tableProperties.getProperty
+ (OrcFile.BLOCK_PADDING)));
+ }
+
+ return new OrcRecordWriter(path, options);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Thu Sep 12 01:21:10 2013
@@ -22,7 +22,8 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
@@ -40,6 +41,9 @@ import org.apache.hadoop.io.Writable;
* It transparently passes the object to/from the ORC file reader/writer.
*/
public class OrcSerde implements SerDe, VectorizedSerde {
+
+ private static final Log LOG = LogFactory.getLog(OrcSerde.class);
+
private final OrcSerdeRow row = new OrcSerdeRow();
private ObjectInspector inspector = null;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Thu Sep 12 01:21:10 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.io.IOException;
@@ -118,8 +119,26 @@ public interface Reader {
* @param include true for each column that should be included
* @return a new RecordReader that will read the specified rows.
* @throws IOException
+ * @deprecated
*/
RecordReader rows(long offset, long length,
boolean[] include) throws IOException;
+ /**
+ * Create a RecordReader that will read a section of a file. It starts reading
+ * at the first stripe after the offset and continues to the stripe that
+ * starts at offset + length. It also accepts a list of columns to read and a
+ * search argument.
+ * @param offset the minimum offset of the first stripe to read
+ * @param length the distance from offset of the first address to stop reading
+ * at
+ * @param include true for each column that should be included
+ * @param sarg a search argument that limits the rows that should be read.
+ * @param neededColumns the names of the included columns
+ * @return the record reader for the rows
+ */
+ RecordReader rows(long offset, long length,
+ boolean[] include, SearchArgument sarg,
+ String[] neededColumns) throws IOException;
+
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Thu Sep 12 01:21:10 2013
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
@@ -247,11 +248,13 @@ final class ReaderImpl implements Reader
if (version.size() >= 2) {
minor = version.get(1);
}
- if (major > OrcFile.MAJOR_VERSION ||
- (major == OrcFile.MAJOR_VERSION && minor > OrcFile.MINOR_VERSION)) {
- log.warn("ORC file " + path + " was written by a future Hive version " +
- versionString(version) + ". This file may not be readable by " +
- "this version of Hive.");
+ if (major > OrcFile.Version.CURRENT.getMajor() ||
+ (major == OrcFile.Version.CURRENT.getMajor() &&
+ minor > OrcFile.Version.CURRENT.getMinor())) {
+ log.warn("ORC file " + path +
+ " was written by a future Hive version " +
+ versionString(version) +
+ ". This file may not be readable by this version of Hive.");
}
}
}
@@ -307,7 +310,8 @@ final class ReaderImpl implements Reader
buffer.position(psOffset - footerSize);
buffer.limit(psOffset);
}
- InputStream instream = InStream.create("footer", buffer, codec, bufferSize);
+ InputStream instream = InStream.create("footer", new ByteBuffer[]{buffer},
+ new long[]{0L}, footerSize, codec, bufferSize);
footer = OrcProto.Footer.parseFrom(instream);
inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
file.close();
@@ -315,15 +319,22 @@ final class ReaderImpl implements Reader
@Override
public RecordReader rows(boolean[] include) throws IOException {
- return rows(0, Long.MAX_VALUE, include);
+ return rows(0, Long.MAX_VALUE, include, null, null);
}
@Override
public RecordReader rows(long offset, long length, boolean[] include
) throws IOException {
+ return rows(offset, length, include, null, null);
+ }
+
+ @Override
+ public RecordReader rows(long offset, long length, boolean[] include,
+ SearchArgument sarg, String[] columnNames
+ ) throws IOException {
return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset,
- length, footer.getTypesList(), codec, bufferSize,
- include, footer.getRowIndexStride());
+ length, footer.getTypesList(), codec, bufferSize,
+ include, footer.getRowIndexStride(), sarg, columnNames);
}
}