You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/10 03:33:55 UTC
svn commit: r1617040 [11/13] - in /hive/branches/spark: ./
ant/src/org/apache/hadoop/hive/ant/ beeline/
beeline/src/java/org/apache/hive/beeline/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ data/conf/ dat...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Sun Aug 10 01:33:50 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.serde2.Col
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.util.StringUtils;
+import parquet.column.ColumnDescriptor;
import parquet.hadoop.api.ReadSupport;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
@@ -46,8 +47,8 @@ public class DataWritableReadSupport ext
private static final String TABLE_SCHEMA = "table_schema";
public static final String HIVE_SCHEMA_KEY = "HIVE_TABLE_SCHEMA";
- public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access";
-
+ public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access";
+
/**
* From a string which columns names (including hive column), return a list
* of string columns
@@ -75,12 +76,16 @@ public class DataWritableReadSupport ext
final Map<String, String> contextMetadata = new HashMap<String, String>();
if (columns != null) {
final List<String> listColumns = getColumns(columns);
-
+ final Map<String, String> lowerCaseFileSchemaColumns = new HashMap<String,String>();
+ for (ColumnDescriptor c : fileSchema.getColumns()) {
+ lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(), c.getPath()[0]);
+ }
final List<Type> typeListTable = new ArrayList<Type>();
- for (final String col : listColumns) {
+ for (String col : listColumns) {
+ col = col.toLowerCase();
// listColumns contains partition columns which are metadata only
- if (fileSchema.containsField(col)) {
- typeListTable.add(fileSchema.getType(col));
+ if (lowerCaseFileSchemaColumns.containsKey(col)) {
+ typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
} else {
// below allows schema evolution
typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
@@ -93,10 +98,24 @@ public class DataWritableReadSupport ext
final List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
final List<Type> typeListWanted = new ArrayList<Type>();
+ final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
for (final Integer idx : indexColumnsWanted) {
- typeListWanted.add(tableSchema.getType(listColumns.get(idx)));
+ String col = listColumns.get(idx);
+ if (indexAccess) {
+ typeListWanted.add(tableSchema.getType(col));
+ } else {
+ col = col.toLowerCase();
+ if (lowerCaseFileSchemaColumns.containsKey(col)) {
+ typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col)));
+ } else {
+ // should never occur?
+ String msg = "Column " + col + " at index " + idx + " does not exist in " +
+ lowerCaseFileSchemaColumns;
+ throw new IllegalStateException(msg);
+ }
+ }
}
- requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
+ requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
typeListWanted), fileSchema, configuration);
return new ReadContext(requestedSchemaByUser, contextMetadata);
@@ -127,29 +146,24 @@ public class DataWritableReadSupport ext
}
final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
-
return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
}
-
+
/**
- * Determine the file column names based on the position within the requested columns and
+ * Determine the file column names based on the position within the requested columns and
* use that as the requested schema.
*/
- private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema,
+ private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema,
Configuration configuration) {
- if(configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
+ if (configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
final List<String> listColumns = getColumns(configuration.get(IOConstants.COLUMNS));
-
List<Type> requestedTypes = new ArrayList<Type>();
-
for(Type t : requestedSchema.getFields()) {
int index = listColumns.indexOf(t.getName());
requestedTypes.add(fileSchema.getType(index));
}
-
requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes);
}
-
return requestedSchema;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sun Aug 10 01:33:50 2014
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -1241,7 +1242,7 @@ public class Hive {
*/
FileSystem oldPartPathFS = oldPartPath.getFileSystem(getConf());
FileSystem loadPathFS = loadPath.getFileSystem(getConf());
- if (oldPartPathFS.equals(loadPathFS)) {
+ if (FileUtils.equalsFileSystem(oldPartPathFS,loadPathFS)) {
newPartPath = oldPartPath;
}
}
@@ -2576,6 +2577,16 @@ private void constructOneLBLocationMap(F
}
}
+ public AggrStats getAggrColStatsFor(String dbName, String tblName,
+ List<String> colNames, List<String> partName) {
+ try {
+ return getMSC().getAggrColStatsFor(dbName, tblName, colNames, partName);
+ } catch (Exception e) {
+ LOG.debug(StringUtils.stringifyException(e));
+ return null;
+ }
+ }
+
public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName)
throws HiveException {
try {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Sun Aug 10 01:33:50 2014
@@ -348,9 +348,9 @@ public final class ConstantPropagateProc
ExprNodeDesc childExpr = newExprs.get(i);
if (childExpr instanceof ExprNodeConstantDesc) {
ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr;
- if (c.getValue() == Boolean.TRUE) {
+ if (Boolean.TRUE.equals(c.getValue())) {
- // if true, prune it
+ // if true, prune it
return newExprs.get(Math.abs(i - 1));
} else {
@@ -366,7 +366,7 @@ public final class ConstantPropagateProc
ExprNodeDesc childExpr = newExprs.get(i);
if (childExpr instanceof ExprNodeConstantDesc) {
ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr;
- if (c.getValue() == Boolean.FALSE) {
+ if (Boolean.FALSE.equals(c.getValue())) {
// if false, prune it
return newExprs.get(Math.abs(i - 1));
@@ -565,10 +565,10 @@ public final class ConstantPropagateProc
ExprNodeDesc newCondn = foldExpr(condn, constants, cppCtx, op, 0, true);
if (newCondn instanceof ExprNodeConstantDesc) {
ExprNodeConstantDesc c = (ExprNodeConstantDesc) newCondn;
- if (c.getValue() == Boolean.TRUE) {
+ if (Boolean.TRUE.equals(c.getValue())) {
cppCtx.addOpToDelete(op);
LOG.debug("Filter expression " + condn + " holds true. Will delete it.");
- } else if (c.getValue() == Boolean.FALSE) {
+ } else if (Boolean.FALSE.equals(c.getValue())) {
LOG.warn("Filter expression " + condn + " holds false!");
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Sun Aug 10 01:33:50 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.exec.*;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
@@ -62,9 +63,12 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -107,6 +111,12 @@ import org.apache.hadoop.hive.ql.udf.UDF
import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
import org.apache.hadoop.hive.ql.udf.UDFYear;
import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
public class Vectorizer implements PhysicalPlanResolver {
@@ -133,6 +143,7 @@ public class Vectorizer implements Physi
patternBuilder.append("|short");
patternBuilder.append("|timestamp");
patternBuilder.append("|boolean");
+ patternBuilder.append("|binary");
patternBuilder.append("|string");
patternBuilder.append("|byte");
patternBuilder.append("|float");
@@ -256,7 +267,15 @@ public class Vectorizer implements Physi
class VectorizationDispatcher implements Dispatcher {
+ private PhysicalContext pctx;
+
+ private int keyColCount;
+ private int valueColCount;
+
public VectorizationDispatcher(PhysicalContext pctx) {
+ this.pctx = pctx;
+ keyColCount = 0;
+ valueColCount = 0;
}
@Override
@@ -270,6 +289,9 @@ public class Vectorizer implements Physi
for (BaseWork w: work.getAllWork()) {
if (w instanceof MapWork) {
convertMapWork((MapWork)w);
+ } else if (w instanceof ReduceWork) {
+ // We are only vectorizing Reduce under Tez.
+ convertReduceWork((ReduceWork)w);
}
}
}
@@ -283,6 +305,13 @@ public class Vectorizer implements Physi
}
}
+ private void addMapWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
+ opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
+ + FileSinkOperator.getOperatorName()), np);
+ opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
+ + ReduceSinkOperator.getOperatorName()), np);
+ }
+
private boolean validateMapWork(MapWork mapWork) throws SemanticException {
// Validate the input format
@@ -297,11 +326,8 @@ public class Vectorizer implements Physi
}
}
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- ValidationNodeProcessor vnp = new ValidationNodeProcessor();
- opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
- + FileSinkOperator.getOperatorName()), vnp);
- opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
- + ReduceSinkOperator.getOperatorName()), vnp);
+ MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor();
+ addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
// iterator the mapper operator tree
@@ -320,14 +346,11 @@ public class Vectorizer implements Physi
}
private void vectorizeMapWork(MapWork mapWork) throws SemanticException {
- LOG.info("Vectorizing task...");
+ LOG.info("Vectorizing MapWork...");
mapWork.setVectorMode(true);
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- VectorizationNodeProcessor vnp = new VectorizationNodeProcessor(mapWork);
- opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*" +
- ReduceSinkOperator.getOperatorName()), vnp);
- opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
- + FileSinkOperator.getOperatorName()), vnp);
+ MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork);
+ addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
// iterator the mapper operator tree
@@ -348,9 +371,114 @@ public class Vectorizer implements Physi
return;
}
+
+ private void convertReduceWork(ReduceWork reduceWork) throws SemanticException {
+ boolean ret = validateReduceWork(reduceWork);
+ if (ret) {
+ vectorizeReduceWork(reduceWork);
+ }
+ }
+
+ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
+ try {
+ // Check key ObjectInspector.
+ ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
+ if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
+ return false;
+ }
+ StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
+ keyColCount = keyStructObjectInspector.getAllStructFieldRefs().size();
+
+ // Tez doesn't use tagging...
+ if (reduceWork.getNeedsTagging()) {
+ return false;
+ }
+
+ // Check value ObjectInspector.
+ ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
+ if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) {
+ return false;
+ }
+ StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
+ valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size();
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ return true;
+ }
+
+ private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
+ opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np);
+ opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np);
+ }
+
+ private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+ // Validate input to ReduceWork.
+ if (!getOnlyStructObjectInspectors(reduceWork)) {
+ return false;
+ }
+ // Now check the reduce operator tree.
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ ReduceWorkValidationNodeProcessor vnp = new ReduceWorkValidationNodeProcessor();
+ addReduceWorkRules(opRules, vnp);
+ Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ // iterator the reduce operator tree
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.add(reduceWork.getReducer());
+ HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+ ogw.startWalking(topNodes, nodeOutput);
+ for (Node n : nodeOutput.keySet()) {
+ if (nodeOutput.get(n) != null) {
+ if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void vectorizeReduceWork(ReduceWork reduceWork) throws SemanticException {
+ LOG.info("Vectorizing ReduceWork...");
+ reduceWork.setVectorMode(true);
+
+ // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as expected.
+ // We need to descend down, otherwise it breaks our algorithm that determines VectorizationContext...
+ // Do we use PreOrderWalker instead of DefaultGraphWalker.
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ ReduceWorkVectorizationNodeProcessor vnp = new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
+ addReduceWorkRules(opRules, vnp);
+ Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+ GraphWalker ogw = new PreOrderWalker(disp);
+ // iterator the reduce operator tree
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.add(reduceWork.getReducer());
+ LOG.info("vectorizeReduceWork reducer Operator: " + reduceWork.getReducer().getName() + "...");
+ HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+ ogw.startWalking(topNodes, nodeOutput);
+
+ // Necessary since we are vectorizing the root operator in reduce.
+ reduceWork.setReducer(vnp.getRootVectorOp());
+
+ Operator<? extends OperatorDesc> reducer = reduceWork.getReducer();
+ if (reducer.getType().equals(OperatorType.EXTRACT)) {
+ ((VectorExtractOperator)reducer).setKeyAndValueColCounts(keyColCount, valueColCount);
+ }
+
+ Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes();
+ reduceWork.setScratchColumnVectorTypes(columnVectorTypes);
+ Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap();
+ reduceWork.setScratchColumnMap(columnMap);
+
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString()));
+ LOG.debug(String.format("columnMap: %s", columnMap.toString()));
+ }
+ }
}
- class ValidationNodeProcessor implements NodeProcessor {
+ class MapWorkValidationNodeProcessor implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
@@ -361,9 +489,9 @@ public class Vectorizer implements Physi
op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
return new Boolean(true);
}
- boolean ret = validateOperator(op);
+ boolean ret = validateMapWorkOperator(op);
if (!ret) {
- LOG.info("Operator: " + op.getName() + " could not be vectorized.");
+ LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
return new Boolean(false);
}
}
@@ -371,24 +499,37 @@ public class Vectorizer implements Physi
}
}
- class VectorizationNodeProcessor implements NodeProcessor {
+ class ReduceWorkValidationNodeProcessor implements NodeProcessor {
- private final MapWork mWork;
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ for (Node n : stack) {
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+ boolean ret = validateReduceWorkOperator(op);
+ if (!ret) {
+ LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
+ return new Boolean(false);
+ }
+ }
+ return new Boolean(true);
+ }
+ }
+
+ // This class has common code used by both MapWorkVectorizationNodeProcessor and
+ // ReduceWorkVectorizationNodeProcessor.
+ class VectorizationNodeProcessor implements NodeProcessor {
// This is used to extract scratch column types for each file key
- private final Map<String, VectorizationContext> scratchColumnContext =
+ protected final Map<String, VectorizationContext> scratchColumnContext =
new HashMap<String, VectorizationContext>();
- private final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
+ protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
new HashMap<Operator<? extends OperatorDesc>, VectorizationContext>();
- private final Set<Operator<? extends OperatorDesc>> opsDone =
+ protected final Set<Operator<? extends OperatorDesc>> opsDone =
new HashSet<Operator<? extends OperatorDesc>>();
- public VectorizationNodeProcessor(MapWork mWork) {
- this.mWork = mWork;
- }
-
public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
Map<String, Map<Integer, String>> scratchColumnVectorTypes =
new HashMap<String, Map<Integer, String>>();
@@ -411,16 +552,90 @@ public class Vectorizer implements Physi
return scratchColumnMap;
}
+ public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack, Operator<? extends OperatorDesc> op)
+ throws SemanticException {
+ VectorizationContext vContext = null;
+ if (stack.size() <= 1) {
+ throw new SemanticException(String.format("Expected operator stack for operator %s to have at least 2 operators", op.getName()));
+ }
+ // Walk down the stack of operators until we found one willing to give us a context.
+ // At the bottom will be the root operator, guaranteed to have a context
+ int i= stack.size()-2;
+ while (vContext == null) {
+ if (i < 0) {
+ throw new SemanticException(String.format("Did not find vectorization context for operator %s in operator stack", op.getName()));
+ }
+ Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
+ vContext = vContextsByTSOp.get(opParent);
+ --i;
+ }
+ return vContext;
+ }
+
+ public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
+ Operator<? extends OperatorDesc> currentOp = op;
+ while (currentOp.getParentOperators().size() > 0) {
+ currentOp = currentOp.getParentOperators().get(0);
+ if (currentOp.getType().equals(OperatorType.GROUPBY)) {
+ // No need to vectorize
+ if (!opsDone.contains(op)) {
+ opsDone.add(op);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext)
+ throws SemanticException {
+ Operator<? extends OperatorDesc> vectorOp = op;
+ try {
+ if (!opsDone.contains(op)) {
+ vectorOp = vectorizeOperator(op, vContext);
+ opsDone.add(op);
+ if (vectorOp != op) {
+ opsDone.add(vectorOp);
+ }
+ if (vectorOp instanceof VectorizationContextRegion) {
+ VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+ VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
+ vContextsByTSOp.put(op, vOutContext);
+ scratchColumnContext.put(vOutContext.getFileKey(), vOutContext);
+ }
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ return vectorOp;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ throw new SemanticException("Must be overridden");
+ }
+ }
+
+ class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+
+ private final MapWork mWork;
+
+ public MapWorkVectorizationNodeProcessor(MapWork mWork) {
+ this.mWork = mWork;
+ }
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ LOG.info("MapWorkVectorizationNodeProcessor processing Operator: " + op.getName() + "...");
VectorizationContext vContext = null;
if (op instanceof TableScanOperator) {
- vContext = getVectorizationContext((TableScanOperator) op, physicalContext);
+ vContext = getVectorizationContext(op, physicalContext);
for (String onefile : mWork.getPathToAliases().keySet()) {
List<String> aliases = mWork.getPathToAliases().get(onefile);
for (String alias : aliases) {
@@ -438,45 +653,76 @@ public class Vectorizer implements Physi
}
vContextsByTSOp.put(op, vContext);
} else {
- assert stack.size() > 1;
- // Walk down the stack of operators until we found one willing to give us a context.
- // At the bottom will be the TS operator, guaranteed to have a context
- int i= stack.size()-2;
- while (vContext == null) {
- Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
- vContext = vContextsByTSOp.get(opParent);
- --i;
- }
+ vContext = walkStackToFindVectorizationContext(stack, op);
}
assert vContext != null;
- if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) &&
- op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
- // No need to vectorize
- if (!opsDone.contains(op)) {
- opsDone.add(op);
- }
+ // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize
+ // any operators below GROUPBY.
+ if (nonVectorizableChildOfGroupBy(op)) {
+ return null;
+ }
+
+ doVectorize(op, vContext);
+
+ return null;
+ }
+ }
+
+ class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+
+ private final ReduceWork rWork;
+ private int keyColCount;
+ private int valueColCount;
+ private Map<String, Integer> reduceColumnNameMap;
+
+ private Operator<? extends OperatorDesc> rootVectorOp;
+
+ public Operator<? extends OperatorDesc> getRootVectorOp() {
+ return rootVectorOp;
+ }
+
+ public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount, int valueColCount) {
+ this.rWork = rWork;
+ reduceColumnNameMap = rWork.getReduceColumnNameMap();
+ this.keyColCount = keyColCount;
+ this.valueColCount = valueColCount;
+ rootVectorOp = null;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " + op.getName() + "...");
+
+ VectorizationContext vContext = null;
+
+ boolean saveRootVectorOp = false;
+
+ if (op.getParentOperators().size() == 0) {
+ vContext = getReduceVectorizationContext(reduceColumnNameMap);
+ vContextsByTSOp.put(op, vContext);
+ saveRootVectorOp = true;
} else {
- try {
- if (!opsDone.contains(op)) {
- Operator<? extends OperatorDesc> vectorOp =
- vectorizeOperator(op, vContext);
- opsDone.add(op);
- if (vectorOp != op) {
- opsDone.add(vectorOp);
- }
- if (vectorOp instanceof VectorizationContextRegion) {
- VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
- VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
- vContextsByTSOp.put(op, vOutContext);
- scratchColumnContext.put(vOutContext.getFileKey(), vOutContext);
- }
- }
- } catch (HiveException e) {
- throw new SemanticException(e);
- }
+ vContext = walkStackToFindVectorizationContext(stack, op);
+ }
+
+ assert vContext != null;
+
+ // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize
+ // any operators below GROUPBY.
+ if (nonVectorizableChildOfGroupBy(op)) {
+ return null;
}
+
+ Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+ if (saveRootVectorOp && op != vectorOp) {
+ rootVectorOp = vectorOp;
+ }
+
return null;
}
}
@@ -519,7 +765,7 @@ public class Vectorizer implements Physi
return pctx;
}
- boolean validateOperator(Operator<? extends OperatorDesc> op) {
+ boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op) {
boolean ret = false;
switch (op.getType()) {
case MAPJOIN:
@@ -555,6 +801,32 @@ public class Vectorizer implements Physi
return ret;
}
+ boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) {
+ boolean ret = false;
+ switch (op.getType()) {
+ case EXTRACT:
+ ret = validateExtractOperator((ExtractOperator) op);
+ break;
+ case FILTER:
+ ret = validateFilterOperator((FilterOperator) op);
+ break;
+ case SELECT:
+ ret = validateSelectOperator((SelectOperator) op);
+ break;
+ case REDUCESINK:
+ ret = validateReduceSinkOperator((ReduceSinkOperator) op);
+ break;
+ case FILESINK:
+ case LIMIT:
+ ret = true;
+ break;
+ default:
+ ret = false;
+ break;
+ }
+ return ret;
+ }
+
private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
SMBJoinDesc desc = op.getConf();
// Validation is the same as for map join, since the 'small' tables are not vectorized
@@ -617,6 +889,15 @@ public class Vectorizer implements Physi
return validateAggregationDesc(op.getConf().getAggregators());
}
+ private boolean validateExtractOperator(ExtractOperator op) {
+ ExprNodeDesc expr = op.getConf().getCol();
+ boolean ret = validateExprNodeDesc(expr);
+ if (!ret) {
+ return false;
+ }
+ return true;
+ }
+
private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
}
@@ -728,7 +1009,7 @@ public class Vectorizer implements Physi
return supportedDataTypesPattern.matcher(type.toLowerCase()).matches();
}
- private VectorizationContext getVectorizationContext(TableScanOperator op,
+ private VectorizationContext getVectorizationContext(Operator op,
PhysicalContext pctx) {
RowSchema rs = op.getSchema();
@@ -741,8 +1022,26 @@ public class Vectorizer implements Physi
}
}
- VectorizationContext vc = new VectorizationContext(cmap, columnCount);
- return vc;
+ return new VectorizationContext(cmap, columnCount);
+ }
+
+ private VectorizationContext getReduceVectorizationContext(Map<String, Integer> reduceColumnNameMap) {
+ return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size());
+ }
+
+ private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, Operator<? extends OperatorDesc> vectorOp) {
+ if (op.getParentOperators() != null) {
+ vectorOp.setParentOperators(op.getParentOperators());
+ for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
+ p.replaceChild(op, vectorOp);
+ }
+ }
+ if (op.getChildOperators() != null) {
+ vectorOp.setChildOperators(op.getChildOperators());
+ for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
+ c.replaceParent(op, vectorOp);
+ }
+ }
}
Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
@@ -757,6 +1056,7 @@ public class Vectorizer implements Physi
case FILESINK:
case REDUCESINK:
case LIMIT:
+ case EXTRACT:
vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
break;
default:
@@ -765,18 +1065,7 @@ public class Vectorizer implements Physi
}
if (vectorOp != op) {
- if (op.getParentOperators() != null) {
- vectorOp.setParentOperators(op.getParentOperators());
- for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
- p.replaceChild(op, vectorOp);
- }
- }
- if (op.getChildOperators() != null) {
- vectorOp.setChildOperators(op.getChildOperators());
- for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
- c.replaceParent(op, vectorOp);
- }
- }
+ fixupParentChildOperators(op, vectorOp);
((AbstractOperatorDesc) vectorOp.getConf()).setVectorMode(true);
}
return vectorOp;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java Sun Aug 10 01:33:50 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.Fi
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -62,6 +63,8 @@ public class AnnotateWithStatistics impl
+ MapJoinOperator.getOperatorName() + "%"), StatsRulesProcFactory.getJoinRule());
opRules.put(new RuleRegExp("LIM", LimitOperator.getOperatorName() + "%"),
StatsRulesProcFactory.getLimitRule());
+ opRules.put(new RuleRegExp("RS", ReduceSinkOperator.getOperatorName() + "%"),
+ StatsRulesProcFactory.getReduceSinkRule());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Sun Aug 10 01:33:50 2014
@@ -100,9 +100,9 @@ public class StatsRulesProcFactory {
}
Table table = aspCtx.getParseContext().getTopToTable().get(tsop);
- // gather statistics for the first time and the attach it to table scan operator
- Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop);
try {
+ // gather statistics for the first time and the attach it to table scan operator
+ Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop);
tsop.setStatistics(stats.clone());
if (LOG.isDebugEnabled()) {
@@ -110,6 +110,9 @@ public class StatsRulesProcFactory {
}
} catch (CloneNotSupportedException e) {
throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
+ } catch (HiveException e) {
+ LOG.debug(e);
+ throw new SemanticException(e);
}
return null;
}
@@ -598,12 +601,18 @@ public class StatsRulesProcFactory {
}
dvProd *= dv;
} else {
-
- // partial column statistics on grouping attributes case.
- // if column statistics on grouping attribute is missing, then
- // assume worst case.
- // GBY rule will emit half the number of rows if dvProd is 0
- dvProd = 0;
+ if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
+ // the column must be an aggregate column inserted by GBY. We
+ // don't have to account for this column when computing product
+ // of NDVs
+ continue;
+ } else {
+ // partial column statistics on grouping attributes case.
+ // if column statistics on grouping attribute is missing, then
+ // assume worst case.
+ // GBY rule will emit half the number of rows if dvProd is 0
+ dvProd = 0;
+ }
break;
}
}
@@ -684,7 +693,17 @@ public class StatsRulesProcFactory {
aggColStats.add(cs);
}
}
- stats.addToColumnStats(aggColStats);
+
+ // add the new aggregate column and recompute data size
+ if (aggColStats.size() > 0) {
+ stats.addToColumnStats(aggColStats);
+
+ // only if the column stats is available, update the data size from
+ // the column stats
+ if (!stats.getColumnStatsState().equals(Statistics.State.NONE)) {
+ updateStats(stats, stats.getNumRows(), true);
+ }
+ }
// if UDAF present and if column expression map is empty then it must
// be full aggregation query like count(*) in which case number of
@@ -731,15 +750,24 @@ public class StatsRulesProcFactory {
* <p>
* In the absence of histograms, we can use the following general case
* <p>
- * <b>Single attribute</b>
+ * <b>2 Relations, 1 attribute</b>
* <p>
* T(RXS) = (T(R)*T(S))/max(V(R,Y), V(S,Y)) where Y is the join attribute
* <p>
- * <b>Multiple attributes</b>
+ * <b>2 Relations, 2 attributes</b>
* <p>
* T(RXS) = T(R)*T(S)/max(V(R,y1), V(S,y1)) * max(V(R,y2), V(S,y2)), where y1 and y2 are the join
* attributes
* <p>
+ * <b>3 Relations, 1 attributes</b>
+ * <p>
+ * T(RXSXQ) = T(R)*T(S)*T(Q)/top2largest(V(R,y), V(S,y), V(Q,y)), where y is the join attribute
+ * <p>
+ * <b>3 Relations, 2 attributes</b>
+ * <p>
+ * T(RXSXQ) = T(R)*T(S)*T(Q)/top2largest(V(R,y1), V(S,y1), V(Q,y1)) * top2largest(V(R,y2), V(S,y2), V(Q,y2)),
+ * where y1 and y2 are the join attributes
+ * <p>
* <i>Worst case:</i> If no column statistics are available, then T(RXS) = joinFactor * max(T(R),
* T(S)) * (numParents - 1) will be used as heuristics. joinFactor is from hive.stats.join.factor
* hive config. In the worst case, since we do not know any information about join keys (and hence
@@ -780,9 +808,12 @@ public class StatsRulesProcFactory {
// statistics object that is combination of statistics from all
// relations involved in JOIN
Statistics stats = new Statistics();
- long prodRows = 1;
+ List<Long> rowCountParents = Lists.newArrayList();
List<Long> distinctVals = Lists.newArrayList();
+
+ // 2 relations, multiple attributes
boolean multiAttr = false;
+ int numAttr = 1;
Map<String, ColStatistics> joinedColStats = Maps.newHashMap();
Map<Integer, List<String>> joinKeys = Maps.newHashMap();
@@ -792,12 +823,13 @@ public class StatsRulesProcFactory {
ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
Statistics parentStats = parent.getStatistics();
- prodRows *= parentStats.getNumRows();
+ rowCountParents.add(parentStats.getNumRows());
List<ExprNodeDesc> keyExprs = parent.getConf().getKeyCols();
// multi-attribute join key
if (keyExprs.size() > 1) {
multiAttr = true;
+ numAttr = keyExprs.size();
}
// compute fully qualified join key column names. this name will be
@@ -808,16 +840,9 @@ public class StatsRulesProcFactory {
StatsUtils.getFullQualifedColNameFromExprs(keyExprs, parent.getColumnExprMap());
joinKeys.put(pos, fqCols);
- Map<String, ExprNodeDesc> colExprMap = parent.getColumnExprMap();
- RowSchema rs = parent.getSchema();
-
// get column statistics for all output columns
- List<ColStatistics> cs =
- StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs);
- for (ColStatistics c : cs) {
- if (c != null) {
- joinedColStats.put(c.getFullyQualifiedColName(), c);
- }
+ for (ColStatistics cs : parentStats.getColumnStats()) {
+ joinedColStats.put(cs.getFullyQualifiedColName(), cs);
}
// since new statistics is derived from all relations involved in
@@ -831,10 +856,10 @@ public class StatsRulesProcFactory {
long denom = 1;
if (multiAttr) {
List<Long> perAttrDVs = Lists.newArrayList();
- int numAttr = joinKeys.get(0).size();
for (int idx = 0; idx < numAttr; idx++) {
for (Integer i : joinKeys.keySet()) {
String col = joinKeys.get(i).get(idx);
+ col = StatsUtils.stripPrefixFromColumnName(col);
ColStatistics cs = joinedColStats.get(col);
if (cs != null) {
perAttrDVs.add(cs.getCountDistint());
@@ -850,6 +875,7 @@ public class StatsRulesProcFactory {
} else {
for (List<String> jkeys : joinKeys.values()) {
for (String jk : jkeys) {
+ jk = StatsUtils.stripPrefixFromColumnName(jk);
ColStatistics cs = joinedColStats.get(jk);
if (cs != null) {
distinctVals.add(cs.getCountDistint());
@@ -859,6 +885,11 @@ public class StatsRulesProcFactory {
denom = getDenominator(distinctVals);
}
+ // Update NDV of joined columns to be min(V(R,y), V(S,y))
+ if (multiAttr) {
+ updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr);
+ }
+
// column statistics from different sources are put together and rename
// fully qualified column names based on output schema of join operator
Map<String, ExprNodeDesc> colExprMap = jop.getColumnExprMap();
@@ -875,7 +906,6 @@ public class StatsRulesProcFactory {
ColStatistics cs = joinedColStats.get(fqColName);
String outColName = key;
String outTabAlias = ci.getTabAlias();
- outColName = StatsUtils.stripPrefixFromColumnName(outColName);
if (cs != null) {
cs.setColumnName(outColName);
cs.setTableAlias(outTabAlias);
@@ -886,13 +916,21 @@ public class StatsRulesProcFactory {
// update join statistics
stats.setColumnStats(outColStats);
- long newRowCount = prodRows / denom;
+ long newRowCount = computeNewRowCount(rowCountParents, denom);
+
+ if (newRowCount <= 0 && LOG.isDebugEnabled()) {
+ newRowCount = 0;
+ LOG.debug("[0] STATS-" + jop.toString() + ": Product of #rows might be greater than"
+ + " denominator or overflow might have occurred. Resetting row count to 0."
+ + " #Rows of parents: " + rowCountParents.toString() + ". Denominator: " + denom);
+ }
+
stats.setNumRows(newRowCount);
stats.setDataSize(StatsUtils.getDataSizeFromColumnStats(newRowCount, outColStats));
jop.setStatistics(stats);
if (LOG.isDebugEnabled()) {
- LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
+ LOG.debug("[1] STATS-" + jop.toString() + ": " + stats.extendedToString());
}
} else {
@@ -927,13 +965,72 @@ public class StatsRulesProcFactory {
jop.setStatistics(wcStats);
if (LOG.isDebugEnabled()) {
- LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
+ LOG.debug("[2] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
}
}
}
return null;
}
+ private long computeNewRowCount(List<Long> rowCountParents, long denom) {
+ double factor = 0.0d;
+ long result = 1;
+ long max = rowCountParents.get(0);
+ long maxIdx = 0;
+
+ // To avoid long overflow, we will divide the max row count by denominator
+ // and use that factor to multiply with other row counts
+ for (int i = 1; i < rowCountParents.size(); i++) {
+ if (rowCountParents.get(i) > max) {
+ max = rowCountParents.get(i);
+ maxIdx = i;
+ }
+ }
+
+ factor = (double) max / (double) denom;
+
+ for (int i = 0; i < rowCountParents.size(); i++) {
+ if (i != maxIdx) {
+ result *= rowCountParents.get(i);
+ }
+ }
+
+ result = (long) (result * factor);
+
+ return result;
+ }
+
+ private void updateJoinColumnsNDV(Map<Integer, List<String>> joinKeys,
+ Map<String, ColStatistics> joinedColStats, int numAttr) {
+ int joinColIdx = 0;
+ while (numAttr > 0) {
+ long minNDV = Long.MAX_VALUE;
+
+ // find min NDV for joining columns
+ for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
+ String key = entry.getValue().get(joinColIdx);
+ key = StatsUtils.stripPrefixFromColumnName(key);
+ ColStatistics cs = joinedColStats.get(key);
+ if (cs != null && cs.getCountDistint() < minNDV) {
+ minNDV = cs.getCountDistint();
+ }
+ }
+
+ // set min NDV value to both columns involved in join
+ if (minNDV != Long.MAX_VALUE) {
+ for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
+ String key = entry.getValue().get(joinColIdx);
+ key = StatsUtils.stripPrefixFromColumnName(key);
+ ColStatistics cs = joinedColStats.get(key);
+ cs.setCountDistint(minNDV);
+ }
+ }
+
+ joinColIdx++;
+ numAttr--;
+ }
+ }
+
private long getDenominator(List<Long> distinctVals) {
if (distinctVals.isEmpty()) {
@@ -951,16 +1048,23 @@ public class StatsRulesProcFactory {
return Collections.max(distinctVals);
} else {
+ // remember min value and ignore it from the denominator
+ long minNDV = distinctVals.get(0);
+ int minIdx = 0;
+
+ for (int i = 1; i < distinctVals.size(); i++) {
+ if (distinctVals.get(i) < minNDV) {
+ minNDV = distinctVals.get(i);
+ minIdx = i;
+ }
+ }
+
// join from multiple relations:
- // denom = max(v1, v2) * max(v2, v3) * max(v3, v4)
+ // denom = Product of all NDVs except the least of all
long denom = 1;
- for (int i = 0; i < distinctVals.size() - 1; i++) {
- long v1 = distinctVals.get(i);
- long v2 = distinctVals.get(i + 1);
- if (v1 >= v2) {
- denom *= v1;
- } else {
- denom *= v2;
+ for (int i = 0; i < distinctVals.size(); i++) {
+ if (i != minIdx) {
+ denom *= distinctVals.get(i);
}
}
return denom;
@@ -980,8 +1084,6 @@ public class StatsRulesProcFactory {
LimitOperator lop = (LimitOperator) nd;
Operator<? extends OperatorDesc> parent = lop.getParentOperators().get(0);
Statistics parentStats = parent.getStatistics();
- AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
- HiveConf conf = aspCtx.getConf();
try {
long limit = -1;
@@ -1029,6 +1131,73 @@ public class StatsRulesProcFactory {
}
/**
+ * ReduceSink operator does not change any of the statistics. But it renames
+ * the column statistics from its parent based on the output key and value
+ * column names to make it easy for the downstream operators. This is different
+ * from the default stats which just aggregates and passes along the statistics
+ * without actually renaming based on output schema of the operator.
+ */
+ public static class ReduceSinkStatsRule extends DefaultStatsRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator rop = (ReduceSinkOperator) nd;
+ Operator<? extends OperatorDesc> parent = rop.getParentOperators().get(0);
+ Statistics parentStats = parent.getStatistics();
+ if (parentStats != null) {
+ AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
+ HiveConf conf = aspCtx.getConf();
+
+ List<String> outKeyColNames = rop.getConf().getOutputKeyColumnNames();
+ List<String> outValueColNames = rop.getConf().getOutputValueColumnNames();
+ Map<String, ExprNodeDesc> colExprMap = rop.getColumnExprMap();
+ try {
+ Statistics outStats = parentStats.clone();
+ if (satisfyPrecondition(parentStats)) {
+ List<ColStatistics> colStats = Lists.newArrayList();
+ for (String key : outKeyColNames) {
+ String prefixedKey = "KEY." + key;
+ ExprNodeDesc end = colExprMap.get(prefixedKey);
+ if (end != null) {
+ ColStatistics cs = StatsUtils
+ .getColStatisticsFromExpression(conf, parentStats, end);
+ if (cs != null) {
+ cs.setColumnName(key);
+ colStats.add(cs);
+ }
+ }
+ }
+
+ for (String val : outValueColNames) {
+ String prefixedVal = "VALUE." + val;
+ ExprNodeDesc end = colExprMap.get(prefixedVal);
+ if (end != null) {
+ ColStatistics cs = StatsUtils
+ .getColStatisticsFromExpression(conf, parentStats, end);
+ if (cs != null) {
+ cs.setColumnName(val);
+ colStats.add(cs);
+ }
+ }
+ }
+
+ outStats.setColumnStats(colStats);
+ }
+ rop.setStatistics(outStats);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString());
+ }
+ } catch (CloneNotSupportedException e) {
+ throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
+ }
+ }
+ return null;
+ }
+
+ }
+
+ /**
* Default rule is to aggregate the statistics from all its parent operators.
*/
public static class DefaultStatsRule implements NodeProcessor {
@@ -1105,6 +1274,10 @@ public class StatsRulesProcFactory {
return new LimitStatsRule();
}
+ public static NodeProcessor getReduceSinkRule() {
+ return new ReduceSinkStatsRule();
+ }
+
public static NodeProcessor getDefaultRule() {
return new DefaultStatsRule();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java Sun Aug 10 01:33:50 2014
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -102,6 +103,10 @@ public class ColumnStatsSemanticAnalyzer
private Map<String,String> getPartKeyValuePairsFromAST(ASTNode tree) {
ASTNode child = ((ASTNode) tree.getChild(0).getChild(1));
Map<String,String> partSpec = new HashMap<String, String>();
+ if (null == child) {
+ // case of analyze table T compute statistics for columns;
+ return partSpec;
+ }
String partKey;
String partValue;
for (int i = 0; i < child.getChildCount(); i++) {
@@ -361,6 +366,9 @@ public class ColumnStatsSemanticAnalyzer
checkIfTemporaryTable();
checkForPartitionColumns(colNames, Utilities.getColumnNamesFromFieldSchema(tbl.getPartitionKeys()));
validateSpecifiedColumnNames(colNames);
+ if (conf.getBoolVar(ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS) && tbl.isPartitioned()) {
+ isPartitionStats = true;
+ }
if (isPartitionStats) {
isTableLevel = false;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Sun Aug 10 01:33:50 2014
@@ -52,6 +52,11 @@ public abstract class BaseWork extends A
private String name;
+ // Vectorization.
+ protected Map<String, Map<Integer, String>> scratchColumnVectorTypes = null;
+ protected Map<String, Map<String, Integer>> scratchColumnMap = null;
+ protected boolean vectorMode = false;
+
public void setGatheringStats(boolean gatherStats) {
this.gatheringStats = gatherStats;
}
@@ -107,5 +112,31 @@ public abstract class BaseWork extends A
return returnSet;
}
+ public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
+ return scratchColumnVectorTypes;
+ }
+
+ public void setScratchColumnVectorTypes(
+ Map<String, Map<Integer, String>> scratchColumnVectorTypes) {
+ this.scratchColumnVectorTypes = scratchColumnVectorTypes;
+ }
+
+ public Map<String, Map<String, Integer>> getScratchColumnMap() {
+ return scratchColumnMap;
+ }
+
+ public void setScratchColumnMap(Map<String, Map<String, Integer>> scratchColumnMap) {
+ this.scratchColumnMap = scratchColumnMap;
+ }
+
+ @Override
+ public void setVectorMode(boolean vectorMode) {
+ this.vectorMode = vectorMode;
+ }
+
+ public boolean getVectorMode() {
+ return vectorMode;
+ }
+
public abstract void configureJobConf(JobConf job);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Sun Aug 10 01:33:50 2014
@@ -116,10 +116,6 @@ public class MapWork extends BaseWork {
private boolean useOneNullRowInputFormat;
- private Map<String, Map<Integer, String>> scratchColumnVectorTypes = null;
- private Map<String, Map<String, Integer>> scratchColumnMap = null;
- private boolean vectorMode = false;
-
public MapWork() {}
public MapWork(String name) {
@@ -519,32 +515,6 @@ public class MapWork extends BaseWork {
}
}
- public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
- return scratchColumnVectorTypes;
- }
-
- public void setScratchColumnVectorTypes(
- Map<String, Map<Integer, String>> scratchColumnVectorTypes) {
- this.scratchColumnVectorTypes = scratchColumnVectorTypes;
- }
-
- public Map<String, Map<String, Integer>> getScratchColumnMap() {
- return scratchColumnMap;
- }
-
- public void setScratchColumnMap(Map<String, Map<String, Integer>> scratchColumnMap) {
- this.scratchColumnMap = scratchColumnMap;
- }
-
- public boolean getVectorMode() {
- return vectorMode;
- }
-
- @Override
- public void setVectorMode(boolean vectorMode) {
- this.vectorMode = vectorMode;
- }
-
public void logPathToAliases() {
if (LOG.isDebugEnabled()) {
LOG.debug("LOGGING PATH TO ALIASES");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Sun Aug 10 01:33:50 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -30,7 +31,18 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* ReduceWork represents all the information used to run a reduce task on the cluster.
@@ -84,6 +96,11 @@ public class ReduceWork extends BaseWork
// for auto reduce parallelism - max reducers requested
private int maxReduceTasks;
+ private ObjectInspector keyObjectInspector = null;
+ private ObjectInspector valueObjectInspector = null;
+
+ private Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
+
/**
* If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
* to keySerializeInfo of the ReduceSink
@@ -95,7 +112,90 @@ public class ReduceWork extends BaseWork
}
public TableDesc getKeyDesc() {
- return keyDesc;
+ return keyDesc;
+ }
+
+ private ObjectInspector getObjectInspector(TableDesc desc) {
+ ObjectInspector objectInspector;
+ try {
+ Deserializer deserializer = (SerDe) ReflectionUtils.newInstance(desc
+ .getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null);
+ objectInspector = deserializer.getObjectInspector();
+ } catch (Exception e) {
+ return null;
+ }
+ return objectInspector;
+ }
+
+ public ObjectInspector getKeyObjectInspector() {
+ if (keyObjectInspector == null) {
+ keyObjectInspector = getObjectInspector(keyDesc);
+ }
+ return keyObjectInspector;
+ }
+
+ // Only works when not tagging.
+ public ObjectInspector getValueObjectInspector() {
+ if (needsTagging) {
+ return null;
+ }
+ if (valueObjectInspector == null) {
+ valueObjectInspector = getObjectInspector(tagToValueDesc.get(0));
+ }
+ return valueObjectInspector;
+ }
+
+ private int addToReduceColumnNameMap(StructObjectInspector structObjectInspector, int startIndex, String prefix) {
+ List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
+ int index = startIndex;
+ for (StructField field: fields) {
+ reduceColumnNameMap.put(prefix + "." + field.getFieldName(), index);
+ index++;
+ }
+ return index;
+ }
+
+ public Boolean fillInReduceColumnNameMap() {
+ ObjectInspector keyObjectInspector = getKeyObjectInspector();
+ if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
+ return false;
+ }
+ StructObjectInspector keyStructObjectInspector = (StructObjectInspector) keyObjectInspector;
+
+ ObjectInspector valueObjectInspector = getValueObjectInspector();
+ if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) {
+ return false;
+ }
+ StructObjectInspector valueStructObjectInspector = (StructObjectInspector) valueObjectInspector;
+
+ int keyCount = addToReduceColumnNameMap(keyStructObjectInspector, 0, Utilities.ReduceField.KEY.toString());
+ addToReduceColumnNameMap(valueStructObjectInspector, keyCount, Utilities.ReduceField.VALUE.toString());
+ return true;
+ }
+
+ public Map<String, Integer> getReduceColumnNameMap() {
+ if (needsTagging) {
+ return null;
+ }
+ if (reduceColumnNameMap.size() == 0) {
+ if (!fillInReduceColumnNameMap()) {
+ return null;
+ }
+ }
+ return reduceColumnNameMap;
+ }
+
+ public List<String> getReduceColumnNames() {
+ if (needsTagging) {
+ return null;
+ }
+ if (reduceColumnNameMap.size() == 0) {
+ if (!fillInReduceColumnNameMap()) {
+ return null;
+ }
+ }
+ return new ArrayList<String>(reduceColumnNameMap.keySet());
}
public List<TableDesc> getTagToValueDesc() {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java Sun Aug 10 01:33:50 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.securi
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.AccessControlException;
+import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.List;
@@ -35,6 +36,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -44,6 +48,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.shims.ShimLoader;
/**
* StorageBasedAuthorizationProvider is an implementation of
@@ -288,7 +293,7 @@ public class StorageBasedAuthorizationPr
* If the given path does not exists, it checks for its parent folder.
*/
protected void checkPermissions(final Configuration conf, final Path path,
- final EnumSet<FsAction> actions) throws IOException, LoginException {
+ final EnumSet<FsAction> actions) throws IOException, LoginException, HiveException {
if (path == null) {
throw new IllegalArgumentException("path is null");
@@ -297,8 +302,7 @@ public class StorageBasedAuthorizationPr
final FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
- checkPermissions(fs, path, actions,
- authenticator.getUserName(), authenticator.getGroupNames());
+ checkPermissions(fs, path, actions, authenticator.getUserName());
} else if (path.getParent() != null) {
// find the ancestor which exists to check its permissions
Path par = path.getParent();
@@ -309,8 +313,7 @@ public class StorageBasedAuthorizationPr
par = par.getParent();
}
- checkPermissions(fs, par, actions,
- authenticator.getUserName(), authenticator.getGroupNames());
+ checkPermissions(fs, par, actions, authenticator.getUserName());
}
}
@@ -320,56 +323,23 @@ public class StorageBasedAuthorizationPr
*/
@SuppressWarnings("deprecation")
protected static void checkPermissions(final FileSystem fs, final Path path,
- final EnumSet<FsAction> actions, String user, List<String> groups) throws IOException,
- AccessControlException {
-
- String superGroupName = getSuperGroupName(fs.getConf());
- if (userBelongsToSuperGroup(superGroupName, groups)) {
- LOG.info("User \"" + user + "\" belongs to super-group \"" + superGroupName + "\". " +
- "Permission granted for actions: (" + actions + ").");
- return;
- }
-
- final FileStatus stat;
+ final EnumSet<FsAction> actions, String user) throws IOException,
+ AccessControlException, HiveException {
try {
- stat = fs.getFileStatus(path);
+ FileStatus stat = fs.getFileStatus(path);
+ for (FsAction action : actions) {
+ FileUtils.checkFileAccessWithImpersonation(fs, stat, action, user);
+ }
} catch (FileNotFoundException fnfe) {
// File named by path doesn't exist; nothing to validate.
return;
} catch (org.apache.hadoop.fs.permission.AccessControlException ace) {
// Older hadoop version will throw this @deprecated Exception.
throw accessControlException(ace);
+ } catch (Exception err) {
+ throw new HiveException(err);
}
-
- final FsPermission dirPerms = stat.getPermission();
- final String grp = stat.getGroup();
-
- for (FsAction action : actions) {
- if (user.equals(stat.getOwner())) {
- if (dirPerms.getUserAction().implies(action)) {
- continue;
- }
- }
- if (groups.contains(grp)) {
- if (dirPerms.getGroupAction().implies(action)) {
- continue;
- }
- }
- if (dirPerms.getOtherAction().implies(action)) {
- continue;
- }
- throw new AccessControlException("action " + action + " not permitted on path "
- + path + " for user " + user);
- }
- }
-
- private static String getSuperGroupName(Configuration configuration) {
- return configuration.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, "");
- }
-
- private static boolean userBelongsToSuperGroup(String superGroupName, List<String> groups) {
- return groups.contains(superGroupName);
}
protected Path getDbLocation(Database db) throws HiveException {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java Sun Aug 10 01:33:50 2014
@@ -394,7 +394,7 @@ public class SQLAuthorizationUtils {
if (FileUtils.isActionPermittedForFileHierarchy(fs, fileStatus, userName, FsAction.READ)) {
availPrivs.addPrivilege(SQLPrivTypeGrant.SELECT_NOGRANT);
}
- } catch (IOException e) {
+ } catch (Exception e) {
String msg = "Error getting permissions for " + filePath + ": " + e.getMessage();
throw new HiveAuthzPluginException(msg, e);
}
@@ -412,5 +412,8 @@ public class SQLAuthorizationUtils {
}
}
+ static HiveAuthzPluginException getPluginException(String prefix, Exception e) {
+ return new HiveAuthzPluginException(prefix + ": " + e.getMessage(), e);
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java Sun Aug 10 01:33:50 2014
@@ -123,8 +123,8 @@ public class SQLStdHiveAccessController
}
return currentRoles;
} catch (Exception e) {
- throw new HiveAuthzPluginException("Failed to retrieve roles for " + currentUserName + ": "
- + e.getMessage(), e);
+ throw SQLAuthorizationUtils.getPluginException("Failed to retrieve roles for "
+ + currentUserName, e);
}
}
@@ -179,7 +179,7 @@ public class SQLStdHiveAccessController
try {
metastoreClient.grant_privileges(privBag);
} catch (Exception e) {
- throw new HiveAuthzPluginException("Error granting privileges: " + e.getMessage(), e);
+ throw SQLAuthorizationUtils.getPluginException("Error granting privileges", e);
}
}
@@ -239,7 +239,7 @@ public class SQLStdHiveAccessController
// that has desired behavior.
metastoreClient.revoke_privileges(new PrivilegeBag(revokePrivs), grantOption);
} catch (Exception e) {
- throw new HiveAuthzPluginException("Error revoking privileges", e);
+ throw SQLAuthorizationUtils.getPluginException("Error revoking privileges", e);
}
}
@@ -260,7 +260,7 @@ public class SQLStdHiveAccessController
metastoreClientFactory.getHiveMetastoreClient().create_role(
new Role(roleName, 0, grantorName));
} catch (TException e) {
- throw new HiveAuthzPluginException("Error create role : " + e.getMessage(), e);
+ throw SQLAuthorizationUtils.getPluginException("Error create role", e);
}
}
@@ -274,7 +274,7 @@ public class SQLStdHiveAccessController
try {
metastoreClientFactory.getHiveMetastoreClient().drop_role(roleName);
} catch (Exception e) {
- throw new HiveAuthzPluginException("Error dropping role", e);
+ throw SQLAuthorizationUtils.getPluginException("Error dropping role", e);
}
}
@@ -295,11 +295,11 @@ public class SQLStdHiveAccessController
grantorPrinc.getName(),
AuthorizationUtils.getThriftPrincipalType(grantorPrinc.getType()), grantOption);
} catch (MetaException e) {
- throw new HiveAuthzPluginException(e.getMessage(), e);
+ throw SQLAuthorizationUtils.getPluginException("Error granting role", e);
} catch (Exception e) {
String msg = "Error granting roles for " + hivePrincipal.getName() + " to role "
- + roleName + ": " + e.getMessage();
- throw new HiveAuthzPluginException(msg, e);
+ + roleName;
+ throw SQLAuthorizationUtils.getPluginException(msg, e);
}
}
}
@@ -321,8 +321,8 @@ public class SQLStdHiveAccessController
AuthorizationUtils.getThriftPrincipalType(hivePrincipal.getType()), grantOption);
} catch (Exception e) {
String msg = "Error revoking roles for " + hivePrincipal.getName() + " to role "
- + roleName + ": " + e.getMessage();
- throw new HiveAuthzPluginException(msg, e);
+ + roleName;
+ throw SQLAuthorizationUtils.getPluginException(msg, e);
}
}
}
@@ -338,7 +338,7 @@ public class SQLStdHiveAccessController
try {
return metastoreClientFactory.getHiveMetastoreClient().listRoleNames();
} catch (Exception e) {
- throw new HiveAuthzPluginException("Error listing all roles", e);
+ throw SQLAuthorizationUtils.getPluginException("Error listing all roles", e);
}
}
@@ -353,10 +353,12 @@ public class SQLStdHiveAccessController
try {
return getHiveRoleGrants(metastoreClientFactory.getHiveMetastoreClient(), roleName);
} catch (Exception e) {
- throw new HiveAuthzPluginException("Error getting principals for all roles", e);
+ throw SQLAuthorizationUtils.getPluginException("Error getting principals for all roles", e);
}
}
+
+
public static List<HiveRoleGrant> getHiveRoleGrants(IMetaStoreClient client, String roleName)
throws Exception {
GetPrincipalsInRoleRequest request = new GetPrincipalsInRoleRequest(roleName);
@@ -435,7 +437,7 @@ public class SQLStdHiveAccessController
return resPrivInfos;
} catch (Exception e) {
- throw new HiveAuthzPluginException("Error showing privileges: "+ e.getMessage(), e);
+ throw SQLAuthorizationUtils.getPluginException("Error showing privileges", e);
}
}
@@ -550,11 +552,7 @@ public class SQLStdHiveAccessController
*/
boolean isUserAdmin() throws HiveAuthzPluginException {
List<HiveRoleGrant> roles;
- try {
- roles = getCurrentRoles();
- } catch (Exception e) {
- throw new HiveAuthzPluginException(e);
- }
+ roles = getCurrentRoles();
for (HiveRoleGrant role : roles) {
if (role.getRoleName().equalsIgnoreCase(HiveMetaStore.ADMIN)) {
return true;
@@ -565,11 +563,7 @@ public class SQLStdHiveAccessController
private boolean doesUserHasAdminOption(List<String> roleNames) throws HiveAuthzPluginException {
List<HiveRoleGrant> currentRoles;
- try {
- currentRoles = getCurrentRoles();
- } catch (Exception e) {
- throw new HiveAuthzPluginException(e);
- }
+ currentRoles = getCurrentRoles();
for (String roleName : roleNames) {
boolean roleFound = false;
for (HiveRoleGrant currentRole : currentRoles) {
@@ -606,8 +600,8 @@ public class SQLStdHiveAccessController
}
return hiveRoleGrants;
} catch (Exception e) {
- throw new HiveAuthzPluginException("Error getting role grant information for user "
- + principal.getName() + ": " + e.getMessage(), e);
+ throw SQLAuthorizationUtils.getPluginException("Error getting role grant information for user "
+ + principal.getName(), e);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java Sun Aug 10 01:33:50 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.stats;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -30,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -101,7 +100,7 @@ public class StatsUtils {
* @throws HiveException
*/
public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
- Table table, TableScanOperator tableScanOperator) {
+ Table table, TableScanOperator tableScanOperator) throws HiveException {
Statistics stats = new Statistics();
@@ -197,7 +196,8 @@ public class StatsUtils {
stats.addToDataSize(ds);
// if at least a partition does not contain row count then mark basic stats state as PARTIAL
- if (containsNonPositives(rowCounts)) {
+ if (containsNonPositives(rowCounts) &&
+ stats.getBasicStatsState().equals(State.COMPLETE)) {
stats.setBasicStatsState(State.PARTIAL);
}
boolean haveFullStats = fetchColStats;
@@ -206,17 +206,26 @@ public class StatsUtils {
for (Partition part : partList.getNotDeniedPartns()) {
partNames.add(part.getName());
}
- Map<String, List<ColStatistics>> partStats =
- getPartColumnStats(table, schema, partNames, neededColumns);
- if (partStats != null) {
- for (String partName : partNames) {
- List<ColStatistics> partStat = partStats.get(partName);
- haveFullStats &= (partStat != null);
- if (partStat != null) {
- stats.updateColumnStatsState(deriveStatType(partStat, neededColumns));
- stats.addToColumnStats(partStat);
- }
+ Map<String, String> colToTabAlias = new HashMap<String, String>();
+ neededColumns = processNeededColumns(schema, neededColumns, colToTabAlias);
+ AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), neededColumns, partNames);
+ if (null == aggrStats) {
+ haveFullStats = false;
+ } else {
+ List<ColumnStatisticsObj> colStats = aggrStats.getColStats();
+ if (colStats.size() != neededColumns.size()) {
+ LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to retrieve"
+ + " for " + colStats.size() + " columns");
+ }
+ List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(), colToTabAlias);
+ stats.addToColumnStats(columnStats);
+ State colState = deriveStatType(columnStats, neededColumns);
+ if (aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) {
+ LOG.debug("Column stats requested for : " + partNames.size() +" partitions. "
+ + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions");
+ stats.updateColumnStatsState(State.PARTIAL);
}
+ stats.setColumnStatsState(colState);
}
}
// There are some partitions with no state (or we didn't fetch any state).
@@ -460,12 +469,7 @@ public class StatsUtils {
try {
List<ColumnStatisticsObj> colStat = Hive.get().getTableColumnStatistics(
dbName, tabName, neededColsInTable);
- stats = new ArrayList<ColStatistics>(colStat.size());
- for (ColumnStatisticsObj statObj : colStat) {
- ColStatistics cs = getColStatistics(statObj, tabName, statObj.getColName());
- cs.setTableAlias(colToTabAlias.get(cs.getColumnName()));
- stats.add(cs);
- }
+ stats = convertColStats(colStat, tabName, colToTabAlias);
} catch (HiveException e) {
LOG.error("Failed to retrieve table statistics: ", e);
stats = null;
@@ -473,43 +477,16 @@ public class StatsUtils {
return stats;
}
- /**
- * Get table level column statistics from metastore for needed columns
- * @param table
- * - table
- * @param schema
- * - output schema
- * @param neededColumns
- * - list of needed columns
- * @return column statistics
- */
- public static Map<String, List<ColStatistics>> getPartColumnStats(Table table,
- List<ColumnInfo> schema, List<String> partNames, List<String> neededColumns) {
- String dbName = table.getDbName();
- String tabName = table.getTableName();
- Map<String, String> colToTabAlias = new HashMap<String, String>(schema.size());
- List<String> neededColsInTable = processNeededColumns(schema, neededColumns, colToTabAlias);
- Map<String, List<ColStatistics>> stats = null;
- try {
- Map<String, List<ColumnStatisticsObj>> colStat = Hive.get().getPartitionColumnStatistics(
- dbName, tabName, partNames, neededColsInTable);
- stats = new HashMap<String, List<ColStatistics>>(colStat.size());
- for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStat.entrySet()) {
- List<ColStatistics> partStat = new ArrayList<ColStatistics>(entry.getValue().size());
- for (ColumnStatisticsObj statObj : entry.getValue()) {
- ColStatistics cs = getColStatistics(statObj, tabName, statObj.getColName());
- cs.setTableAlias(colToTabAlias.get(cs.getColumnName()));
- partStat.add(cs);
- }
- stats.put(entry.getKey(), partStat);
- }
- } catch (HiveException e) {
- LOG.error("Failed to retrieve partitions statistics: ", e);
- stats = null;
+ private static List<ColStatistics> convertColStats(List<ColumnStatisticsObj> colStats, String tabName,
+ Map<String,String> colToTabAlias) {
+ List<ColStatistics> stats = new ArrayList<ColStatistics>(colStats.size());
+ for (ColumnStatisticsObj statObj : colStats) {
+ ColStatistics cs = getColStatistics(statObj, tabName, statObj.getColName());
+ cs.setTableAlias(colToTabAlias.get(cs.getColumnName()));
+ stats.add(cs);
}
return stats;
}
-
private static List<String> processNeededColumns(List<ColumnInfo> schema,
List<String> neededColumns, Map<String, String> colToTabAlias) {
for (ColumnInfo col : schema) {
@@ -884,12 +861,9 @@ public class StatsUtils {
if (colExprMap != null) {
for (ColumnInfo ci : rowSchema.getSignature()) {
String outColName = ci.getInternalName();
+ outColName = StatsUtils.stripPrefixFromColumnName(outColName);
String outTabAlias = ci.getTabAlias();
ExprNodeDesc end = colExprMap.get(outColName);
- if (end == null) {
- outColName = StatsUtils.stripPrefixFromColumnName(outColName);
- end = colExprMap.get(outColName);
- }
ColStatistics colStat = getColStatisticsFromExpression(conf, parentStats, end);
if (colStat != null) {
outColName = StatsUtils.stripPrefixFromColumnName(outColName);
@@ -1150,7 +1124,7 @@ public class StatsUtils {
*/
public static String stripPrefixFromColumnName(String colName) {
String stripedName = colName;
- if (colName.startsWith("KEY._") || colName.startsWith("VALUE._")) {
+ if (colName.startsWith("KEY") || colName.startsWith("VALUE")) {
// strip off KEY./VALUE. from column name
stripedName = colName.split("\\.")[1];
}
@@ -1218,15 +1192,16 @@ public class StatsUtils {
for (Map.Entry<String, ExprNodeDesc> entry : map.entrySet()) {
if (entry.getValue().isSame(end)) {
outColName = entry.getKey();
+ outColName = stripPrefixFromColumnName(outColName);
}
}
if (end instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc encd = (ExprNodeColumnDesc) end;
if (outColName == null) {
outColName = encd.getColumn();
+ outColName = stripPrefixFromColumnName(outColName);
}
String tabAlias = encd.getTabAlias();
- outColName = stripPrefixFromColumnName(outColName);
result.add(getFullyQualifiedColumnName(tabAlias, outColName));
} else if (end instanceof ExprNodeGenericFuncDesc) {
ExprNodeGenericFuncDesc enf = (ExprNodeGenericFuncDesc) end;