You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2018/07/12 18:46:12 UTC
hive git commit: HIVE-20091: Tez: Add security credentials for
FileSinkOperator output (Matt McCline, reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/master e0c2b9d97 -> 3fa7f0c6e
HIVE-20091: Tez: Add security credentials for FileSinkOperator output (Matt McCline, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3fa7f0c6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3fa7f0c6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3fa7f0c6
Branch: refs/heads/master
Commit: 3fa7f0c6e5a42d0e1fc7fb7bb8c2d7b1a86acee8
Parents: e0c2b9d
Author: Matt McCline <mm...@hortonworks.com>
Authored: Thu Jul 12 13:46:05 2018 -0500
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Thu Jul 12 13:46:05 2018 -0500
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/tez/DagUtils.java | 101 ++++++++++++++++++-
1 file changed, 98 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3fa7f0c6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 0e75f6e..de0abd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -18,12 +18,13 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.util.Collection;
-
import java.util.concurrent.ConcurrentHashMap;
+
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+
import javax.security.auth.login.LoginException;
import java.io.File;
@@ -37,9 +38,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
@@ -79,10 +83,22 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
@@ -169,6 +185,57 @@ public class DagUtils {
*/
private final ConcurrentHashMap<String, Object> copyNotifiers = new ConcurrentHashMap<>();
+ class CollectFileSinkUrisNodeProcessor implements NodeProcessor {
+
+ private final Set<URI> uris;
+
+ public CollectFileSinkUrisNodeProcessor(Set<URI> uris) {
+ this.uris = uris;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ for (Node n : stack) {
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+ OperatorDesc desc = op.getConf();
+ if (desc instanceof FileSinkDesc) {
+ FileSinkDesc fileSinkDesc = (FileSinkDesc) desc;
+ Path dirName = fileSinkDesc.getDirName();
+ if (dirName != null) {
+ uris.add(dirName.toUri());
+ }
+ Path destPath = fileSinkDesc.getDestPath();
+ if (destPath != null) {
+ uris.add(destPath.toUri());
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ private void addCollectFileSinkUrisRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
+ opRules.put(new RuleRegExp("R1", FileSinkOperator.getOperatorName() + ".*"), np);
+ }
+
+ private void collectFileSinkUris(List<Node> topNodes, Set<URI> uris) {
+
+ CollectFileSinkUrisNodeProcessor np = new CollectFileSinkUrisNodeProcessor(uris);
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ addCollectFileSinkUrisRules(opRules, np);
+
+ Dispatcher disp = new DefaultRuleDispatcher(np, opRules, null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ try {
+ ogw.startWalking(topNodes, null);
+ } catch (SemanticException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private void addCredentials(MapWork mapWork, DAG dag) {
Set<Path> paths = mapWork.getPathToAliases().keySet();
if (!paths.isEmpty()) {
@@ -184,15 +251,43 @@ public class DagUtils {
if (LOG.isDebugEnabled()) {
for (URI uri: uris) {
- LOG.debug("Marking URI as needing credentials: "+uri);
+ LOG.debug("Marking MapWork input URI as needing credentials: " + uri);
}
}
dag.addURIsForCredentials(uris);
}
+
+ Set<URI> fileSinkUris = new HashSet<URI>();
+
+ List<Node> topNodes = new ArrayList<Node>();
+ LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
+ for (Operator<? extends OperatorDesc> operator : aliasToWork.values()) {
+ topNodes.add(operator);
+ }
+ collectFileSinkUris(topNodes, fileSinkUris);
+
+ if (LOG.isDebugEnabled()) {
+ for (URI fileSinkUri: fileSinkUris) {
+ LOG.debug("Marking MapWork output URI as needing credentials: " + fileSinkUri);
+ }
+ }
+ dag.addURIsForCredentials(fileSinkUris);
}
private void addCredentials(ReduceWork reduceWork, DAG dag) {
- // nothing at the moment
+
+ Set<URI> fileSinkUris = new HashSet<URI>();
+
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.add(reduceWork.getReducer());
+ collectFileSinkUris(topNodes, fileSinkUris);
+
+ if (LOG.isDebugEnabled()) {
+ for (URI fileSinkUri: fileSinkUris) {
+ LOG.debug("Marking ReduceWork output URI as needing credentials: " + fileSinkUri);
+ }
+ }
+ dag.addURIsForCredentials(fileSinkUris);
}
/*