You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/04/09 22:00:45 UTC
svn commit: r1586116 - in /hive/branches/branch-0.13: ./
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/test/org/apache/hadoop/hive/ql/exec/vector/ ql/sr...
Author: omalley
Date: Wed Apr 9 20:00:45 2014
New Revision: 1586116
URL: http://svn.apache.org/r1586116
Log:
HIVE-6604. Fix ORC ACID format to work with vectorization. (omalley)
Modified:
hive/branches/branch-0.13/ (props changed)
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
Propchange: hive/branches/branch-0.13/
------------------------------------------------------------------------------
Merged /hive/trunk:r1586113
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java Wed Apr 9 20:00:45 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.v
import java.sql.Timestamp;
import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -89,7 +88,7 @@ public class VectorColumnAssignFactory {
}
protected void assignNull(int index) {
- VectorizedBatchUtil.SetNullColIsNullValue(outCol, index);
+ VectorizedBatchUtil.setNullColIsNullValue(outCol, index);
}
@Override
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java Wed Apr 9 20:00:45 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -45,7 +48,7 @@ public class VectorizedBatchUtil {
* @param cv
* @param rowIndex
*/
- public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) {
+ public static void setNullColIsNullValue(ColumnVector cv, int rowIndex) {
cv.isNull[rowIndex] = true;
if (cv.noNulls) {
cv.noNulls = false;
@@ -56,12 +59,10 @@ public class VectorizedBatchUtil {
* Iterates thru all the column vectors and sets noNull to
* specified value.
*
- * @param valueToSet
- * noNull value to set
* @param batch
* Batch on which noNull is set
*/
- public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) {
+ public static void setNoNullFields(VectorizedRowBatch batch) {
for (int i = 0; i < batch.numCols; i++) {
batch.cols[i].noNulls = true;
}
@@ -75,8 +76,11 @@ public class VectorizedBatchUtil {
* @param batch Vectorized batch to which the row is added at rowIndex
* @throws HiveException
*/
- public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIndex,
- VectorizedRowBatch batch) throws HiveException {
+ public static void addRowToBatch(Object row, StructObjectInspector oi,
+ int rowIndex,
+ VectorizedRowBatch batch,
+ DataOutputBuffer buffer
+ ) throws HiveException {
List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
// Iterate thru the cols and load the batch
for (int i = 0; i < fieldRefs.size(); i++) {
@@ -100,7 +104,7 @@ public class VectorizedBatchUtil {
lcv.isNull[rowIndex] = false;
} else {
lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
+ setNullColIsNullValue(lcv, rowIndex);
}
}
break;
@@ -111,7 +115,7 @@ public class VectorizedBatchUtil {
lcv.isNull[rowIndex] = false;
} else {
lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
+ setNullColIsNullValue(lcv, rowIndex);
}
}
break;
@@ -122,7 +126,7 @@ public class VectorizedBatchUtil {
lcv.isNull[rowIndex] = false;
} else {
lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
+ setNullColIsNullValue(lcv, rowIndex);
}
}
break;
@@ -133,7 +137,7 @@ public class VectorizedBatchUtil {
lcv.isNull[rowIndex] = false;
} else {
lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
+ setNullColIsNullValue(lcv, rowIndex);
}
}
break;
@@ -144,7 +148,7 @@ public class VectorizedBatchUtil {
lcv.isNull[rowIndex] = false;
} else {
lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
+ setNullColIsNullValue(lcv, rowIndex);
}
}
break;
@@ -155,7 +159,7 @@ public class VectorizedBatchUtil {
lcv.isNull[rowIndex] = false;
} else {
lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
+ setNullColIsNullValue(lcv, rowIndex);
}
}
break;
@@ -166,7 +170,7 @@ public class VectorizedBatchUtil {
dcv.isNull[rowIndex] = false;
} else {
dcv.vector[rowIndex] = Double.NaN;
- SetNullColIsNullValue(dcv, rowIndex);
+ setNullColIsNullValue(dcv, rowIndex);
}
}
break;
@@ -177,7 +181,7 @@ public class VectorizedBatchUtil {
dcv.isNull[rowIndex] = false;
} else {
dcv.vector[rowIndex] = Double.NaN;
- SetNullColIsNullValue(dcv, rowIndex);
+ setNullColIsNullValue(dcv, rowIndex);
}
}
break;
@@ -189,7 +193,7 @@ public class VectorizedBatchUtil {
lcv.isNull[rowIndex] = false;
} else {
lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
+ setNullColIsNullValue(lcv, rowIndex);
}
}
break;
@@ -198,12 +202,30 @@ public class VectorizedBatchUtil {
if (writableCol != null) {
bcv.isNull[rowIndex] = false;
Text colText = (Text) writableCol;
- bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength());
+ int start = buffer.getLength();
+ int length = colText.getLength();
+ try {
+ buffer.write(colText.getBytes(), 0, length);
+ } catch (IOException ioe) {
+ throw new IllegalStateException("bad write", ioe);
+ }
+ bcv.setRef(rowIndex, buffer.getData(), start, length);
} else {
- SetNullColIsNullValue(bcv, rowIndex);
+ setNullColIsNullValue(bcv, rowIndex);
}
}
break;
+ case DECIMAL:
+ DecimalColumnVector dcv = (DecimalColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ dcv.isNull[rowIndex] = false;
+ HiveDecimalWritable wobj = (HiveDecimalWritable) writableCol;
+ dcv.vector[rowIndex].update(wobj.getHiveDecimal().unscaledValue(),
+ (short) wobj.getScale());
+ } else {
+ setNullColIsNullValue(dcv, rowIndex);
+ }
+ break;
default:
throw new HiveException("Vectorizaton is not supported for datatype:"
+ poi.getPrimitiveCategory());
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java Wed Apr 9 20:00:45 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -60,7 +61,7 @@ public class VectorizedColumnarSerDe ext
/**
* Serialize a vectorized row batch
*
- * @param obj
+ * @param vrg
* Vectorized row batch to serialize
* @param objInspector
* The ObjectInspector for the row object
@@ -220,7 +221,7 @@ public class VectorizedColumnarSerDe ext
// Ideally this should throw UnsupportedOperationException as the serde is
// vectorized serde. But since RC file reader does not support vectorized reading this
- // is left as it is. This function will be called from VectorizedRowBatchCtx::AddRowToBatch
+ // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch
// to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized
// reading this serde and be standalone serde with no dependency on ColumnarSerDe.
return super.deserialize(blob);
@@ -251,10 +252,13 @@ public class VectorizedColumnarSerDe ext
VectorizedRowBatch reuseBatch) throws SerDeException {
BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob;
+ DataOutputBuffer buffer = new DataOutputBuffer();
for (int i = 0; i < rowsInBlob; i++) {
Object row = deserialize(refArray[i]);
try {
- VectorizedBatchUtil.AddRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch);
+ VectorizedBatchUtil.addRowToBatch(row,
+ (StructObjectInspector) cachedObjectInspector, i,
+ reuseBatch, buffer);
} catch (HiveException e) {
throw new SerDeException(e);
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Wed Apr 9 20:00:45 2014
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.serde2.Des
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
@@ -55,6 +54,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
@@ -332,14 +332,17 @@ public class VectorizedRowBatchCtx {
* Row blob (serialized version of row)
* @param batch
* Vectorized batch to which the row is added
+ * @param buffer a buffer to copy strings into
* @throws HiveException
* @throws SerDeException
*/
- public void addRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch)
- throws HiveException, SerDeException
+ public void addRowToBatch(int rowIndex, Writable rowBlob,
+ VectorizedRowBatch batch,
+ DataOutputBuffer buffer
+ ) throws HiveException, SerDeException
{
Object row = this.deserializer.deserialize(rowBlob);
- VectorizedBatchUtil.AddRowToBatch(row, this.rawRowOI, rowIndex, batch);
+ VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer);
}
/**
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Apr 9 20:00:45 2014
@@ -389,7 +389,7 @@ public class HiveInputFormat<K extends W
}
if (partDesc == null) {
throw new IOException("cannot find dir = " + dir.toString()
- + " in partToPartitionInfo!");
+ + " in " + pathToPartitionInfo);
}
return partDesc;
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java Wed Apr 9 20:00:45 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.RCFi
import org.apache.hadoop.hive.ql.io.RCFile.Reader;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -42,9 +43,6 @@ import org.apache.hadoop.mapred.RecordRe
/**
* RCFileRecordReader.
- *
- * @param <K>
- * @param <V>
*/
public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
@@ -59,6 +57,7 @@ public class VectorizedRCFileRecordReade
private final LongWritable keyCache = new LongWritable();
private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
private boolean addPartitionCols = true;
+ private final DataOutputBuffer buffer = new DataOutputBuffer();
private static RCFileSyncCache syncCache = new RCFileSyncCache();
@@ -164,7 +163,8 @@ public class VectorizedRCFileRecordReade
public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
// Reset column fields noNull values to true
- VectorizedBatchUtil.SetNoNullFields(true, value);
+ VectorizedBatchUtil.setNoNullFields(value);
+ buffer.reset();
value.selectedInUse = false;
for (int i = 0; i < value.numCols; i++) {
value.cols[i].isRepeating = false;
@@ -187,7 +187,7 @@ public class VectorizedRCFileRecordReade
in.getCurrentRow(colsCache);
// Currently RCFile reader does not support reading vectorized
// data. Populating the batch by adding one row at a time.
- rbCtx.addRowToBatch(i, (Writable) colsCache, value);
+ rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer);
} else {
break;
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Wed Apr 9 20:00:45 2014
@@ -1000,24 +1000,24 @@ public class OrcInputFormat implements
}
OrcSplit split = (OrcSplit) inputSplit;
- // TODO vectorized reader doesn't work with the new format yet
- if (vectorMode) {
- if (!split.getDeltas().isEmpty() || !split.isOriginal()) {
- throw new IOException("Vectorization and ACID tables are incompatible."
- );
- }
- return createVectorizedReader(inputSplit, conf, reporter);
- }
reporter.setStatus(inputSplit.toString());
// if we are strictly old-school, just use the old code
if (split.isOriginal() && split.getDeltas().isEmpty()) {
- return new OrcRecordReader(OrcFile.createReader(split.getPath(),
- OrcFile.readerOptions(conf)), conf, split);
+ if (vectorMode) {
+ return createVectorizedReader(inputSplit, conf, reporter);
+ } else {
+ return new OrcRecordReader(OrcFile.createReader(split.getPath(),
+ OrcFile.readerOptions(conf)), conf, split);
+ }
}
Options options = new Options(conf).reporter(reporter);
final RowReader<OrcStruct> inner = getReader(inputSplit, options);
+ if (vectorMode) {
+ return (org.apache.hadoop.mapred.RecordReader)
+ new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit);
+ }
final RecordIdentifier id = inner.createKey();
// Return a RecordReader that is compatible with the Hive 0.12 reader
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Wed Apr 9 20:00:45 2014
@@ -305,6 +305,7 @@ public class OrcOutputFormat extends Fil
public void write(Writable w) throws IOException {
OrcStruct orc = (OrcStruct) w;
watcher.addKey(
+ ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(),
((LongWritable)
orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(),
((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(),
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java Wed Apr 9 20:00:45 2014
@@ -37,10 +37,12 @@ import org.apache.hadoop.io.LongWritable
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
import java.util.ArrayList;
import java.util.List;
@@ -53,8 +55,10 @@ public class OrcRecordUpdater implements
public static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index";
public static final String ACID_FORMAT = "_orc_acid_version";
+ public static final String ACID_STATS = "hive.acid.stats";
public static final int ORC_ACID_VERSION = 0;
+
final static int INSERT_OPERATION = 0;
final static int UPDATE_OPERATION = 1;
final static int DELETE_OPERATION = 2;
@@ -70,6 +74,8 @@ public class OrcRecordUpdater implements
final static int DELTA_BUFFER_SIZE = 16 * 1024;
final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024;
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
private final AcidOutputFormat.Options options;
private final Path path;
private final FileSystem fs;
@@ -84,6 +90,33 @@ public class OrcRecordUpdater implements
private long insertedRows = 0;
private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+ static class AcidStats {
+ long inserts;
+ long updates;
+ long deletes;
+
+ AcidStats() {
+ // nothing
+ }
+
+ AcidStats(String serialized) {
+ String[] parts = serialized.split(",");
+ inserts = Long.parseLong(parts[0]);
+ updates = Long.parseLong(parts[1]);
+ deletes = Long.parseLong(parts[2]);
+ }
+
+ String serialize() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(inserts);
+ builder.append(",");
+ builder.append(updates);
+ builder.append(",");
+ builder.append(deletes);
+ return builder.toString();
+ }
+ }
+
static Path getSideFile(Path main) {
return new Path(main + "_flush_length");
}
@@ -219,7 +252,7 @@ public class OrcRecordUpdater implements
this.originalTransaction.set(originalTransaction);
this.rowId.set(rowId);
item.setFieldValue(OrcRecordUpdater.ROW, row);
- indexBuilder.addKey(originalTransaction, bucket.get(), rowId);
+ indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
writer.addRow(item);
}
@@ -323,6 +356,7 @@ public class OrcRecordUpdater implements
long lastTransaction;
int lastBucket;
long lastRowId;
+ AcidStats acidStats = new AcidStats();
@Override
public void preStripeWrite(OrcFile.WriterContext context
@@ -338,11 +372,26 @@ public class OrcRecordUpdater implements
@Override
public void preFooterWrite(OrcFile.WriterContext context
) throws IOException {
- context.getWriter().addUserMetadata(OrcRecordUpdater.ACID_KEY_INDEX_NAME,
- ByteBuffer.wrap(lastKey.toString().getBytes(utf8)));
- }
-
- void addKey(long transaction, int bucket, long rowId) {
+ context.getWriter().addUserMetadata(ACID_KEY_INDEX_NAME,
+ UTF8.encode(lastKey.toString()));
+ context.getWriter().addUserMetadata(ACID_STATS,
+ UTF8.encode(acidStats.serialize()));
+ }
+
+ void addKey(int op, long transaction, int bucket, long rowId) {
+ switch (op) {
+ case INSERT_OPERATION:
+ acidStats.inserts += 1;
+ break;
+ case UPDATE_OPERATION:
+ acidStats.updates += 1;
+ break;
+ case DELETE_OPERATION:
+ acidStats.deletes += 1;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown operation " + op);
+ }
lastTransaction = transaction;
lastBucket = bucket;
lastRowId = rowId;
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Wed Apr 9 20:00:45 2014
@@ -59,7 +59,6 @@ public class VectorizedOrcInputFormat ex
VectorizedOrcRecordReader(Reader file, Configuration conf,
FileSplit fileSplit) throws IOException {
List<OrcProto.Type> types = file.getTypes();
- // TODO fix to work with ACID
Reader.Options options = new Reader.Options();
this.offset = fileSplit.getStart();
this.length = fileSplit.getLength();
@@ -93,7 +92,6 @@ public class VectorizedOrcInputFormat ex
addPartitionCols = false;
}
reader.nextBatch(value);
- rbCtx.convertRowBatchBlobToVectorizedBatch((Object) value, value.size, value);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -108,13 +106,11 @@ public class VectorizedOrcInputFormat ex
@Override
public VectorizedRowBatch createValue() {
- VectorizedRowBatch result = null;
try {
- result = rbCtx.createVectorizedRowBatch();
+ return rbCtx.createVectorizedRowBatch();
} catch (HiveException e) {
throw new RuntimeException("Error creating a batch", e);
}
- return result;
}
@Override
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed Apr 9 20:00:45 2014
@@ -2069,6 +2069,9 @@ class WriterImpl implements Writer, Memo
flushStripe();
// write a footer
if (stripesAtLastFlush != stripes.size()) {
+ if (callback != null) {
+ callback.preFooterWrite(callbackContext);
+ }
int metaLength = writeMetadata(rawWriter.getPos());
int footLength = writeFooter(rawWriter.getPos() - metaLength);
rawWriter.writeByte(writePostScript(footLength, metaLength));
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java Wed Apr 9 20:00:45 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -193,6 +194,7 @@ public class TestVectorizedRowBatchCtx {
private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, IOException {
RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf);
+ DataOutputBuffer buffer = new DataOutputBuffer();
// Get object inspector
StructObjectInspector oi = (StructObjectInspector) serDe
@@ -204,7 +206,7 @@ public class TestVectorizedRowBatchCtx {
// Create the context
VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null, null);
VectorizedRowBatch batch = ctx.createVectorizedRowBatch();
- VectorizedBatchUtil.SetNoNullFields(true, batch);
+ VectorizedBatchUtil.setNoNullFields(batch);
// Iterate thru the rows and populate the batch
LongWritable rowID = new LongWritable();
@@ -213,7 +215,7 @@ public class TestVectorizedRowBatchCtx {
BytesRefArrayWritable cols = new BytesRefArrayWritable();
reader.getCurrentRow(cols);
cols.resetValid(colCount);
- ctx.addRowToBatch(i, cols, batch);
+ ctx.addRowToBatch(i, cols, batch, buffer);
}
reader.close();
batch.size = 10;
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1586116&r1=1586115&r2=1586116&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Wed Apr 9 20:00:45 2014
@@ -27,6 +27,9 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -36,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.TimeZone;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
@@ -47,10 +51,15 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.type.Decimal128;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -71,11 +80,13 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.shims.CombineHiveKey;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -95,6 +106,225 @@ import org.junit.rules.TestName;
public class TestInputOutputFormat {
Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp"));
+ static final int MILLIS_IN_DAY = 1000 * 60 * 60 * 24;
+ private static final SimpleDateFormat DATE_FORMAT =
+ new SimpleDateFormat("yyyy/MM/dd");
+ private static final SimpleDateFormat TIME_FORMAT =
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+ private static final TimeZone LOCAL_TIMEZONE = TimeZone.getDefault();
+
+ static {
+ TimeZone gmt = TimeZone.getTimeZone("GMT+0");
+ DATE_FORMAT.setTimeZone(gmt);
+ TIME_FORMAT.setTimeZone(gmt);
+ TimeZone local = TimeZone.getDefault();
+ }
+
+ public static class BigRow implements Writable {
+ boolean booleanValue;
+ byte byteValue;
+ short shortValue;
+ int intValue;
+ long longValue;
+ float floatValue;
+ double doubleValue;
+ String stringValue;
+ HiveDecimal decimalValue;
+ Date dateValue;
+ Timestamp timestampValue;
+
+ BigRow(long x) {
+ booleanValue = x % 2 == 0;
+ byteValue = (byte) x;
+ shortValue = (short) x;
+ intValue = (int) x;
+ longValue = x;
+ floatValue = x;
+ doubleValue = x;
+ stringValue = Long.toHexString(x);
+ decimalValue = HiveDecimal.create(x);
+ long millisUtc = x * MILLIS_IN_DAY;
+ millisUtc -= LOCAL_TIMEZONE.getOffset(millisUtc);
+ dateValue = new Date(millisUtc);
+ timestampValue = new Timestamp(millisUtc);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ throw new UnsupportedOperationException("no write");
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ throw new UnsupportedOperationException("no read");
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("bigrow{booleanValue: ");
+ builder.append(booleanValue);
+ builder.append(", byteValue: ");
+ builder.append(byteValue);
+ builder.append(", shortValue: ");
+ builder.append(shortValue);
+ builder.append(", intValue: ");
+ builder.append(intValue);
+ builder.append(", longValue: ");
+ builder.append(longValue);
+ builder.append(", floatValue: ");
+ builder.append(floatValue);
+ builder.append(", doubleValue: ");
+ builder.append(doubleValue);
+ builder.append(", stringValue: ");
+ builder.append(stringValue);
+ builder.append(", decimalValue: ");
+ builder.append(decimalValue);
+ builder.append(", dateValue: ");
+ builder.append(DATE_FORMAT.format(dateValue));
+ builder.append(", timestampValue: ");
+ builder.append(TIME_FORMAT.format(timestampValue));
+ builder.append("}");
+ return builder.toString();
+ }
+ }
+
+ public static class BigRowField implements StructField {
+ private final int id;
+ private final String fieldName;
+ private final ObjectInspector inspector;
+
+ BigRowField(int id, String fieldName, ObjectInspector inspector) {
+ this.id = id;
+ this.fieldName = fieldName;
+ this.inspector = inspector;
+ }
+
+ @Override
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @Override
+ public ObjectInspector getFieldObjectInspector() {
+ return inspector;
+ }
+
+ @Override
+ public String getFieldComment() {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "field " + id + " " + fieldName;
+ }
+ }
+
+ public static class BigRowInspector extends StructObjectInspector {
+ static final List<BigRowField> FIELDS = new ArrayList<BigRowField>();
+ static {
+ FIELDS.add(new BigRowField(0, "booleanValue",
+ PrimitiveObjectInspectorFactory.javaBooleanObjectInspector));
+ FIELDS.add(new BigRowField(1, "byteValue",
+ PrimitiveObjectInspectorFactory.javaByteObjectInspector));
+ FIELDS.add(new BigRowField(2, "shortValue",
+ PrimitiveObjectInspectorFactory.javaShortObjectInspector));
+ FIELDS.add(new BigRowField(3, "intValue",
+ PrimitiveObjectInspectorFactory.javaIntObjectInspector));
+ FIELDS.add(new BigRowField(4, "longValue",
+ PrimitiveObjectInspectorFactory.javaLongObjectInspector));
+ FIELDS.add(new BigRowField(5, "floatValue",
+ PrimitiveObjectInspectorFactory.javaFloatObjectInspector));
+ FIELDS.add(new BigRowField(6, "doubleValue",
+ PrimitiveObjectInspectorFactory.javaDoubleObjectInspector));
+ FIELDS.add(new BigRowField(7, "stringValue",
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector));
+ FIELDS.add(new BigRowField(8, "decimalValue",
+ PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector));
+ FIELDS.add(new BigRowField(9, "dateValue",
+ PrimitiveObjectInspectorFactory.javaDateObjectInspector));
+ FIELDS.add(new BigRowField(10, "timestampValue",
+ PrimitiveObjectInspectorFactory.javaTimestampObjectInspector));
+ }
+
+
+ @Override
+ public List<? extends StructField> getAllStructFieldRefs() {
+ return FIELDS;
+ }
+
+ @Override
+ public StructField getStructFieldRef(String fieldName) {
+ for(StructField field: FIELDS) {
+ if (field.getFieldName().equals(fieldName)) {
+ return field;
+ }
+ }
+ throw new IllegalArgumentException("Can't find field " + fieldName);
+ }
+
+ @Override
+ public Object getStructFieldData(Object data, StructField fieldRef) {
+ BigRow obj = (BigRow) data;
+ switch (((BigRowField) fieldRef).id) {
+ case 0:
+ return obj.booleanValue;
+ case 1:
+ return obj.byteValue;
+ case 2:
+ return obj.shortValue;
+ case 3:
+ return obj.intValue;
+ case 4:
+ return obj.longValue;
+ case 5:
+ return obj.floatValue;
+ case 6:
+ return obj.doubleValue;
+ case 7:
+ return obj.stringValue;
+ case 8:
+ return obj.decimalValue;
+ case 9:
+ return obj.dateValue;
+ case 10:
+ return obj.timestampValue;
+ }
+ throw new IllegalArgumentException("No such field " + fieldRef);
+ }
+
+ @Override
+ public List<Object> getStructFieldsDataAsList(Object data) {
+ BigRow obj = (BigRow) data;
+ List<Object> result = new ArrayList<Object>(11);
+ result.add(obj.booleanValue);
+ result.add(obj.byteValue);
+ result.add(obj.shortValue);
+ result.add(obj.intValue);
+ result.add(obj.longValue);
+ result.add(obj.floatValue);
+ result.add(obj.doubleValue);
+ result.add(obj.stringValue);
+ result.add(obj.decimalValue);
+ result.add(obj.dateValue);
+ result.add(obj.timestampValue);
+ return result;
+ }
+
+ @Override
+ public String getTypeName() {
+ return "struct<booleanValue:boolean,byteValue:tinyint," +
+ "shortValue:smallint,intValue:int,longValue:bigint," +
+ "floatValue:float,doubleValue:double,stringValue:string," +
+ "decimalValue:decimal>";
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.STRUCT;
+ }
+ }
public static class MyRow implements Writable {
int x;
@@ -959,6 +1189,7 @@ public class TestInputOutputFormat {
* explode.
* @param workDir a local filesystem work directory
* @param warehouseDir a mock filesystem warehouse directory
+ * @param tableName the table name
* @param objectInspector object inspector for the row
* @param isVectorized should run vectorized
* @return a JobConf that contains the necessary information
@@ -966,16 +1197,19 @@ public class TestInputOutputFormat {
*/
JobConf createMockExecutionEnvironment(Path workDir,
Path warehouseDir,
+ String tableName,
ObjectInspector objectInspector,
boolean isVectorized
) throws IOException {
+ Utilities.clearWorkMap();
JobConf conf = new JobConf();
conf.set("hive.exec.plan", workDir.toString());
conf.set("mapred.job.tracker", "local");
conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized));
conf.set("fs.mock.impl", MockFileSystem.class.getName());
conf.set("mapred.mapper.class", ExecMapper.class.getName());
- Path root = new Path(warehouseDir, "table/p=0");
+ Path root = new Path(warehouseDir, tableName + "/p=0");
+ ((MockFileSystem) root.getFileSystem(conf)).clear();
conf.set("mapred.input.dir", root.toString());
StringBuilder columnIds = new StringBuilder();
StringBuilder columnNames = new StringBuilder();
@@ -999,6 +1233,7 @@ public class TestInputOutputFormat {
fs.clear();
Properties tblProps = new Properties();
+ tblProps.put("name", tableName);
tblProps.put("serialization.lib", OrcSerde.class.getName());
tblProps.put("columns", columnNames.toString());
tblProps.put("columns.types", columnTypes.toString());
@@ -1014,7 +1249,7 @@ public class TestInputOutputFormat {
LinkedHashMap<String, ArrayList<String>> aliasMap =
new LinkedHashMap<String, ArrayList<String>>();
ArrayList<String> aliases = new ArrayList<String>();
- aliases.add("tbl");
+ aliases.add(tableName);
aliasMap.put(root.toString(), aliases);
mapWork.setPathToAliases(aliasMap);
LinkedHashMap<String, PartitionDesc> partMap =
@@ -1027,8 +1262,9 @@ public class TestInputOutputFormat {
// write the plan out
FileSystem localFs = FileSystem.getLocal(conf).getRaw();
- FSDataOutputStream planStream =
- localFs.create(new Path(workDir, "map.xml"));
+ Path mapXml = new Path(workDir, "map.xml");
+ localFs.delete(mapXml, true);
+ FSDataOutputStream planStream = localFs.create(mapXml);
Utilities.serializePlan(mapWork, planStream, conf);
planStream.close();
return conf;
@@ -1048,7 +1284,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- inspector, true);
+ "vectorization", inspector, true);
// write the orc file to the mock file system
Writer writer =
@@ -1095,7 +1331,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- inspector, true);
+ "vectorBuckets", inspector, true);
// write the orc file to the mock file system
Writer writer =
@@ -1132,23 +1368,18 @@ public class TestInputOutputFormat {
// test acid with vectorization, no combine
@Test
public void testVectorizationWithAcid() throws Exception {
- // get the object inspector for MyRow
- StructObjectInspector inspector;
- synchronized (TestOrcFile.class) {
- inspector = (StructObjectInspector)
- ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class,
- ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
- }
+ StructObjectInspector inspector = new BigRowInspector();
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- inspector, true);
+ "vectorizationAcid", inspector, true);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
.writingBase(true).bucket(0).inspector(inspector));
- for(int i=0; i < 10; ++i) {
- writer.insert(10, new MyRow(i, 2 * i));
+ for(int i=0; i < 100; ++i) {
+ BigRow row = new BigRow(i);
+ writer.insert(10, row);
}
WriterImpl baseWriter = (WriterImpl) writer.getWriter();
writer.close(false);
@@ -1161,14 +1392,44 @@ public class TestInputOutputFormat {
InputSplit[] splits = inputFormat.getSplits(conf, 10);
assertEquals(1, splits.length);
- try {
- org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch>
+ org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch>
reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL);
- assertTrue("should throw here", false);
- } catch (IOException ioe) {
- assertEquals("java.io.IOException: Vectorization and ACID tables are incompatible.",
- ioe.getMessage());
+ NullWritable key = reader.createKey();
+ VectorizedRowBatch value = reader.createValue();
+ assertEquals(true, reader.next(key, value));
+ assertEquals(100, value.count());
+ LongColumnVector booleanColumn = (LongColumnVector) value.cols[0];
+ LongColumnVector byteColumn = (LongColumnVector) value.cols[1];
+ LongColumnVector shortColumn = (LongColumnVector) value.cols[2];
+ LongColumnVector intColumn = (LongColumnVector) value.cols[3];
+ LongColumnVector longColumn = (LongColumnVector) value.cols[4];
+ DoubleColumnVector floatColumn = (DoubleColumnVector) value.cols[5];
+ DoubleColumnVector doubleCoulmn = (DoubleColumnVector) value.cols[6];
+ BytesColumnVector stringColumn = (BytesColumnVector) value.cols[7];
+ DecimalColumnVector decimalColumn = (DecimalColumnVector) value.cols[8];
+ LongColumnVector dateColumn = (LongColumnVector) value.cols[9];
+ LongColumnVector timestampColumn = (LongColumnVector) value.cols[10];
+ for(int i=0; i < 100; i++) {
+ assertEquals("checking boolean " + i, i % 2 == 0 ? 1 : 0,
+ booleanColumn.vector[i]);
+ assertEquals("checking byte " + i, (byte) i,
+ byteColumn.vector[i]);
+ assertEquals("checking short " + i, (short) i, shortColumn.vector[i]);
+ assertEquals("checking int " + i, i, intColumn.vector[i]);
+ assertEquals("checking long " + i, i, longColumn.vector[i]);
+ assertEquals("checking float " + i, i, floatColumn.vector[i], 0.0001);
+ assertEquals("checking double " + i, i, doubleCoulmn.vector[i], 0.0001);
+ assertEquals("checking string " + i, new Text(Long.toHexString(i)),
+ stringColumn.getWritableObject(i));
+ assertEquals("checking decimal " + i, new Decimal128(i),
+ decimalColumn.vector[i]);
+ assertEquals("checking date " + i, i, dateColumn.vector[i]);
+ long millis = (long) i * MILLIS_IN_DAY;
+ millis -= LOCAL_TIMEZONE.getOffset(millis);
+ assertEquals("checking timestamp " + i, millis * 1000000L,
+ timestampColumn.vector[i]);
}
+ assertEquals(false, reader.next(key, value));
}
// test non-vectorized, non-acid, combine
@@ -1182,7 +1443,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- inspector, false);
+ "combination", inspector, false);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1251,7 +1512,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- inspector, false);
+ "combinationAcid", inspector, false);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));