You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [10/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hado...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/IndexPredicateAnalyzer.java Thu Oct 30 16:22:33 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.index;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -44,6 +45,13 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
@@ -57,14 +65,14 @@ import org.apache.hadoop.hive.ql.udf.gen
public class IndexPredicateAnalyzer {
private final Set<String> udfNames;
- private final Set<String> allowedColumnNames;
+ private final Map<String, Set<String>> columnToUDFs;
private FieldValidator fieldValidator;
private boolean acceptsFields;
public IndexPredicateAnalyzer() {
udfNames = new HashSet<String>();
- allowedColumnNames = new HashSet<String>();
+ columnToUDFs = new HashMap<String, Set<String>>();
}
public void setFieldValidator(FieldValidator fieldValidator) {
@@ -89,7 +97,7 @@ public class IndexPredicateAnalyzer {
* column names are allowed.)
*/
public void clearAllowedColumnNames() {
- allowedColumnNames.clear();
+ columnToUDFs.clear();
}
/**
@@ -98,7 +106,22 @@ public class IndexPredicateAnalyzer {
* @param columnName name of column to be allowed
*/
public void allowColumnName(String columnName) {
- allowedColumnNames.add(columnName);
+ columnToUDFs.put(columnName, udfNames);
+ }
+
+ /**
+ * add allowed functions per column
+ * @param columnName
+ * @param udfs
+ */
+ public void addComparisonOp(String columnName, String... udfs) {
+ Set<String> allowed = columnToUDFs.get(columnName);
+ if (allowed == null || allowed == udfNames) {
+ // override
+ columnToUDFs.put(columnName, new HashSet<String>(Arrays.asList(udfs)));
+ } else {
+ allowed.addAll(Arrays.asList(udfs));
+ }
}
/**
@@ -152,6 +175,32 @@ public class IndexPredicateAnalyzer {
return residualPredicate;
}
+ //Check if ExprNodeColumnDesc is wrapped in expr.
+ //If so, peel off. Otherwise return itself.
+ private ExprNodeDesc getColumnExpr(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ return expr;
+ }
+ ExprNodeGenericFuncDesc funcDesc = null;
+ if (expr instanceof ExprNodeGenericFuncDesc) {
+ funcDesc = (ExprNodeGenericFuncDesc) expr;
+ }
+ if (null == funcDesc) {
+ return expr;
+ }
+ GenericUDF udf = funcDesc.getGenericUDF();
+ // check if its a simple cast expression.
+ if ((udf instanceof GenericUDFBridge || udf instanceof GenericUDFToBinary
+ || udf instanceof GenericUDFToChar || udf instanceof GenericUDFToVarchar
+ || udf instanceof GenericUDFToDecimal || udf instanceof GenericUDFToDate
+ || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp)
+ && funcDesc.getChildren().size() == 1
+ && funcDesc.getChildren().get(0) instanceof ExprNodeColumnDesc) {
+ return expr.getChildren().get(0);
+ }
+ return expr;
+ }
+
private ExprNodeDesc analyzeExpr(
ExprNodeGenericFuncDesc expr,
List<IndexSearchCondition> searchConditions,
@@ -182,11 +231,17 @@ public class IndexPredicateAnalyzer {
}
ExprNodeDesc expr1 = (ExprNodeDesc) nodeOutputs[0];
ExprNodeDesc expr2 = (ExprNodeDesc) nodeOutputs[1];
+ // We may need to peel off the GenericUDFBridge that is added by CBO or user
+ if (expr1.getTypeInfo().equals(expr2.getTypeInfo())) {
+ expr1 = getColumnExpr(expr1);
+ expr2 = getColumnExpr(expr2);
+ }
+
ExprNodeDesc[] extracted = ExprNodeDescUtils.extractComparePair(expr1, expr2);
if (extracted == null || (extracted.length > 2 && !acceptsFields)) {
return expr;
}
-
+
ExprNodeColumnDesc columnDesc;
ExprNodeConstantDesc constantDesc;
if (extracted[0] instanceof ExprNodeConstantDesc) {
@@ -198,12 +253,13 @@ public class IndexPredicateAnalyzer {
constantDesc = (ExprNodeConstantDesc) extracted[1];
}
- String udfName = genericUDF.getUdfName();
- if (!udfNames.contains(genericUDF.getUdfName())) {
+ Set<String> allowed = columnToUDFs.get(columnDesc.getColumn());
+ if (allowed == null) {
return expr;
}
- if (!allowedColumnNames.contains(columnDesc.getColumn())) {
+ String udfName = genericUDF.getUdfName();
+ if (!allowed.contains(genericUDF.getUdfName())) {
return expr;
}
@@ -216,6 +272,13 @@ public class IndexPredicateAnalyzer {
fields = ExprNodeDescUtils.extractFields(fieldDesc);
}
+ // We also need to update the expr so that the index query can be generated.
+ // Note that, hive does not support UDFToDouble etc in the query text.
+ List<ExprNodeDesc> list = new ArrayList<ExprNodeDesc>();
+ list.add(expr1);
+ list.add(expr2);
+ expr = new ExprNodeGenericFuncDesc(expr.getTypeInfo(), expr.getGenericUDF(), list);
+
searchConditions.add(
new IndexSearchCondition(
columnDesc,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java Thu Oct 30 16:22:33 2014
@@ -155,6 +155,8 @@ public interface AcidInputFormat<KEY ext
public static interface RawReader<V>
extends RecordReader<RecordIdentifier, V> {
public ObjectInspector getObjectInspector();
+
+ public boolean isDelete(V value);
}
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Thu Oct 30 16:22:33 2014
@@ -348,7 +348,7 @@ public class AcidUtils {
long bestBaseTxn = 0;
final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
List<ParsedDelta> working = new ArrayList<ParsedDelta>();
- final List<FileStatus> original = new ArrayList<FileStatus>();
+ List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
final List<FileStatus> obsolete = new ArrayList<FileStatus>();
List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory,
hiddenFileFilter);
@@ -375,16 +375,26 @@ public class AcidUtils {
working.add(delta);
}
} else {
- findOriginals(fs, child, original);
+ // This is just the directory. We need to recurse and find the actual files. But don't
+ // do this until we have determined there is no base. This saves time. Plus,
+ // it is possible that the cleaner is running and removing these original files,
+ // in which case recursing through them could cause us to get an error.
+ originalDirectories.add(child);
}
}
+ final List<FileStatus> original = new ArrayList<FileStatus>();
// if we have a base, the original files are obsolete.
if (bestBase != null) {
- obsolete.addAll(original);
// remove the entries so we don't get confused later and think we should
// use them.
original.clear();
+ } else {
+ // Okay, we're going to need these originals. Recurse through them and figure out what we
+ // really need.
+ for (FileStatus origDir : originalDirectories) {
+ findOriginals(fs, origDir, original);
+ }
}
Collections.sort(working);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Oct 30 16:22:33 2014
@@ -275,7 +275,7 @@ public class HiveInputFormat<K extends W
InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
TableDesc table, List<InputSplit> result) throws IOException {
- Utilities.copyTableJobPropertiesToConf(table, conf);
+ Utilities.copyTablePropertiesToConf(table, conf);
if (tableScan != null) {
pushFilters(conf, tableScan);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Thu Oct 30 16:22:33 2014
@@ -21,8 +21,6 @@ package org.apache.hadoop.hive.ql.io;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -47,8 +45,6 @@ public class IOContext {
};
private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
- private static IOContext ioContext = new IOContext();
-
public static Map<String, IOContext> getMap() {
return inputNameIOContextMap;
}
@@ -72,6 +68,7 @@ public class IOContext {
public static void clear() {
IOContext.threadLocal.remove();
+ inputNameIOContextMap.clear();
}
long currentBlockStart;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Oct 30 16:22:33 2014
@@ -388,6 +388,7 @@ public class OrcInputFormat implements
ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
numBuckets =
Math.max(conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0), 0);
+ LOG.debug("Number of buckets specified by conf file is " + numBuckets);
int cacheStripeDetailsSize = HiveConf.getIntVar(conf,
ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
int numThreads = HiveConf.getIntVar(conf,
@@ -1134,7 +1135,7 @@ public class OrcInputFormat implements
@Override
public ObjectInspector getObjectInspector() {
- return ((StructObjectInspector) reader.getObjectInspector())
+ return ((StructObjectInspector) records.getObjectInspector())
.getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
.getFieldObjectInspector();
}
@@ -1188,7 +1189,9 @@ public class OrcInputFormat implements
int bucket) throws IOException {
for(FileStatus stat: fs.listStatus(directory)) {
String name = stat.getPath().getName();
- if (Integer.parseInt(name.substring(0, name.indexOf('_'))) == bucket) {
+ String numberPart = name.substring(0, name.indexOf('_'));
+ if (org.apache.commons.lang3.StringUtils.isNumeric(numberPart) &&
+ Integer.parseInt(numberPart) == bucket) {
return stat.getPath();
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java Thu Oct 30 16:22:33 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.Val
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -37,9 +38,10 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -448,6 +450,10 @@ public class OrcRawRecordMerger implemen
// we always want to read all of the deltas
eventOptions.range(0, Long.MAX_VALUE);
+ // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as
+ // it can produce wrong results (if the latest valid version of the record is filtered out by
+ // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
+ eventOptions.searchArgument(null, null);
if (deltaDirectory != null) {
for(Path delta: deltaDirectory) {
ReaderKey key = new ReaderKey();
@@ -627,8 +633,16 @@ public class OrcRawRecordMerger implemen
// Parse the configuration parameters
ArrayList<String> columnNames = new ArrayList<String>();
+ Deque<Integer> virtualColumns = new ArrayDeque<Integer>();
if (columnNameProperty != null && columnNameProperty.length() > 0) {
- Collections.addAll(columnNames, columnNameProperty.split(","));
+ String[] colNames = columnNameProperty.split(",");
+ for (int i = 0; i < colNames.length; i++) {
+ if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) {
+ virtualColumns.addLast(i);
+ } else {
+ columnNames.add(colNames[i]);
+ }
+ }
}
if (columnTypeProperty == null) {
// Default type: all string
@@ -644,6 +658,9 @@ public class OrcRawRecordMerger implemen
ArrayList<TypeInfo> fieldTypes =
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ while (virtualColumns.size() > 0) {
+ fieldTypes.remove(virtualColumns.removeLast());
+ }
StructTypeInfo rowType = new StructTypeInfo();
rowType.setAllStructFieldNames(columnNames);
rowType.setAllStructFieldTypeInfos(fieldTypes);
@@ -651,6 +668,11 @@ public class OrcRawRecordMerger implemen
(OrcStruct.createObjectInspector(rowType));
}
+ @Override
+ public boolean isDelete(OrcStruct value) {
+ return OrcRecordUpdater.getOperation(value) == OrcRecordUpdater.DELETE_OPERATION;
+ }
+
/**
* Get the number of columns in the underlying rows.
* @return 0 if there are no base and no deltas.
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Thu Oct 30 16:22:33 2014
@@ -27,8 +27,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -40,6 +42,7 @@ import org.apache.hadoop.io.Writable;
* A serde class for ORC.
* It transparently passes the object to/from the ORC file reader/writer.
*/
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
public class OrcSerde implements SerDe, VectorizedSerde {
private static final Log LOG = LogFactory.getLog(OrcSerde.class);
@@ -75,9 +78,9 @@ public class OrcSerde implements SerDe,
@Override
public void initialize(Configuration conf, Properties table) {
// Read the configuration parameters
- String columnNameProperty = table.getProperty("columns");
+ String columnNameProperty = table.getProperty(serdeConstants.LIST_COLUMNS);
// NOTE: if "columns.types" is missing, all columns will be of String type
- String columnTypeProperty = table.getProperty("columns.types");
+ String columnTypeProperty = table.getProperty(serdeConstants.LIST_COLUMN_TYPES);
// Parse the configuration parameters
ArrayList<String> columnNames = new ArrayList<String>();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Thu Oct 30 16:22:33 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -1074,41 +1075,20 @@ class RecordReaderImpl implements Record
result = (LongColumnVector) previousVector;
}
- // Read present/isNull stream
- super.nextVector(result, batchSize);
-
- data.nextVector(result, batchSize);
- nanoVector.isNull = result.isNull;
- nanos.nextVector(nanoVector, batchSize);
-
- if(result.isRepeating && nanoVector.isRepeating) {
- batchSize = 1;
- }
-
- // Non repeating values preset in the vector. Iterate thru the vector and populate the time
+ result.reset();
+ Object obj = null;
for (int i = 0; i < batchSize; i++) {
- if (!result.isNull[i]) {
- long ms = (result.vector[result.isRepeating ? 0 : i] + WriterImpl.BASE_TIMESTAMP)
- * WriterImpl.MILLIS_PER_SECOND;
- long ns = parseNanos(nanoVector.vector[nanoVector.isRepeating ? 0 : i]);
- // the rounding error exists because java always rounds up when dividing integers
- // -42001/1000 = -42; and -42001 % 1000 = -1 (+ 1000)
- // to get the correct value we need
- // (-42 - 1)*1000 + 999 = -42001
- // (42)*1000 + 1 = 42001
- if(ms < 0 && ns != 0) {
- ms -= 1000;
- }
- // Convert millis into nanos and add the nano vector value to it
- result.vector[i] = (ms * 1000000) + ns;
+ obj = next(obj);
+ if (obj == null) {
+ result.noNulls = false;
+ result.isNull[i] = true;
+ } else {
+ TimestampWritable writable = (TimestampWritable) obj;
+ Timestamp timestamp = writable.getTimestamp();
+ result.vector[i] = TimestampUtils.getTimeNanoSec(timestamp);
}
}
- if(!(result.isRepeating && nanoVector.isRepeating)) {
- // both have to repeat for the result to be repeating
- result.isRepeating = false;
- }
-
return result;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java Thu Oct 30 16:22:33 2014
@@ -48,7 +48,6 @@ class VectorizedOrcAcidRowReader
private final OrcStruct value;
private final VectorizedRowBatchCtx rowBatchCtx;
private final ObjectInspector objectInspector;
- private boolean needToSetPartition = true;
private final DataOutputBuffer buffer = new DataOutputBuffer();
VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner,
@@ -83,23 +82,20 @@ class VectorizedOrcAcidRowReader
if (!innerReader.next(key, value)) {
return false;
}
- if (needToSetPartition) {
- try {
- rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch);
- } catch (HiveException e) {
- throw new IOException("Problem adding partition column", e);
- }
- needToSetPartition = false;
+ try {
+ rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch);
+ } catch (HiveException e) {
+ throw new IOException("Problem adding partition column", e);
}
try {
- VectorizedBatchUtil.addRowToBatch(value,
+ VectorizedBatchUtil.acidAddRowToBatch(value,
(StructObjectInspector) objectInspector,
- vectorizedRowBatch.size++, vectorizedRowBatch, buffer);
+ vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length &&
innerReader.next(key, value)) {
- VectorizedBatchUtil.addRowToBatch(value,
+ VectorizedBatchUtil.acidAddRowToBatch(value,
(StructObjectInspector) objectInspector,
- vectorizedRowBatch.size++, vectorizedRowBatch, buffer);
+ vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
}
} catch (HiveException he) {
throw new IOException("error iterating", he);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Thu Oct 30 16:22:33 2014
@@ -21,6 +21,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -40,6 +41,8 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.util.Progressable;
import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.util.ContextUtil;
/**
*
@@ -110,15 +113,19 @@ public class MapredParquetOutputFormat e
}
DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
- return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+
+ return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
+ progress,tableProperties);
}
protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
ParquetOutputFormat<ArrayWritable> realOutputFormat,
JobConf jobConf,
String finalOutPath,
- Progressable progress
+ Progressable progress,
+ Properties tableProperties
) throws IOException {
- return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+ return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(),
+ progress,tableProperties);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Thu Oct 30 16:22:33 2014
@@ -75,6 +75,7 @@ public class DataWritableReadSupport ext
final Map<String, String> keyValueMetaData, final MessageType fileSchema) {
final String columns = configuration.get(IOConstants.COLUMNS);
final Map<String, String> contextMetadata = new HashMap<String, String>();
+ final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
if (columns != null) {
final List<String> listColumns = getColumns(columns);
final Map<String, String> lowerCaseFileSchemaColumns = new HashMap<String,String>();
@@ -82,45 +83,50 @@ public class DataWritableReadSupport ext
lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(), c.getPath()[0]);
}
final List<Type> typeListTable = new ArrayList<Type>();
- for (String col : listColumns) {
- col = col.toLowerCase();
- // listColumns contains partition columns which are metadata only
- if (lowerCaseFileSchemaColumns.containsKey(col)) {
- typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
- } else {
- // below allows schema evolution
- typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
+ if(indexAccess) {
+ for (int index = 0; index < listColumns.size(); index++) {
+ //Take columns based on index or pad the field
+ if(index < fileSchema.getFieldCount()) {
+ typeListTable.add(fileSchema.getType(index));
+ } else {
+ //prefixing with '_mask_' to ensure no conflict with named
+ //columns in the file schema
+ typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_mask_"+listColumns.get(index)));
+ }
+ }
+ } else {
+ for (String col : listColumns) {
+ col = col.toLowerCase();
+ // listColumns contains partition columns which are metadata only
+ if (lowerCaseFileSchemaColumns.containsKey(col)) {
+ typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
+ } else {
+ // below allows schema evolution
+ typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
+ }
}
}
MessageType tableSchema = new MessageType(TABLE_SCHEMA, typeListTable);
contextMetadata.put(HIVE_SCHEMA_KEY, tableSchema.toString());
- MessageType requestedSchemaByUser = tableSchema;
final List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
final List<Type> typeListWanted = new ArrayList<Type>();
- final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+
for (final Integer idx : indexColumnsWanted) {
if (idx < listColumns.size()) {
String col = listColumns.get(idx);
if (indexAccess) {
- typeListWanted.add(tableSchema.getType(col));
+ typeListWanted.add(fileSchema.getFields().get(idx));
} else {
col = col.toLowerCase();
if (lowerCaseFileSchemaColumns.containsKey(col)) {
typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col)));
- } else {
- // should never occur?
- String msg = "Column " + col + " at index " + idx + " does not exist in " +
- lowerCaseFileSchemaColumns;
- throw new IllegalStateException(msg);
}
}
}
}
- requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
- typeListWanted), fileSchema, configuration);
-
+ MessageType requestedSchemaByUser = new MessageType(fileSchema.getName(), typeListWanted);
return new ReadContext(requestedSchemaByUser, contextMetadata);
} else {
contextMetadata.put(HIVE_SCHEMA_KEY, fileSchema.toString());
@@ -147,26 +153,7 @@ public class DataWritableReadSupport ext
throw new IllegalStateException("ReadContext not initialized properly. " +
"Don't know the Hive Schema.");
}
- final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
- parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
+ final MessageType tableSchema = MessageTypeParser.parseMessageType(metadata.get(HIVE_SCHEMA_KEY));
return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
}
-
- /**
- * Determine the file column names based on the position within the requested columns and
- * use that as the requested schema.
- */
- private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema,
- Configuration configuration) {
- if (configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
- final List<String> listColumns = getColumns(configuration.get(IOConstants.COLUMNS));
- List<Type> requestedTypes = new ArrayList<Type>();
- for(Type t : requestedSchema.getFields()) {
- int index = listColumns.indexOf(t.getName());
- requestedTypes.add(fileSchema.getType(index));
- }
- requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes);
- }
- return requestedSchema;
- }
-}
\ No newline at end of file
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Thu Oct 30 16:22:33 2014
@@ -23,9 +23,10 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -69,6 +70,7 @@ import parquet.io.api.Binary;
* A ParquetHiveSerDe for Hive (with the deprecated package mapred)
*
*/
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
public class ParquetHiveSerDe extends AbstractSerDe {
public static final Text MAP_KEY = new Text("key");
public static final Text MAP_VALUE = new Text("value");
@@ -105,8 +107,8 @@ public class ParquetHiveSerDe extends Ab
final List<String> columnNames;
final List<TypeInfo> columnTypes;
// Get column names and sort order
- final String columnNameProperty = tbl.getProperty(IOConstants.COLUMNS);
- final String columnTypeProperty = tbl.getProperty(IOConstants.COLUMNS_TYPES);
+ final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
+ final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
if (columnNameProperty.length() == 0) {
columnNames = new ArrayList<String>();
@@ -242,7 +244,7 @@ public class ParquetHiveSerDe extends Ab
case BOOLEAN:
return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj) ? Boolean.TRUE : Boolean.FALSE);
case BYTE:
- return new ByteWritable((byte) ((ByteObjectInspector) inspector).get(obj));
+ return new ByteWritable(((ByteObjectInspector) inspector).get(obj));
case DOUBLE:
return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
case FLOAT:
@@ -252,7 +254,7 @@ public class ParquetHiveSerDe extends Ab
case LONG:
return new LongWritable(((LongObjectInspector) inspector).get(obj));
case SHORT:
- return new ShortWritable((short) ((ShortObjectInspector) inspector).get(obj));
+ return new ShortWritable(((ShortObjectInspector) inspector).get(obj));
case STRING:
String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(obj);
try {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTime.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTime.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTime.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTime.java Thu Oct 30 16:22:33 2014
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.ql.io.parquet.timestamp;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import parquet.Preconditions;
import parquet.io.api.Binary;
@@ -28,7 +29,10 @@ public class NanoTime {
public static NanoTime fromBinary(Binary bytes) {
Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes");
ByteBuffer buf = bytes.toByteBuffer();
- return new NanoTime(buf.getInt(), buf.getLong());
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+ long timeOfDayNanos = buf.getLong();
+ int julianDay = buf.getInt();
+ return new NanoTime(julianDay, timeOfDayNanos);
}
public NanoTime(int julianDay, long timeOfDayNanos) {
@@ -46,8 +50,9 @@ public class NanoTime {
public Binary toBinary() {
ByteBuffer buf = ByteBuffer.allocate(12);
- buf.putInt(julianDay);
+ buf.order(ByteOrder.LITTLE_ENDIAN);
buf.putLong(timeOfDayNanos);
+ buf.putInt(julianDay);
buf.flip();
return Binary.fromByteBuffer(buf);
}
@@ -60,4 +65,4 @@ public class NanoTime {
public String toString() {
return "NanoTime{julianDay="+julianDay+", timeOfDayNanos="+timeOfDayNanos+"}";
}
-}
\ No newline at end of file
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Thu Oct 30 16:22:33 2014
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.ql.io.parquet.write;
import java.io.IOException;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.util.Progressable;
import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.util.ContextUtil;
public class ParquetRecordWriterWrapper implements RecordWriter<Void, ArrayWritable>,
@@ -43,7 +45,8 @@ public class ParquetRecordWriterWrapper
final OutputFormat<Void, ArrayWritable> realOutputFormat,
final JobConf jobConf,
final String name,
- final Progressable progress) throws IOException {
+ final Progressable progress, Properties tableProperties) throws
+ IOException {
try {
// create a TaskInputOutputContext
TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
@@ -53,7 +56,21 @@ public class ParquetRecordWriterWrapper
taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
LOG.info("creating real writer to write at " + name);
- realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext, new Path(name));
+
+ String compressionName = tableProperties.getProperty(ParquetOutputFormat.COMPRESSION);
+ if (compressionName != null && !compressionName.isEmpty()) {
+ //get override compression properties via "tblproperties" clause if it is set
+ LOG.debug("get override compression properties via tblproperties");
+
+ ContextUtil.getConfiguration(taskContext);
+ CompressionCodecName codecName = CompressionCodecName.fromConf(compressionName);
+ realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(jobConf,
+ new Path(name), codecName);
+ } else {
+ realWriter = ((ParquetOutputFormat) realOutputFormat).getRecordWriter(taskContext,
+ new Path(name));
+ }
+
LOG.info("real writer: " + realWriter);
} catch (final InterruptedException e) {
throw new IOException(e);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Thu Oct 30 16:22:33 2014
@@ -18,17 +18,21 @@
package org.apache.hadoop.hive.ql.io.sarg;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -52,15 +56,9 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
/**
* The implementation of SearchArguments.
@@ -366,7 +364,7 @@ final class SearchArgumentImpl implement
case STRING:
return StringUtils.stripEnd(lit.getValue().toString(), null);
case FLOAT:
- return ((Number) lit.getValue()).doubleValue();
+ return Double.parseDouble(lit.getValue().toString());
case DATE:
case TIMESTAMP:
case DECIMAL:
@@ -977,7 +975,9 @@ final class SearchArgumentImpl implement
literal instanceof Integer) {
return Long.valueOf(literal.toString());
} else if (literal instanceof Float) {
- return Double.valueOf((Float) literal);
+ // to avoid change in precision when upcasting float to double
+ // we convert the literal to string and parse it as double. (HIVE-8460)
+ return Double.parseDouble(literal.toString());
} else {
throw new IllegalArgumentException("Unknown type for literal " +
literal);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Thu Oct 30 16:22:33 2014
@@ -313,7 +313,7 @@ public class Hive {
* @param name
* @param deleteData
* @param ignoreUnknownDb if true, will ignore NoSuchObjectException
- * @param cascade if true, delete all tables on the DB if exists. Othewise, the query
+ * @param cascade if true, delete all tables on the DB if exists. Otherwise, the query
* will fail if table still exists.
* @throws HiveException
* @throws NoSuchObjectException
@@ -331,7 +331,7 @@ public class Hive {
/**
- * Creates a table metdata and the directory for the table data
+ * Creates a table metadata and the directory for the table data
*
* @param tableName
* name of the table
@@ -355,7 +355,7 @@ public class Hive {
}
/**
- * Creates a table metdata and the directory for the table data
+ * Creates a table metadata and the directory for the table data
*
* @param tableName
* name of the table
@@ -885,16 +885,21 @@ public class Hive {
}
}
- public boolean dropIndex(String baseTableName, String index_name, boolean deleteData) throws HiveException {
+ public boolean dropIndex(String baseTableName, String index_name,
+ boolean throwException, boolean deleteData) throws HiveException {
String[] names = Utilities.getDbTableName(baseTableName);
- return dropIndex(names[0], names[1], index_name, deleteData);
+ return dropIndex(names[0], names[1], index_name, throwException, deleteData);
}
- public boolean dropIndex(String db_name, String tbl_name, String index_name, boolean deleteData) throws HiveException {
+ public boolean dropIndex(String db_name, String tbl_name, String index_name,
+ boolean throwException, boolean deleteData) throws HiveException {
try {
return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData);
} catch (NoSuchObjectException e) {
- throw new HiveException("Partition or table doesn't exist. " + e.getMessage(), e);
+ if (throwException) {
+ throw new HiveException("Index " + index_name + " doesn't exist. ", e);
+ }
+ return false;
} catch (Exception e) {
throw new HiveException(e.getMessage(), e);
}
@@ -2861,10 +2866,6 @@ private void constructOneLBLocationMap(F
}
}
- public static String[] getQualifiedNames(String qualifiedName) {
- return qualifiedName.split("\\.");
- }
-
public void createFunction(Function func) throws HiveException {
try {
getMSC().createFunction(func);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Thu Oct 30 16:22:33 2014
@@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -192,7 +191,7 @@ public class Table implements Serializab
"at least one column must be specified for the table");
}
if (!isView()) {
- if (null == getDeserializerFromMetaStore()) {
+ if (null == getDeserializerFromMetaStore(false)) {
throw new HiveException("must specify a non-null serDe");
}
if (null == getInputFormatClass()) {
@@ -253,14 +252,25 @@ public class Table implements Serializab
final public Deserializer getDeserializer() {
if (deserializer == null) {
- deserializer = getDeserializerFromMetaStore();
+ deserializer = getDeserializerFromMetaStore(false);
}
return deserializer;
}
- private Deserializer getDeserializerFromMetaStore() {
+ final public Class<? extends Deserializer> getDeserializerClass() throws Exception {
+ return MetaStoreUtils.getDeserializerClass(Hive.get().getConf(), tTable);
+ }
+
+ final public Deserializer getDeserializer(boolean skipConfError) {
+ if (deserializer == null) {
+ deserializer = getDeserializerFromMetaStore(skipConfError);
+ }
+ return deserializer;
+ }
+
+ final public Deserializer getDeserializerFromMetaStore(boolean skipConfError) {
try {
- return MetaStoreUtils.getDeserializer(Hive.get().getConf(), tTable);
+ return MetaStoreUtils.getDeserializer(Hive.get().getConf(), tTable, skipConfError);
} catch (MetaException e) {
throw new RuntimeException(e);
} catch (HiveException e) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Thu Oct 30 16:22:33 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Fi
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
@@ -76,6 +77,7 @@ public class ColumnPruner implements Tra
* @param pactx
* the current parse context
*/
+ @Override
public ParseContext transform(ParseContext pactx) throws SemanticException {
pGraphContext = pactx;
opToParseCtxMap = pGraphContext.getOpParseCtx();
@@ -120,6 +122,9 @@ public class ColumnPruner implements Tra
opRules.put(new RuleRegExp("R11",
ScriptOperator.getOperatorName() + "%"),
ColumnPrunerProcFactory.getScriptProc());
+ opRules.put(new RuleRegExp("R12",
+ LimitOperator.getOperatorName() + "%"),
+ ColumnPrunerProcFactory.getLimitProc());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Oct 30 16:22:33 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.Gr
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -215,6 +216,24 @@ public final class ColumnPrunerProcFacto
}
}
+ public static class ColumnPrunerLimitProc extends ColumnPrunerDefaultProc {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ super.process(nd, stack, ctx, nodeOutputs);
+ List<String> cols = ((ColumnPrunerProcCtx)ctx).getPrunedColLists().get(nd);
+ if (null != cols) {
+ pruneOperator(ctx, (LimitOperator) nd, cols);
+ }
+ return null;
+ }
+ }
+
+ public static ColumnPrunerLimitProc getLimitProc() {
+ return new ColumnPrunerLimitProc();
+ }
+
public static ColumnPrunerScriptProc getScriptProc() {
return new ColumnPrunerScriptProc();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Thu Oct 30 16:22:33 2014
@@ -29,9 +29,11 @@ import java.util.Stack;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -57,15 +59,16 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -163,6 +166,10 @@ public final class ConstantPropagateProc
}
LOG.debug("Casting " + desc + " to type " + ti);
ExprNodeConstantDesc c = (ExprNodeConstantDesc) desc;
+ if (null != c.getFoldedFromVal() && priti.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ // avoid double casting to preserve original string representation of constant.
+ return new ExprNodeConstantDesc(c.getFoldedFromVal());
+ }
ObjectInspector origOI =
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(desc.getTypeInfo());
ObjectInspector oi =
@@ -187,6 +194,14 @@ public final class ConstantPropagateProc
return new ExprNodeConstantDesc(ti, convObj);
}
+ public static ExprNodeDesc foldExpr(ExprNodeGenericFuncDesc funcDesc) {
+
+ GenericUDF udf = funcDesc.getGenericUDF();
+ if (!isDeterministicUdf(udf)) {
+ return funcDesc;
+ }
+ return evaluateFunction(funcDesc.getGenericUDF(),funcDesc.getChildren(), funcDesc.getChildren());
+ }
/**
* Fold input expression desc.
*
@@ -278,7 +293,7 @@ public final class ConstantPropagateProc
(UDF) Class.forName(bridge.getUdfClassName(), true, Utilities.getSessionSpecifiedClassLoader())
.newInstance();
files = udfInternal.getRequiredFiles();
- jars = udf.getRequiredJars();
+ jars = udfInternal.getRequiredJars();
} catch (Exception e) {
LOG.error("The UDF implementation class '" + udfClassName
+ "' is not present in the class path");
@@ -308,15 +323,22 @@ public final class ConstantPropagateProc
if (udf instanceof GenericUDFOPEqual) {
ExprNodeDesc lOperand = newExprs.get(0);
ExprNodeDesc rOperand = newExprs.get(1);
- ExprNodeColumnDesc c;
ExprNodeConstantDesc v;
- if (lOperand instanceof ExprNodeColumnDesc && rOperand instanceof ExprNodeConstantDesc) {
- c = (ExprNodeColumnDesc) lOperand;
- v = (ExprNodeConstantDesc) rOperand;
- } else if (rOperand instanceof ExprNodeColumnDesc && lOperand instanceof ExprNodeConstantDesc) {
- c = (ExprNodeColumnDesc) rOperand;
+ if (lOperand instanceof ExprNodeConstantDesc) {
v = (ExprNodeConstantDesc) lOperand;
+ } else if (rOperand instanceof ExprNodeConstantDesc) {
+ v = (ExprNodeConstantDesc) rOperand;
} else {
+ // we need a constant on one side.
+ return;
+ }
+ // If both sides are constants, there is nothing to propagate
+ ExprNodeColumnDesc c = getColumnExpr(lOperand);
+ if (null == c) {
+ c = getColumnExpr(rOperand);
+ }
+ if (null == c) {
+ // we need a column expression on other side.
return;
}
ColumnInfo ci = resolveColumn(rr, c);
@@ -342,21 +364,42 @@ public final class ConstantPropagateProc
}
}
+ private static ExprNodeColumnDesc getColumnExpr(ExprNodeDesc expr) {
+ while (FunctionRegistry.isOpCast(expr)) {
+ expr = expr.getChildren().get(0);
+ }
+ return (expr instanceof ExprNodeColumnDesc) ? (ExprNodeColumnDesc)expr : null;
+ }
+
private static ExprNodeDesc shortcutFunction(GenericUDF udf, List<ExprNodeDesc> newExprs) {
if (udf instanceof GenericUDFOPAnd) {
for (int i = 0; i < 2; i++) {
ExprNodeDesc childExpr = newExprs.get(i);
+ ExprNodeDesc other = newExprs.get(Math.abs(i - 1));
if (childExpr instanceof ExprNodeConstantDesc) {
ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr;
if (Boolean.TRUE.equals(c.getValue())) {
// if true, prune it
- return newExprs.get(Math.abs(i - 1));
+ return other;
} else {
// if false return false
return childExpr;
}
+ } else // Try to fold (key = 86) and (key is not null) to (key = 86)
+ if (childExpr instanceof ExprNodeGenericFuncDesc &&
+ ((ExprNodeGenericFuncDesc)childExpr).getGenericUDF() instanceof GenericUDFOPNotNull &&
+ childExpr.getChildren().get(0) instanceof ExprNodeColumnDesc && other instanceof ExprNodeGenericFuncDesc
+ && ((ExprNodeGenericFuncDesc)other).getGenericUDF() instanceof GenericUDFBaseCompare
+ && other.getChildren().size() == 2) {
+ ExprNodeColumnDesc colDesc = getColumnExpr(other.getChildren().get(0));
+ if (null == colDesc) {
+ colDesc = getColumnExpr(other.getChildren().get(1));
+ }
+ if (null != colDesc && colDesc.isSame(childExpr.getChildren().get(0))) {
+ return other;
+ }
}
}
}
@@ -470,6 +513,16 @@ public final class ConstantPropagateProc
// FIXME: add null support.
return null;
+ } else if (desc instanceof ExprNodeGenericFuncDesc) {
+ ExprNodeDesc evaluatedFn = foldExpr((ExprNodeGenericFuncDesc)desc);
+ if (null == evaluatedFn || !(evaluatedFn instanceof ExprNodeConstantDesc)) {
+ return null;
+ }
+ ExprNodeConstantDesc constant = (ExprNodeConstantDesc) evaluatedFn;
+ Object writableValue = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+ (PrimitiveTypeInfo) constant.getTypeInfo()).getPrimitiveWritableObject(constant.getValue());
+ arguments[i] = new DeferredJavaObject(writableValue);
+ argois[i] = ObjectInspectorUtils.getConstantObjectInspector(constant.getWritableObjectInspector(), writableValue);
} else {
return null;
}
@@ -502,7 +555,12 @@ public final class ConstantPropagateProc
LOG.error("Unable to evaluate " + udf + ". Return value unrecoginizable.");
return null;
}
- return new ExprNodeConstantDesc(o);
+ String constStr = null;
+ if(arguments.length == 1 && FunctionRegistry.isOpCast(udf)) {
+ // remember original string representation of constant.
+ constStr = arguments[0].get().toString();
+ }
+ return new ExprNodeConstantDesc(o).setFoldedFromVal(constStr);
} catch (HiveException e) {
LOG.error("Evaluation function " + udf.getClass()
+ " failed in Constant Propagatation Optimizer.");
@@ -675,6 +733,21 @@ public final class ConstantPropagateProc
if (colList != null) {
for (int i = 0; i < colList.size(); i++) {
ExprNodeDesc newCol = foldExpr(colList.get(i), constants, cppCtx, op, 0, false);
+ if (!(colList.get(i) instanceof ExprNodeConstantDesc) && newCol instanceof ExprNodeConstantDesc) {
+ // Lets try to store original column name, if this column got folded
+ // This is useful for optimizations like GroupByOptimizer
+ String colName = colList.get(i).getExprString();
+ if (HiveConf.getPositionFromInternalName(colName) == -1) {
+ // if its not an internal name, this is what we want.
+ ((ExprNodeConstantDesc)newCol).setFoldedFromCol(colName);
+ } else {
+ // If it was internal column, lets try to get name from columnExprMap
+ ExprNodeDesc desc = columnExprMap.get(colName);
+ if (desc instanceof ExprNodeConstantDesc) {
+ ((ExprNodeConstantDesc)newCol).setFoldedFromCol(((ExprNodeConstantDesc)desc).getFoldedFromCol());
+ }
+ }
+ }
colList.set(i, newCol);
if (columnExprMap != null) {
columnExprMap.put(columnNames.get(i), newCol);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Thu Oct 30 16:22:33 2014
@@ -85,13 +85,18 @@ public class ConvertJoinMapJoin implemen
JoinOperator joinOp = (JoinOperator) nd;
- if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
- && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
+ TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+ if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
- return null;
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+ if (retval == null) {
+ return retval;
+ } else {
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ return null;
+ }
}
// if we have traits, and table info is present in the traits, we know the
@@ -99,7 +104,6 @@ public class ConvertJoinMapJoin implemen
// reducers from the parent operators.
int numBuckets = -1;
int estimatedBuckets = -1;
- TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
if (parentOp.getOpTraits().getNumBuckets() > 0) {
@@ -126,53 +130,15 @@ public class ConvertJoinMapJoin implemen
LOG.info("Estimated number of buckets " + numBuckets);
int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
if (mapJoinConversionPos < 0) {
- // we cannot convert to bucket map join, we cannot convert to
- // map join either based on the size. Check if we can convert to SMB join.
- if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
- convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
- return null;
- }
- Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
- try {
- bigTableMatcherClass =
- (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
- context.parseContext.getConf(),
- HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
- } catch (ClassNotFoundException e) {
- throw new SemanticException(e.getMessage());
- }
-
- BigTableSelectorForAutoSMJ bigTableMatcher =
- ReflectionUtils.newInstance(bigTableMatcherClass, null);
- JoinDesc joinDesc = joinOp.getConf();
- JoinCondDesc[] joinCondns = joinDesc.getConds();
- Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
- if (joinCandidates.isEmpty()) {
- // This is a full outer join. This can never be a map-join
- // of any type. So return false.
- return false;
- }
- mapJoinConversionPos =
- bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
- if (mapJoinConversionPos < 0) {
- // contains aliases from sub-query
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
- return null;
- }
-
- if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
- convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
- tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+ Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+ if (retval == null) {
+ return retval;
} else {
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ // only case is full outer join with SMB enabled which is not possible. Convert to regular
+ // join.
+ convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+ return null;
}
- return null;
}
if (numBuckets > 1) {
@@ -206,6 +172,57 @@ public class ConvertJoinMapJoin implemen
return null;
}
+ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
+ TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+ // we cannot convert to bucket map join, we cannot convert to
+ // map join either based on the size. Check if we can convert to SMB join.
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+ convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+ return null;
+ }
+ Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+ try {
+ bigTableMatcherClass =
+ (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
+ context.parseContext.getConf(),
+ HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+ } catch (ClassNotFoundException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ BigTableSelectorForAutoSMJ bigTableMatcher =
+ ReflectionUtils.newInstance(bigTableMatcherClass, null);
+ JoinDesc joinDesc = joinOp.getConf();
+ JoinCondDesc[] joinCondns = joinDesc.getConds();
+ Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
+ if (joinCandidates.isEmpty()) {
+ // This is a full outer join. This can never be a map-join
+ // of any type. So return false.
+ return false;
+ }
+ int mapJoinConversionPos =
+ bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
+ if (mapJoinConversionPos < 0) {
+ // contains aliases from sub-query
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ return null;
+ }
+
+ if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+ convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
+ tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+ } else {
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ }
+ return null;
+}
+
// replaces the join operator with a new CommonJoinOperator, removes the
// parent reduce sinks
private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
@@ -228,7 +245,7 @@ public class ConvertJoinMapJoin implemen
@SuppressWarnings("unchecked")
CommonMergeJoinOperator mergeJoinOp =
(CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
- isSubQuery, mapJoinConversionPos, mapJoinDesc));
+ isSubQuery, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema());
OpTraits opTraits =
new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
.getSortCols());
@@ -630,7 +647,6 @@ public class ConvertJoinMapJoin implemen
hasDynamicPartitionPruning = true;
break;
}
-
if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
// crossing reduce sink or file sink means the pruning isn't for this parent.
break;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Thu Oct 30 16:22:33 2014
@@ -332,18 +332,26 @@ public class GroupByOptimizer implements
continue;
}
- ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
- if (selectColList instanceof ExprNodeColumnDesc) {
+ ExprNodeDesc selectCol = selectDesc.getColList().get(pos);
+ if (selectCol instanceof ExprNodeColumnDesc) {
String newValue =
- tableColsMapping.get(((ExprNodeColumnDesc) selectColList).getColumn());
+ tableColsMapping.get(((ExprNodeColumnDesc) selectCol).getColumn());
tableColsMapping.put(outputColumnName, newValue);
}
else {
tableColsMapping.remove(outputColumnName);
- if ((selectColList instanceof ExprNodeConstantDesc) ||
- (selectColList instanceof ExprNodeNullDesc)) {
+ if (selectCol instanceof ExprNodeNullDesc) {
newConstantCols.add(outputColumnName);
}
+ if (selectCol instanceof ExprNodeConstantDesc) {
+ // Lets see if this constant was folded because of optimization.
+ String origCol = ((ExprNodeConstantDesc) selectCol).getFoldedFromCol();
+ if (origCol != null) {
+ tableColsMapping.put(outputColumnName, origCol);
+ } else {
+ newConstantCols.add(outputColumnName);
+ }
+ }
}
}
@@ -351,7 +359,6 @@ public class GroupByOptimizer implements
}
}
- boolean sortGroupBy = true;
// compute groupby columns from groupby keys
List<String> groupByCols = new ArrayList<String>();
// If the group by expression is anything other than a list of columns,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Thu Oct 30 16:22:33 2014
@@ -1,7 +1,5 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
@@ -22,21 +20,6 @@ import org.apache.hadoop.hive.ql.plan.Te
import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
public class MergeJoinProc implements NodeProcessor {
-
- public Operator<? extends OperatorDesc> getLeafOperator(Operator<? extends OperatorDesc> op) {
- for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
- // FileSink or ReduceSink operators are used to create vertices. See
- // TezCompiler.
- if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) {
- return childOp;
- } else {
- return getLeafOperator(childOp);
- }
- }
-
- return null;
- }
-
@Override
public Object
process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
@@ -60,13 +43,13 @@ public class MergeJoinProc implements No
// merge work already exists for this merge join operator, add the dummy store work to the
// merge work. Else create a merge work, add above work to the merge work
MergeJoinWork mergeWork = null;
- if (context.opMergeJoinWorkMap.containsKey(getLeafOperator(mergeJoinOp))) {
+ if (context.opMergeJoinWorkMap.containsKey(mergeJoinOp)) {
// we already have the merge work corresponding to this merge join operator
- mergeWork = context.opMergeJoinWorkMap.get(getLeafOperator(mergeJoinOp));
+ mergeWork = context.opMergeJoinWorkMap.get(mergeJoinOp);
} else {
mergeWork = new MergeJoinWork();
tezWork.add(mergeWork);
- context.opMergeJoinWorkMap.put(getLeafOperator(mergeJoinOp), mergeWork);
+ context.opMergeJoinWorkMap.put(mergeJoinOp, mergeWork);
}
mergeWork.setMergeJoinOperator(mergeJoinOp);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Thu Oct 30 16:22:33 2014
@@ -21,6 +21,8 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.optimizer.correlation.CorrelationOptimizer;
import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hive.ql.ppd.Syn
public class Optimizer {
private ParseContext pctx;
private List<Transform> transformations;
+ private static final Log LOG = LogFactory.getLog(Optimizer.class.getName());
/**
* Create the list of transformations.
@@ -61,9 +64,19 @@ public class Optimizer {
// Add the transformation that computes the lineage information.
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
- transformations.add(new PredicateTransitivePropagate());
+ transformations.add(new PredicateTransitivePropagate());
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+ transformations.add(new ConstantPropagate());
+ }
transformations.add(new SyntheticJoinPredicate());
transformations.add(new PredicatePushDown());
+ }
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+ // We run constant propagation twice because after predicate pushdown, filter expressions
+ // are combined and may become eligible for reduction (like is not null filter).
+ transformations.add(new ConstantPropagate());
+ }
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
transformations.add(new PartitionPruner());
transformations.add(new PartitionConditionRemover());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) {
@@ -71,16 +84,18 @@ public class Optimizer {
transformations.add(new ListBucketingPruner());
}
}
+
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY) ||
HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT)) {
transformations.add(new GroupByOptimizer());
}
- if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
- transformations.add(new ConstantPropagate());
- }
transformations.add(new ColumnPruner());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
- transformations.add(new SkewJoinOptimizer());
+ if (!isTezExecEngine) {
+ transformations.add(new SkewJoinOptimizer());
+ } else {
+ LOG.warn("Skew join is currently not supported in tez! Disabling the skew join optimization.");
+ }
}
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGBYUSINGINDEX)) {
transformations.add(new RewriteGBUsingIndex());
@@ -140,7 +155,9 @@ public class Optimizer {
transformations.add(new AnnotateWithOpTraits());
}
- transformations.add(new SimpleFetchOptimizer()); // must be called last
+ if (!HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVEFETCHTASKCONVERSION).equals("none")) {
+ transformations.add(new SimpleFetchOptimizer()); // must be called last
+ }
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEFETCHTASKAGGR)) {
transformations.add(new SimpleFetchAggregation());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Thu Oct 30 16:22:33 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -55,6 +56,8 @@ import org.apache.hadoop.hive.ql.plan.Te
import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.FIXED;
+
public class ReduceSinkMapJoinProc implements NodeProcessor {
protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
@@ -167,7 +170,7 @@ public class ReduceSinkMapJoinProc imple
if (joinConf.isBucketMapJoin()) {
// disable auto parallelism for bucket map joins
- parentRS.getConf().setAutoParallel(false);
+ parentRS.getConf().setReducerTraits(EnumSet.of(FIXED));
numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0];
if (joinConf.getCustomBucketMapJoin()) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java Thu Oct 30 16:22:33 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.Stack;
import org.apache.commons.logging.Log;
@@ -31,9 +33,13 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
/**
* SetReducerParallelism determines how many reducers should
* be run for a given reduce sink.
@@ -86,7 +92,14 @@ public class SetReducerParallelism imple
maxReducers, false);
LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers);
desc.setNumReducers(numReducers);
- desc.setAutoParallel(true);
+
+ final Collection<ExprNodeDescEqualityWrapper> keyCols = ExprNodeDescEqualityWrapper.transform(desc.getKeyCols());
+ final Collection<ExprNodeDescEqualityWrapper> partCols = ExprNodeDescEqualityWrapper.transform(desc.getPartitionCols());
+ if (keyCols != null && keyCols.equals(partCols)) {
+ desc.setReducerTraits(EnumSet.of(UNIFORM, AUTOPARALLEL));
+ } else {
+ desc.setReducerTraits(EnumSet.of(AUTOPARALLEL));
+ }
}
} else {
LOG.info("Number of reducers determined to be: "+desc.getNumReducers());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java Thu Oct 30 16:22:33 2014
@@ -112,7 +112,7 @@ public final class RewriteParseContextGe
((SemanticAnalyzer) sem).initParseCtx(subPCtx);
LOG.info("Starting Sub-query Semantic Analysis");
- sem.doPhase1(child, qb, sem.initPhase1Ctx());
+ sem.doPhase1(child, qb, sem.initPhase1Ctx(), null);
LOG.info("Completed phase 1 of Sub-query Semantic Analysis");
sem.getMetaData(qb);