You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/22 06:05:10 UTC
svn commit: r1653769 [8/14] - in /hive/branches/spark: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ data/scripts/ dev-s...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jan 22 05:05:05 2015
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hive.con
import java.io.IOException;
import java.io.Serializable;
+import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -38,14 +39,17 @@ import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.antlr.runtime.ClassicToken;
+import org.antlr.runtime.CommonToken;
import org.antlr.runtime.Token;
import org.antlr.runtime.tree.Tree;
import org.antlr.runtime.tree.TreeWizard;
import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -200,9 +204,12 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* Implementation of the semantic analyzer. It generates the query plan.
@@ -374,12 +381,9 @@ public class SemanticAnalyzer extends Ba
opToPartList = pctx.getOpToPartList();
opToSamplePruner = pctx.getOpToSamplePruner();
topOps = pctx.getTopOps();
- topSelOps = pctx.getTopSelOps();
opParseCtx = pctx.getOpParseCtx();
loadTableWork = pctx.getLoadTableWork();
loadFileWork = pctx.getLoadFileWork();
- joinContext = pctx.getJoinContext();
- smbMapJoinContext = pctx.getSmbMapJoinContext();
ctx = pctx.getContext();
destTableId = pctx.getDestTableId();
idToTableNameMap = pctx.getIdToTableNameMap();
@@ -393,15 +397,15 @@ public class SemanticAnalyzer extends Ba
}
public ParseContext getParseContext() {
- return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
- topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps,
- fsopToTable, loadTableWork,
- loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList,
+ topOps, opParseCtx,
+ new HashSet<JoinOperator>(joinContext.keySet()),
+ new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
opToPartToSkewedPruner, viewAliasToInput,
- reduceSinkOperatorsAddedByEnforceBucketingSorting,
- queryProperties);
+ reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
}
@SuppressWarnings("nls")
@@ -1669,7 +1673,7 @@ public class SemanticAnalyzer extends Ba
throw new SemanticException(e);
}
try {
- fname = ctx.getExternalTmpPath(
+ fname = ctx.getExtTmpPathRelTo(
FileUtils.makeQualified(location, conf)).toString();
} catch (Exception e) {
throw new SemanticException(generateErrorMessage(ast,
@@ -1685,8 +1689,9 @@ public class SemanticAnalyzer extends Ba
} else {
// This is the only place where isQuery is set to true; it defaults to false.
qb.setIsQuery(true);
- fname = ctx.getMRTmpPath().toString();
- ctx.setResDir(new Path(fname));
+ Path stagingPath = getStagingDirectoryPathname(qb);
+ fname = stagingPath.toString();
+ ctx.setResDir(stagingPath);
}
}
qb.getMetaData().setDestForAlias(name, fname,
@@ -1742,6 +1747,160 @@ public class SemanticAnalyzer extends Ba
}
}
+ /**
+ * Checks if a given path is encrypted (valid only for HDFS files)
+ * @param path The path to check for encryption
+ * @return True if the path is encrypted; False if it is not encrypted
+ * @throws HiveException If an error occurs while checking for encryption
+ */
+ private boolean isPathEncrypted(Path path) throws HiveException {
+ HadoopShims.HdfsEncryptionShim hdfsEncryptionShim;
+
+ hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
+ if (hdfsEncryptionShim != null) {
+ try {
+ if (hdfsEncryptionShim.isPathEncrypted(path)) {
+ return true;
+ }
+ } catch (Exception e) {
+ throw new HiveException("Unable to determine if " + path + "is encrypted: " + e, e);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Compares to path key encryption strenghts.
+ *
+ * @param p1 Path to an HDFS file system
+ * @param p2 Path to an HDFS file system
+ * @return -1 if strength is weak; 0 if is equals; 1 if it is stronger
+ * @throws HiveException If an error occurs while comparing key strengths.
+ */
+ private int comparePathKeyStrength(Path p1, Path p2) throws HiveException {
+ HadoopShims.HdfsEncryptionShim hdfsEncryptionShim;
+
+ hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
+ if (hdfsEncryptionShim != null) {
+ try {
+ return hdfsEncryptionShim.comparePathKeyStrength(p1, p2);
+ } catch (Exception e) {
+ throw new HiveException("Unable to compare key strength for " + p1 + " and " + p2 + " : " + e, e);
+ }
+ }
+
+ return 0; // Non-encrypted path (or equals strength)
+ }
+
+ /**
+ * Checks if a given path has read-only access permissions.
+ *
+ * @param path The path to check for read-only permissions.
+ * @return True if the path is read-only; False otherwise.
+ * @throws HiveException If an error occurs while checking file permissions.
+ */
+ private boolean isPathReadOnly(Path path) throws HiveException {
+ HiveConf conf = SessionState.get().getConf();
+ try {
+ FileSystem fs = path.getFileSystem(conf);
+ UserGroupInformation ugi = Utils.getUGI();
+ FileStatus status = fs.getFileStatus(path);
+
+ // We just check for writing permissions. If it fails with AccessControException, then it
+ // means the location may be read-only.
+ FileUtils.checkFileAccessWithImpersonation(fs, status, FsAction.WRITE, ugi.getUserName());
+
+ // Path has writing permissions
+ return false;
+ } catch (AccessControlException e) {
+ // An AccessControlException may be caused for other different errors,
+ // but we take it as if our path is read-only
+ return true;
+ } catch (Exception e) {
+ throw new HiveException("Unable to determine if " + path + " is read only: " + e, e);
+ }
+ }
+
+ /**
+ * Gets the strongest encrypted table path.
+ *
+ * @param qb The QB object that contains a list of all table locations.
+ * @return The strongest encrypted path
+ * @throws HiveException if an error occurred attempting to compare the encryption strength
+ */
+ private Path getStrongestEncryptedTablePath(QB qb) throws HiveException {
+ List<String> tabAliases = new ArrayList<String>(qb.getTabAliases());
+ Path strongestPath = null;
+
+ /* Walk through all found table locations to get the most encrypted table */
+ for (String alias : tabAliases) {
+ Table tab = qb.getMetaData().getTableForAlias(alias);
+ if (tab != null) {
+ Path tablePath = tab.getDataLocation();
+ if (tablePath != null) {
+ try {
+ if (strongestPath == null) {
+ strongestPath = tablePath;
+ } else if ("hdfs".equals(tablePath.toUri().getScheme())
+ && isPathEncrypted(tablePath)
+ && comparePathKeyStrength(tablePath, strongestPath) > 0)
+ {
+ strongestPath = tablePath;
+ }
+ } catch (HiveException e) {
+ throw new HiveException("Unable to find the most secure table path: " + e, e);
+ }
+ }
+ }
+ }
+
+ return strongestPath;
+ }
+
+ /**
+ * Gets the staging directory where MR files will be stored temporary.
+ * It walks through the QB plan to find the correct location where save temporary files. This
+ * temporary location (or staging directory) may be created inside encrypted tables locations for
+ * security reasons. If the QB has read-only tables, then the older scratch directory will be used,
+ * or a permission error will be thrown if the requested query table is encrypted and the old scratch
+ * directory is not.
+ *
+ * @param qb The QB object that contains a list of all table locations.
+ * @return The path to the staging directory.
+ * @throws HiveException If an error occurs while identifying the correct staging location.
+ */
+ private Path getStagingDirectoryPathname(QB qb) throws HiveException {
+ Path stagingPath = null, tablePath;
+
+ // Looks for the most encrypted table location (if there is one)
+ tablePath = getStrongestEncryptedTablePath(qb);
+ if (tablePath != null && isPathEncrypted(tablePath)) {
+ // Only HDFS paths can be checked for encryption
+ if ("hdfs".equals(tablePath.toUri().getScheme())) {
+ if (isPathReadOnly(tablePath)) {
+ Path tmpPath = ctx.getMRTmpPath();
+ if (comparePathKeyStrength(tablePath, tmpPath) < 0) {
+ throw new HiveException("Read-only encrypted tables cannot be read " +
+ "if the scratch directory is not encrypted (or encryption is weak)");
+ } else {
+ stagingPath = tmpPath;
+ }
+ }
+ } else {
+ LOG.debug("Encryption is not applicable to table path " + tablePath.toString());
+ }
+
+ if (stagingPath == null) {
+ stagingPath = ctx.getMRTmpPath(tablePath.toUri());
+ }
+ } else {
+ stagingPath = ctx.getMRTmpPath();
+ }
+
+ return stagingPath;
+ }
+
private void replaceViewReferenceWithDefinition(QB qb, Table tab,
String tab_name, String alias) throws SemanticException {
@@ -2642,7 +2801,7 @@ public class SemanticAnalyzer extends Ba
@SuppressWarnings("nls")
// TODO: make aliases unique, otherwise needless rewriting takes place
- Integer genColListRegex(String colRegex, String tabAlias, ASTNode sel,
+ Integer genColListRegex(String colRegex, String tabAlias, ASTNode sel,
ArrayList<ExprNodeDesc> col_list, HashSet<ColumnInfo> excludeCols, RowResolver input,
RowResolver colSrcRR, Integer pos, RowResolver output, List<String> aliases,
boolean ensureUniqueCols) throws SemanticException {
@@ -3938,7 +4097,7 @@ public class SemanticAnalyzer extends Ba
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0, numDistinctUDFs > 0),
+ false, groupByMemoryUsage, memoryThreshold, null, false, -1, numDistinctUDFs > 0),
new RowSchema(groupByOutputRowResolver.getColumnInfos()),
input), groupByOutputRowResolver);
op.setColumnExprMap(colExprMap);
@@ -4063,10 +4222,11 @@ public class SemanticAnalyzer extends Ba
}
// This is only needed if a new grouping set key is being created
- int groupingSetsPosition = 0;
+ int groupingSetsPosition = -1;
// For grouping sets, add a dummy grouping key
if (groupingSetsPresent) {
+ groupingSetsPosition = groupByKeys.size();
// Consider the query: select a,b, count(1) from T group by a,b with cube;
// where it is being executed in a single map-reduce job
// The plan is TableScan -> GroupBy1 -> ReduceSink -> GroupBy2 -> FileSink
@@ -4081,7 +4241,6 @@ public class SemanticAnalyzer extends Ba
colExprMap);
}
else {
- groupingSetsPosition = groupByKeys.size();
// The grouping set has not yet been processed. Create a new grouping key
// Consider the query: select a,b, count(1) from T group by a,b with cube;
// where it is being executed in 2 map-reduce jobs
@@ -4297,7 +4456,7 @@ public class SemanticAnalyzer extends Ba
}
// The grouping set key is present after the grouping keys, before the distinct keys
- int groupingSetsPosition = groupByKeys.size();
+ int groupingSetsPosition = -1;
// For grouping sets, add a dummy grouping key
// This dummy key needs to be added as a reduce key
@@ -4309,6 +4468,7 @@ public class SemanticAnalyzer extends Ba
// This function is called for GroupBy1 to create an additional grouping key
// for the grouping set (corresponding to the rollup).
if (groupingSetsPresent) {
+ groupingSetsPosition = groupByKeys.size();
createNewGroupingKey(groupByKeys,
outputColumnNames,
groupByOutputRowResolver,
@@ -4865,8 +5025,10 @@ public class SemanticAnalyzer extends Ba
colExprMap.put(field, groupByKeys.get(groupByKeys.size() - 1));
}
+ int groupingSetsPosition = -1;
// For grouping sets, add a dummy grouping key
if (groupingSetsPresent) {
+ groupingSetsPosition = groupByKeys.size();
addGroupingSetKey(
groupByKeys,
groupByInputRowResolver2,
@@ -4922,7 +5084,8 @@ public class SemanticAnalyzer extends Ba
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0, containsDistinctAggr),
+ false, groupByMemoryUsage, memoryThreshold, null, false,
+ groupingSetsPosition, containsDistinctAggr),
new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
reduceSinkOperatorInfo2), groupByOutputRowResolver2);
op.setColumnExprMap(colExprMap);
@@ -5857,6 +6020,7 @@ public class SemanticAnalyzer extends Ba
Table dest_tab = null; // destination table if any
boolean destTableIsAcid = false; // should the destination table be written to using ACID
+ boolean destTableIsTemporary = false;
Partition dest_part = null;// destination partition if any
Path queryTmpdir = null; // the intermediate destination directory
Path dest_path = null; // the final destination directory
@@ -5874,6 +6038,7 @@ public class SemanticAnalyzer extends Ba
dest_tab = qbm.getDestTableForAlias(dest);
destTableIsAcid = isAcidTable(dest_tab);
+ destTableIsTemporary = dest_tab.isTemporary();
// Is the user trying to insert into a external tables
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
@@ -5946,7 +6111,7 @@ public class SemanticAnalyzer extends Ba
if (isNonNativeTable) {
queryTmpdir = dest_path;
} else {
- queryTmpdir = ctx.getExternalTmpPath(dest_path);
+ queryTmpdir = ctx.getExtTmpPathRelTo(dest_path);
}
if (dpCtx != null) {
// set the root of the temporary path where dynamic partition columns will populate
@@ -6127,7 +6292,7 @@ public class SemanticAnalyzer extends Ba
try {
Path qPath = FileUtils.makeQualified(dest_path, conf);
- queryTmpdir = ctx.getExternalTmpPath(qPath);
+ queryTmpdir = ctx.getExtTmpPathRelTo(qPath);
} catch (Exception e) {
throw new SemanticException("Error creating temporary folder on: "
+ dest_path, e);
@@ -6143,6 +6308,7 @@ public class SemanticAnalyzer extends Ba
CreateTableDesc tblDesc = qb.getTableDesc();
if (tblDesc != null) {
field_schemas = new ArrayList<FieldSchema>();
+ destTableIsTemporary = tblDesc.isTemporary();
}
boolean first = true;
@@ -6287,6 +6453,8 @@ public class SemanticAnalyzer extends Ba
fileSinkDesc.setWriteType(wt);
acidFileSinks.add(fileSinkDesc);
}
+
+ fileSinkDesc.setTemporary(destTableIsTemporary);
/* Set List Bucketing context. */
if (lbCtx != null) {
@@ -6305,7 +6473,7 @@ public class SemanticAnalyzer extends Ba
// it should be the same as the MoveWork's sourceDir.
fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
- String statsTmpLoc = ctx.getExternalTmpPath(queryTmpdir).toString();
+ String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString();
LOG.info("Set stats collection dir : " + statsTmpLoc);
conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc);
}
@@ -6334,7 +6502,9 @@ public class SemanticAnalyzer extends Ba
+ dest_path + " row schema: " + inputRR.toString());
}
- fsopToTable.put((FileSinkOperator) output, dest_tab);
+ FileSinkOperator fso = (FileSinkOperator) output;
+ fso.getConf().setTable(dest_tab);
+ fsopToTable.put(fso, dest_tab);
return output;
}
@@ -6867,12 +7037,11 @@ public class SemanticAnalyzer extends Ba
}
- @SuppressWarnings("nls")
private Operator genReduceSinkPlan(String dest, QB qb, Operator<?> input,
int numReducers) throws SemanticException {
-
+
RowResolver inputRR = opParseCtx.get(input).getRowResolver();
-
+
// First generate the expression for the partition and sort keys
// The cluster by clause / distribute by clause has the aliases for
// partition function
@@ -6880,15 +7049,14 @@ public class SemanticAnalyzer extends Ba
if (partitionExprs == null) {
partitionExprs = qb.getParseInfo().getDistributeByForClause(dest);
}
- ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
if (partitionExprs != null) {
int ccount = partitionExprs.getChildCount();
for (int i = 0; i < ccount; ++i) {
ASTNode cl = (ASTNode) partitionExprs.getChild(i);
- partitionCols.add(genExprNodeDesc(cl, inputRR));
+ partCols.add(genExprNodeDesc(cl, inputRR));
}
}
-
ASTNode sortExprs = qb.getParseInfo().getClusterByForClause(dest);
if (sortExprs == null) {
sortExprs = qb.getParseInfo().getSortByForClause(dest);
@@ -6908,11 +7076,7 @@ public class SemanticAnalyzer extends Ba
}
}
}
- Operator dummy = Operator.createDummy();
- dummy.setParentOperators(Arrays.asList(input));
-
ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
- ArrayList<ExprNodeDesc> sortColsBack = new ArrayList<ExprNodeDesc>();
StringBuilder order = new StringBuilder();
if (sortExprs != null) {
int ccount = sortExprs.getChildCount();
@@ -6933,9 +7097,25 @@ public class SemanticAnalyzer extends Ba
}
ExprNodeDesc exprNode = genExprNodeDesc(cl, inputRR);
sortCols.add(exprNode);
- sortColsBack.add(ExprNodeDescUtils.backtrack(exprNode, dummy, input));
}
}
+ return genReduceSinkPlan(input, partCols, sortCols, order.toString(), numReducers);
+ }
+
+ @SuppressWarnings("nls")
+ private Operator genReduceSinkPlan(Operator<?> input,
+ ArrayList<ExprNodeDesc> partitionCols, ArrayList<ExprNodeDesc> sortCols,
+ String sortOrder, int numReducers) throws SemanticException {
+
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+
+ Operator dummy = Operator.createDummy();
+ dummy.setParentOperators(Arrays.asList(input));
+
+ ArrayList<ExprNodeDesc> sortColsBack = new ArrayList<ExprNodeDesc>();
+ for (ExprNodeDesc sortCol : sortCols) {
+ sortColsBack.add(ExprNodeDescUtils.backtrack(sortCol, dummy, input));
+ }
// For the generation of the values expression just get the inputs
// signature and generate field expressions for those
RowResolver rsRR = new RowResolver();
@@ -6993,7 +7173,7 @@ public class SemanticAnalyzer extends Ba
// TODO Not 100% sure NOT_ACID is always right here.
ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(sortCols, valueCols, outputColumns,
- false, -1, partitionCols, order.toString(), numReducers, AcidUtils.Operation.NOT_ACID);
+ false, -1, partitionCols, sortOrder, numReducers, AcidUtils.Operation.NOT_ACID);
Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
new RowSchema(rsRR.getColumnInfos()), input), rsRR);
@@ -7341,6 +7521,7 @@ public class SemanticAnalyzer extends Ba
JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree,
joinSrcOp, srcOps, omitOpts, joinKeys);
+ joinOp.getConf().setQBJoinTreeProps(joinTree);
joinContext.put(joinOp, joinTree);
Operator op = joinOp;
@@ -7434,7 +7615,7 @@ public class SemanticAnalyzer extends Ba
.getFloatVar(conf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild(
new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations,
- false, groupByMemoryUsage, memoryThreshold, null, false, 0, false),
+ false, groupByMemoryUsage, memoryThreshold, null, false, -1, false),
new RowSchema(groupByOutputRowResolver.getColumnInfos()),
inputOperatorInfo), groupByOutputRowResolver);
@@ -8751,6 +8932,23 @@ public class SemanticAnalyzer extends Ba
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(curr);
+ // Check and transform group by *. This will only happen for select distinct *.
+ // Here the "genSelectPlan" is being leveraged.
+ // The main benefits are (1) remove virtual columns that should
+ // not be included in the group by; (2) add the fully qualified column names to unParseTranslator
+ // so that view is supported. The drawback is that an additional SEL op is added. If it is
+ // not necessary, it will be removed by NonBlockingOpDeDupProc Optimizer because it will match
+ // SEL%SEL% rule.
+ ASTNode selExprList = qbp.getSelForClause(dest);
+ if (selExprList.getToken().getType() == HiveParser.TOK_SELECTDI
+ && selExprList.getChildCount() == 1 && selExprList.getChild(0).getChildCount() == 1) {
+ ASTNode node = (ASTNode) selExprList.getChild(0).getChild(0);
+ if (node.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
+ curr = genSelectPlan(dest, qb, curr, curr);
+ RowResolver rr = opParseCtx.get(curr).getRowResolver();
+ qbp.setSelExprForClause(dest, SemanticAnalyzer.genSelectDIAST(rr));
+ }
+ }
if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
curr = genGroupByPlanMapAggrNoSkew(dest, qb, curr);
@@ -9249,7 +9447,7 @@ public class SemanticAnalyzer extends Ba
}
// Create the root of the operator tree
- TableScanDesc tsDesc = new TableScanDesc(alias, vcList);
+ TableScanDesc tsDesc = new TableScanDesc(alias, vcList, tab);
setupStats(tsDesc, qb.getParseInfo(), tab, alias, rwsch);
SplitSample sample = nameToSplitSample.get(alias_id);
@@ -9271,6 +9469,7 @@ public class SemanticAnalyzer extends Ba
Map<String, String> props = qb.getTabPropsForAlias(alias);
if (props != null) {
topToTableProps.put((TableScanOperator) top, props);
+ tsDesc.setOpProps(props);
}
} else {
rwsch = opParseCtx.get(top).getRowResolver();
@@ -9433,7 +9632,7 @@ public class SemanticAnalyzer extends Ba
tsDesc.setGatherStats(false);
} else {
if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
- String statsTmpLoc = ctx.getExternalTmpPath(tab.getPath()).toString();
+ String statsTmpLoc = ctx.getExtTmpPathRelTo(tab.getPath()).toString();
LOG.info("Set stats collection dir : " + statsTmpLoc);
conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc);
}
@@ -9964,9 +10163,11 @@ public class SemanticAnalyzer extends Ba
}
// 4. Generate Parse Context for Optimizer & Physical compiler
- ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child, opToPartPruner, opToPartList,
- topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps,
- fsopToTable, loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child,
+ opToPartPruner, opToPartList, topOps, opParseCtx,
+ new HashSet<JoinOperator>(joinContext.keySet()),
+ new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner,
globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
@@ -11948,130 +12149,34 @@ public class SemanticAnalyzer extends Ba
}
private Operator genReduceSinkPlanForWindowing(WindowingSpec spec,
- RowResolver inputRR,
- Operator input) throws SemanticException{
+ RowResolver inputRR, Operator input) throws SemanticException{
+
ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
- ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
- Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
- List<String> outputColumnNames = new ArrayList<String>();
- StringBuilder orderString = new StringBuilder();
+ StringBuilder order = new StringBuilder();
- ArrayList<PartitionExpression> partColList = spec.getQueryPartitionSpec().getExpressions();
- for (PartitionExpression partCol : partColList) {
+ for (PartitionExpression partCol : spec.getQueryPartitionSpec().getExpressions()) {
ExprNodeDesc partExpr = genExprNodeDesc(partCol.getExpression(), inputRR);
partCols.add(partExpr);
orderCols.add(partExpr);
- orderString.append('+');
+ order.append('+');
}
- ArrayList<OrderExpression> orderColList = spec.getQueryOrderSpec() == null ?
- new ArrayList<PTFInvocationSpec.OrderExpression>() :
- spec.getQueryOrderSpec().getExpressions();
- for (int i = 0; i < orderColList.size(); i++) {
- OrderExpression orderCol = orderColList.get(i);
- org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = orderCol.getOrder();
- if (order.name().equals("ASC")) {
- orderString.append('+');
- } else {
- orderString.append('-');
- }
- ExprNodeDesc orderExpr = genExprNodeDesc(orderCol.getExpression(), inputRR);
- orderCols.add(orderExpr);
- }
-
- ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
- RowResolver rsNewRR = new RowResolver();
- int pos = 0;
- for (ColumnInfo colInfo : colInfoList) {
- ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo);
- valueCols.add(valueColExpr);
- String internalName = SemanticAnalyzer.getColumnInternalName(pos++);
- outputColumnNames.add(internalName);
- colExprMap.put(internalName, valueColExpr);
-
- String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
- ColumnInfo newColInfo = new ColumnInfo(
- internalName, colInfo.getType(), alias[0],
- colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
- rsNewRR.put(alias[0], alias[1], newColInfo);
- String[] altMapping = inputRR.getAlternateMappings(colInfo.getInternalName());
- if ( altMapping != null ) {
- rsNewRR.put(altMapping[0], altMapping[1], newColInfo);
- }
- }
-
- input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
- .getReduceSinkDesc(orderCols,
- valueCols, outputColumnNames, false,
- -1, partCols, orderString.toString(), -1, AcidUtils.Operation.NOT_ACID),
- new RowSchema(rsNewRR.getColumnInfos()), input), rsNewRR);
- input.setColumnExprMap(colExprMap);
-
-
- // Construct the RR for extract operator
- RowResolver extractRR = new RowResolver();
- LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
- new LinkedHashMap<String[], ColumnInfo>();
- pos = 0;
-
- for (ColumnInfo colInfo : colInfoList) {
- String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
- /*
- * if we have already encountered this colInfo internalName.
- * We encounter it again because it must be put for the Having clause.
- * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
- */
- if ( colsAddedByHaving.containsKey(alias)) {
- continue;
- }
- ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
- ColumnInfo eColInfo = new ColumnInfo(
- SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
- colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
-
- if ( astNode == null ) {
- extractRR.put(alias[0], alias[1], eColInfo);
- }
- else {
- /*
- * in case having clause refers to this column may have been added twice;
- * once with the ASTNode.toStringTree as the alias
- * and then with the real alias.
- */
- extractRR.putExpression(astNode, eColInfo);
- if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
- colsAddedByHaving.put(alias, eColInfo);
+ if (spec.getQueryOrderSpec() != null) {
+ for (OrderExpression orderCol : spec.getQueryOrderSpec().getExpressions()) {
+ String orderString = orderCol.getOrder().name();
+ if (orderString.equals("ASC")) {
+ order.append('+');
+ } else {
+ order.append('-');
}
+ orderCols.add(genExprNodeDesc(orderCol.getExpression(), inputRR));
}
- String[] altMapping = inputRR.getAlternateMappings(colInfo.getInternalName());
- if ( altMapping != null ) {
- extractRR.put(altMapping[0], altMapping[1], eColInfo);
- }
- }
-
- for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
- String[] alias = columnAddedByHaving.getKey();
- ColumnInfo eColInfo = columnAddedByHaving.getValue();
- extractRR.put(alias[0], alias[1], eColInfo);
}
- /*
- * b. Construct Extract Operator.
- */
- input = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new ExtractDesc(
- new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
- Utilities.ReduceField.VALUE
- .toString(), "", false)),
- new RowSchema(inputRR.getColumnInfos()),
- input), extractRR);
-
-
- return input;
+ return genReduceSinkPlan(input, partCols, orderCols, order.toString(), -1);
}
-
public static ArrayList<WindowExpressionSpec> parseSelect(String selectExprStr)
throws SemanticException
{
@@ -12207,4 +12312,25 @@ public class SemanticAnalyzer extends Ba
protected boolean deleting() {
return false;
}
+ public static ASTNode genSelectDIAST(RowResolver rr) {
+ HashMap<String, LinkedHashMap<String, ColumnInfo>> map = rr.getRslvMap();
+ ASTNode selectDI = new ASTNode(new CommonToken(HiveParser.TOK_SELECTDI, "TOK_SELECTDI"));
+ for (String tabAlias : map.keySet()) {
+ for (Entry<String, ColumnInfo> entry : map.get(tabAlias).entrySet()) {
+ selectDI.addChild(buildSelExprSubTree(tabAlias, entry.getKey()));
+ }
+ }
+ return selectDI;
+ }
+ private static ASTNode buildSelExprSubTree(String tableAlias, String col) {
+ ASTNode selexpr = new ASTNode(new CommonToken(HiveParser.TOK_SELEXPR, "TOK_SELEXPR"));
+ ASTNode tableOrCol = new ASTNode(new CommonToken(HiveParser.TOK_TABLE_OR_COL,
+ "TOK_TABLE_OR_COL"));
+ ASTNode dot = new ASTNode(new CommonToken(HiveParser.DOT, "."));
+ tableOrCol.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, tableAlias)));
+ dot.addChild(tableOrCol);
+ dot.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, col)));
+ selexpr.addChild(dot);
+ return selexpr;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TableAccessAnalyzer.java Thu Jan 22 05:05:05 2015
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -143,7 +142,7 @@ public class TableAccessAnalyzer {
// Must be deterministic order map for consistent q-test output across Java versions
Map<String, List<String>> tableToKeysMap = new LinkedHashMap<String, List<String>>();
- Table tbl = pGraphContext.getTopToTable().get(tso);
+ Table tbl = tso.getConf().getTableMetadata();
tableToKeysMap.put(tbl.getCompleteName(), keyColNames);
tableAccessCtx.addOperatorTableAccess(op, tableToKeysMap);
@@ -174,10 +173,9 @@ public class TableAccessAnalyzer {
// Get the key column names for each side of the join,
// and check if the keys are all constants
// or columns (not expressions). If yes, proceed.
- QBJoinTree joinTree = pGraphContext.getJoinContext().get(op);
- assert(parentOps.size() == joinTree.getBaseSrc().length);
+ assert(parentOps.size() == op.getConf().getBaseSrc().length);
int pos = 0;
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : op.getConf().getBaseSrc()) {
if (src != null) {
assert(parentOps.get(pos) instanceof ReduceSinkOperator);
ReduceSinkOperator reduceSinkOp = (ReduceSinkOperator) parentOps.get(pos);
@@ -203,7 +201,7 @@ public class TableAccessAnalyzer {
return null;
}
- Table tbl = pGraphContext.getTopToTable().get(tso);
+ Table tbl = tso.getConf().getTableMetadata();
tableToKeysMap.put(tbl.getCompleteName(), keyColNames);
} else {
return null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java Thu Jan 22 05:05:05 2015
@@ -26,8 +26,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -60,6 +58,9 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
/**
* TaskCompiler is a the base class for classes that compile
* operator pipelines into tasks.
@@ -386,9 +387,7 @@ public abstract class TaskCompiler {
ParseContext clone = new ParseContext(conf,
pCtx.getQB(), pCtx.getParseTree(),
pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(),
- pCtx.getTopSelOps(), pCtx.getOpParseCtx(), pCtx.getJoinContext(),
- pCtx.getSmbMapJoinContext(), pCtx.getTopToTable(), pCtx.getTopToProps(),
- pCtx.getFsopToTable(),
+ pCtx.getOpParseCtx(), pCtx.getJoinOps(), pCtx.getSmbMapJoinOps(),
pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(),
pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(),
pCtx.getListMapJoinOpsNoReducer(), pCtx.getGroupOpToInputTables(),
@@ -399,7 +398,7 @@ public abstract class TaskCompiler {
pCtx.getQueryProperties());
clone.setFetchTask(pCtx.getFetchTask());
clone.setLineageInfo(pCtx.getLineageInfo());
- clone.setMapJoinContext(pCtx.getMapJoinContext());
+ clone.setMapJoinOps(pCtx.getMapJoinOps());
return clone;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/UnparseTranslator.java Thu Jan 22 05:05:05 2015
@@ -97,6 +97,10 @@ class UnparseTranslator {
int tokenStartIndex = node.getTokenStartIndex();
int tokenStopIndex = node.getTokenStopIndex();
+ if (tokenStopIndex < 0) {
+ // this is for artificially added tokens
+ return;
+ }
Translation translation = new Translation();
translation.tokenStopIndex = tokenStopIndex;
translation.replacementText = replacementText;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Thu Jan 22 05:05:05 2015
@@ -82,7 +82,7 @@ public class SparkProcessAnalyzeTable im
ParseContext parseContext = context.parseContext;
@SuppressWarnings("rawtypes")
- Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
+ Class<? extends InputFormat> inputFormat = tableScan.getConf().getTableMetadata()
.getInputFormatClass();
QB queryBlock = parseContext.getQB();
QBParseInfo parseInfo = parseContext.getQB().getParseInfo();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java Thu Jan 22 05:05:05 2015
@@ -18,12 +18,22 @@
package org.apache.hadoop.hive.ql.plan;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.PTFUtils;
+
public class AbstractOperatorDesc implements OperatorDesc {
protected boolean vectorMode = false;
protected transient Statistics statistics;
protected transient OpTraits opTraits;
+ protected transient Map<String, String> opProps;
+ static {
+ PTFUtils.makeTransient(AbstractOperatorDesc.class, "opProps");
+ }
+
@Override
@Explain(skipHeader = true, displayName = "Statistics")
public Statistics getStatistics() {
@@ -51,4 +61,12 @@ public class AbstractOperatorDesc implem
public void setOpTraits(OpTraits opTraits) {
this.opTraits = opTraits;
}
+
+ public Map<String, String> getOpProps() {
+ return opProps;
+ }
+
+ public void setOpProps(Map<String, String> props) {
+ this.opProps = props;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Thu Jan 22 05:05:05 2015
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
/**
* FileSinkDesc.
@@ -47,6 +48,7 @@ public class FileSinkDesc extends Abstra
private String compressCodec;
private String compressType;
private boolean multiFileSpray;
+ private boolean temporary;
// Whether the files output by this FileSink can be merged, e.g. if they are to be put into a
// bucketed or sorted table/partition they cannot be merged.
private boolean canBeMerged;
@@ -89,6 +91,8 @@ public class FileSinkDesc extends Abstra
private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
private long txnId = 0; // transaction id for this operation
+ private transient Table table;
+
public FileSinkDesc() {
}
@@ -217,6 +221,21 @@ public class FileSinkDesc extends Abstra
public void setMultiFileSpray(boolean multiFileSpray) {
this.multiFileSpray = multiFileSpray;
}
+
+ /**
+ * @return destination is temporary
+ */
+ public boolean isTemporary() {
+ return temporary;
+ }
+
+ /**
+ * @param totalFiles the totalFiles to set
+ */
+ public void setTemporary(boolean temporary) {
+ this.temporary = temporary;
+ }
+
public boolean canBeMerged() {
return canBeMerged;
@@ -421,4 +440,12 @@ public class FileSinkDesc extends Abstra
public long getTransactionId() {
return txnId;
}
+
+ public Table getTable() {
+ return table;
+ }
+
+ public void setTable(Table table) {
+ this.table = table;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Thu Jan 22 05:05:05 2015
@@ -61,7 +61,7 @@ public class GroupByDesc extends Abstrac
private ArrayList<ExprNodeDesc> keys;
private List<Integer> listGroupingSets;
private boolean groupingSetsPresent;
- private int groupingSetPosition;
+ private int groupingSetPosition = -1;
private ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators;
private ArrayList<java.lang.String> outputColumnNames;
private float groupByMemoryUsage;
@@ -177,6 +177,12 @@ public class GroupByDesc extends Abstrac
return outputColumnNames;
}
+ @Explain(displayName = "pruneGroupingSetId", displayOnlyOnTrue = true)
+ public boolean pruneGroupingSetId() {
+ return groupingSetPosition >= 0 &&
+ outputColumnNames.size() != keys.size() + aggregators.size();
+ }
+
public void setOutputColumnNames(
ArrayList<java.lang.String> outputColumnNames) {
this.outputColumnNames = outputColumnNames;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Thu Jan 22 05:05:05 2015
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
/**
* Join operator Descriptor implementation.
@@ -88,12 +90,26 @@ public class JoinDesc extends AbstractOp
// used only for explain.
private transient ExprNodeDesc [][] joinKeys;
+
+ // Data structures coming originally from QBJoinTree
+ private transient String leftAlias;
+ private transient String[] leftAliases;
+ private transient String[] rightAliases;
+ private transient String[] baseSrc;
+ private transient String id;
+ private transient boolean mapSideJoin;
+ private transient List<String> mapAliases; //map-side join aliases
+ private transient Map<String, Operator<? extends OperatorDesc>> aliasToOpInfo;
+ private transient boolean leftInputJoin;
+ private transient List<String> streamAliases;
+
public JoinDesc() {
}
public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
List<String> outputColumnNames, final boolean noOuterJoin,
- final JoinCondDesc[] conds, final Map<Byte, List<ExprNodeDesc>> filters, ExprNodeDesc[][] joinKeys) {
+ final JoinCondDesc[] conds, final Map<Byte, List<ExprNodeDesc>> filters,
+ ExprNodeDesc[][] joinKeys) {
this.exprs = exprs;
this.outputColumnNames = outputColumnNames;
this.noOuterJoin = noOuterJoin;
@@ -509,4 +525,88 @@ public class JoinDesc extends AbstractOp
public void setFixedAsSorted(boolean fixedAsSorted) {
this.fixedAsSorted = fixedAsSorted;
}
+
+ public String[] getLeftAliases() {
+ return leftAliases;
+ }
+
+ public String[] getBaseSrc() {
+ return baseSrc;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public List<String> getMapAliases() {
+ return mapAliases;
+ }
+
+ public Map<String, Operator<? extends OperatorDesc>> getAliasToOpInfo() {
+ return aliasToOpInfo;
+ }
+
+ public boolean isLeftInputJoin() {
+ return leftInputJoin;
+ }
+
+ public String getLeftAlias() {
+ return leftAlias;
+ }
+
+ public void setLeftAlias(String leftAlias) {
+ this.leftAlias = leftAlias;
+ }
+
+ public String[] getRightAliases() {
+ return rightAliases;
+ }
+
+ public List<String> getStreamAliases() {
+ return streamAliases;
+ }
+
+ public boolean isMapSideJoin() {
+ return mapSideJoin;
+ }
+
+ public void setQBJoinTreeProps(JoinDesc joinDesc) {
+ leftAlias = joinDesc.leftAlias;
+ leftAliases = joinDesc.leftAliases;
+ rightAliases = joinDesc.rightAliases;
+ baseSrc = joinDesc.baseSrc;
+ id = joinDesc.id;
+ mapSideJoin = joinDesc.mapSideJoin;
+ mapAliases = joinDesc.mapAliases;
+ aliasToOpInfo = joinDesc.aliasToOpInfo;
+ leftInputJoin = joinDesc.leftInputJoin;
+ streamAliases = joinDesc.streamAliases;
+ }
+
+ public void setQBJoinTreeProps(QBJoinTree joinTree) {
+ leftAlias = joinTree.getLeftAlias();
+ leftAliases = joinTree.getLeftAliases();
+ rightAliases = joinTree.getRightAliases();
+ baseSrc = joinTree.getBaseSrc();
+ id = joinTree.getId();
+ mapSideJoin = joinTree.isMapSideJoin();
+ mapAliases = joinTree.getMapAliases();
+ aliasToOpInfo = joinTree.getAliasToOpInfo();
+ leftInputJoin = joinTree.getJoinSrc() != null;
+ streamAliases = joinTree.getStreamAliases();
+ }
+
+ public void cloneQBJoinTreeProps(JoinDesc joinDesc) {
+ leftAlias = joinDesc.leftAlias;
+ leftAliases = joinDesc.leftAliases == null ? null : joinDesc.leftAliases.clone();
+ rightAliases = joinDesc.rightAliases == null ? null : joinDesc.rightAliases.clone();
+ baseSrc = joinDesc.baseSrc == null ? null : joinDesc.baseSrc.clone();
+ id = joinDesc.id;
+ mapSideJoin = joinDesc.mapSideJoin;
+ mapAliases = joinDesc.mapAliases == null ? null : new ArrayList<String>(joinDesc.mapAliases);
+ aliasToOpInfo = new HashMap<String, Operator<? extends OperatorDesc>>(joinDesc.aliasToOpInfo);
+ leftInputJoin = joinDesc.leftInputJoin;
+ streamAliases = joinDesc.streamAliases == null ? null : new ArrayList<String>(joinDesc.streamAliases);
+ }
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Thu Jan 22 05:05:05 2015
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import com.google.common.collect.Interner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -41,10 +40,11 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol;
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SplitSample;
import org.apache.hadoop.mapred.JobConf;
+import com.google.common.collect.Interner;
+
/**
* MapWork represents all the information used to run a map task on the cluster.
* It is first used when the query planner breaks the logical plan into tasks and
@@ -105,8 +105,10 @@ public class MapWork extends BaseWork {
public static final int SAMPLING_ON_START = 2; // sampling on task running
// the following two are used for join processing
- private QBJoinTree joinTree;
private LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap;
+ private boolean leftInputJoin;
+ private String[] baseSrc;
+ private List<String> mapAliases;
private boolean mapperCannotSpanPartns;
@@ -419,14 +421,6 @@ public class MapWork extends BaseWork {
return useOneNullRowInputFormat;
}
- public QBJoinTree getJoinTree() {
- return joinTree;
- }
-
- public void setJoinTree(QBJoinTree joinTree) {
- this.joinTree = joinTree;
- }
-
public void setMapperCannotSpanPartns(boolean mapperCannotSpanPartns) {
this.mapperCannotSpanPartns = mapperCannotSpanPartns;
}
@@ -579,4 +573,28 @@ public class MapWork extends BaseWork {
public boolean getDoSplitsGrouping() {
return this.doSplitsGrouping;
}
+
+ public boolean isLeftInputJoin() {
+ return leftInputJoin;
+ }
+
+ public void setLeftInputJoin(boolean leftInputJoin) {
+ this.leftInputJoin = leftInputJoin;
+ }
+
+ public String[] getBaseSrc() {
+ return baseSrc;
+ }
+
+ public void setBaseSrc(String[] baseSrc) {
+ this.baseSrc = baseSrc;
+ }
+
+ public List<String> getMapAliases() {
+ return mapAliases;
+ }
+
+ public void setMapAliases(List<String> mapAliases) {
+ this.mapAliases = mapAliases;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java Thu Jan 22 05:05:05 2015
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.Map;
public interface OperatorDesc extends Serializable, Cloneable {
public Object clone() throws CloneNotSupportedException;
@@ -26,4 +27,5 @@ public interface OperatorDesc extends Se
public void setStatistics(Statistics statistics);
public OpTraits getOpTraits();
public void setOpTraits(OpTraits opTraits);
+ public Map<String, String> getOpProps();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Thu Jan 22 05:05:05 2015
@@ -956,7 +956,7 @@ public final class PlanUtils {
ReadEntity parentViewInfo = getParentViewInfo(alias, parseCtx.getViewAliasToInput());
// Adds tables only for create view (PPD filter can be appended by outer query)
- Table table = parseCtx.getTopToTable().get(topOp);
+ Table table = topOp.getConf().getTableMetadata();
PlanUtils.addInput(inputs, new ReadEntity(table, parentViewInfo));
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Thu Jan 22 05:05:05 2015
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
/**
@@ -36,7 +37,7 @@ public class TableScanDesc extends Abstr
private static final long serialVersionUID = 1L;
static {
- PTFUtils.makeTransient(TableScanDesc.class, "filterObject", "referencedColumns");
+ PTFUtils.makeTransient(TableScanDesc.class, "filterObject", "referencedColumns", "tableMetadata");
}
private String alias;
@@ -93,23 +94,32 @@ public class TableScanDesc extends Abstr
private boolean isMetadataOnly = false;
- @SuppressWarnings("nls")
+ private transient final Table tableMetadata;
+
+
public TableScanDesc() {
+ this(null, null);
}
- public TableScanDesc(final String alias) {
- this.alias = alias;
+ @SuppressWarnings("nls")
+ public TableScanDesc(Table tblMetadata) {
+ this(null, tblMetadata);
}
- public TableScanDesc(final String alias, List<VirtualColumn> vcs) {
+ public TableScanDesc(final String alias, Table tblMetadata) {
+ this(alias, null, tblMetadata);
+ }
+
+ public TableScanDesc(final String alias, List<VirtualColumn> vcs, Table tblMetadata) {
this.alias = alias;
this.virtualCols = vcs;
+ this.tableMetadata = tblMetadata;
}
@Override
public Object clone() {
List<VirtualColumn> vcs = new ArrayList<VirtualColumn>(getVirtualCols());
- return new TableScanDesc(getAlias(), vcs);
+ return new TableScanDesc(getAlias(), vcs, this.tableMetadata);
}
@Explain(displayName = "alias")
@@ -250,8 +260,12 @@ public class TableScanDesc extends Abstr
public void setIsMetadataOnly(boolean metadata_only) {
isMetadataOnly = metadata_only;
}
-
+
public boolean getIsMetadataOnly() {
return isMetadataOnly;
}
+
+ public Table getTableMetadata() {
+ return tableMetadata;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Thu Jan 22 05:05:05 2015
@@ -897,7 +897,7 @@ public final class OpProcFactory {
HiveConf hiveConf) {
TableScanDesc tableScanDesc = tableScanOp.getConf();
- Table tbl = owi.getParseContext().getTopToTable().get(tableScanOp);
+ Table tbl = tableScanDesc.getTableMetadata();
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER)) {
// attach the original predicate to the table scan operator for index
// optimizations that require the pushed predicate before pcr & later
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java Thu Jan 22 05:05:05 2015
@@ -29,6 +29,7 @@ import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -49,8 +50,14 @@ public final class CommandProcessorFacto
}
public static CommandProcessor getForHiveCommand(String[] cmd, HiveConf conf)
- throws SQLException {
- HiveCommand hiveCommand = HiveCommand.find(cmd);
+ throws SQLException {
+ return getForHiveCommandInternal(cmd, conf, false);
+ }
+
+ public static CommandProcessor getForHiveCommandInternal(String[] cmd, HiveConf conf,
+ boolean testOnly)
+ throws SQLException {
+ HiveCommand hiveCommand = HiveCommand.find(cmd, testOnly);
if (hiveCommand == null || isBlank(cmd[0])) {
return null;
}
@@ -58,7 +65,8 @@ public final class CommandProcessorFacto
conf = new HiveConf();
}
Set<String> availableCommands = new HashSet<String>();
- for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST).split(",")) {
+ for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)
+ .split(",")) {
availableCommands.add(availableCommand.toLowerCase().trim());
}
if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
@@ -82,6 +90,12 @@ public final class CommandProcessorFacto
return new CompileProcessor();
case RELOAD:
return new ReloadProcessor();
+ case CRYPTO:
+ try {
+ return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
+ } catch (HiveException e) {
+ throw new SQLException("Fail to start the command processor due to the exception: ", e);
+ }
default:
throw new AssertionError("Unknown HiveCommand " + hiveCommand);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java Thu Jan 22 05:05:05 2015
@@ -29,18 +29,40 @@ public enum HiveCommand {
SET(),
RESET(),
DFS(),
+ CRYPTO(true),
ADD(),
LIST(),
RELOAD(),
DELETE(),
COMPILE();
+
+ public static boolean ONLY_FOR_TESTING = true;
+ private boolean usedOnlyForTesting;
+
+ HiveCommand() {
+ this(false);
+ }
+
+ HiveCommand(boolean onlyForTesting) {
+ this.usedOnlyForTesting = onlyForTesting;
+ }
+
+ public boolean isOnlyForTesting() {
+ return this.usedOnlyForTesting;
+ }
+
private static final Set<String> COMMANDS = new HashSet<String>();
static {
for (HiveCommand command : HiveCommand.values()) {
COMMANDS.add(command.name());
}
}
+
public static HiveCommand find(String[] command) {
+ return find(command, false);
+ }
+
+ public static HiveCommand find(String[] command, boolean findOnlyForTesting) {
if (null == command){
return null;
}
@@ -54,7 +76,13 @@ public enum HiveCommand {
//special handling for SQL "delete from <table> where..."
return null;
} else if (COMMANDS.contains(cmd)) {
- return HiveCommand.valueOf(cmd);
+ HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
+
+ if (findOnlyForTesting == hiveCommand.isOnlyForTesting()) {
+ return hiveCommand;
+ }
+
+ return null;
}
}
return null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Thu Jan 22 05:05:05 2015
@@ -66,6 +66,7 @@ import org.apache.hadoop.hive.ql.securit
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl;
import org.apache.hadoop.hive.ql.util.DosToUnix;
+import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -184,6 +185,11 @@ public class SessionState {
private SparkSession sparkSession;
/**
+ * Gets information about HDFS encryption
+ */
+ private HadoopShims.HdfsEncryptionShim hdfsEncryptionShim;
+
+ /**
* Lineage state.
*/
LineageState ls;
@@ -378,6 +384,23 @@ public class SessionState {
return txnAutoCommit;
}
+ public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException {
+ if (hdfsEncryptionShim == null) {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ if ("hdfs".equals(fs.getUri().getScheme())) {
+ hdfsEncryptionShim = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf);
+ } else {
+ LOG.info("Could not get hdfsEncryptionShim, it is only applicable to hdfs filesystem.");
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ return hdfsEncryptionShim;
+ }
+
/**
* Singleton Session object per thread.
*
@@ -410,7 +433,6 @@ public class SessionState {
* when switching from one session to another.
*/
public static SessionState start(SessionState startSs) {
-
setCurrentSessionState(startSs);
if (startSs.hiveHist == null){
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Thu Jan 22 05:05:05 2015
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -262,7 +263,7 @@ public class Initiator extends Compactor
private long sumDirSize(FileSystem fs, Path dir) throws IOException {
long size = 0;
- FileStatus[] buckets = fs.listStatus(dir);
+ FileStatus[] buckets = fs.listStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (int i = 0; i < buckets.length; i++) {
size += buckets[i].getLen();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFDayOfMonth.java Thu Jan 22 05:05:05 2015
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.Text;
extended = "date is a string in the format of 'yyyy-MM-dd HH:mm:ss' or "
+ "'yyyy-MM-dd'.\n"
+ "Example:\n "
- + " > SELECT _FUNC_('2009-30-07', 1) FROM src LIMIT 1;\n" + " 30")
+ + " > SELECT _FUNC_('2009-07-30') FROM src LIMIT 1;\n" + " 30")
@VectorizedExpressions({VectorUDFDayOfMonthLong.class, VectorUDFDayOfMonthString.class})
public class UDFDayOfMonth extends UDF {
private final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFMonth.java Thu Jan 22 05:05:05 2015
@@ -40,7 +40,7 @@ import org.apache.hadoop.io.Text;
@Description(name = "month",
value = "_FUNC_(date) - Returns the month of date",
extended = "Example:\n"
- + " > SELECT _FUNC_('2009-30-07') FROM src LIMIT 1;\n" + " 7")
+ + " > SELECT _FUNC_('2009-07-30') FROM src LIMIT 1;\n" + " 7")
@VectorizedExpressions({VectorUDFMonthLong.class, VectorUDFMonthString.class})
public class UDFMonth extends UDF {
private final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFYear.java Thu Jan 22 05:05:05 2015
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.Text;
extended = "date is a string in the format of 'yyyy-MM-dd HH:mm:ss' or "
+ "'yyyy-MM-dd'.\n"
+ "Example:\n "
- + " > SELECT _FUNC_('2009-30-07', 1) FROM src LIMIT 1;\n" + " 2009")
+ + " > SELECT _FUNC_('2009-07-30') FROM src LIMIT 1;\n" + " 2009")
@VectorizedExpressions({VectorUDFYearLong.class, VectorUDFYearString.class})
public class UDFYear extends UDF {
private final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateAdd.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateAdd.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateAdd.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateAdd.java Thu Jan 22 05:05:05 2015
@@ -61,8 +61,8 @@ import org.apache.hadoop.io.IntWritable;
+ " 'yyyy-MM-dd'. num_days is a number. The time part of start_date is "
+ "ignored.\n"
+ "Example:\n "
- + " > SELECT _FUNC_('2009-30-07', 1) FROM src LIMIT 1;\n"
- + " '2009-31-07'")
+ + " > SELECT _FUNC_('2009-07-30', 1) FROM src LIMIT 1;\n"
+ + " '2009-07-31'")
@VectorizedExpressions({VectorUDFDateAddColScalar.class, VectorUDFDateAddScalarCol.class, VectorUDFDateAddColCol.class})
public class GenericUDFDateAdd extends GenericUDF {
private transient SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateDiff.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateDiff.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateDiff.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateDiff.java Thu Jan 22 05:05:05 2015
@@ -57,7 +57,7 @@ import org.apache.hadoop.io.IntWritable;
+ "'yyyy-MM-dd HH:mm:ss' or 'yyyy-MM-dd'. The time parts are ignored."
+ "If date1 is earlier than date2, the result is negative.\n"
+ "Example:\n "
- + " > SELECT _FUNC_('2009-30-07', '2009-31-07') FROM src LIMIT 1;\n"
+ + " > SELECT _FUNC_('2009-07-30', '2009-07-31') FROM src LIMIT 1;\n"
+ " 1")
@VectorizedExpressions({VectorUDFDateDiffColScalar.class, VectorUDFDateDiffColCol.class, VectorUDFDateDiffScalarCol.class})
public class GenericUDFDateDiff extends GenericUDF {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateSub.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateSub.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateSub.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFDateSub.java Thu Jan 22 05:05:05 2015
@@ -61,8 +61,8 @@ import org.apache.hadoop.io.IntWritable;
+ " 'yyyy-MM-dd'. num_days is a number. The time part of start_date is "
+ "ignored.\n"
+ "Example:\n "
- + " > SELECT _FUNC_('2009-30-07', 1) FROM src LIMIT 1;\n"
- + " '2009-29-07'")
+ + " > SELECT _FUNC_('2009-07-30', 1) FROM src LIMIT 1;\n"
+ + " '2009-07-29'")
@VectorizedExpressions({VectorUDFDateSubColScalar.class, VectorUDFDateSubScalarCol.class, VectorUDFDateSubColCol.class})
public class GenericUDFDateSub extends GenericUDF {
private transient SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInFile.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInFile.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInFile.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFInFile.java Thu Jan 22 05:05:05 2015
@@ -19,11 +19,11 @@
package org.apache.hadoop.hive.ql.udf.generic;
import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.HashSet;
import org.apache.hadoop.hive.ql.exec.Description;
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IOUtils;
/**
* IN_FILE(str, filename) returns true if 'str' appears in the file specified
@@ -106,29 +107,37 @@ public class GenericUDFInFile extends Ge
arguments[0].get(), strObjectInspector).toString();
if (set == null) {
- String fileName = (String)ObjectInspectorUtils.copyToStandardJavaObject(
+ String filePath = (String)ObjectInspectorUtils.copyToStandardJavaObject(
arguments[1].get(), fileObjectInspector);
- try {
- load(new FileInputStream((new File(fileName)).getName()));
- } catch (FileNotFoundException e) {
- throw new HiveException(e);
- }
+ loadFromFile(filePath);
}
- return Boolean.valueOf(set.contains(str));
+ return set.contains(str);
}
- /**
- * Load the file from an InputStream.
- * @param is The InputStream contains the file data.
- * @throws HiveException
- */
- public void load(InputStream is) throws HiveException {
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(is));
+ private BufferedReader getReaderFor(String filePath) throws HiveException {
+ try {
+ Path fullFilePath = FileSystems.getDefault().getPath(filePath);
+ Path fileName = fullFilePath.getFileName();
+ if (Files.exists(fileName)) {
+ return Files.newBufferedReader(fileName, Charset.defaultCharset());
+ }
+ else
+ if (Files.exists(fullFilePath)) {
+ return Files.newBufferedReader(fullFilePath, Charset.defaultCharset());
+ }
+ else {
+ throw new HiveException("Could not find \"" + fileName + "\" or \"" + fullFilePath + "\" in IN_FILE() UDF.");
+ }
+ }
+ catch(IOException exception) {
+ throw new HiveException(exception);
+ }
+ }
+ private void loadFromFile(String filePath) throws HiveException {
set = new HashSet<String>();
-
+ BufferedReader reader = getReaderFor(filePath);
try {
String line;
while((line = reader.readLine()) != null) {
@@ -137,6 +146,20 @@ public class GenericUDFInFile extends Ge
} catch (Exception e) {
throw new HiveException(e);
}
+ finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ @Override
+ public void copyToNewInstance(Object newInstance) throws UDFArgumentException {
+ super.copyToNewInstance(newInstance); // Asserts the class invariant. (Same types.)
+ GenericUDFInFile that = (GenericUDFInFile)newInstance;
+ if (that != this) {
+ that.set = (this.set == null ? null : (HashSet<String>)this.set.clone());
+ that.strObjectInspector = this.strObjectInspector;
+ that.fileObjectInspector = this.fileObjectInspector;
+ }
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLower.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLower.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLower.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLower.java Thu Jan 22 05:05:05 2015
@@ -51,7 +51,7 @@ public class GenericUDFLower extends Gen
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
- if (arguments.length < 0) {
+ if (arguments.length != 1) {
throw new UDFArgumentLengthException(
"LOWER requires 1 argument, got " + arguments.length);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFUpper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFUpper.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFUpper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFUpper.java Thu Jan 22 05:05:05 2015
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
/**
* UDFUpper.
@@ -52,7 +51,7 @@ public class GenericUDFUpper extends Gen
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
- if (arguments.length < 0) {
+ if (arguments.length != 1) {
throw new UDFArgumentLengthException(
"UPPER requires 1 argument, got " + arguments.length);
}
Modified: hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (original)
+++ hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java Thu Jan 22 05:05:05 2015
@@ -101,7 +101,7 @@ public class TestSymlinkTextInputFormat
}
/**
- * Test combine symlink text input file. Two input dir, and each contails one
+ * Test combine symlink text input file. Two input dir, and each contains one
* file, and then create one symlink file containing these 2 files. Normally
* without combine, it will return at least 2 splits
*/
@@ -166,7 +166,11 @@ public class TestSymlinkTextInputFormat
}
String cmd = "select key*1 from " + tblName;
- drv.compile(cmd);
+ ecode = drv.compile(cmd);
+ if (ecode != 0) {
+ throw new Exception("Select compile: " + cmd
+ + " failed with exit code= " + ecode);
+ }
//create scratch dir
Context ctx = new Context(newJob);
Modified: hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java (original)
+++ hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java Thu Jan 22 05:05:05 2015
@@ -18,70 +18,82 @@
package org.apache.hadoop.hive.ql.lockmgr.zookeeper;
-import static org.mockito.Mockito.*;
-
-import java.util.Collections;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.After;
import org.junit.Test;
-import com.google.common.base.Joiner;
-
public class TestZookeeperLockManager {
- private static final Joiner SLASH = Joiner.on("/");
- private static final String PARENT = "hive";
- private static final String TABLE = "t1";
- private static final String PARENT_LOCK_PATH = SLASH.join("", PARENT, TABLE);
- private static final String TABLE_LOCK_PATH = SLASH.join("", PARENT, TABLE, "00001");
private HiveConf conf;
- private ZooKeeper zooKeeper;
+ private TestingServer server;
+ private CuratorFramework client;
private HiveLockObject hiveLock;
private ZooKeeperHiveLock zLock;
+ private HiveLockObjectData lockObjData;
+ private static final String PARENT = "hive";
+ private static final String TABLE = "t1";
+ private static final String PARENT_LOCK_PATH = "/hive/t1";
+ private static final String TABLE_LOCK_PATH = "/hive/t1/00001";
@Before
public void setup() {
conf = new HiveConf();
- zooKeeper = mock(ZooKeeper.class);
- hiveLock = mock(HiveLockObject.class);
- when(hiveLock.getName()).thenReturn(TABLE);
+ lockObjData = new HiveLockObjectData("1", "10", "SHARED", "show tables");
+ hiveLock = new HiveLockObject(TABLE, lockObjData);
zLock = new ZooKeeperHiveLock(TABLE_LOCK_PATH, hiveLock, HiveLockMode.SHARED);
- }
- @Test
- public void testDeleteNoChildren() throws Exception {
- ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
- verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
- verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
- verify(zooKeeper).delete(PARENT_LOCK_PATH, -1);
- verifyNoMoreInteractions(zooKeeper);
+ while (server == null)
+ {
+ try {
+ server = new TestingServer();
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build();
+ client.start();
+ } catch (Exception e) {
+ System.err.println("Getting bind exception - retrying to allocate server");
+ server = null;
+ }
+ }
}
- /**
- * Tests two threads racing to delete PARENT_LOCK_PATH
- */
- @Test
- public void testDeleteNoChildrenNodeDoesNotExist() throws Exception {
- doThrow(new KeeperException.NoNodeException()).when(zooKeeper).delete(PARENT_LOCK_PATH, -1);
- ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
- verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
- verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
- verify(zooKeeper).delete(PARENT_LOCK_PATH, -1);
- verifyNoMoreInteractions(zooKeeper);
+
+ @After
+ public void teardown() throws Exception
+ {
+ client.close();
+ server.close();
+ server = null;
}
+
@Test
- public void testDeleteWithChildren() throws Exception {
- when(zooKeeper.getChildren(PARENT_LOCK_PATH, false)).thenReturn(Collections.singletonList("somechild"));
- ZooKeeperHiveLockManager.unlockPrimitive(conf, zooKeeper, zLock, PARENT);
- verify(zooKeeper).delete(TABLE_LOCK_PATH, -1);
- verify(zooKeeper).getChildren(PARENT_LOCK_PATH, false);
- verifyNoMoreInteractions(zooKeeper);
+ public void testDeleteNoChildren() throws Exception
+ {
+ client.create().creatingParentsIfNeeded().forPath(TABLE_LOCK_PATH, lockObjData.toString().getBytes());
+ byte[] data = client.getData().forPath(TABLE_LOCK_PATH);
+ Assert.assertArrayEquals(lockObjData.toString().getBytes(), data);
+ ZooKeeperHiveLockManager.unlockPrimitive(zLock, PARENT, client);
+ try {
+ data = client.getData().forPath(TABLE_LOCK_PATH);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals( e instanceof KeeperException.NoNodeException, true);
+ }
+ try {
+ data = client.getData().forPath(PARENT_LOCK_PATH);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertEquals( e instanceof KeeperException.NoNodeException, true);
+ }
}
@Test
@@ -99,3 +111,4 @@ public class TestZookeeperLockManager {
Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
}
}
+
Modified: hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java (original)
+++ hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java Thu Jan 22 05:05:05 2015
@@ -95,7 +95,7 @@ public class TestGenTezWork {
rs = new ReduceSinkOperator();
rs.setConf(new ReduceSinkDesc());
ts = new TableScanOperator();
- ts.setConf(new TableScanDesc());
+ ts.setConf(new TableScanDesc(null));
ts.getChildOperators().add(rs);
rs.getParentOperators().add(ts);
rs.getChildOperators().add(fs);
Modified: hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java (original)
+++ hive/branches/spark/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java Thu Jan 22 05:05:05 2015
@@ -193,7 +193,7 @@ public class TestIUD {
}
@Test
public void testInsertIntoTableAsSelectFromNamedVirtTable() throws ParseException {
- ASTNode ast = parse("insert into table page_view select a,b as c from (values (1,2),(3,4)) as VC(a,b) where b = 9");
+ ASTNode ast = parse("insert into page_view select a,b as c from (values (1,2),(3,4)) as VC(a,b) where b = 9");
Assert.assertEquals("AST doesn't match",
"(TOK_QUERY " +
"(TOK_FROM " +
@@ -209,7 +209,7 @@ public class TestIUD {
}
@Test
public void testInsertIntoTableFromAnonymousTable1Row() throws ParseException {
- ASTNode ast = parse("insert into table page_view values(1,2)");
+ ASTNode ast = parse("insert into page_view values(1,2)");
Assert.assertEquals("AST doesn't match",
"(TOK_QUERY " +
"(TOK_FROM " +
@@ -232,5 +232,16 @@ public class TestIUD {
"(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME page_view))) " +
"(TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))",
ast.toStringTree());
+ //same query as above less the "table" keyword KW_TABLE
+ ast = parse("insert into page_view values(-1,2),(3,+4)");
+ Assert.assertEquals("AST doesn't match",
+ "(TOK_QUERY " +
+ "(TOK_FROM " +
+ "(TOK_VIRTUAL_TABLE " +
+ "(TOK_VIRTUAL_TABREF TOK_ANONYMOUS) " +
+ "(TOK_VALUES_TABLE (TOK_VALUE_ROW (- 1) 2) (TOK_VALUE_ROW 3 (+ 4))))) " +
+ "(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME page_view))) " +
+ "(TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))",
+ ast.toStringTree());
}
}