You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/23 20:59:24 UTC
svn commit: r1654355 [16/27] - in /hive/branches/llap: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/con...
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Jan 23 19:59:11 2015
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hive.con
import java.io.IOException;
import java.io.Serializable;
+import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -38,14 +39,17 @@ import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.antlr.runtime.ClassicToken;
+import org.antlr.runtime.CommonToken;
import org.antlr.runtime.Token;
import org.antlr.runtime.tree.Tree;
import org.antlr.runtime.tree.TreeWizard;
import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -200,9 +204,12 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* Implementation of the semantic analyzer. It generates the query plan.
@@ -374,12 +381,9 @@ public class SemanticAnalyzer extends Ba
opToPartList = pctx.getOpToPartList();
opToSamplePruner = pctx.getOpToSamplePruner();
topOps = pctx.getTopOps();
- topSelOps = pctx.getTopSelOps();
opParseCtx = pctx.getOpParseCtx();
loadTableWork = pctx.getLoadTableWork();
loadFileWork = pctx.getLoadFileWork();
- joinContext = pctx.getJoinContext();
- smbMapJoinContext = pctx.getSmbMapJoinContext();
ctx = pctx.getContext();
destTableId = pctx.getDestTableId();
idToTableNameMap = pctx.getIdToTableNameMap();
@@ -393,15 +397,15 @@ public class SemanticAnalyzer extends Ba
}
public ParseContext getParseContext() {
- return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
- topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps,
- fsopToTable, loadTableWork,
- loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList,
+ topOps, opParseCtx,
+ new HashSet<JoinOperator>(joinContext.keySet()),
+ new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput,
- reduceSinkOperatorsAddedByEnforceBucketingSorting,
- queryProperties);
+ reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
}
@SuppressWarnings("nls")
@@ -1669,7 +1673,7 @@ public class SemanticAnalyzer extends Ba
throw new SemanticException(e);
}
try {
- fname = ctx.getExternalTmpPath(
+ fname = ctx.getExtTmpPathRelTo(
FileUtils.makeQualified(location, conf)).toString();
} catch (Exception e) {
throw new SemanticException(generateErrorMessage(ast,
@@ -1685,8 +1689,9 @@ public class SemanticAnalyzer extends Ba
} else {
// This is the only place where isQuery is set to true; it defaults to false.
qb.setIsQuery(true);
- fname = ctx.getMRTmpPath().toString();
- ctx.setResDir(new Path(fname));
+ Path stagingPath = getStagingDirectoryPathname(qb);
+ fname = stagingPath.toString();
+ ctx.setResDir(stagingPath);
}
}
qb.getMetaData().setDestForAlias(name, fname,
@@ -1742,6 +1747,160 @@ public class SemanticAnalyzer extends Ba
}
}
+ /**
+ * Checks if a given path is encrypted (valid only for HDFS files)
+ * @param path The path to check for encryption
+ * @return True if the path is encrypted; False if it is not encrypted
+ * @throws HiveException If an error occurs while checking for encryption
+ */
+ private boolean isPathEncrypted(Path path) throws HiveException {
+ HadoopShims.HdfsEncryptionShim hdfsEncryptionShim;
+
+ hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
+ if (hdfsEncryptionShim != null) {
+ try {
+ if (hdfsEncryptionShim.isPathEncrypted(path)) {
+ return true;
+ }
+ } catch (Exception e) {
+ throw new HiveException("Unable to determine if " + path + "is encrypted: " + e, e);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Compares to path key encryption strenghts.
+ *
+ * @param p1 Path to an HDFS file system
+ * @param p2 Path to an HDFS file system
+ * @return -1 if strength is weak; 0 if is equals; 1 if it is stronger
+ * @throws HiveException If an error occurs while comparing key strengths.
+ */
+ private int comparePathKeyStrength(Path p1, Path p2) throws HiveException {
+ HadoopShims.HdfsEncryptionShim hdfsEncryptionShim;
+
+ hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
+ if (hdfsEncryptionShim != null) {
+ try {
+ return hdfsEncryptionShim.comparePathKeyStrength(p1, p2);
+ } catch (Exception e) {
+ throw new HiveException("Unable to compare key strength for " + p1 + " and " + p2 + " : " + e, e);
+ }
+ }
+
+ return 0; // Non-encrypted path (or equals strength)
+ }
+
+ /**
+ * Checks if a given path has read-only access permissions.
+ *
+ * @param path The path to check for read-only permissions.
+ * @return True if the path is read-only; False otherwise.
+ * @throws HiveException If an error occurs while checking file permissions.
+ */
+ private boolean isPathReadOnly(Path path) throws HiveException {
+ HiveConf conf = SessionState.get().getConf();
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ UserGroupInformation ugi = Utils.getUGI();
+ FileStatus status = fs.getFileStatus(path);
+
+ // We just check for writing permissions. If it fails with AccessControException, then it
+ // means the location may be read-only.
+ FileUtils.checkFileAccessWithImpersonation(fs, status, FsAction.WRITE, ugi.getUserName());
+
+ // Path has writing permissions
+ return false;
+ } catch (AccessControlException e) {
+ // An AccessControlException may be caused for other different errors,
+ // but we take it as if our path is read-only
+ return true;
+ } catch (Exception e) {
+ throw new HiveException("Unable to determine if " + path + " is read only: " + e, e);
+ }
+ }
+
+ /**
+ * Gets the strongest encrypted table path.
+ *
+ * @param qb The QB object that contains a list of all table locations.
+ * @return The strongest encrypted path
+ * @throws HiveException if an error occurred attempting to compare the encryption strength
+ */
+ private Path getStrongestEncryptedTablePath(QB qb) throws HiveException {
+ List<String> tabAliases = new ArrayList<String>(qb.getTabAliases());
+ Path strongestPath = null;
+
+ /* Walk through all found table locations to get the most encrypted table */
+ for (String alias : tabAliases) {
+ Table tab = qb.getMetaData().getTableForAlias(alias);
+ if (tab != null) {
+ Path tablePath = tab.getDataLocation();
+ if (tablePath != null) {
+ try {
+ if (strongestPath == null) {
+ strongestPath = tablePath;
+ } else if ("hdfs".equals(tablePath.toUri().getScheme())
+ && isPathEncrypted(tablePath)
+ && comparePathKeyStrength(tablePath, strongestPath) > 0)
+ {
+ strongestPath = tablePath;
+ }
+ } catch (HiveException e) {
+ throw new HiveException("Unable to find the most secure table path: " + e, e);
+ }
+ }
+ }
+ }
+
+ return strongestPath;
+ }
+
+ /**
+ * Gets the staging directory where MR files will be stored temporary.
+ * It walks through the QB plan to find the correct location where save temporary files. This
+ * temporary location (or staging directory) may be created inside encrypted tables locations for
+ * security reasons. If the QB has read-only tables, then the older scratch directory will be used,
+ * or a permission error will be thrown if the requested query table is encrypted and the old scratch
+ * directory is not.
+ *
+ * @param qb The QB object that contains a list of all table locations.
+ * @return The path to the staging directory.
+ * @throws HiveException If an error occurs while identifying the correct staging location.
+ */
+ private Path getStagingDirectoryPathname(QB qb) throws HiveException {
+ Path stagingPath = null, tablePath;
+
+ // Looks for the most encrypted table location (if there is one)
+ tablePath = getStrongestEncryptedTablePath(qb);
+ if (tablePath != null && isPathEncrypted(tablePath)) {
+ // Only HDFS paths can be checked for encryption
+ if ("hdfs".equals(tablePath.toUri().getScheme())) {
+ if (isPathReadOnly(tablePath)) {
+ Path tmpPath = ctx.getMRTmpPath();
+ if (comparePathKeyStrength(tablePath, tmpPath) < 0) {
+ throw new HiveException("Read-only encrypted tables cannot be read " +
+ "if the scratch directory is not encrypted (or encryption is weak)");
+ } else {
+ stagingPath = tmpPath;
+ }
+ }
+ } else {
+ LOG.debug("Encryption is not applicable to table path " + tablePath.toString());
+ }
+
+ if (stagingPath == null) {
+ stagingPath = ctx.getMRTmpPath(tablePath.toUri());
+ }
+ } else {
+ stagingPath = ctx.getMRTmpPath();
+ }
+
+ return stagingPath;
+ }
+
private void replaceViewReferenceWithDefinition(QB qb, Table tab,
String tab_name, String alias) throws SemanticException {
@@ -2642,7 +2801,7 @@ public class SemanticAnalyzer extends Ba
@SuppressWarnings("nls")
// TODO: make aliases unique, otherwise needless rewriting takes place
- Integer genColListRegex(String colRegex, String tabAlias, ASTNode sel,
+ Integer genColListRegex(String colRegex, String tabAlias, ASTNode sel,
ArrayList<ExprNodeDesc> col_list, HashSet<ColumnInfo> excludeCols, RowResolver input,
RowResolver colSrcRR, Integer pos, RowResolver output, List<String> aliases,
boolean ensureUniqueCols) throws SemanticException {
@@ -3117,7 +3276,7 @@ public class SemanticAnalyzer extends Ba
}
}
- private List<Integer> getGroupingSetsForRollup(int size) {
+ protected List<Integer> getGroupingSetsForRollup(int size) {
List<Integer> groupingSetKeys = new ArrayList<Integer>();
for (int i = 0; i <= size; i++) {
groupingSetKeys.add((1 << i) - 1);
@@ -3125,7 +3284,7 @@ public class SemanticAnalyzer extends Ba
return groupingSetKeys;
}
- private List<Integer> getGroupingSetsForCube(int size) {
+ protected List<Integer> getGroupingSetsForCube(int size) {
int count = 1 << size;
List<Integer> results = new ArrayList<Integer>(count);
for (int i = 0; i < count; ++i) {
@@ -3152,7 +3311,7 @@ public class SemanticAnalyzer extends Ba
return new ObjectPair<List<ASTNode>, List<Integer>>(groupByExprs, groupingSets);
}
- private List<Integer> getGroupingSets(List<ASTNode> groupByExpr, QBParseInfo parseInfo,
+ protected List<Integer> getGroupingSets(List<ASTNode> groupByExpr, QBParseInfo parseInfo,
String dest) throws SemanticException {
Map<String, Integer> exprPos = new HashMap<String, Integer>();
for (int i = 0; i < groupByExpr.size(); ++i) {
@@ -3938,7 +4097,7 @@ public class SemanticAnalyzer extends Ba
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0, numDistinctUDFs > 0),
+ false, groupByMemoryUsage, memoryThreshold, null, false, -1, numDistinctUDFs > 0),
new RowSchema(groupByOutputRowResolver.getColumnInfos()),
input), groupByOutputRowResolver);
op.setColumnExprMap(colExprMap);
@@ -4063,10 +4222,11 @@ public class SemanticAnalyzer extends Ba
}
// This is only needed if a new grouping set key is being created
- int groupingSetsPosition = 0;
+ int groupingSetsPosition = -1;
// For grouping sets, add a dummy grouping key
if (groupingSetsPresent) {
+ groupingSetsPosition = groupByKeys.size();
// Consider the query: select a,b, count(1) from T group by a,b with cube;
// where it is being executed in a single map-reduce job
// The plan is TableScan -> GroupBy1 -> ReduceSink -> GroupBy2 -> FileSink
@@ -4081,7 +4241,6 @@ public class SemanticAnalyzer extends Ba
colExprMap);
}
else {
- groupingSetsPosition = groupByKeys.size();
// The grouping set has not yet been processed. Create a new grouping key
// Consider the query: select a,b, count(1) from T group by a,b with cube;
// where it is being executed in 2 map-reduce jobs
@@ -4297,7 +4456,7 @@ public class SemanticAnalyzer extends Ba
}
// The grouping set key is present after the grouping keys, before the distinct keys
- int groupingSetsPosition = groupByKeys.size();
+ int groupingSetsPosition = -1;
// For grouping sets, add a dummy grouping key
// This dummy key needs to be added as a reduce key
@@ -4309,6 +4468,7 @@ public class SemanticAnalyzer extends Ba
// This function is called for GroupBy1 to create an additional grouping key
// for the grouping set (corresponding to the rollup).
if (groupingSetsPresent) {
+ groupingSetsPosition = groupByKeys.size();
createNewGroupingKey(groupByKeys,
outputColumnNames,
groupByOutputRowResolver,
@@ -4865,8 +5025,10 @@ public class SemanticAnalyzer extends Ba
colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1));
}
+ int groupingSetsPosition = -1;
// For grouping sets, add a dummy grouping key
if (groupingSetsPresent) {
+ groupingSetsPosition = groupByKeys.size();
addGroupingSetKey(
groupByKeys,
groupByInputRowResolver2,
@@ -4922,7 +5084,8 @@ public class SemanticAnalyzer extends Ba
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0, containsDistinctAggr),
+ false, groupByMemoryUsage, memoryThreshold, null, false,
+ groupingSetsPosition, containsDistinctAggr),
new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
reduceSinkOperatorInfo2), groupByOutputRowResolver2);
op.setColumnExprMap(colExprMap);
@@ -5665,28 +5828,6 @@ public class SemanticAnalyzer extends Ba
}
}
- @SuppressWarnings("nls")
- private Operator genConversionOps(String dest, QB qb, Operator input)
- throws SemanticException {
-
- Integer dest_type = qb.getMetaData().getDestTypeForAlias(dest);
- switch (dest_type.intValue()) {
- case QBMetaData.DEST_TABLE: {
- qb.getMetaData().getDestTableForAlias(dest);
- break;
- }
- case QBMetaData.DEST_PARTITION: {
- qb.getMetaData().getDestPartitionForAlias(dest).getTable();
- break;
- }
- default: {
- return input;
- }
- }
-
- return input;
- }
-
private int getReducersBucketing(int totalFiles, int maxReducers) {
int numFiles = (int)Math.ceil((double)totalFiles / (double)maxReducers);
while (true) {
@@ -5879,6 +6020,7 @@ public class SemanticAnalyzer extends Ba
Table dest_tab = null; // destination table if any
boolean destTableIsAcid = false; // should the destination table be written to using ACID
+ boolean destTableIsTemporary = false;
Partition dest_part = null;// destination partition if any
Path queryTmpdir = null; // the intermediate destination directory
Path dest_path = null; // the final destination directory
@@ -5896,6 +6038,7 @@ public class SemanticAnalyzer extends Ba
dest_tab = qbm.getDestTableForAlias(dest);
destTableIsAcid = isAcidTable(dest_tab);
+ destTableIsTemporary = dest_tab.isTemporary();
// Is the user trying to insert into a external tables
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
@@ -5968,7 +6111,7 @@ public class SemanticAnalyzer extends Ba
if (isNonNativeTable) {
queryTmpdir = dest_path;
} else {
- queryTmpdir = ctx.getExternalTmpPath(dest_path);
+ queryTmpdir = ctx.getExtTmpPathRelTo(dest_path);
}
if (dpCtx != null) {
// set the root of the temporary path where dynamic partition columns will populate
@@ -6149,7 +6292,7 @@ public class SemanticAnalyzer extends Ba
try {
Path qPath = FileUtils.makeQualified(dest_path, conf);
- queryTmpdir = ctx.getExternalTmpPath(qPath);
+ queryTmpdir = ctx.getExtTmpPathRelTo(qPath);
} catch (Exception e) {
throw new SemanticException("Error creating temporary folder on: "
+ dest_path, e);
@@ -6165,6 +6308,7 @@ public class SemanticAnalyzer extends Ba
CreateTableDesc tblDesc = qb.getTableDesc();
if (tblDesc != null) {
field_schemas = new ArrayList<FieldSchema>();
+ destTableIsTemporary = tblDesc.isTemporary();
}
boolean first = true;
@@ -6309,6 +6453,8 @@ public class SemanticAnalyzer extends Ba
fileSinkDesc.setWriteType(wt);
acidFileSinks.add(fileSinkDesc);
}
+
+ fileSinkDesc.setTemporary(destTableIsTemporary);
/* Set List Bucketing context. */
if (lbCtx != null) {
@@ -6327,7 +6473,7 @@ public class SemanticAnalyzer extends Ba
// it should be the same as the MoveWork's sourceDir.
fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
- String statsTmpLoc = ctx.getExternalTmpPath(queryTmpdir).toString();
+ String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString();
LOG.info("Set stats collection dir : " + statsTmpLoc);
conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc);
}
@@ -6356,7 +6502,9 @@ public class SemanticAnalyzer extends Ba
+ dest_path + " row schema: " + inputRR.toString());
}
- fsopToTable.put((FileSinkOperator) output, dest_tab);
+ FileSinkOperator fso = (FileSinkOperator) output;
+ fso.getConf().setTable(dest_tab);
+ fsopToTable.put(fso, dest_tab);
return output;
}
@@ -6889,12 +7037,11 @@ public class SemanticAnalyzer extends Ba
}
- @SuppressWarnings("nls")
private Operator genReduceSinkPlan(String dest, QB qb, Operator<?> input,
int numReducers) throws SemanticException {
-
+
RowResolver inputRR = opParseCtx.get(input).getRowResolver();
-
+
// First generate the expression for the partition and sort keys
// The cluster by clause / distribute by clause has the aliases for
// partition function
@@ -6902,15 +7049,14 @@ public class SemanticAnalyzer extends Ba
if (partitionExprs == null) {
partitionExprs = qb.getParseInfo().getDistributeByForClause(dest);
}
- ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
if (partitionExprs != null) {
int ccount = partitionExprs.getChildCount();
for (int i = 0; i < ccount; ++i) {
ASTNode cl = (ASTNode) partitionExprs.getChild(i);
- partitionCols.add(genExprNodeDesc(cl, inputRR));
+ partCols.add(genExprNodeDesc(cl, inputRR));
}
}
-
ASTNode sortExprs = qb.getParseInfo().getClusterByForClause(dest);
if (sortExprs == null) {
sortExprs = qb.getParseInfo().getSortByForClause(dest);
@@ -6930,11 +7076,7 @@ public class SemanticAnalyzer extends Ba
}
}
}
- Operator dummy = Operator.createDummy();
- dummy.setParentOperators(Arrays.asList(input));
-
ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
- ArrayList<ExprNodeDesc> sortColsBack = new ArrayList<ExprNodeDesc>();
StringBuilder order = new StringBuilder();
if (sortExprs != null) {
int ccount = sortExprs.getChildCount();
@@ -6955,9 +7097,25 @@ public class SemanticAnalyzer extends Ba
}
ExprNodeDesc exprNode = genExprNodeDesc(cl, inputRR);
sortCols.add(exprNode);
- sortColsBack.add(ExprNodeDescUtils.backtrack(exprNode, dummy, input));
}
}
+ return genReduceSinkPlan(input, partCols, sortCols, order.toString(), numReducers);
+ }
+
+ @SuppressWarnings("nls")
+ private Operator genReduceSinkPlan(Operator<?> input,
+ ArrayList<ExprNodeDesc> partitionCols, ArrayList<ExprNodeDesc> sortCols,
+ String sortOrder, int numReducers) throws SemanticException {
+
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+
+ Operator dummy = Operator.createDummy();
+ dummy.setParentOperators(Arrays.asList(input));
+
+ ArrayList<ExprNodeDesc> sortColsBack = new ArrayList<ExprNodeDesc>();
+ for (ExprNodeDesc sortCol : sortCols) {
+ sortColsBack.add(ExprNodeDescUtils.backtrack(sortCol, dummy, input));
+ }
// For the generation of the values expression just get the inputs
// signature and generate field expressions for those
RowResolver rsRR = new RowResolver();
@@ -7015,7 +7173,7 @@ public class SemanticAnalyzer extends Ba
// TODO Not 100% sure NOT_ACID is always right here.
ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(sortCols, valueCols, outputColumns,
- false, -1, partitionCols, order.toString(), numReducers, AcidUtils.Operation.NOT_ACID);
+ false, -1, partitionCols, sortOrder, numReducers, AcidUtils.Operation.NOT_ACID);
Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
new RowSchema(rsRR.getColumnInfos()), input), rsRR);
@@ -7363,6 +7521,7 @@ public class SemanticAnalyzer extends Ba
JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree,
joinSrcOp, srcOps, omitOpts, joinKeys);
+ joinOp.getConf().setQBJoinTreeProps(joinTree);
joinContext.put(joinOp, joinTree);
Operator op = joinOp;
@@ -7456,7 +7615,7 @@ public class SemanticAnalyzer extends Ba
.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0, false),
+ false, groupByMemoryUsage, memoryThreshold, null, false, -1, false),
new RowSchema(groupByOutputRowResolver.getColumnInfos()),
inputOperatorInfo), groupByOutputRowResolver);
@@ -8773,6 +8932,23 @@ public class SemanticAnalyzer extends Ba
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(curr);
+ // Check and transform group by *. This will only happen for select distinct *.
+ // Here the "genSelectPlan" is being leveraged.
+ // The main benefits are (1) remove virtual columns that should
+ // not be included in the group by; (2) add the fully qualified column names to unParseTranslator
+ // so that view is supported. The drawback is that an additional SEL op is added. If it is
+ // not necessary, it will be removed by NonBlockingOpDeDupProc Optimizer because it will match
+ // SEL%SEL% rule.
+ ASTNode selExprList = qbp.getSelForClause(dest);
+ if (selExprList.getToken().getType() == HiveParser.TOK_SELECTDI
+ && selExprList.getChildCount() == 1 && selExprList.getChild(0).getChildCount() == 1) {
+ ASTNode node = (ASTNode) selExprList.getChild(0).getChild(0);
+ if (node.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
+ curr = genSelectPlan(dest, qb, curr, curr);
+ RowResolver rr = opParseCtx.get(curr).getRowResolver();
+ qbp.setSelExprForClause(dest, SemanticAnalyzer.genSelectDIAST(rr));
+ }
+ }
if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
curr = genGroupByPlanMapAggrNoSkew(dest, qb, curr);
@@ -8883,7 +9059,6 @@ public class SemanticAnalyzer extends Ba
.getOrderByForClause(dest) != null ? false : true);
}
} else {
- curr = genConversionOps(dest, qb, curr);
// exact limit can be taken care of by the fetch operator
if (limit != null) {
boolean extraMRStep = true;
@@ -9272,7 +9447,7 @@ public class SemanticAnalyzer extends Ba
}
// Create the root of the operator tree
- TableScanDesc tsDesc = new TableScanDesc(alias, vcList);
+ TableScanDesc tsDesc = new TableScanDesc(alias, vcList, tab);
setupStats(tsDesc, qb.getParseInfo(), tab, alias, rwsch);
SplitSample sample = nameToSplitSample.get(alias_id);
@@ -9294,6 +9469,7 @@ public class SemanticAnalyzer extends Ba
Map<String, String> props = qb.getTabPropsForAlias(alias);
if (props != null) {
topToTableProps.put((TableScanOperator) top, props);
+ tsDesc.setOpProps(props);
}
} else {
rwsch = opParseCtx.get(top).getRowResolver();
@@ -9456,7 +9632,7 @@ public class SemanticAnalyzer extends Ba
tsDesc.setGatherStats(false);
} else {
if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
- String statsTmpLoc = ctx.getExternalTmpPath(tab.getPath()).toString();
+ String statsTmpLoc = ctx.getExtTmpPathRelTo(tab.getPath()).toString();
LOG.info("Set stats collection dir : " + statsTmpLoc);
conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc);
}
@@ -9987,9 +10163,11 @@ public class SemanticAnalyzer extends Ba
}
// 4. Generate Parse Context for Optimizer & Physical compiler
- ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child, opToPartPruner, opToPartList,
- topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps,
- fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child,
+ opToPartPruner, opToPartList, topOps, opParseCtx,
+ new HashSet<JoinOperator>(joinContext.keySet()),
+ new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner,
globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
@@ -10590,6 +10768,7 @@ public class SemanticAnalyzer extends Ba
List<List<String>> skewedValues = new ArrayList<List<String>>();
Map<List<String>, String> listBucketColValuesMapping = new HashMap<List<String>, String>();
boolean storedAsDirs = false;
+ boolean isUserStorageFormat = false;
RowFormatParams rowFormatParams = new RowFormatParams();
StorageFormat storageFormat = new StorageFormat(conf);
@@ -10607,6 +10786,7 @@ public class SemanticAnalyzer extends Ba
for (int num = 1; num < numCh; num++) {
ASTNode child = (ASTNode) ast.getChild(num);
if (storageFormat.fillStorageFormat(child)) {
+ isUserStorageFormat = true;
continue;
}
switch (child.getToken().getType()) {
@@ -10799,7 +10979,7 @@ public class SemanticAnalyzer extends Ba
CreateTableLikeDesc crtTblLikeDesc = new CreateTableLikeDesc(dbDotTab, isExt, isTemporary,
storageFormat.getInputFormat(), storageFormat.getOutputFormat(), location,
storageFormat.getSerde(), storageFormat.getSerdeProps(), tblProps, ifNotExists,
- likeTableName);
+ likeTableName, isUserStorageFormat);
SessionState.get().setCommandType(HiveOperation.CREATETABLE);
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
crtTblLikeDesc), conf));
@@ -11969,130 +12149,34 @@ public class SemanticAnalyzer extends Ba
}
private Operator genReduceSinkPlanForWindowing(WindowingSpec spec,
- RowResolver inputRR,
- Operator input) throws SemanticException{
+ RowResolver inputRR, Operator input) throws SemanticException{
+
ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
- ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
- Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
- List<String> outputColumnNames = new ArrayList<String>();
- StringBuilder orderString = new StringBuilder();
+ StringBuilder order = new StringBuilder();
- ArrayList<PartitionExpression> partColList = spec.getQueryPartitionSpec().getExpressions();
- for (PartitionExpression partCol : partColList) {
+ for (PartitionExpression partCol : spec.getQueryPartitionSpec().getExpressions()) {
ExprNodeDesc partExpr = genExprNodeDesc(partCol.getExpression(), inputRR);
partCols.add(partExpr);
orderCols.add(partExpr);
- orderString.append('+');
+ order.append('+');
}
- ArrayList<OrderExpression> orderColList = spec.getQueryOrderSpec() == null ?
- new ArrayList<PTFInvocationSpec.OrderExpression>() :
- spec.getQueryOrderSpec().getExpressions();
- for (int i = 0; i < orderColList.size(); i++) {
- OrderExpression orderCol = orderColList.get(i);
- org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = orderCol.getOrder();
- if (order.name().equals("ASC")) {
- orderString.append('+');
- } else {
- orderString.append('-');
- }
- ExprNodeDesc orderExpr = genExprNodeDesc(orderCol.getExpression(), inputRR);
- orderCols.add(orderExpr);
- }
-
- ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
- RowResolver rsNewRR = new RowResolver();
- int pos = 0;
- for (ColumnInfo colInfo : colInfoList) {
- ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo);
- valueCols.add(valueColExpr);
- String internalName = SemanticAnalyzer.getColumnInternalName(pos++);
- outputColumnNames.add(internalName);
- colExprMap.put(internalName, valueColExpr);
-
- String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
- ColumnInfo newColInfo = new ColumnInfo(
- internalName, colInfo.getType(), alias[0],
- colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
- rsNewRR.put(alias[0], alias[1], newColInfo);
- String[] altMapping = inputRR.getAlternateMappings(colInfo.getInternalName());
- if ( altMapping != null ) {
- rsNewRR.put(altMapping[0], altMapping[1], newColInfo);
- }
- }
-
- input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
- .getReduceSinkDesc(orderCols,
- valueCols, outputColumnNames, false,
- -1, partCols, orderString.toString(), -1, AcidUtils.Operation.NOT_ACID),
- new RowSchema(rsNewRR.getColumnInfos()), input), rsNewRR);
- input.setColumnExprMap(colExprMap);
-
-
- // Construct the RR for extract operator
- RowResolver extractRR = new RowResolver();
- LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
- new LinkedHashMap<String[], ColumnInfo>();
- pos = 0;
-
- for (ColumnInfo colInfo : colInfoList) {
- String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
- /*
- * if we have already encountered this colInfo internalName.
- * We encounter it again because it must be put for the Having clause.
- * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
- */
- if ( colsAddedByHaving.containsKey(alias)) {
- continue;
- }
- ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
- ColumnInfo eColInfo = new ColumnInfo(
- SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
- colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
-
- if ( astNode == null ) {
- extractRR.put(alias[0], alias[1], eColInfo);
- }
- else {
- /*
- * in case having clause refers to this column may have been added twice;
- * once with the ASTNode.toStringTree as the alias
- * and then with the real alias.
- */
- extractRR.putExpression(astNode, eColInfo);
- if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
- colsAddedByHaving.put(alias, eColInfo);
+ if (spec.getQueryOrderSpec() != null) {
+ for (OrderExpression orderCol : spec.getQueryOrderSpec().getExpressions()) {
+ String orderString = orderCol.getOrder().name();
+ if (orderString.equals("ASC")) {
+ order.append('+');
+ } else {
+ order.append('-');
}
+ orderCols.add(genExprNodeDesc(orderCol.getExpression(), inputRR));
}
- String[] altMapping = inputRR.getAlternateMappings(colInfo.getInternalName());
- if ( altMapping != null ) {
- extractRR.put(altMapping[0], altMapping[1], eColInfo);
- }
- }
-
- for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
- String[] alias = columnAddedByHaving.getKey();
- ColumnInfo eColInfo = columnAddedByHaving.getValue();
- extractRR.put(alias[0], alias[1], eColInfo);
}
- /*
- * b. Construct Extract Operator.
- */
- input = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new ExtractDesc(
- new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
- Utilities.ReduceField.VALUE
- .toString(), "", false)),
- new RowSchema(inputRR.getColumnInfos()),
- input), extractRR);
-
-
- return input;
+ return genReduceSinkPlan(input, partCols, orderCols, order.toString(), -1);
}
-
public static ArrayList<WindowExpressionSpec> parseSelect(String selectExprStr)
throws SemanticException
{
@@ -12228,4 +12312,25 @@ public class SemanticAnalyzer extends Ba
protected boolean deleting() {
return false;
}
+ public static ASTNode genSelectDIAST(RowResolver rr) {
+ HashMap<String, LinkedHashMap<String, ColumnInfo>> map = rr.getRslvMap();
+ ASTNode selectDI = new ASTNode(new CommonToken(HiveParser.TOK_SELECTDI, "TOK_SELECTDI"));
+ for (String tabAlias : map.keySet()) {
+ for (Entry<String, ColumnInfo> entry : map.get(tabAlias).entrySet()) {
+ selectDI.addChild(buildSelExprSubTree(tabAlias, entry.getKey()));
+ }
+ }
+ return selectDI;
+ }
+ private static ASTNode buildSelExprSubTree(String tableAlias, String col) {
+ ASTNode selexpr = new ASTNode(new CommonToken(HiveParser.TOK_SELEXPR, "TOK_SELEXPR"));
+ ASTNode tableOrCol = new ASTNode(new CommonToken(HiveParser.TOK_TABLE_OR_COL,
+ "TOK_TABLE_OR_COL"));
+ ASTNode dot = new ASTNode(new CommonToken(HiveParser.DOT, "."));
+ tableOrCol.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, tableAlias)));
+ dot.addChild(tableOrCol);
+ dot.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, col)));
+ selexpr.addChild(dot);
+ return selexpr;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java Fri Jan 23 19:59:11 2015
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -143,7 +142,7 @@ public class TableAccessAnalyzer {
// Must be deterministic order map for consistent q-test output across Java versions
Map<String, List<String>> tableToKeysMap = new LinkedHashMap<String, List<String>>();
- Table tbl = pGraphContext.getTopToTable().get(tso);
+ Table tbl = tso.getConf().getTableMetadata();
tableToKeysMap.put(tbl.getCompleteName(), keyColNames);
tableAccessCtx.addOperatorTableAccess(op, tableToKeysMap);
@@ -174,10 +173,9 @@ public class TableAccessAnalyzer {
// Get the key column names for each side of the join,
// and check if the keys are all constants
// or columns (not expressions). If yes, proceed.
- QBJoinTree joinTree = pGraphContext.getJoinContext().get(op);
- assert(parentOps.size() == joinTree.getBaseSrc().length);
+ assert(parentOps.size() == op.getConf().getBaseSrc().length);
int pos = 0;
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : op.getConf().getBaseSrc()) {
if (src != null) {
assert(parentOps.get(pos) instanceof ReduceSinkOperator);
ReduceSinkOperator reduceSinkOp = (ReduceSinkOperator) parentOps.get(pos);
@@ -203,7 +201,7 @@ public class TableAccessAnalyzer {
return null;
}
- Table tbl = pGraphContext.getTopToTable().get(tso);
+ Table tbl = tso.getConf().getTableMetadata();
tableToKeysMap.put(tbl.getCompleteName(), keyColNames);
} else {
return null;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java Fri Jan 23 19:59:11 2015
@@ -26,8 +26,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -60,6 +58,9 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
/**
* TaskCompiler is a the base class for classes that compile
* operator pipelines into tasks.
@@ -386,9 +387,7 @@ public abstract class TaskCompiler {
ParseContext clone = new ParseContext(conf,
pCtx.getQB(), pCtx.getParseTree(),
pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
- pCtx.getTopSelOps(), pCtx.getOpParseCtx(), pCtx.getJoinContext(),
- pCtx.getSmbMapJoinContext(), pCtx.getTopToTable(), pCtx.getTopToProps(),
- pCtx.getFsopToTable(),
+ pCtx.getOpParseCtx(), pCtx.getJoinOps(), pCtx.getSmbMapJoinOps(),
pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(),
pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(),
pCtx.getListMapJoinOpsNoReducer(), pCtx.getGroupOpToInputTables(),
@@ -399,7 +398,7 @@ public abstract class TaskCompiler {
pCtx.getQueryProperties());
clone.setFetchTask(pCtx.getFetchTask());
clone.setLineageInfo(pCtx.getLineageInfo());
- clone.setMapJoinContext(pCtx.getMapJoinContext());
+ clone.setMapJoinOps(pCtx.getMapJoinOps());
return clone;
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java Fri Jan 23 19:59:11 2015
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.spark.SparkCompiler;
/**
* TaskCompilerFactory is a factory class to choose the appropriate
@@ -37,6 +38,8 @@ public class TaskCompilerFactory {
public static TaskCompiler getCompiler(HiveConf conf, ParseContext parseContext) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
return new TezCompiler();
+ } else if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return new SparkCompiler();
} else {
return new MapReduceCompiler();
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java Fri Jan 23 19:59:11 2015
@@ -97,6 +97,10 @@ class UnparseTranslator {
int tokenStartIndex = node.getTokenStartIndex();
int tokenStopIndex = node.getTokenStopIndex();
+ if (tokenStopIndex < 0) {
+ // this is for artificially added tokens
+ return;
+ }
Translation translation = new Translation();
translation.tokenStopIndex = tokenStopIndex;
translation.replacementText = replacementText;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java Fri Jan 23 19:59:11 2015
@@ -18,12 +18,22 @@
package org.apache.hadoop.hive.ql.plan;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+
public class AbstractOperatorDesc implements OperatorDesc {
protected boolean vectorMode = false;
protected transient Statistics statistics;
protected transient OpTraits opTraits;
+ protected transient Map<String, String> opProps;
+ static {
+ PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps");
+ }
+
@Override
@Explain(skipHeader = true, displayName = "Statistics")
public Statistics getStatistics() {
@@ -51,4 +61,12 @@ public class AbstractOperatorDesc implem
public void setOpTraits(OpTraits opTraits) {
this.opTraits = opTraits;
}
+
+ public Map<String, String> getOpProps() {
+ return opProps;
+ }
+
+ public void setOpProps(Map<String, String> props) {
+ this.opProps = props;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Fri Jan 23 19:59:11 2015
@@ -117,6 +117,31 @@ public abstract class BaseWork extends A
return returnSet;
}
+ /**
+ * Returns a set containing all leaf operators from the operator tree in this work.
+ * @return a set containing all leaf operators in this operator tree.
+ */
+ public Set<Operator<?>> getAllLeafOperators() {
+ Set<Operator<?>> returnSet = new LinkedHashSet<Operator<?>>();
+ Set<Operator<?>> opSet = getAllRootOperators();
+ Stack<Operator<?>> opStack = new Stack<Operator<?>>();
+
+ // add all children
+ opStack.addAll(opSet);
+
+ while (!opStack.empty()) {
+ Operator<?> op = opStack.pop();
+ if (op.getNumChild() == 0) {
+ returnSet.add(op);
+ }
+ if (op.getChildOperators() != null) {
+ opStack.addAll(op.getChildOperators());
+ }
+ }
+
+ return returnSet;
+ }
+
public Map<String, Map<Integer, String>> getAllScratchColumnVectorTypeMaps() {
return allScratchColumnVectorTypeMaps;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/BucketMapJoinContext.java Fri Jan 23 19:59:11 2015
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -176,6 +177,33 @@ public class BucketMapJoinContext implem
this.bigTablePartSpecToFileMapping = bigTablePartSpecToFileMapping;
}
+ /**
+ * Given a small table input file, find the mapping
+ * big table input file with the smallest bucket number.
+ */
+ public String getMappingBigFile(String alias, String smallFile) {
+ HashSet<String> bigFiles = new HashSet<String>();
+ Map<String, List<String>> mapping = aliasBucketFileNameMapping.get(alias);
+ for (Map.Entry<String, List<String>> entry: mapping.entrySet()) {
+ if (entry.getValue().contains(smallFile)) {
+ bigFiles.add(entry.getKey());
+ }
+ }
+ // There could be several big table input files
+ // mapping to the same small input file.
+ // Find that one with the lowest bucket id.
+ int bucketId = Integer.MAX_VALUE;
+ String bigFile = null;
+ for (String f: bigFiles) {
+ int id = bucketFileNameMapping.get(f);
+ if (id < bucketId) {
+ bucketId = id;
+ bigFile = f;
+ }
+ }
+ return bigFile;
+ }
+
// returns fileId for SMBJoin, which consists part of result file name
// needed to avoid file name conflict when big table is partitioned
public String createFileId(String inputPath) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Fri Jan 23 19:59:11 2015
@@ -149,6 +149,8 @@ public class ConditionalResolverMergeFil
work = ((MapredWork) mrTask.getWork()).getMapWork();
} else if (mrTask.getWork() instanceof TezWork){
work = (MapWork) ((TezWork) mrTask.getWork()).getAllWork().get(0);
+ } else if (mrTask.getWork() instanceof SparkWork) {
+ work = (MapWork) ((SparkWork) mrTask.getWork()).getAllWork().get(0);
} else {
work = (MapWork) mrTask.getWork();
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableLikeDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableLikeDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableLikeDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableLikeDesc.java Fri Jan 23 19:59:11 2015
@@ -39,6 +39,7 @@ public class CreateTableLikeDesc extends
boolean ifNotExists;
String likeTableName;
boolean isTemporary = false;
+ boolean isUserStorageFormat = false;
public CreateTableLikeDesc() {
}
@@ -46,7 +47,7 @@ public class CreateTableLikeDesc extends
public CreateTableLikeDesc(String tableName, boolean isExternal, boolean isTemporary,
String defaultInputFormat, String defaultOutputFormat, String location,
String defaultSerName, Map<String, String> defaultSerdeProps, Map<String, String> tblProps,
- boolean ifNotExists, String likeTableName) {
+ boolean ifNotExists, String likeTableName, boolean isUserStorageFormat) {
this.tableName = tableName;
this.isExternal = isExternal;
this.isTemporary = isTemporary;
@@ -58,6 +59,7 @@ public class CreateTableLikeDesc extends
this.tblProps = tblProps;
this.ifNotExists = ifNotExists;
this.likeTableName = likeTableName;
+ this.isUserStorageFormat = isUserStorageFormat;
}
@Explain(displayName = "if not exists", displayOnlyOnTrue = true)
@@ -186,4 +188,11 @@ public class CreateTableLikeDesc extends
this.isTemporary = isTemporary;
}
+ /**
+ * True if user has specified storage format in query
+ * @return boolean
+ */
+ public boolean isUserStorageFormat() {
+ return this.isUserStorageFormat;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Fri Jan 23 19:59:11 2015
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
/**
* FileSinkDesc.
@@ -47,6 +48,7 @@ public class FileSinkDesc extends Abstra
private String compressCodec;
private String compressType;
private boolean multiFileSpray;
+ private boolean temporary;
// Whether the files output by this FileSink can be merged, e.g. if they are to be put into a
// bucketed or sorted table/partition they cannot be merged.
private boolean canBeMerged;
@@ -89,6 +91,8 @@ public class FileSinkDesc extends Abstra
private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
private long txnId = 0; // transaction id for this operation
+ private transient Table table;
+
public FileSinkDesc() {
}
@@ -217,6 +221,21 @@ public class FileSinkDesc extends Abstra
public void setMultiFileSpray(boolean multiFileSpray) {
this.multiFileSpray = multiFileSpray;
}
+
+ /**
+ * @return destination is temporary
+ */
+ public boolean isTemporary() {
+ return temporary;
+ }
+
+ /**
+ * @param totalFiles the totalFiles to set
+ */
+ public void setTemporary(boolean temporary) {
+ this.temporary = temporary;
+ }
+
public boolean canBeMerged() {
return canBeMerged;
@@ -421,4 +440,12 @@ public class FileSinkDesc extends Abstra
public long getTransactionId() {
return txnId;
}
+
+ public Table getTable() {
+ return table;
+ }
+
+ public void setTable(Table table) {
+ this.table = table;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Fri Jan 23 19:59:11 2015
@@ -61,7 +61,7 @@ public class GroupByDesc extends Abstrac
private ArrayList<ExprNodeDesc> keys;
private List<Integer> listGroupingSets;
private boolean groupingSetsPresent;
- private int groupingSetPosition;
+ private int groupingSetPosition = -1;
private ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators;
private ArrayList<java.lang.String> outputColumnNames;
private float groupByMemoryUsage;
@@ -177,6 +177,12 @@ public class GroupByDesc extends Abstrac
return outputColumnNames;
}
+ @Explain(displayName = "pruneGroupingSetId", displayOnlyOnTrue = true)
+ public boolean pruneGroupingSetId() {
+ return groupingSetPosition >= 0 &&
+ outputColumnNames.size() != keys.size() + aggregators.size();
+ }
+
public void setOutputColumnNames(
ArrayList<java.lang.String> outputColumnNames) {
this.outputColumnNames = outputColumnNames;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Fri Jan 23 19:59:11 2015
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
/**
* Join operator Descriptor implementation.
@@ -88,12 +90,26 @@ public class JoinDesc extends AbstractOp
// used only for explain.
private transient ExprNodeDesc [][] joinKeys;
+
+ // Data structures coming originally from QBJoinTree
+ private transient String leftAlias;
+ private transient String[] leftAliases;
+ private transient String[] rightAliases;
+ private transient String[] baseSrc;
+ private transient String id;
+ private transient boolean mapSideJoin;
+ private transient List<String> mapAliases; //map-side join aliases
+ private transient Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo;
+ private transient boolean leftInputJoin;
+ private transient List<String> streamAliases;
+
public JoinDesc() {
}
public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
List<String> outputColumnNames, final boolean noOuterJoin,
- final JoinCondDesc[] conds, final Map<Byte, List<ExprNodeDesc>> filters, ExprNodeDesc[][] joinKeys) {
+ final JoinCondDesc[] conds, final Map<Byte, List<ExprNodeDesc>> filters,
+ ExprNodeDesc[][] joinKeys) {
this.exprs = exprs;
this.outputColumnNames = outputColumnNames;
this.noOuterJoin = noOuterJoin;
@@ -509,4 +525,88 @@ public class JoinDesc extends AbstractOp
public void setFixedAsSorted(boolean fixedAsSorted) {
this.fixedAsSorted = fixedAsSorted;
}
+
+ public String[] getLeftAliases() {
+ return leftAliases;
+ }
+
+ public String[] getBaseSrc() {
+ return baseSrc;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public List<String> getMapAliases() {
+ return mapAliases;
+ }
+
+ public Map<String, Operator<? extends OperatorDesc>> getAliasToOpInfo() {
+ return aliasToOpInfo;
+ }
+
+ public boolean isLeftInputJoin() {
+ return leftInputJoin;
+ }
+
+ public String getLeftAlias() {
+ return leftAlias;
+ }
+
+ public void setLeftAlias(String leftAlias) {
+ this.leftAlias = leftAlias;
+ }
+
+ public String[] getRightAliases() {
+ return rightAliases;
+ }
+
+ public List<String> getStreamAliases() {
+ return streamAliases;
+ }
+
+ public boolean isMapSideJoin() {
+ return mapSideJoin;
+ }
+
+ public void setQBJoinTreeProps(JoinDesc joinDesc) {
+ leftAlias = joinDesc.leftAlias;
+ leftAliases = joinDesc.leftAliases;
+ rightAliases = joinDesc.rightAliases;
+ baseSrc = joinDesc.baseSrc;
+ id = joinDesc.id;
+ mapSideJoin = joinDesc.mapSideJoin;
+ mapAliases = joinDesc.mapAliases;
+ aliasToOpInfo = joinDesc.aliasToOpInfo;
+ leftInputJoin = joinDesc.leftInputJoin;
+ streamAliases = joinDesc.streamAliases;
+ }
+
+ public void setQBJoinTreeProps(QBJoinTree joinTree) {
+ leftAlias = joinTree.getLeftAlias();
+ leftAliases = joinTree.getLeftAliases();
+ rightAliases = joinTree.getRightAliases();
+ baseSrc = joinTree.getBaseSrc();
+ id = joinTree.getId();
+ mapSideJoin = joinTree.isMapSideJoin();
+ mapAliases = joinTree.getMapAliases();
+ aliasToOpInfo = joinTree.getAliasToOpInfo();
+ leftInputJoin = joinTree.getJoinSrc() != null;
+ streamAliases = joinTree.getStreamAliases();
+ }
+
+ public void cloneQBJoinTreeProps(JoinDesc joinDesc) {
+ leftAlias = joinDesc.leftAlias;
+ leftAliases = joinDesc.leftAliases == null ? null : joinDesc.leftAliases.clone();
+ rightAliases = joinDesc.rightAliases == null ? null : joinDesc.rightAliases.clone();
+ baseSrc = joinDesc.baseSrc == null ? null : joinDesc.baseSrc.clone();
+ id = joinDesc.id;
+ mapSideJoin = joinDesc.mapSideJoin;
+ mapAliases = joinDesc.mapAliases == null ? null : new ArrayList<String>(joinDesc.mapAliases);
+ aliasToOpInfo = new HashMap<String, Operator<? extends OperatorDesc>>(joinDesc.aliasToOpInfo);
+ leftInputJoin = joinDesc.leftInputJoin;
+ streamAliases = joinDesc.streamAliases == null ? null : new ArrayList<String>(joinDesc.streamAliases);
+ }
+
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Fri Jan 23 19:59:11 2015
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import com.google.common.collect.Interner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -41,10 +40,11 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol;
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SplitSample;
import org.apache.hadoop.mapred.JobConf;
+import com.google.common.collect.Interner;
+
/**
* MapWork represents all the information used to run a map task on the cluster.
* It is first used when the query planner breaks the logical plan into tasks and
@@ -105,8 +105,10 @@ public class MapWork extends BaseWork {
public static final int SAMPLING_ON_START = 2; // sampling on task running
// the following two are used for join processing
- private QBJoinTree joinTree;
private LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap;
+ private boolean leftInputJoin;
+ private String[] baseSrc;
+ private List<String> mapAliases;
private boolean mapperCannotSpanPartns;
@@ -419,14 +421,6 @@ public class MapWork extends BaseWork {
return useOneNullRowInputFormat;
}
- public QBJoinTree getJoinTree() {
- return joinTree;
- }
-
- public void setJoinTree(QBJoinTree joinTree) {
- this.joinTree = joinTree;
- }
-
public void setMapperCannotSpanPartns(boolean mapperCannotSpanPartns) {
this.mapperCannotSpanPartns = mapperCannotSpanPartns;
}
@@ -579,4 +573,28 @@ public class MapWork extends BaseWork {
public boolean getDoSplitsGrouping() {
return this.doSplitsGrouping;
}
+
+ public boolean isLeftInputJoin() {
+ return leftInputJoin;
+ }
+
+ public void setLeftInputJoin(boolean leftInputJoin) {
+ this.leftInputJoin = leftInputJoin;
+ }
+
+ public String[] getBaseSrc() {
+ return baseSrc;
+ }
+
+ public void setBaseSrc(String[] baseSrc) {
+ this.baseSrc = baseSrc;
+ }
+
+ public List<String> getMapAliases() {
+ return mapAliases;
+ }
+
+ public void setMapAliases(List<String> mapAliases) {
+ this.mapAliases = mapAliases;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java Fri Jan 23 19:59:11 2015
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -45,6 +46,8 @@ public class MapredLocalWork implements
private BucketMapJoinContext bucketMapjoinContext;
private Path tmpPath;
private String stageID;
+ // Temp HDFS path for Spark HashTable sink
+ private Path tmpHDFSPath;
private List<Operator<? extends OperatorDesc>> dummyParentOp;
private Map<MapJoinOperator, List<Operator<? extends OperatorDesc>>> directFetchOp;
@@ -52,7 +55,10 @@ public class MapredLocalWork implements
private boolean hasStagedAlias;
public MapredLocalWork() {
-
+ this(new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
+ new LinkedHashMap<String, FetchWork>());
+ this.dummyParentOp = new ArrayList<Operator<? extends OperatorDesc>>();
+ this.directFetchOp = new LinkedHashMap<MapJoinOperator, List<Operator<? extends OperatorDesc>>>();
}
public MapredLocalWork(
@@ -60,23 +66,20 @@ public class MapredLocalWork implements
final LinkedHashMap<String, FetchWork> aliasToFetchWork) {
this.aliasToWork = aliasToWork;
this.aliasToFetchWork = aliasToFetchWork;
-
}
public MapredLocalWork(MapredLocalWork clone){
this.tmpPath = clone.tmpPath;
this.inputFileChangeSensitive=clone.inputFileChangeSensitive;
-
}
-
public void setDummyParentOp(List<Operator<? extends OperatorDesc>> op){
this.dummyParentOp=op;
}
public List<Operator<? extends OperatorDesc>> getDummyParentOp(){
- return this.dummyParentOp;
+ return dummyParentOp;
}
@@ -168,6 +171,14 @@ public class MapredLocalWork implements
return tmpPath;
}
+ public void setTmpHDFSPath(Path tmpPath) {
+ this.tmpHDFSPath = tmpPath;
+ }
+
+ public Path getTmpHDFSPath() {
+ return tmpHDFSPath;
+ }
+
public String getBucketFileName(String bigFileName) {
if (!inputFileChangeSensitive || bigFileName == null || bigFileName.isEmpty()) {
return "-";
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java Fri Jan 23 19:59:11 2015
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.Map;
public interface OperatorDesc extends Serializable, Cloneable {
public Object clone() throws CloneNotSupportedException;
@@ -26,4 +27,5 @@ public interface OperatorDesc extends Se
public void setStatistics(Statistics statistics);
public OpTraits getOpTraits();
public void setOpTraits(OpTraits opTraits);
+ public Map<String, String> getOpProps();
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Fri Jan 23 19:59:11 2015
@@ -956,7 +956,7 @@ public final class PlanUtils {
ReadEntity parentViewInfo = getParentViewInfo(alias, parseCtx.getViewAliasToInput());
// Adds tables only for create view (PPD filter can be appended by outer query)
- Table table = parseCtx.getTopToTable().get(topOp);
+ Table table = topOp.getConf().getTableMetadata();
PlanUtils.addInput(inputs, new ReadEntity(table, parentViewInfo));
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Fri Jan 23 19:59:11 2015
@@ -111,6 +111,9 @@ public class ReduceSinkDesc extends Abst
// Write type, since this needs to calculate buckets differently for updates and deletes
private AcidUtils.Operation writeType;
+ // whether we'll enforce the sort order of the RS
+ private transient boolean enforceSort = false;
+
private static transient Log LOG = LogFactory.getLog(ReduceSinkDesc.class);
public ReduceSinkDesc() {
}
@@ -165,6 +168,7 @@ public class ReduceSinkDesc extends Abst
desc.setStatistics(this.getStatistics());
desc.setSkipTag(skipTag);
desc.reduceTraits = reduceTraits.clone();
+ desc.setEnforceSort(enforceSort);
return desc;
}
@@ -407,4 +411,12 @@ public class ReduceSinkDesc extends Abst
public AcidUtils.Operation getWriteType() {
return writeType;
}
+
+ public boolean isEnforceSort() {
+ return enforceSort;
+ }
+
+ public void setEnforceSort(boolean isDeduplicated) {
+ this.enforceSort = isDeduplicated;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Fri Jan 23 19:59:11 2015
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
/**
@@ -36,7 +37,7 @@ public class TableScanDesc extends Abstr
private static final long serialVersionUID = 1L;
static {
- PTFUtils.makeTransient(TableScanDesc.class, "filterObject", "referencedColumns");
+ PTFUtils.makeTransient(TableScanDesc.class, "filterObject", "referencedColumns", "tableMetadata");
}
private String alias;
@@ -93,23 +94,32 @@ public class TableScanDesc extends Abstr
private boolean isMetadataOnly = false;
- @SuppressWarnings("nls")
+ private transient final Table tableMetadata;
+
+
public TableScanDesc() {
+ this(null, null);
}
- public TableScanDesc(final String alias) {
- this.alias = alias;
+ @SuppressWarnings("nls")
+ public TableScanDesc(Table tblMetadata) {
+ this(null, tblMetadata);
}
- public TableScanDesc(final String alias, List<VirtualColumn> vcs) {
+ public TableScanDesc(final String alias, Table tblMetadata) {
+ this(alias, null, tblMetadata);
+ }
+
+ public TableScanDesc(final String alias, List<VirtualColumn> vcs, Table tblMetadata) {
this.alias = alias;
this.virtualCols = vcs;
+ this.tableMetadata = tblMetadata;
}
@Override
public Object clone() {
List<VirtualColumn> vcs = new ArrayList<VirtualColumn>(getVirtualCols());
- return new TableScanDesc(getAlias(), vcs);
+ return new TableScanDesc(getAlias(), vcs, this.tableMetadata);
}
@Explain(displayName = "alias")
@@ -250,8 +260,12 @@ public class TableScanDesc extends Abstr
public void setIsMetadataOnly(boolean metadata_only) {
isMetadataOnly = metadata_only;
}
-
+
public boolean getIsMetadataOnly() {
return isMetadataOnly;
}
+
+ public Table getTableMetadata() {
+ return tableMetadata;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Fri Jan 23 19:59:11 2015
@@ -897,7 +897,7 @@ public final class OpProcFactory {
HiveConf hiveConf) {
TableScanDesc tableScanDesc = tableScanOp.getConf();
- Table tbl = owi.getParseContext().getTopToTable().get(tableScanOp);
+ Table tbl = tableScanDesc.getTableMetadata();
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER)) {
// attach the original predicate to the table scan operator for index
// optimizations that require the pushed predicate before pcr & later
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java Fri Jan 23 19:59:11 2015
@@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -49,8 +50,14 @@ public final class CommandProcessorFacto
}
public static CommandProcessor getForHiveCommand(String[] cmd, HiveConf conf)
- throws SQLException {
- HiveCommand hiveCommand = HiveCommand.find(cmd);
+ throws SQLException {
+ return getForHiveCommandInternal(cmd, conf, false);
+ }
+
+ public static CommandProcessor getForHiveCommandInternal(String[] cmd, HiveConf conf,
+ boolean testOnly)
+ throws SQLException {
+ HiveCommand hiveCommand = HiveCommand.find(cmd, testOnly);
if (hiveCommand == null || isBlank(cmd[0])) {
return null;
}
@@ -58,7 +65,8 @@ public final class CommandProcessorFacto
conf = new HiveConf();
}
Set<String> availableCommands = new HashSet<String>();
- for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST).split(",")) {
+ for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)
+ .split(",")) {
availableCommands.add(availableCommand.toLowerCase().trim());
}
if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
@@ -82,6 +90,12 @@ public final class CommandProcessorFacto
return new CompileProcessor();
case RELOAD:
return new ReloadProcessor();
+ case CRYPTO:
+ try {
+ return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
+ } catch (HiveException e) {
+ throw new SQLException("Fail to start the command processor due to the exception: ", e);
+ }
default:
throw new AssertionError("Unknown HiveCommand " + hiveCommand);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java Fri Jan 23 19:59:11 2015
@@ -29,18 +29,40 @@ public enum HiveCommand {
SET(),
RESET(),
DFS(),
+ CRYPTO(true),
ADD(),
LIST(),
RELOAD(),
DELETE(),
COMPILE();
+
+ public static boolean ONLY_FOR_TESTING = true;
+ private boolean usedOnlyForTesting;
+
+ HiveCommand() {
+ this(false);
+ }
+
+ HiveCommand(boolean onlyForTesting) {
+ this.usedOnlyForTesting = onlyForTesting;
+ }
+
+ public boolean isOnlyForTesting() {
+ return this.usedOnlyForTesting;
+ }
+
private static final Set<String> COMMANDS = new HashSet<String>();
static {
for (HiveCommand command : HiveCommand.values()) {
COMMANDS.add(command.name());
}
}
+
public static HiveCommand find(String[] command) {
+ return find(command, false);
+ }
+
+ public static HiveCommand find(String[] command, boolean findOnlyForTesting) {
if (null == command){
return null;
}
@@ -54,7 +76,13 @@ public enum HiveCommand {
//special handling for SQL "delete from <table> where..."
return null;
} else if (COMMANDS.contains(cmd)) {
- return HiveCommand.valueOf(cmd);
+ HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
+
+ if (findOnlyForTesting == hiveCommand.isOnlyForTesting()) {
+ return hiveCommand;
+ }
+
+ return null;
}
}
return null;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Fri Jan 23 19:59:11 2015
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.history.HiveHistory;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hive.ql.securit
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl;
import org.apache.hadoop.hive.ql.util.DosToUnix;
+import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -179,6 +182,13 @@ public class SessionState {
private String userIpAddress;
+ private SparkSession sparkSession;
+
+ /**
+ * Gets information about HDFS encryption
+ */
+ private HadoopShims.HdfsEncryptionShim hdfsEncryptionShim;
+
/**
* Lineage state.
*/
@@ -374,6 +384,23 @@ public class SessionState {
return txnAutoCommit;
}
+ public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException {
+ if (hdfsEncryptionShim == null) {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ if ("hdfs".equals(fs.getUri().getScheme())) {
+ hdfsEncryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
+ } else {
+ LOG.info("Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.");
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ return hdfsEncryptionShim;
+ }
+
/**
* Singleton Session object per thread.
*
@@ -406,7 +433,6 @@ public class SessionState {
* when switching from one session to another.
*/
public static SessionState start(SessionState startSs) {
-
setCurrentSessionState(startSs);
if (startSs.hiveHist == null){
@@ -1264,6 +1290,16 @@ public class SessionState {
tezSessionState = null;
}
+ if (sparkSession != null) {
+ try {
+ SparkSessionManagerImpl.getInstance().closeSession(sparkSession);
+ } catch (Exception ex) {
+ LOG.error("Error closing spark session.", ex);
+ } finally {
+ sparkSession = null;
+ }
+ }
+
dropSessionPaths(conf);
}
@@ -1358,6 +1394,14 @@ public class SessionState {
this.userIpAddress = userIpAddress;
}
+ public SparkSession getSparkSession() {
+ return sparkSession;
+ }
+
+ public void setSparkSession(SparkSession sparkSession) {
+ this.sparkSession = sparkSession;
+ }
+
/**
* Get the next suffix to use in naming a temporary table created by insert...values
* @return suffix
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java Fri Jan 23 19:59:11 2015
@@ -41,14 +41,16 @@ public class CounterStatsAggregator impl
@Override
public boolean connect(Configuration hconf, Task sourceTask) {
- try {
- jc = new JobClient(toJobConf(hconf));
- RunningJob job = jc.getJob(((MapRedTask)sourceTask).getJobID());
- if (job != null) {
- counters = job.getCounters();
+ if (sourceTask instanceof MapRedTask) {
+ try {
+ jc = new JobClient(toJobConf(hconf));
+ RunningJob job = jc.getJob(((MapRedTask)sourceTask).getJobID());
+ if (job != null) {
+ counters = job.getCounters();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to get Job instance for " + sourceTask.getJobID(),e);
}
- } catch (Exception e) {
- LOG.error("Failed to get Job instance for " + sourceTask.getJobID(),e);
}
return counters != null;
}
@@ -59,9 +61,13 @@ public class CounterStatsAggregator impl
@Override
public String aggregateStats(String counterGrpName, String statType) {
- // In case of counters, aggregation is done by JobTracker / MR AM itself
- // so no need to aggregate, simply return the counter value for requested stat.
- return String.valueOf(counters.getGroup(counterGrpName).getCounter(statType));
+ long value = 0;
+ if (counters != null) {
+ // In case of counters, aggregation is done by JobTracker / MR AM itself
+ // so no need to aggregate, simply return the counter value for requested stat.
+ value = counters.getGroup(counterGrpName).getCounter(statType);
+ }
+ return String.valueOf(value);
}
@Override
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java Fri Jan 23 19:59:11 2015
@@ -52,8 +52,10 @@ public class CounterStatsPublisher imple
for (Map.Entry<String, String> entry : stats.entrySet()) {
try {
reporter.incrCounter(fileID, entry.getKey(), Long.valueOf(entry.getValue()));
- } catch (NumberFormatException e) {
- LOG.error("Invalid counter value " + entry.getValue() + " for " + entry.getKey());
+ } catch (Exception e) {
+ LOG.error("Failed to increment counter value " + entry.getValue() + " for " + entry.getKey()
+ + ": " + e, e);
+ return false;
}
}
return true;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Fri Jan 23 19:59:11 2015
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -262,7 +263,7 @@ public class Initiator extends Compactor
private long sumDirSize(FileSystem fs, Path dir) throws IOException {
long size = 0;
- FileStatus[] buckets = fs.listStatus(dir);
+ FileStatus[] buckets = fs.listStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (int i = 0; i < buckets.length; i++) {
size += buckets[i].getLen();
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java Fri Jan 23 19:59:11 2015
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.Text;
extended = "date is a string in the format of 'yyyy-MM-dd HH:mm:ss' or "
+ "'yyyy-MM-dd'.\n"
+ "Example:\n "
- + " > SELECT _FUNC_('2009-30-07', 1) FROM src LIMIT 1;\n" + " 30")
+ + " > SELECT _FUNC_('2009-07-30') FROM src LIMIT 1;\n" + " 30")
@VectorizedExpressions({VectorUDFDayOfMonthLong.class, VectorUDFDayOfMonthString.class})
public class UDFDayOfMonth extends UDF {
private final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java Fri Jan 23 19:59:11 2015
@@ -40,7 +40,7 @@ import org.apache.hadoop.io.Text;
@Description(name = "month",
value = "_FUNC_(date) - Returns the month of date",
extended = "Example:\n"
- + " > SELECT _FUNC_('2009-30-07') FROM src LIMIT 1;\n" + " 7")
+ + " > SELECT _FUNC_('2009-07-30') FROM src LIMIT 1;\n" + " 7")
@VectorizedExpressions({VectorUDFMonthLong.class, VectorUDFMonthString.class})
public class UDFMonth extends UDF {
private final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java Fri Jan 23 19:59:11 2015
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.Text;
extended = "date is a string in the format of 'yyyy-MM-dd HH:mm:ss' or "
+ "'yyyy-MM-dd'.\n"
+ "Example:\n "
- + " > SELECT _FUNC_('2009-30-07', 1) FROM src LIMIT 1;\n" + " 2009")
+ + " > SELECT _FUNC_('2009-07-30') FROM src LIMIT 1;\n" + " 2009")
@VectorizedExpressions({VectorUDFYearLong.class, VectorUDFYearString.class})
public class UDFYear extends UDF {
private final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");