You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/23 22:41:01 UTC
svn commit: r1525692 [8/8] - in /hive/branches/vectorization: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
contrib/src/test/results/clientpositive/
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/results/positive/ h...
Modified: hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml (original)
+++ hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml Mon Sep 23 20:40:54 2013
@@ -163,6 +163,9 @@
</void>
</object>
</void>
+ <void property="conf">
+ <object class="org.apache.hadoop.hive.ql.plan.TableScanDesc"/>
+ </void>
<void property="counterNames">
<object class="java.util.ArrayList">
<void method="add">
@@ -179,6 +182,26 @@
</void>
</object>
</void>
+ <void property="neededColumnIDs">
+ <object class="java.util.ArrayList">
+ <void method="add">
+ <int>0</int>
+ </void>
+ <void method="add">
+ <int>1</int>
+ </void>
+ </object>
+ </void>
+ <void property="neededColumns">
+ <object class="java.util.ArrayList">
+ <void method="add">
+ <string>_col0</string>
+ </void>
+ <void method="add">
+ <string>_col1</string>
+ </void>
+ </object>
+ </void>
<void property="operatorId">
<string>TS_11</string>
</void>
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Mon Sep 23 20:40:54 2013
@@ -32,87 +32,57 @@ public final class ColumnProjectionUtils
public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
+ private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
+ private static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
+ private static final boolean READ_ALL_COLUMNS_DEFAULT = true;
/**
- * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
- * is included in the list, RCFile's reader will not skip its value.
- *
+ * Sets the <em>READ_ALL_COLUMNS</em> flag and removes any previously
+ * set column ids.
*/
- public static void setReadColumnIDs(Configuration conf, List<Integer> ids) {
- String id = toReadColumnIDString(ids);
- setReadColumnIDConf(conf, id);
+ public static void setReadAllColumns(Configuration conf) {
+ conf.setBoolean(READ_ALL_COLUMNS, true);
+ setReadColumnIDConf(conf, READ_COLUMN_IDS_CONF_STR_DEFAULT);
}
/**
- * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
- * is included in the list, RCFile's reader will not skip its value.
- *
+ * Returns the <em>READ_ALL_COLUMNS</em> columns flag.
*/
- public static void appendReadColumnIDs(Configuration conf, List<Integer> ids) {
- String id = toReadColumnIDString(ids);
- if (id != null) {
- String old = conf.get(READ_COLUMN_IDS_CONF_STR, null);
- String newConfStr = id;
- if (old != null) {
- newConfStr = newConfStr + StringUtils.COMMA_STR + old;
- }
-
- setReadColumnIDConf(conf, newConfStr);
- }
- }
-
- public static void appendReadColumnNames(Configuration conf,
- List<String> cols) {
- if (cols != null) {
- String old = conf.get(READ_COLUMN_NAMES_CONF_STR, "");
- StringBuilder result = new StringBuilder(old);
- boolean first = old.isEmpty();
- for(String col: cols) {
- if (first) {
- first = false;
- } else {
- result.append(',');
- }
- result.append(col);
- }
- conf.set(READ_COLUMN_NAMES_CONF_STR, result.toString());
- }
+ public static boolean isReadAllColumns(Configuration conf) {
+ return conf.getBoolean(READ_ALL_COLUMNS, READ_ALL_COLUMNS_DEFAULT);
}
- private static void setReadColumnIDConf(Configuration conf, String id) {
- if (id == null || id.length() <= 0) {
- conf.set(READ_COLUMN_IDS_CONF_STR, "");
- return;
- }
-
- conf.set(READ_COLUMN_IDS_CONF_STR, id);
+ /**
+ * Appends read columns' ids (start from zero). Once a column
+ * is included in the list, a underlying record reader of a columnar file format
+ * (e.g. RCFile and ORC) can know what columns are needed.
+ */
+ public static void appendReadColumns(Configuration conf, List<Integer> ids) {
+ String id = toReadColumnIDString(ids);
+ String old = conf.get(READ_COLUMN_IDS_CONF_STR, null);
+ String newConfStr = id;
+ if (old != null) {
+ newConfStr = newConfStr + StringUtils.COMMA_STR + old;
+ }
+ setReadColumnIDConf(conf, newConfStr);
+ // Set READ_ALL_COLUMNS to false
+ conf.setBoolean(READ_ALL_COLUMNS, false);
}
- private static String toReadColumnIDString(List<Integer> ids) {
- String id = null;
- if (ids != null) {
- for (int i = 0; i < ids.size(); i++) {
- if (i == 0) {
- id = "" + ids.get(i);
- } else {
- id = id + StringUtils.COMMA_STR + ids.get(i);
- }
- }
- }
- return id;
+ public static void appendReadColumns(
+ Configuration conf, List<Integer> ids, List<String> names) {
+ appendReadColumns(conf, ids);
+ appendReadColumnNames(conf, names);
}
/**
* Returns an array of column ids(start from zero) which is set in the given
* parameter <tt>conf</tt>.
*/
- public static ArrayList<Integer> getReadColumnIDs(Configuration conf) {
- if (conf == null) {
- return new ArrayList<Integer>(0);
- }
- String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
+ public static List<Integer> getReadColumnIDs(Configuration conf) {
+ String skips = conf.get(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT);
String[] list = StringUtils.split(skips);
- ArrayList<Integer> result = new ArrayList<Integer>(list.length);
+ List<Integer> result = new ArrayList<Integer>(list.length);
for (String element : list) {
// it may contain duplicates, remove duplicates
Integer toAdd = Integer.parseInt(element);
@@ -123,11 +93,39 @@ public final class ColumnProjectionUtils
return result;
}
- /**
- * Clears the read column ids set in the conf, and will read all columns.
- */
- public static void setFullyReadColumns(Configuration conf) {
- conf.set(READ_COLUMN_IDS_CONF_STR, "");
+ private static void setReadColumnIDConf(Configuration conf, String id) {
+ if (id.trim().isEmpty()) {
+ conf.set(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT);
+ } else {
+ conf.set(READ_COLUMN_IDS_CONF_STR, id);
+ }
+ }
+
+ private static void appendReadColumnNames(Configuration conf, List<String> cols) {
+ String old = conf.get(READ_COLUMN_NAMES_CONF_STR, "");
+ StringBuilder result = new StringBuilder(old);
+ boolean first = old.isEmpty();
+ for(String col: cols) {
+ if (first) {
+ first = false;
+ } else {
+ result.append(',');
+ }
+ result.append(col);
+ }
+ conf.set(READ_COLUMN_NAMES_CONF_STR, result.toString());
+ }
+
+ private static String toReadColumnIDString(List<Integer> ids) {
+ String id = "";
+ for (int i = 0; i < ids.size(); i++) {
+ if (i == 0) {
+ id = id + ids.get(i);
+ } else {
+ id = id + StringUtils.COMMA_STR + ids.get(i);
+ }
+ }
+ return id;
}
private ColumnProjectionUtils() {
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java Mon Sep 23 20:40:54 2013
@@ -21,8 +21,10 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.rmi.server.UID;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -52,6 +54,20 @@ import org.apache.hadoop.io.Writable;
class AvroDeserializer {
private static final Log LOG = LogFactory.getLog(AvroDeserializer.class);
/**
+ * Set of already seen and valid record readers IDs which doesn't need re-encoding
+ */
+ private final HashSet<UID> noEncodingNeeded = new HashSet<UID>();
+ /**
+ * Map of record reader ID and the associated re-encoder. It contains only the record readers
+ * that record needs to be re-encoded.
+ */
+ private final HashMap<UID, SchemaReEncoder> reEncoderCache = new HashMap<UID, SchemaReEncoder>();
+ /**
+ * Flag to print the re-encoding warning message only once. Avoid excessive logging for each
+ * record encoding.
+ */
+ private static boolean warnedOnce = false;
+ /**
* When encountering a record with an older schema than the one we're trying
* to read, it is necessary to re-encode with a reader against the newer schema.
* Because Hive doesn't provide a way to pass extra information to the
@@ -64,16 +80,15 @@ class AvroDeserializer {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>();
private BinaryDecoder binaryDecoder = null;
- private final InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>> gdrCache
- = new InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>>() {
- @Override
- protected GenericDatumReader<GenericRecord> makeInstance(ReaderWriterSchemaPair hv) {
- return new GenericDatumReader<GenericRecord>(hv.getWriter(), hv.getReader());
- }
- };
- public GenericRecord reencode(GenericRecord r, Schema readerSchema)
- throws AvroSerdeException {
+ GenericDatumReader<GenericRecord> gdr = null;
+
+ public SchemaReEncoder(Schema writer, Schema reader) {
+ gdr = new GenericDatumReader<GenericRecord>(writer, reader);
+ }
+
+ public GenericRecord reencode(GenericRecord r)
+ throws AvroSerdeException {
baos.reset();
BinaryEncoder be = EncoderFactory.get().directBinaryEncoder(baos, null);
@@ -84,8 +99,6 @@ class AvroDeserializer {
binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bais, binaryDecoder);
- ReaderWriterSchemaPair pair = new ReaderWriterSchemaPair(r.getSchema(), readerSchema);
- GenericDatumReader<GenericRecord> gdr = gdrCache.retrieve(pair);
return gdr.read(r, binaryDecoder);
} catch (IOException e) {
@@ -95,7 +108,6 @@ class AvroDeserializer {
}
private List<Object> row;
- private SchemaReEncoder reEncoder;
/**
* Deserialize an Avro record, recursing into its component fields and
@@ -127,14 +139,31 @@ class AvroDeserializer {
AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable;
GenericRecord r = recordWritable.getRecord();
- // Check if we're working with an evolved schema
- if(!r.getSchema().equals(readerSchema)) {
- LOG.warn("Received different schemas. Have to re-encode: " +
- r.getSchema().toString(false));
- if(reEncoder == null) {
- reEncoder = new SchemaReEncoder();
+ UID recordReaderId = recordWritable.getRecordReaderID();
+ //If the record reader (from which the record is originated) is already seen and valid,
+ //no need to re-encode the record.
+ if(!noEncodingNeeded.contains(recordReaderId)) {
+ SchemaReEncoder reEncoder = null;
+ //Check if the record record is already encoded once. If it does
+ //reuse the encoder.
+ if(reEncoderCache.containsKey(recordReaderId)) {
+ reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder
+ } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema?
+ //Create and store new encoder in the map for re-use
+ reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema);
+ reEncoderCache.put(recordReaderId, reEncoder);
+ } else{
+ LOG.info("Adding new valid RRID :" + recordReaderId);
+ noEncodingNeeded.add(recordReaderId);
+ }
+ if(reEncoder != null) {
+ if (!warnedOnce) {
+ LOG.warn("Received different schemas. Have to re-encode: " +
+ r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId);
+ warnedOnce = true;
+ }
+ r = reEncoder.reencode(r);
}
- r = reEncoder.reencode(r, readerSchema);
}
workerBase(row, columnNames, columnTypes, r);
@@ -288,4 +317,13 @@ class AvroDeserializer {
return map;
}
+
+ public HashSet<UID> getNoEncodingNeeded() {
+ return noEncodingNeeded;
+ }
+
+ public HashMap<UID, SchemaReEncoder> getReEncoderCache() {
+ return reEncoderCache;
+ }
+
}
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java Mon Sep 23 20:40:54 2013
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hive.serde2.avro;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.rmi.server.UID;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
@@ -28,12 +35,6 @@ import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
/**
* Wrapper around an Avro GenericRecord. Necessary because Hive's deserializer
* will happily deserialize any object - as long as it's a writable.
@@ -41,6 +42,10 @@ import java.io.InputStream;
public class AvroGenericRecordWritable implements Writable{
GenericRecord record;
private BinaryDecoder binaryDecoder;
+ /**
+ * Unique Id determine which record reader created this record
+ */
+ private UID recordReaderID;
// There are two areas of exploration for optimization here.
// 1. We're serializing the schema with every object. If we assume the schema
@@ -68,6 +73,7 @@ public class AvroGenericRecordWritable i
// Write schema since we need it to pull the data out. (see point #1 above)
String schemaString = record.getSchema().toString(false);
out.writeUTF(schemaString);
+ recordReaderID.write(out);
// Write record to byte buffer
GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>();
@@ -80,9 +86,18 @@ public class AvroGenericRecordWritable i
@Override
public void readFields(DataInput in) throws IOException {
Schema schema = Schema.parse(in.readUTF());
+ recordReaderID = UID.read(in);
record = new GenericData.Record(schema);
binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder((InputStream) in, binaryDecoder);
GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>(schema);
record = gdr.read(record, binaryDecoder);
}
+
+ public UID getRecordReaderID() {
+ return recordReaderID;
+ }
+
+ public void setRecordReaderID(UID recordReaderID) {
+ this.recordReaderID = recordReaderID;
+ }
}
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java Mon Sep 23 20:40:54 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.serde2.columnar;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@@ -78,9 +79,9 @@ public class ColumnarSerDe extends Colum
* @see SerDe#initialize(Configuration, Properties)
*/
@Override
- public void initialize(Configuration job, Properties tbl) throws SerDeException {
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
- serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, getClass().getName());
+ serdeParams = LazySimpleSerDe.initSerdeParams(conf, tbl, getClass().getName());
// Create the ObjectInspectors for the fields. Note: Currently
// ColumnarObject uses same ObjectInpector as LazyStruct
@@ -89,14 +90,20 @@ public class ColumnarSerDe extends Colum
.getSeparators(), serdeParams.getNullSequence(), serdeParams
.isEscaped(), serdeParams.getEscapeChar());
- java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(job);
-
- cachedLazyStruct = new ColumnarStruct(cachedObjectInspector, notSkipIDs,
- serdeParams.getNullSequence());
-
int size = serdeParams.getColumnTypes().size();
+ List<Integer> notSkipIDs = new ArrayList<Integer>();
+ if (conf == null || ColumnProjectionUtils.isReadAllColumns(conf)) {
+ for (int i = 0; i < size; i++ ) {
+ notSkipIDs.add(i);
+ }
+ } else {
+ notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
+ }
+ cachedLazyStruct = new ColumnarStruct(
+ cachedObjectInspector, notSkipIDs, serdeParams.getNullSequence());
+
super.initialize(size);
- LOG.debug("ColumnarSerDe initialized with: columnNames="
+ LOG.info("ColumnarSerDe initialized with: columnNames="
+ serdeParams.getColumnNames() + " columnTypes="
+ serdeParams.getColumnTypes() + " separator="
+ Arrays.asList(serdeParams.getSeparators()) + " nullstring="
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Mon Sep 23 20:40:54 2013
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.serde2.columnar;
-import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,22 +49,10 @@ public class ColumnarStruct extends Colu
*
* @param oi
* the ObjectInspector representing the type of this LazyStruct.
- */
- public ColumnarStruct(ObjectInspector oi) {
- this(oi, null, null);
- }
-
- /**
- * Construct a ColumnarStruct object with the TypeInfo. It creates the first
- * level object at the first place
- *
- * @param oi
- * the ObjectInspector representing the type of this LazyStruct.
* @param notSkippedColumnIDs
* the column ids that should not be skipped
*/
- public ColumnarStruct(ObjectInspector oi,
- ArrayList<Integer> notSkippedColumnIDs, Text nullSequence) {
+ public ColumnarStruct(ObjectInspector oi, List<Integer> notSkippedColumnIDs, Text nullSequence) {
super(oi, notSkippedColumnIDs);
if (nullSequence != null) {
this.nullSequence = nullSequence;
@@ -84,7 +72,7 @@ public class ColumnarStruct extends Colu
}
return fieldLen;
}
-
+
@Override
protected LazyObjectBase createLazyObjectBase(ObjectInspector objectInspector) {
return LazyFactory.createLazyObject(objectInspector);
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java Mon Sep 23 20:40:54 2013
@@ -122,22 +122,13 @@ public abstract class ColumnarStructBase
private FieldInfo[] fieldInfoList = null;
private ArrayList<Object> cachedList;
- public ColumnarStructBase(ObjectInspector oi,
- ArrayList<Integer> notSkippedColumnIDs) {
+ public ColumnarStructBase(ObjectInspector oi, List<Integer> notSkippedColumnIDs) {
List<? extends StructField> fieldRefs = ((StructObjectInspector) oi)
.getAllStructFieldRefs();
int num = fieldRefs.size();
fieldInfoList = new FieldInfo[num];
- // if no columns is set to be skipped, add all columns in
- // 'notSkippedColumnIDs'
- if (notSkippedColumnIDs == null || notSkippedColumnIDs.size() == 0) {
- for (int i = 0; i < num; i++) {
- notSkippedColumnIDs.add(i);
- }
- }
-
for (int i = 0; i < num; i++) {
ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
fieldInfoList[i] = new FieldInfo(
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java Mon Sep 23 20:40:54 2013
@@ -17,22 +17,23 @@
*/
package org.apache.hadoop.hive.serde2.columnar;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
@@ -66,9 +67,17 @@ public class LazyBinaryColumnarSerDe ext
cachedObjectInspector = LazyBinaryFactory.createColumnarStructInspector(
columnNames, columnTypes);
- java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
- cachedLazyStruct = new LazyBinaryColumnarStruct(cachedObjectInspector, notSkipIDs);
int size = columnTypes.size();
+ List<Integer> notSkipIDs = new ArrayList<Integer>();
+ if (conf == null || ColumnProjectionUtils.isReadAllColumns(conf)) {
+ for (int i = 0; i < size; i++ ) {
+ notSkipIDs.add(i);
+ }
+ } else {
+ notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
+ }
+ cachedLazyStruct = new LazyBinaryColumnarStruct(cachedObjectInspector, notSkipIDs);
+
super.initialize(size);
}
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java Mon Sep 23 20:40:54 2013
@@ -18,26 +18,22 @@
package org.apache.hadoop.hive.serde2.columnar;
-import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
public class LazyBinaryColumnarStruct extends ColumnarStructBase {
- public LazyBinaryColumnarStruct(ObjectInspector oi, ArrayList<Integer> notSkippedColumnIDs) {
+ public LazyBinaryColumnarStruct(ObjectInspector oi, List<Integer> notSkippedColumnIDs) {
super(oi, notSkippedColumnIDs);
}
- static VInt vInt = new LazyBinaryUtils.VInt();
-
@Override
protected int getLength(ObjectInspector objectInspector, ByteArrayRef cachedByteArrayRef,
int start, int length) {
@@ -48,8 +44,8 @@ public class LazyBinaryColumnarStruct ex
if (category.equals(Category.PRIMITIVE)) {
PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) objectInspector)
.getPrimitiveCategory();
- if (primitiveCategory.equals(PrimitiveCategory.STRING) && (length == 1) &&
- (cachedByteArrayRef.getData()[start]
+ if (primitiveCategory.equals(PrimitiveCategory.STRING) && (length == 1) &&
+ (cachedByteArrayRef.getData()[start]
== LazyBinaryColumnarSerDe.INVALID_UTF__SINGLE_BYTE[0])) {
return 0;
}
Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java Mon Sep 23 20:40:54 2013
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.rmi.server.UID;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
@@ -500,4 +501,64 @@ public class TestAvroDeserializer {
assertEquals(expected, soi.getPrimitiveJavaObject(rowElement));
}
}
+
+ @Test
+ public void verifyCaching() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+ GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema());
+ innerRecord.put("int1", 42);
+ innerRecord.put("boolean1", true);
+ innerRecord.put("long1", 42432234234l);
+ record.put("aRecord", innerRecord);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ UID recordReaderID = new UID();
+ garw.setRecordReaderID(recordReaderID);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row =
+ (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+
+ assertEquals(1, de.getNoEncodingNeeded().size());
+ assertEquals(0, de.getReEncoderCache().size());
+
+ // Read the record with the same record reader ID
+ row = (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+
+ //Expecting not to change the size of internal structures
+ assertEquals(1, de.getNoEncodingNeeded().size());
+ assertEquals(0, de.getReEncoderCache().size());
+
+ //Read the record with **different** record reader ID
+ garw.setRecordReaderID(new UID()); //New record reader ID
+ row = (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+
+ //Expecting to change the size of internal structures
+ assertEquals(2, de.getNoEncodingNeeded().size());
+ assertEquals(0, de.getReEncoderCache().size());
+
+ //Read the record with **different** record reader ID and **evolved** schema
+ Schema evolvedSchema = Schema.parse(s.toString());
+ evolvedSchema.getField("aRecord").schema().addProp("Testing", "meaningless");
+ garw.setRecordReaderID(recordReaderID = new UID()); //New record reader ID
+ row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema);
+
+ //Expecting to change the size of internal structures
+ assertEquals(2, de.getNoEncodingNeeded().size());
+ assertEquals(1, de.getReEncoderCache().size());
+
+ //Read the record with existing record reader ID and same **evolved** schema
+ garw.setRecordReaderID(recordReaderID); //Reuse record reader ID
+ row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema);
+
+ //Expecting NOT to change the size of internal structures
+ assertEquals(2, de.getNoEncodingNeeded().size());
+ assertEquals(1, de.getReEncoderCache().size());
+
+ }
}
Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java Mon Sep 23 20:40:54 2013
@@ -17,18 +17,19 @@
*/
package org.apache.hadoop.hive.serde2.avro;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.rmi.server.UID;
-import static org.junit.Assert.assertEquals;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
public class TestGenericAvroRecordWritable {
private static final String schemaJSON = "{\n" +
@@ -59,12 +60,14 @@ public class TestGenericAvroRecordWritab
assertEquals("Doctor", gr.get("last"));
AvroGenericRecordWritable garw = new AvroGenericRecordWritable(gr);
+ garw.setRecordReaderID(new UID());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream daos = new DataOutputStream(baos);
garw.write(daos);
AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable(gr);
+ garw2.setRecordReaderID(new UID());
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputStream dais = new DataInputStream(bais);
Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java Mon Sep 23 20:40:54 2013
@@ -17,15 +17,15 @@
*/
package org.apache.hadoop.hive.serde2.avro;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
public class TestSchemaReEncoder {
@Test
public void schemasCanAddFields() throws SerDeException {
@@ -62,8 +62,8 @@ public class TestSchemaReEncoder {
GenericRecord record = new GenericData.Record(originalSchema);
record.put("text", "it is a far better thing I do, yadda, yadda");
assertTrue(GenericData.get().validate(originalSchema, record));
- AvroDeserializer.SchemaReEncoder schemaReEncoder = new AvroDeserializer.SchemaReEncoder();
- GenericRecord r2 = schemaReEncoder.reencode(record, evolvedSchema);
+ AvroDeserializer.SchemaReEncoder schemaReEncoder = new AvroDeserializer.SchemaReEncoder(record.getSchema(), evolvedSchema);
+ GenericRecord r2 = schemaReEncoder.reencode(record);
assertTrue(GenericData.get().validate(evolvedSchema, r2));
assertEquals("Hi!", r2.get("new_kid").toString());
@@ -104,7 +104,8 @@ public class TestSchemaReEncoder {
record.put("a", 19);
assertTrue(GenericData.get().validate(originalSchema2, record));
- r2 = schemaReEncoder.reencode(record, evolvedSchema2);
+ schemaReEncoder = new AvroDeserializer.SchemaReEncoder(record.getSchema(), evolvedSchema2);
+ r2 = schemaReEncoder.reencode(record);
assertTrue(GenericData.get().validate(evolvedSchema2, r2));
assertEquals(42l, r2.get("b"));
}
Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java Mon Sep 23 20:40:54 2013
@@ -17,13 +17,14 @@
*/
package org.apache.hadoop.hive.serde2.avro;
-import org.apache.avro.generic.GenericData;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.rmi.server.UID;
+
+import org.apache.avro.generic.GenericData;
class Utils {
// Force Avro to serialize and de-serialize the record to make sure it has a
@@ -31,6 +32,7 @@ class Utils {
public static AvroGenericRecordWritable
serializeAndDeserializeRecord(GenericData.Record record) throws IOException {
AvroGenericRecordWritable garw = new AvroGenericRecordWritable(record);
+ garw.setRecordReaderID(new UID());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream daos = new DataOutputStream(baos);
garw.write(daos);
@@ -39,6 +41,7 @@ class Utils {
DataInputStream dais = new DataInputStream(bais);
AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable();
+ garw2.setRecordReaderID(new UID());
garw2.readFields(dais);
return garw2;
}