You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/01/05 20:03:49 UTC
svn commit: r1649612 - in /hive/trunk: data/files/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/mr/
ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/apache/had...
Author: hashutosh
Date: Mon Jan 5 19:03:49 2015
New Revision: 1649612
URL: http://svn.apache.org/r1649612
Log:
HIVE-7977 : Avoid creating serde for partitions if possible in FetchTask (Navis via Ashutosh Chauhan)
Removed:
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedListObjectInspector.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedMapObjectInspector.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedObjectInspectorFactory.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedUnionObjectInspector.java
Modified:
hive/trunk/data/files/datatypes.txt
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java
Modified: hive/trunk/data/files/datatypes.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/datatypes.txt?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/data/files/datatypes.txt (original)
+++ hive/trunk/data/files/datatypes.txt Mon Jan 5 19:03:49 2015
@@ -1,3 +1,3 @@
-\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N
--1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N
+\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N
+-1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N\N
1true1.11121x2ykva92.2111.01abcd1111213142212212x1abcd22012-04-22 09:00:00.123456789123456789.0123456YWJjZA==2013-01-01abc123abc123X'01FF'
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Jan 5 19:03:49 2015
@@ -222,7 +222,7 @@ public class Driver implements CommandPr
String tableName = "result";
List<FieldSchema> lst = null;
try {
- lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer());
+ lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf));
} catch (Exception e) {
LOG.warn("Error getting schema: "
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Mon Jan 5 19:03:49 2015
@@ -26,8 +26,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.lang3.StringEscapeUtils;
+import com.google.common.collect.Iterators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -49,14 +51,13 @@ import org.apache.hadoop.hive.ql.plan.Pa
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -70,74 +71,81 @@ import org.apache.hadoop.mapred.JobConfi
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.AnnotationUtils;
/**
* FetchTask implementation.
**/
public class FetchOperator implements Serializable {
- static Log LOG = LogFactory.getLog(FetchOperator.class.getName());
- static LogHelper console = new LogHelper(LOG);
+ static final Log LOG = LogFactory.getLog(FetchOperator.class.getName());
+ static final LogHelper console = new LogHelper(LOG);
public static final String FETCH_OPERATOR_DIRECTORY_LIST =
"hive.complete.dir.list";
- private boolean isNativeTable;
private FetchWork work;
- protected Operator<?> operator; // operator tree for processing row further (option)
- private int splitNum;
- private PartitionDesc currPart;
- private TableDesc currTbl;
- private boolean tblDataDone;
- private FooterBuffer footerBuffer = null;
- private int headerCount = 0;
- private int footerCount = 0;
+ private Operator<?> operator; // operator tree for processing row further (optional)
- private boolean hasVC;
- private boolean isPartitioned;
+ private final boolean hasVC;
+ private final boolean isStatReader;
+ private final boolean isPartitioned;
+ private final boolean isNonNativeTable;
private StructObjectInspector vcsOI;
private List<VirtualColumn> vcCols;
private ExecMapperContext context;
+ private transient Deserializer tableSerDe;
+ private transient StructObjectInspector tableOI;
+ private transient StructObjectInspector partKeyOI;
+ private transient StructObjectInspector convertedOI;
+
+ private transient Iterator<Path> iterPath;
+ private transient Iterator<PartitionDesc> iterPartDesc;
+ private transient Iterator<FetchInputFormatSplit> iterSplits = Iterators.emptyIterator();
+
+ private transient Path currPath;
+ private transient PartitionDesc currDesc;
+ private transient Deserializer currSerDe;
+ private transient Converter ObjectConverter;
private transient RecordReader<WritableComparable, Writable> currRecReader;
- private transient FetchInputFormatSplit[] inputSplits;
- private transient InputFormat inputFormat;
+
private transient JobConf job;
private transient WritableComparable key;
private transient Writable value;
private transient Object[] vcValues;
- private transient Deserializer serde;
- private transient Deserializer tblSerde;
- private transient Converter partTblObjectInspectorConverter;
- private transient Iterator<Path> iterPath;
- private transient Iterator<PartitionDesc> iterPartDesc;
- private transient Path currPath;
- private transient StructObjectInspector objectInspector;
- private transient StructObjectInspector rowObjectInspector;
- private transient ObjectInspector partitionedTableOI;
+ private transient int headerCount;
+ private transient int footerCount;
+ private transient FooterBuffer footerBuffer;
+
+ private transient StructObjectInspector outputOI;
private transient Object[] row;
- public FetchOperator() {
- }
-
- public FetchOperator(FetchWork work, JobConf job) {
- this.job = job;
- this.work = work;
- initialize();
+ public FetchOperator(FetchWork work, JobConf job) throws HiveException {
+ this(work, job, null, null);
}
public FetchOperator(FetchWork work, JobConf job, Operator<?> operator,
- List<VirtualColumn> vcCols) {
+ List<VirtualColumn> vcCols) throws HiveException {
this.job = job;
this.work = work;
this.operator = operator;
this.vcCols = vcCols;
+ this.hasVC = vcCols != null && !vcCols.isEmpty();
+ this.isStatReader = work.getTblDesc() == null;
+ this.isPartitioned = !isStatReader && work.isPartitioned();
+ this.isNonNativeTable = !isStatReader && work.getTblDesc().isNonNative();
initialize();
}
- private void initialize() {
- if (hasVC = vcCols != null && !vcCols.isEmpty()) {
+ private void initialize() throws HiveException {
+ if (isStatReader) {
+ outputOI = work.getStatRowOI();
+ return;
+ }
+ if (hasVC) {
List<String> names = new ArrayList<String>(vcCols.size());
List<ObjectInspector> inspectors = new ArrayList<ObjectInspector>(vcCols.size());
for (VirtualColumn vc : vcCols) {
@@ -147,8 +155,6 @@ public class FetchOperator implements Se
vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
vcValues = new Object[vcCols.size()];
}
- isPartitioned = work.isPartitioned();
- tblDataDone = false;
if (hasVC && isPartitioned) {
row = new Object[3];
} else if (hasVC || isPartitioned) {
@@ -156,21 +162,27 @@ public class FetchOperator implements Se
} else {
row = new Object[1];
}
- if (work.getTblDesc() != null) {
- isNativeTable = !work.getTblDesc().isNonNative();
+ if (isPartitioned) {
+ iterPath = work.getPartDir().iterator();
+ iterPartDesc = work.getPartDesc().iterator();
} else {
- isNativeTable = true;
+ iterPath = Arrays.asList(work.getTblDir()).iterator();
+ iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null));
}
- setupExecContext();
+ outputOI = setupOutputObjectInspector();
+ context = setupExecContext(operator, work.getPathLists());
}
- private void setupExecContext() {
+ private ExecMapperContext setupExecContext(Operator operator, List<Path> paths) {
+ ExecMapperContext context = null;
if (hasVC || work.getSplitSample() != null) {
context = new ExecMapperContext(job);
if (operator != null) {
operator.setExecContext(context);
}
}
+ setFetchOperatorContext(job, paths);
+ return context;
}
public FetchWork getWork() {
@@ -181,42 +193,6 @@ public class FetchOperator implements Se
this.work = work;
}
- public int getSplitNum() {
- return splitNum;
- }
-
- public void setSplitNum(int splitNum) {
- this.splitNum = splitNum;
- }
-
- public PartitionDesc getCurrPart() {
- return currPart;
- }
-
- public void setCurrPart(PartitionDesc currPart) {
- this.currPart = currPart;
- }
-
- public TableDesc getCurrTbl() {
- return currTbl;
- }
-
- public void setCurrTbl(TableDesc currTbl) {
- this.currTbl = currTbl;
- }
-
- public boolean isTblDataDone() {
- return tblDataDone;
- }
-
- public void setTblDataDone(boolean tblDataDone) {
- this.tblDataDone = tblDataDone;
- }
-
- public boolean isEmptyTable() {
- return work.getTblDir() == null && (work.getPartDir() == null || work.getPartDir().isEmpty());
- }
-
/**
* A cache of InputFormat instances.
*/
@@ -243,146 +219,54 @@ public class FetchOperator implements Se
return format;
}
- private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception {
- Deserializer serde = table.getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDeWithoutErrorCheck(serde, job, table.getProperties(), null);
- return createRowInspector(getStructOIFrom(serde.getObjectInspector()));
- }
-
- private StructObjectInspector getRowInspectorFromPartition(PartitionDesc partition,
- ObjectInspector partitionOI) throws Exception {
-
- String pcols = partition.getTableDesc().getProperties().getProperty(
+ private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception {
+ String pcols = tableDesc.getProperties().getProperty(
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- String[] partKeys = pcols.trim().split("/");
- String pcolTypes = partition.getTableDesc().getProperties().getProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- String[] partKeyTypes = pcolTypes.trim().split(":");
- row[1] = createPartValue(partKeys, partition.getPartSpec(), partKeyTypes);
+ String pcolTypes = tableDesc.getProperties().getProperty(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- return createRowInspector(getStructOIFrom(partitionOI), partKeys, partKeyTypes);
- }
-
- private StructObjectInspector getRowInspectorFromPartitionedTable(TableDesc table)
- throws Exception {
- Deserializer serde = table.getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null);
- String pcols = table.getProperties().getProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
String[] partKeys = pcols.trim().split("/");
- String pcolTypes = table.getProperties().getProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- String[] partKeyTypes = pcolTypes.trim().split(":");
- row[1] = null;
- return createRowInspector(getStructOIFrom(serde.getObjectInspector()), partKeys, partKeyTypes);
- }
-
- private StructObjectInspector getStructOIFrom(ObjectInspector current) throws SerDeException {
- if (objectInspector != null) {
- current = DelegatedObjectInspectorFactory.reset(objectInspector, current);
- } else {
- current = DelegatedObjectInspectorFactory.wrap(current);
- }
- return objectInspector = (StructObjectInspector) current;
- }
-
- private StructObjectInspector createRowInspector(StructObjectInspector current)
- throws SerDeException {
- return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector(
- Arrays.asList(current, vcsOI)) : current;
- }
-
- private StructObjectInspector createRowInspector(StructObjectInspector current, String[] partKeys, String[] partKeyTypes)
- throws SerDeException {
- List<String> partNames = new ArrayList<String>();
- List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>();
+ String[] partKeyTypes = pcolTypes.trim().split(":");
+ ObjectInspector[] inspectors = new ObjectInspector[partKeys.length];
for (int i = 0; i < partKeys.length; i++) {
- String key = partKeys[i];
- partNames.add(key);
- ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ inspectors[i] = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
- partObjectInspectors.add(oi);
}
- StructObjectInspector partObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(partNames, partObjectInspectors);
-
- return ObjectInspectorFactory.getUnionStructObjectInspector(
- hasVC ? Arrays.asList(current, partObjectInspector, vcsOI) :
- Arrays.asList(current, partObjectInspector));
+ return ObjectInspectorFactory.getStandardStructObjectInspector(
+ Arrays.asList(partKeys), Arrays.asList(inspectors));
}
- private Object[] createPartValue(String[] partKeys, Map<String, String> partSpec, String[] partKeyTypes) {
- Object[] partValues = new Object[partKeys.length];
- for (int i = 0; i < partKeys.length; i++) {
- String key = partKeys[i];
- ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
- TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
- partValues[i] =
- ObjectInspectorConverters.
- getConverter(PrimitiveObjectInspectorFactory.
- javaStringObjectInspector, oi).convert(partSpec.get(key));
+ private Object[] createPartValue(PartitionDesc partDesc, StructObjectInspector partOI) {
+ Map<String, String> partSpec = partDesc.getPartSpec();
+ List<? extends StructField> fields = partOI.getAllStructFieldRefs();
+ Object[] partValues = new Object[fields.size()];
+ for (int i = 0; i < partValues.length; i++) {
+ StructField field = fields.get(i);
+ String value = partSpec.get(field.getFieldName());
+ ObjectInspector oi = field.getFieldObjectInspector();
+ partValues[i] = ObjectInspectorConverters.getConverter(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi).convert(value);
}
return partValues;
}
- private void getNextPath() throws Exception {
- // first time
- if (iterPath == null) {
- if (work.isNotPartitioned()) {
- if (!tblDataDone) {
- currPath = work.getTblDir();
- currTbl = work.getTblDesc();
- if (isNativeTable) {
- FileSystem fs = currPath.getFileSystem(job);
- if (fs.exists(currPath)) {
- FileStatus[] fStats = listStatusUnderPath(fs, currPath);
- for (FileStatus fStat : fStats) {
- if (fStat.getLen() > 0) {
- tblDataDone = true;
- break;
- }
- }
- }
- } else {
- tblDataDone = true;
- }
-
- if (!tblDataDone) {
- currPath = null;
- }
- return;
- } else {
- currTbl = null;
- currPath = null;
- }
- return;
- } else {
- setFetchOperatorContext(job, work.getPartDir());
- iterPath = work.getPartDir().iterator();
- iterPartDesc = work.getPartDesc().iterator();
- }
- }
-
+ private boolean getNextPath() throws Exception {
while (iterPath.hasNext()) {
- Path nxt = iterPath.next();
- PartitionDesc prt = null;
- if (iterPartDesc != null) {
- prt = iterPartDesc.next();
- }
- FileSystem fs = nxt.getFileSystem(job);
- if (fs.exists(nxt)) {
- FileStatus[] fStats = listStatusUnderPath(fs, nxt);
- for (FileStatus fStat : fStats) {
+ currPath = iterPath.next();
+ currDesc = iterPartDesc.next();
+ if (isNonNativeTable) {
+ return true;
+ }
+ FileSystem fs = currPath.getFileSystem(job);
+ if (fs.exists(currPath)) {
+ for (FileStatus fStat : listStatusUnderPath(fs, currPath)) {
if (fStat.getLen() > 0) {
- currPath = nxt;
- if (iterPartDesc != null) {
- currPart = prt;
- }
- return;
+ return true;
}
}
}
}
+ return false;
}
/**
@@ -390,119 +274,53 @@ public class FetchOperator implements Se
* This helps InputFormats make decisions based on the scope of the complete
* operation.
* @param conf the configuration to modify
- * @param partDirs the list of partition directories
+ * @param paths the list of input directories
*/
- static void setFetchOperatorContext(JobConf conf,
- ArrayList<Path> partDirs) {
- if (partDirs != null) {
+ static void setFetchOperatorContext(JobConf conf, List<Path> paths) {
+ if (paths != null) {
StringBuilder buff = new StringBuilder();
- boolean first = true;
- for(Path p: partDirs) {
- if (first) {
- first = false;
- } else {
+ for (Path path : paths) {
+ if (buff.length() > 0) {
buff.append('\t');
}
- buff.append(StringEscapeUtils.escapeJava(p.toString()));
+ buff.append(StringEscapeUtils.escapeJava(path.toString()));
}
conf.set(FETCH_OPERATOR_DIRECTORY_LIST, buff.toString());
}
}
- /**
- * A cache of Object Inspector Settable Properties.
- */
- private static Map<ObjectInspector, Boolean> oiSettableProperties = new HashMap<ObjectInspector, Boolean>();
-
private RecordReader<WritableComparable, Writable> getRecordReader() throws Exception {
- if (currPath == null) {
- getNextPath();
- if (currPath == null) {
+ if (!iterSplits.hasNext()) {
+ FetchInputFormatSplit[] splits = getNextSplits();
+ if (splits == null) {
return null;
}
-
- // not using FileInputFormat.setInputPaths() here because it forces a
- // connection
- // to the default file system - which may or may not be online during pure
- // metadata
- // operations
- job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath
- .toString()));
-
- // Fetch operator is not vectorized and as such turn vectorization flag off so that
- // non-vectorized record reader is created below.
- if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
- HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
- }
-
- PartitionDesc partDesc;
- if (currTbl == null) {
- partDesc = currPart;
+ if (!isPartitioned || convertedOI == null) {
+ currSerDe = tableSerDe;
+ ObjectConverter = null;
} else {
- partDesc = new PartitionDesc(currTbl, null);
- }
-
- Class<? extends InputFormat> formatter = partDesc.getInputFileFormatClass();
- inputFormat = getInputFormatFromCache(formatter, job);
- Utilities.copyTableJobPropertiesToConf(partDesc.getTableDesc(), job);
- InputSplit[] splits = inputFormat.getSplits(job, 1);
- FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
- for (int i = 0; i < splits.length; i++) {
- inputSplits[i] = new FetchInputFormatSplit(splits[i], formatter.getName());
+ currSerDe = needConversion(currDesc) ? currDesc.getDeserializer(job) : tableSerDe;
+ ObjectInspector inputOI = currSerDe.getObjectInspector();
+ ObjectConverter = ObjectInspectorConverters.getConverter(inputOI, convertedOI);
}
- if (work.getSplitSample() != null) {
- inputSplits = splitSampling(work.getSplitSample(), inputSplits);
+ if (isPartitioned) {
+ row[1] = createPartValue(currDesc, partKeyOI);
}
- this.inputSplits = inputSplits;
-
- splitNum = 0;
- serde = partDesc.getDeserializer(job);
- SerDeUtils.initializeSerDe(serde, job, partDesc.getTableDesc().getProperties(),
- partDesc.getProperties());
-
- if (currTbl != null) {
- tblSerde = serde;
- }
- else {
- tblSerde = currPart.getTableDesc().getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(tblSerde, job, currPart.getTableDesc().getProperties(), null);
- }
-
- ObjectInspector outputOI = ObjectInspectorConverters.getConvertedOI(
- serde.getObjectInspector(),
- partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI,
- oiSettableProperties);
-
- partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
- serde.getObjectInspector(), outputOI);
+ iterSplits = Arrays.asList(splits).iterator();
if (LOG.isDebugEnabled()) {
LOG.debug("Creating fetchTask with deserializer typeinfo: "
- + serde.getObjectInspector().getTypeName());
+ + currSerDe.getObjectInspector().getTypeName());
LOG.debug("deserializer properties:\ntable properties: " +
- partDesc.getTableDesc().getProperties() + "\npartition properties: " +
- partDesc.getProperties());
- }
-
- if (currPart != null) {
- getRowInspectorFromPartition(currPart, outputOI);
- }
- }
-
- if (splitNum >= inputSplits.length) {
- if (currRecReader != null) {
- currRecReader.close();
- currRecReader = null;
+ currDesc.getTableDesc().getProperties() + "\npartition properties: " +
+ currDesc.getProperties());
}
- currPath = null;
- return getRecordReader();
}
- final FetchInputFormatSplit target = inputSplits[splitNum];
+ final FetchInputFormatSplit target = iterSplits.next();
@SuppressWarnings("unchecked")
- final RecordReader<WritableComparable, Writable> reader =
- inputFormat.getRecordReader(target.getInputSplit(), job, Reporter.NULL);
+ final RecordReader<WritableComparable, Writable> reader = target.getRecordReader(job);
if (hasVC || work.getSplitSample() != null) {
currRecReader = new HiveRecordReader<WritableComparable, Writable>(reader, job) {
@Override
@@ -517,23 +335,52 @@ public class FetchOperator implements Se
}
};
((HiveContextAwareRecordReader)currRecReader).
- initIOContext(target, job, inputFormat.getClass(), reader);
+ initIOContext(target, job, target.inputFormat.getClass(), reader);
} else {
currRecReader = reader;
}
- splitNum++;
key = currRecReader.createKey();
value = currRecReader.createValue();
+ headerCount = footerCount = 0;
return currRecReader;
}
+ protected FetchInputFormatSplit[] getNextSplits() throws Exception {
+ while (getNextPath()) {
+ // not using FileInputFormat.setInputPaths() here because it forces a connection to the
+ // default file system - which may or may not be online during pure metadata operations
+ job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString()));
+
+ // Fetch operator is not vectorized and as such turn vectorization flag off so that
+ // non-vectorized record reader is created below.
+ HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+
+ Class<? extends InputFormat> formatter = currDesc.getInputFileFormatClass();
+ Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job);
+ InputFormat inputFormat = getInputFormatFromCache(formatter, job);
+
+ InputSplit[] splits = inputFormat.getSplits(job, 1);
+ FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat);
+ }
+ if (work.getSplitSample() != null) {
+ inputSplits = splitSampling(work.getSplitSample(), inputSplits);
+ }
+ if (inputSplits.length > 0) {
+ return inputSplits;
+ }
+ }
+ return null;
+ }
+
private FetchInputFormatSplit[] splitSampling(SplitSample splitSample,
FetchInputFormatSplit[] splits) {
long totalSize = 0;
for (FetchInputFormatSplit split: splits) {
totalSize += split.getLength();
}
- List<FetchInputFormatSplit> result = new ArrayList<FetchInputFormatSplit>();
+ List<FetchInputFormatSplit> result = new ArrayList<FetchInputFormatSplit>(splits.length);
long targetSize = splitSample.getTargetSize(totalSize);
int startIndex = splitSample.getSeedNum() % splits.length;
long size = 0;
@@ -557,18 +404,18 @@ public class FetchOperator implements Se
* Currently only used by FetchTask.
**/
public boolean pushRow() throws IOException, HiveException {
- if(work.getRowsComputedUsingStats() != null) {
+ if (work.getRowsComputedUsingStats() != null) {
for (List<Object> row : work.getRowsComputedUsingStats()) {
operator.processOp(row, 0);
}
- operator.flush();
+ flushRow();
return true;
}
InspectableObject row = getNextRow();
if (row != null) {
pushRow(row);
} else {
- operator.flush();
+ flushRow();
}
return row != null;
}
@@ -577,6 +424,10 @@ public class FetchOperator implements Se
operator.processOp(row.o, 0);
}
+ protected void flushRow() throws HiveException {
+ operator.flush();
+ }
+
private transient final InspectableObject inspectable = new InspectableObject();
/**
@@ -602,28 +453,16 @@ public class FetchOperator implements Se
* If file contains footer, used FooterBuffer to cache and remove footer
* records at the end of the file.
*/
- headerCount = 0;
- footerCount = 0;
- TableDesc table = null;
- if (currTbl != null) {
- table = currTbl;
- } else if (currPart != null) {
- table = currPart.getTableDesc();
- }
- if (table != null) {
- headerCount = Utilities.getHeaderCount(table);
- footerCount = Utilities.getFooterCount(table, job);
- }
+ headerCount = Utilities.getHeaderCount(currDesc.getTableDesc());
+ footerCount = Utilities.getFooterCount(currDesc.getTableDesc(), job);
// Skip header lines.
opNotEOF = Utilities.skipHeader(currRecReader, headerCount, key, value);
// Initialize footer buffer.
- if (opNotEOF) {
- if (footerCount > 0) {
- footerBuffer = new FooterBuffer();
- opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value);
- }
+ if (opNotEOF && footerCount > 0) {
+ footerBuffer = new FooterBuffer();
+ opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value);
}
}
@@ -640,25 +479,24 @@ public class FetchOperator implements Se
if (opNotEOF) {
if (operator != null && context != null && context.inputFileChanged()) {
// The child operators cleanup if input file has changed
- try {
- operator.cleanUpInputFileChanged();
- } catch (HiveException e) {
- throw new IOException(e);
- }
+ operator.cleanUpInputFileChanged();
}
if (hasVC) {
- vcValues = MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, serde);
- row[isPartitioned ? 2 : 1] = vcValues;
+ row[isPartitioned ? 2 : 1] =
+ MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, currSerDe);
+ }
+ Object deserialized = currSerDe.deserialize(value);
+ if (ObjectConverter != null) {
+ deserialized = ObjectConverter.convert(deserialized);
}
- row[0] = partTblObjectInspectorConverter.convert(serde.deserialize(value));
if (hasVC || isPartitioned) {
+ row[0] = deserialized;
inspectable.o = row;
- inspectable.oi = rowObjectInspector;
- return inspectable;
+ } else {
+ inspectable.o = deserialized;
}
- inspectable.o = row[0];
- inspectable.oi = tblSerde.getObjectInspector();
+ inspectable.oi = currSerDe.getObjectInspector();
return inspectable;
} else {
currRecReader.close();
@@ -688,13 +526,13 @@ public class FetchOperator implements Se
context.clear();
context = null;
}
- this.currTbl = null;
this.currPath = null;
this.iterPath = null;
this.iterPartDesc = null;
+ this.iterSplits = Iterators.emptyIterator();
} catch (Exception e) {
throw new HiveException("Failed with exception " + e.getMessage()
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ + StringUtils.stringifyException(e));
}
}
@@ -703,25 +541,33 @@ public class FetchOperator implements Se
*/
public void setupContext(List<Path> paths) {
this.iterPath = paths.iterator();
- if (work.isNotPartitioned()) {
- this.currTbl = work.getTblDesc();
+ List<PartitionDesc> partitionDescs;
+ if (!isPartitioned) {
+ this.iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null));
} else {
this.iterPartDesc = work.getPartDescs(paths).iterator();
}
- setupExecContext();
+ this.context = setupExecContext(operator, paths);
}
/**
* returns output ObjectInspector, never null
*/
- public ObjectInspector getOutputObjectInspector() throws HiveException {
- if(null != work.getStatRowOI()) {
- return work.getStatRowOI();
- }
+ public ObjectInspector getOutputObjectInspector() {
+ return outputOI;
+ }
+
+ private StructObjectInspector setupOutputObjectInspector() throws HiveException {
+ TableDesc tableDesc = work.getTblDesc();
try {
- if (work.isNotPartitioned()) {
- return getRowInspectorFromTable(work.getTblDesc());
+ tableSerDe = tableDesc.getDeserializer(job, true);
+ tableOI = (StructObjectInspector) tableSerDe.getObjectInspector();
+ if (!isPartitioned) {
+ return getTableRowOI(tableOI);
}
+ partKeyOI = getPartitionKeyOI(tableDesc);
+
+ PartitionDesc partDesc = new PartitionDesc(tableDesc, null);
List<PartitionDesc> listParts = work.getPartDesc();
// Chose the table descriptor if none of the partitions is present.
// For eg: consider the query:
@@ -729,39 +575,50 @@ public class FetchOperator implements Se
// Both T1 and T2 and partitioned tables, but T1 does not have any partitions
// FetchOperator is invoked for T1, and listParts is empty. In that case,
// use T1's schema to get the ObjectInspector.
- if (listParts == null || listParts.isEmpty()) {
- return getRowInspectorFromPartitionedTable(work.getTblDesc());
+ if (listParts == null || listParts.isEmpty() || !needConversion(tableDesc, listParts)) {
+ return getPartitionedRowOI(tableOI);
}
+ convertedOI = (StructObjectInspector) ObjectInspectorConverters.getConvertedOI(
+ tableOI, tableOI, null, false);
+ return getPartitionedRowOI(convertedOI);
+ } catch (Exception e) {
+ throw new HiveException("Failed with exception " + e.getMessage()
+ + StringUtils.stringifyException(e));
+ }
+ }
- // Choose any partition. It's OI needs to be converted to the table OI
- // Whenever a new partition is being read, a new converter is being created
- PartitionDesc partition = listParts.get(0);
- Deserializer tblSerde = partition.getTableDesc().getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(tblSerde, job, partition.getTableDesc().getProperties(), null);
-
- partitionedTableOI = null;
- ObjectInspector tableOI = tblSerde.getObjectInspector();
-
- // Get the OI corresponding to all the partitions
- for (PartitionDesc listPart : listParts) {
- partition = listPart;
- Deserializer partSerde = listPart.getDeserializer(job);
- SerDeUtils.initializeSerDe(partSerde, job, partition.getTableDesc().getProperties(),
- listPart.getProperties());
-
- partitionedTableOI = ObjectInspectorConverters.getConvertedOI(
- partSerde.getObjectInspector(), tableOI, oiSettableProperties);
- if (!partitionedTableOI.equals(tableOI)) {
- break;
+ private StructObjectInspector getTableRowOI(StructObjectInspector valueOI) {
+ return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector(
+ Arrays.asList(valueOI, vcsOI)) : valueOI;
+ }
+
+ private StructObjectInspector getPartitionedRowOI(StructObjectInspector valueOI) {
+ return ObjectInspectorFactory.getUnionStructObjectInspector(
+ hasVC ? Arrays.asList(valueOI, partKeyOI, vcsOI) : Arrays.asList(valueOI, partKeyOI));
+ }
+
+ private boolean needConversion(PartitionDesc partitionDesc) {
+ return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc));
+ }
+
+ // if table and all partitions have the same schema and serde, no need to convert
+ private boolean needConversion(TableDesc tableDesc, List<PartitionDesc> partDescs) {
+ Class<?> tableSerDe = tableDesc.getDeserializerClass();
+ String[] schemaProps = AnnotationUtils.getAnnotation(tableSerDe, SerDeSpec.class).schemaProps();
+ Properties tableProps = tableDesc.getProperties();
+ for (PartitionDesc partitionDesc : partDescs) {
+ if (!tableSerDe.getName().equals(partitionDesc.getDeserializerClassName())) {
+ return true;
+ }
+ Properties partProps = partitionDesc.getProperties();
+ for (String schemaProp : schemaProps) {
+ if (!org.apache.commons.lang3.StringUtils.equals(
+ tableProps.getProperty(schemaProp), partProps.getProperty(schemaProp))) {
+ return true;
}
}
- return getRowInspectorFromPartition(partition, partitionedTableOI);
- } catch (Exception e) {
- throw new HiveException("Failed with exception " + e.getMessage()
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
- } finally {
- currPart = null;
}
+ return false;
}
/**
@@ -797,11 +654,17 @@ public class FetchOperator implements Se
// shrinked size for this split. counter part of this in normal mode is
// InputSplitShim.shrinkedLength.
// what's different is that this is evaluated by unit of row using RecordReader.getPos()
- // and that is evaluated by unit of split using InputSplt.getLength().
+ // and that is evaluated by unit of split using InputSplit.getLength().
private long shrinkedLength = -1;
+ private InputFormat inputFormat;
+
+ public FetchInputFormatSplit(InputSplit split, InputFormat inputFormat) {
+ super(split, inputFormat.getClass().getName());
+ this.inputFormat = inputFormat;
+ }
- public FetchInputFormatSplit(InputSplit split, String name) {
- super(split, name);
+ public RecordReader<WritableComparable, Writable> getRecordReader(JobConf job) throws IOException {
+ return inputFormat.getRecordReader(getInputSplit(), job, Reporter.NULL);
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Mon Jan 5 19:03:49 2015
@@ -133,20 +133,20 @@ public class PartitionKeySampler impleme
}
// random sampling
- public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf job,
- Operator<?> operator) {
+ public static FetchOperator createSampler(FetchWork work, HiveConf conf, JobConf job,
+ Operator<?> operator) throws HiveException {
int sampleNum = conf.getIntVar(HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY);
float samplePercent = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY);
if (samplePercent < 0.0 || samplePercent > 1.0) {
throw new IllegalArgumentException("Percentile value must be within the range of 0 to 1.");
}
- FetchSampler sampler = new FetchSampler(work, job, operator);
+ RandomSampler sampler = new RandomSampler(work, job, operator);
sampler.setSampleNum(sampleNum);
sampler.setSamplePercent(samplePercent);
return sampler;
}
- private static class FetchSampler extends FetchOperator {
+ private static class RandomSampler extends FetchOperator {
private int sampleNum = 1000;
private float samplePercent = 0.1f;
@@ -154,7 +154,8 @@ public class PartitionKeySampler impleme
private int sampled;
- public FetchSampler(FetchWork work, JobConf job, Operator<?> operator) {
+ public RandomSampler(FetchWork work, JobConf job, Operator<?> operator)
+ throws HiveException {
super(work, job, operator, null);
}
@@ -174,7 +175,7 @@ public class PartitionKeySampler impleme
if (sampled < sampleNum) {
return true;
}
- operator.flush();
+ flushRow();
return false;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Jan 5 19:03:49 2015
@@ -19,8 +19,6 @@
package org.apache.hadoop.hive.ql.exec.mr;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
@@ -30,8 +28,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.ObjectCache;
@@ -54,7 +50,7 @@ import org.apache.hadoop.util.StringUtil
/**
* ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
* the bridge between the map-reduce framework and the Hive operator pipeline at
- * execution time. It's main responsabilities are:
+ * execution time. It's main responsibilities are:
*
* - Load and setup the operator pipeline from XML
* - Run the pipeline by transforming key value pairs to records and forwarding them to the operators
@@ -66,7 +62,6 @@ public class ExecMapper extends MapReduc
private static final String PLAN_KEY = "__MAP_PLAN__";
private MapOperator mo;
- private Map<String, FetchOperator> fetchOperators;
private OutputCollector oc;
private JobConf jc;
private boolean abort = false;
@@ -74,7 +69,6 @@ public class ExecMapper extends MapReduc
public static final Log l4j = LogFactory.getLog(ExecMapper.class);
private static boolean done;
- // used to log memory usage periodically
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
@@ -213,15 +207,6 @@ public class ExecMapper extends MapReduc
}
}
- if (fetchOperators != null) {
- MapredLocalWork localWork = mo.getConf().getMapRedLocalWork();
- for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- Operator<? extends OperatorDesc> forwardOp = localWork
- .getAliasToWork().get(entry.getKey());
- forwardOp.close(abort);
- }
- }
-
ReportStats rps = new ReportStats(rp, jc);
mo.preorderMap(rps);
return;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Jan 5 19:03:49 2015
@@ -6263,7 +6263,7 @@ public class SemanticAnalyzer extends Ba
} else {
try {
StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
- .getDeserializer().getObjectInspector();
+ .getDeserializer(conf).getObjectInspector();
List<? extends StructField> fields = rowObjectInspector
.getAllStructFieldRefs();
for (int i = 0; i < fields.size(); i++) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java Mon Jan 5 19:03:49 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.TreeMap;
@@ -28,7 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Li
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.parse.SplitSample;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
* FetchWork.
@@ -53,7 +54,7 @@ public class FetchWork implements Serial
private SplitSample splitSample;
private transient List<List<Object>> rowsComputedFromStats;
- private transient ObjectInspector statRowOI;
+ private transient StructObjectInspector statRowOI;
/**
* Serialization Null Format for the serde used to fetch data.
@@ -63,12 +64,12 @@ public class FetchWork implements Serial
public FetchWork() {
}
- public FetchWork(List<List<Object>> rowsComputedFromStats,ObjectInspector statRowOI) {
+ public FetchWork(List<List<Object>> rowsComputedFromStats, StructObjectInspector statRowOI) {
this.rowsComputedFromStats = rowsComputedFromStats;
this.statRowOI = statRowOI;
}
- public ObjectInspector getStatRowOI() {
+ public StructObjectInspector getStatRowOI() {
return statRowOI;
}
@@ -174,6 +175,11 @@ public class FetchWork implements Serial
return partDesc;
}
+ public List<Path> getPathLists() {
+ return isPartitioned() ? partDir == null ?
+ null : new ArrayList<Path>(partDir) : Arrays.asList(tblDir);
+ }
+
/**
* Get Partition descriptors in sorted (ascending) order of partition directory
*
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java Mon Jan 5 19:03:49 2015
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.Des
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.common.util.HiveStringUtils;
/**
@@ -84,8 +85,17 @@ public class TableDesc implements Serial
* Return a deserializer object corresponding to the tableDesc.
*/
public Deserializer getDeserializer(Configuration conf) throws Exception {
- Deserializer de = getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(de, conf, properties, null);
+ return getDeserializer(conf, false);
+ }
+
+ public Deserializer getDeserializer(Configuration conf, boolean ignoreError) throws Exception {
+ Deserializer de = ReflectionUtils.newInstance(
+ getDeserializerClass().asSubclass(Deserializer.class), conf);
+ if (ignoreError) {
+ SerDeUtils.initializeSerDeWithoutErrorCheck(de, conf, properties, null);
+ } else {
+ SerDeUtils.initializeSerDe(de, conf, properties, null);
+ }
return de;
}
Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java Mon Jan 5 19:03:49 2015
@@ -209,7 +209,7 @@ public final class ObjectInspectorConver
* can contain non-settable fields only if inputOI equals outputOI and equalsCheck is
* true.
*/
- private static ObjectInspector getConvertedOI(
+ public static ObjectInspector getConvertedOI(
ObjectInspector inputOI,
ObjectInspector outputOI,
Map<ObjectInspector, Boolean> oiSettableProperties,
Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java Mon Jan 5 19:03:49 2015
@@ -478,6 +478,9 @@ public class PrimitiveObjectInspectorCon
@Override
public Object convert(Object input) {
+ if (input == null) {
+ return null;
+ }
switch (inputOI.getPrimitiveCategory()) {
case BOOLEAN:
return outputOI.set(hc,
@@ -504,6 +507,9 @@ public class PrimitiveObjectInspectorCon
@Override
public Object convert(Object input) {
+ if (input == null) {
+ return null;
+ }
switch (inputOI.getPrimitiveCategory()) {
case BOOLEAN:
return outputOI.set(hc,