You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2013/10/19 00:43:00 UTC
svn commit: r1533658 - in /hive/trunk/hcatalog:
src/test/e2e/templeton/inpdir/ src/test/e2e/templeton/tests/
webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/
webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/
Author: daijy
Date: Fri Oct 18 22:42:59 2013
New Revision: 1533658
URL: http://svn.apache.org/r1533658
Log:
HIVE-5133: webhcat jobs that need to access metastore fails in secure mode (Eugene Koifman via Daniel Dai)
Added:
hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java
Modified:
hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
Added: hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig?rev=1533658&view=auto
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig (added)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig Fri Oct 18 22:42:59 2013
@@ -0,0 +1,21 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+l = load '$INPDIR/nums.txt' as (i:int, j:int);
+store l into 'hcattest_pig' using org.apache.hive.hcatalog.pig.HCatStorer();
+s = load 'hcattest_pig' using org.apache.hive.hcatalog.pig.HCatLoader();
+store s into '$OUTDIR/loadstore.out';
Modified: hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Fri Oct 18 22:42:59 2013
@@ -234,7 +234,7 @@ $cfg =
{
#a simple load store script with log enabled
- 'num' => 9,
+ 'num' => 10,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/pig',
'post_options' => ['user.name=:UNAME:', 'arg=-p', 'arg=INPDIR=:INPDIR_HDFS:','arg=-p', 'arg=OUTDIR=:OUTDIR:', 'file=:INPDIR_HDFS:/loadstore.pig',
@@ -249,7 +249,31 @@ $cfg =
'check_call_back' => 1,
},
- #test 10
+ {
+ #note: this test will fail unless Hive is installed in the default location Pig expects it in
+ #HIVE-5547 will address this limitation
+ 'num' => 11,
+ 'setup' => [
+ {
+ 'method' => 'POST',
+ 'url' => ':TEMPLETON_URL:/templeton/v1/ddl',
+ 'status_code' => 200,
+ 'post_options' => ['user.name=:UNAME:','exec=drop table if exists hcattest_pig; create table hcattest_pig(i int, j int) STORED AS textfile;'],
+ 'json_field_substr_match' => {'stderr' => 'OK'}
+ }
+ ],
+ 'method' => 'POST',
+ 'url' => ':TEMPLETON_URL:/templeton/v1/pig',
+ 'post_options' => ['user.name=:UNAME:', 'arg=-useHCatalog', 'arg=-p', 'arg=INPDIR=:INPDIR_HDFS:', 'arg=-p', 'arg= OUTDIR=:OUTDIR:', 'file=:INPDIR_HDFS:/hcatloadstore.pig'],
+
+ 'json_field_substr_match' => { 'id' => '\d+'},
+ 'status_code' => 200,
+ 'check_job_created' => 1,
+ 'check_job_complete' => 'SUCCESS',
+ 'check_job_exit_value' => 0,
+ 'check_call_back' => 1,
+ },
+ #test 11
#TODO jython test
@@ -420,7 +444,7 @@ $cfg =
{
#test add jar
- 'num' => 9,
+ 'num' => 11,
'ignore23' => 'Log collector does not work with Hadoop 2',
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/hive',
@@ -435,7 +459,7 @@ $cfg =
},
{
#test add jar when the jar is not shipped
- 'num' => 10,
+ 'num' => 12,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/hive',
'post_options' => ['user.name=:UNAME:','execute=add jar piggybank.jar',],
@@ -449,7 +473,7 @@ $cfg =
},
{
#enable logs
- 'num' => 11,
+ 'num' => 13,
'ignore23' => 'Log collector does not work with Hadoop 2',
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/hive',
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java Fri Oct 18 22:42:59 2013
@@ -20,11 +20,15 @@ package org.apache.hive.hcatalog.templet
import java.io.IOException;
import java.net.URL;
-import java.net.MalformedURLException;
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.templeton.tool.DelegationTokenCache;
import org.apache.hive.hcatalog.templeton.tool.JobState;
import org.apache.hive.hcatalog.templeton.tool.TempletonUtils;
@@ -39,11 +43,12 @@ import org.apache.hive.hcatalog.templeto
* this at the same time. That should never happen.
*
* We use a Hadoop config var to notify this class on the completion
- * of a job. Hadoop will call use multiple times in the event of
+ * of a job. Hadoop will call us multiple times in the event of
* failure. Even if the failure is that the client callback failed.
*
* See LauncherDelegator for the HADOOP_END_RETRY* vars that are set.
*/
+@InterfaceAudience.Private
public class CompleteDelegator extends TempletonDelegator {
private static final Log LOG = LogFactory.getLog(CompleteDelegator.class);
@@ -51,28 +56,36 @@ public class CompleteDelegator extends T
super(appConf);
}
- public CompleteBean run(String id)
+ public CompleteBean run(String id, String jobStatus)
throws CallbackFailedException, IOException {
if (id == null)
acceptWithError("No jobid given");
JobState state = null;
+ /* we don't want to cancel the delegation token if we think the callback is going to
+ to be retried, for example, because the job is not complete yet */
+ boolean cancelMetastoreToken = false;
try {
state = new JobState(id, Main.getAppConfigInstance());
if (state.getCompleteStatus() == null)
- failed("Job not yet complete. jobId=" + id, null);
+ failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null);
Long notified = state.getNotifiedTime();
- if (notified != null)
+ if (notified != null) {
+ cancelMetastoreToken = true;
return acceptWithError("Callback already run for jobId=" + id +
" at " + new Date(notified));
+ }
String callback = state.getCallback();
- if (callback == null)
+ if (callback == null) {
+ cancelMetastoreToken = true;
return new CompleteBean("No callback registered");
-
+ }
+
try {
doCallback(state.getId(), callback);
+ cancelMetastoreToken = true;
} catch (Exception e) {
failed("Callback failed " + callback + " for " + id, e);
}
@@ -80,8 +93,26 @@ public class CompleteDelegator extends T
state.setNotifiedTime(System.currentTimeMillis());
return new CompleteBean("Callback sent");
} finally {
- if (state != null)
- state.close();
+ state.close();
+ HiveMetaStoreClient client = null;
+ try {
+ if(cancelMetastoreToken) {
+ String metastoreTokenStrForm =
+ DelegationTokenCache.getStringFormTokenCache().getDelegationToken(id);
+ if(metastoreTokenStrForm != null) {
+ client = HCatUtil.getHiveClient(new HiveConf());
+ client.cancelDelegationToken(metastoreTokenStrForm);
+ LOG.debug("Cancelled token for jobId=" + id + " status from JT=" + jobStatus);
+ DelegationTokenCache.getStringFormTokenCache().removeDelegationToken(id);
+ }
+ }
+ }
+ catch(Exception ex) {
+ LOG.warn("Failed to cancel metastore delegation token for jobId=" + id, ex);
+ }
+ finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
}
}
@@ -90,8 +121,7 @@ public class CompleteDelegator extends T
* finished. If the url has the string $jobId in it, it will be
* replaced with the completed jobid.
*/
- public static void doCallback(String jobid, String url)
- throws MalformedURLException, IOException {
+ public static void doCallback(String jobid, String url) throws IOException {
if (url.contains("$jobId"))
url = url.replace("$jobId", jobid);
TempletonUtils.fetchUrl(new URL(url));
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java Fri Oct 18 22:42:59 2013
@@ -65,6 +65,8 @@ public class HiveDelegator extends Launc
args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, enablelog));
args.add("--");
TempletonUtils.addCmdForWindows(args);
+ addHiveMetaStoreTokenArg();
+
args.add(appConf.hivePath());
args.add("--service");
@@ -111,9 +113,10 @@ public class HiveDelegator extends Launc
ArrayList<String> args = new ArrayList<String>();
ArrayList<String> allFiles = new ArrayList<String>();
- if (TempletonUtils.isset(srcFile))
+ if (TempletonUtils.isset(srcFile)) {
allFiles.add(TempletonUtils.hadoopFsFilename(srcFile, appConf,
runAs));
+ }
if (TempletonUtils.isset(otherFiles)) {
String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs);
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java Fri Oct 18 22:42:59 2013
@@ -42,14 +42,15 @@ public class JarDelegator extends Launch
public EnqueueBean run(String user, Map<String, Object> userArgs, String jar, String mainClass,
String libjars, String files,
List<String> jarArgs, List<String> defines,
- String statusdir, String callback, String completedUrl,
+ String statusdir, String callback,
+ boolean usehcatalog, String completedUrl,
boolean enablelog, JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
ExecuteException, IOException, InterruptedException {
runAs = user;
List<String> args = makeArgs(jar, mainClass,
libjars, files, jarArgs, defines,
- statusdir, completedUrl, enablelog, jobType);
+ statusdir, usehcatalog, completedUrl, enablelog, jobType);
return enqueueController(user, userArgs, callback, args);
}
@@ -57,23 +58,30 @@ public class JarDelegator extends Launch
private List<String> makeArgs(String jar, String mainClass,
String libjars, String files,
List<String> jarArgs, List<String> defines,
- String statusdir, String completedUrl,
+ String statusdir, boolean usehcatalog, String completedUrl,
boolean enablelog, JobType jobType)
throws BadParam, IOException, InterruptedException {
ArrayList<String> args = new ArrayList<String>();
try {
- ArrayList<String> allFiles = new ArrayList();
+ ArrayList<String> allFiles = new ArrayList<String>();
allFiles.add(TempletonUtils.hadoopFsFilename(jar, appConf, runAs));
args.addAll(makeLauncherArgs(appConf, statusdir,
completedUrl, allFiles, enablelog, jobType));
args.add("--");
TempletonUtils.addCmdForWindows(args);
+
+ //check if the rest command specified explicitly to use hcatalog
+ if(usehcatalog){
+ addHiveMetaStoreTokenArg();
+ }
+
args.add(appConf.clusterHadoop());
args.add("jar");
args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName());
- if (TempletonUtils.isset(mainClass))
+ if (TempletonUtils.isset(mainClass)) {
args.add(mainClass);
+ }
if (TempletonUtils.isset(libjars)) {
String libjarsListAsString =
TempletonUtils.hadoopFsListAsString(libjars, appConf, runAs);
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java Fri Oct 18 22:42:59 2013
@@ -24,10 +24,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.commons.exec.ExecuteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@@ -44,7 +44,8 @@ import org.apache.hive.hcatalog.templeto
public class LauncherDelegator extends TempletonDelegator {
private static final Log LOG = LogFactory.getLog(LauncherDelegator.class);
protected String runAs = null;
- static public enum JobType {JAR, STREAMING, PIG, HIVE};
+ static public enum JobType {JAR, STREAMING, PIG, HIVE}
+ private boolean secureMeatastoreAccess = false;
public LauncherDelegator(AppConfig appConf) {
super(appConf);
@@ -70,7 +71,7 @@ public class LauncherDelegator extends T
*/
public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback,
List<String> args)
- throws NotAuthorizedException, BusyException, ExecuteException,
+ throws NotAuthorizedException, BusyException,
IOException, QueueException {
try {
UserGroupInformation ugi = UgiFactory.getUgi(user);
@@ -82,9 +83,10 @@ public class LauncherDelegator extends T
long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6));
LOG.debug("queued job " + id + " in " + elapsed + " ms");
- if (id == null)
+ if (id == null) {
throw new QueueException("Unable to get job id");
-
+ }
+
registerJob(id, user, callback, userArgs);
return new EnqueueBean(id);
@@ -95,16 +97,14 @@ public class LauncherDelegator extends T
private String queueAsUser(UserGroupInformation ugi, final List<String> args)
throws IOException, InterruptedException {
- String id = ugi.doAs(new PrivilegedExceptionAction<String>() {
+ return ugi.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws Exception {
String[] array = new String[args.size()];
- TempletonControllerJob ctrl = new TempletonControllerJob();
+ TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess);
ToolRunner.run(ctrl, args.toArray(array));
return ctrl.getSubmittedId();
}
});
-
- return id;
}
public List<String> makeLauncherArgs(AppConfig appConf, String statusdir,
@@ -182,8 +182,9 @@ public class LauncherDelegator extends T
*/
public static String makeOverrideClasspath(AppConfig appConf) {
String[] overrides = appConf.overrideJars();
- if (overrides == null)
+ if (overrides == null) {
return null;
+ }
ArrayList<String> cp = new ArrayList<String>();
for (String fname : overrides) {
@@ -204,5 +205,18 @@ public class LauncherDelegator extends T
args.add(name + "=" + val);
}
}
-
+ /**
+ * This is called by subclasses when they determined that the sumbmitted job requires
+ * metastore access (e.g. Pig job that uses HCatalog). This then determines if
+ * secure access is required and causes TempletonControllerJob to set up a delegation token.
+ * @see TempletonControllerJob
+ */
+ void addHiveMetaStoreTokenArg() {
+ //in order for this to work hive-site.xml must be on the classpath
+ HiveConf hiveConf = new HiveConf();
+ if(!hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL)) {
+ return;
+ }
+ secureMeatastoreAccess = true;
+ }
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java Fri Oct 18 22:42:59 2013
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
import org.apache.hive.hcatalog.templeton.tool.TempletonUtils;
@@ -36,6 +38,7 @@ import org.apache.hive.hcatalog.templeto
* This is the backend of the pig web service.
*/
public class PigDelegator extends LauncherDelegator {
+ private static final Log LOG = LogFactory.getLog(PigDelegator.class);
public PigDelegator(AppConfig appConf) {
super(appConf);
}
@@ -43,27 +46,43 @@ public class PigDelegator extends Launch
public EnqueueBean run(String user, Map<String, Object> userArgs,
String execute, String srcFile,
List<String> pigArgs, String otherFiles,
- String statusdir, String callback, String completedUrl, boolean enablelog)
+ String statusdir, String callback,
+ boolean usehcatalog, String completedUrl, boolean enablelog)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
ExecuteException, IOException, InterruptedException {
runAs = user;
List<String> args = makeArgs(execute,
srcFile, pigArgs,
- otherFiles, statusdir, completedUrl, enablelog);
+ otherFiles, statusdir, usehcatalog, completedUrl, enablelog);
return enqueueController(user, userArgs, callback, args);
}
+ /**
+ * @param execute pig query string to be executed
+ * @param srcFile pig query file to be executed
+ * @param pigArgs pig command line arguments
+ * @param otherFiles files to be copied to the map reduce cluster
+ * @param statusdir status dir location
+ * @param usehcatalog whether the command uses hcatalog/needs to connect
+ * to hive metastore server
+ * @param completedUrl call back url
+ * @return
+ * @throws BadParam
+ * @throws IOException
+ * @throws InterruptedException
+ */
private List<String> makeArgs(String execute, String srcFile,
List<String> pigArgs, String otherFiles,
- String statusdir, String completedUrl, boolean enablelog)
+ String statusdir, boolean usehcatalog,
+ String completedUrl, boolean enablelog)
throws BadParam, IOException, InterruptedException {
ArrayList<String> args = new ArrayList<String>();
try {
ArrayList<String> allFiles = new ArrayList<String>();
- if (TempletonUtils.isset(srcFile))
- allFiles.add(TempletonUtils.hadoopFsFilename
- (srcFile, appConf, runAs));
+ if (TempletonUtils.isset(srcFile)) {
+ allFiles.add(TempletonUtils.hadoopFsFilename(srcFile, appConf, runAs));
+ }
if (TempletonUtils.isset(otherFiles)) {
String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs);
allFiles.addAll(Arrays.asList(ofs));
@@ -85,6 +104,12 @@ public class PigDelegator extends Launch
for (String pigArg : pigArgs) {
args.add(TempletonUtils.quoteForWindows(pigArg));
}
+ //check if the REST command specified explicitly to use hcatalog
+ // or if it says that implicitly using the pig -useHCatalog arg
+ if(usehcatalog || hasPigArgUseHcat(pigArgs)){
+ addHiveMetaStoreTokenArg();
+ }
+
if (TempletonUtils.isset(execute)) {
args.add("-execute");
args.add(TempletonUtils.quoteForWindows(execute));
@@ -101,4 +126,12 @@ public class PigDelegator extends Launch
return args;
}
+
+ /**
+ * Check if the pig arguments has -useHCatalog set
+ * see http://hive.apache.org/docs/hcat_r0.5.0/loadstore.pdf
+ */
+ private boolean hasPigArgUseHcat(List<String> pigArgs) {
+ return pigArgs.contains("-useHCatalog");
+ }
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java Fri Oct 18 22:42:59 2013
@@ -33,10 +33,10 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.io.Text;
-import org.apache.thrift.TException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.thrift.TException;
/**
* Helper class to run jobs using Kerberos security. Always safe to
@@ -44,8 +44,8 @@ import org.apache.hadoop.security.token.
*/
public class SecureProxySupport {
private Path tokenPath;
- private final String HCAT_SERVICE = "hcat";
- private boolean isEnabled;
+ public static final String HCAT_SERVICE = "hcat";
+ private final boolean isEnabled;
private String user;
public SecureProxySupport() {
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java Fri Oct 18 22:42:59 2013
@@ -186,8 +186,9 @@ public class Server {
verifyDdlParam(db, ":db");
HcatDelegator d = new HcatDelegator(appConf, execService);
- if (!TempletonUtils.isset(tablePattern))
+ if (!TempletonUtils.isset(tablePattern)) {
tablePattern = "*";
+ }
return d.listTables(getDoAsUser(), db, tablePattern);
}
@@ -252,10 +253,12 @@ public class Server {
verifyDdlParam(table, ":table");
HcatDelegator d = new HcatDelegator(appConf, execService);
- if ("extended".equals(format))
+ if ("extended".equals(format)) {
return d.descExtendedTable(getDoAsUser(), db, table);
- else
+ }
+ else {
return d.descTable(getDoAsUser(), db, table, false);
+ }
}
/**
@@ -455,8 +458,9 @@ public class Server {
verifyUser();
HcatDelegator d = new HcatDelegator(appConf, execService);
- if (!TempletonUtils.isset(dbPattern))
+ if (!TempletonUtils.isset(dbPattern)) {
dbPattern = "*";
+ }
return d.listDatabases(getDoAsUser(), dbPattern);
}
@@ -508,8 +512,9 @@ public class Server {
BadParam, ExecuteException, IOException {
verifyUser();
verifyDdlParam(db, ":db");
- if (TempletonUtils.isset(option))
+ if (TempletonUtils.isset(option)) {
verifyDdlParam(option, "option");
+ }
HcatDelegator d = new HcatDelegator(appConf, execService);
return d.dropDatabase(getDoAsUser(), db, ifExists, option,
group, permissions);
@@ -579,6 +584,7 @@ public class Server {
/**
* Run a MapReduce Streaming job.
+ * @param callback URL which WebHCat will call when the hive job finishes
*/
@POST
@Path("mapreduce/streaming")
@@ -628,6 +634,11 @@ public class Server {
/**
* Run a MapReduce Jar job.
+ * Params correspond to the REST api params
+ * @param usehcatalog if {@code true}, means the Jar uses HCat and thus needs to access
+ * metastore, which requires additional steps for WebHCat to perform in a secure cluster.
+ * @param callback URL which WebHCat will call when the hive job finishes
+ * @see org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob
*/
@POST
@Path("mapreduce/jar")
@@ -640,6 +651,7 @@ public class Server {
@FormParam("define") List<String> defines,
@FormParam("statusdir") String statusdir,
@FormParam("callback") String callback,
+ @FormParam("usehcatalog") boolean usehcatalog,
@FormParam("enablelog") boolean enablelog)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
ExecuteException, IOException, InterruptedException {
@@ -665,11 +677,18 @@ public class Server {
return d.run(getDoAsUser(), userArgs,
jar, mainClass,
libjars, files, args, defines,
- statusdir, callback, getCompletedUrl(), enablelog, JobType.JAR);
+ statusdir, callback, usehcatalog, getCompletedUrl(), enablelog, JobType.JAR);
}
/**
* Run a Pig job.
+ * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usehcatalog},
+ * is interpreted as true.
+ * @param usehcatalog if {@code true}, means the Pig script uses HCat and thus needs to access
+ * metastore, which requires additional steps for WebHCat to perform in a secure cluster.
+ * This does nothing to ensure that Pig is installed on target node in the cluster.
+ * @param callback URL which WebHCat will call when the hive job finishes
+ * @see org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob
*/
@POST
@Path("pig")
@@ -680,12 +699,14 @@ public class Server {
@FormParam("files") String otherFiles,
@FormParam("statusdir") String statusdir,
@FormParam("callback") String callback,
+ @FormParam("usehcatalog") boolean usehcatalog,
@FormParam("enablelog") boolean enablelog)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
ExecuteException, IOException, InterruptedException {
verifyUser();
- if (execute == null && srcFile == null)
+ if (execute == null && srcFile == null) {
throw new BadParam("Either execute or file parameter required");
+ }
//add all function arguments to a map
Map<String, Object> userArgs = new HashMap<String, Object>();
@@ -704,7 +725,7 @@ public class Server {
return d.run(getDoAsUser(), userArgs,
execute, srcFile,
pigArgs, otherFiles,
- statusdir, callback, getCompletedUrl(), enablelog);
+ statusdir, callback, usehcatalog, getCompletedUrl(), enablelog);
}
/**
@@ -719,7 +740,7 @@ public class Server {
* used in "add jar" statement in hive script
* @param defines shortcut for command line arguments "--define"
* @param statusdir where the stderr/stdout of templeton controller job goes
- * @param callback callback url when the hive job finishes
+ * @param callback URL which WebHCat will call when the hive job finishes
* @param enablelog whether to collect mapreduce log into statusdir/logs
*/
@POST
@@ -736,8 +757,9 @@ public class Server {
throws NotAuthorizedException, BusyException, BadParam, QueueException,
ExecuteException, IOException, InterruptedException {
verifyUser();
- if (execute == null && srcFile == null)
+ if (execute == null && srcFile == null) {
throw new BadParam("Either execute or file parameter required");
+ }
//add all function arguments to a map
Map<String, Object> userArgs = new HashMap<String, Object>();
@@ -874,10 +896,12 @@ public class Server {
@GET
@Path("internal/complete/{jobid}")
@Produces({MediaType.APPLICATION_JSON})
- public CompleteBean completeJob(@PathParam("jobid") String jobid)
+ public CompleteBean completeJob(@PathParam("jobid") String jobid,
+ @QueryParam("status") String jobStatus)
throws CallbackFailedException, IOException {
+ LOG.debug("Received callback " + theUriInfo.getRequestUri());
CompleteDelegator d = new CompleteDelegator(appConf);
- return d.run(jobid);
+ return d.run(jobid, jobStatus);
}
/**
@@ -887,8 +911,9 @@ public class Server {
String requestingUser = getRequestingUser();
if (requestingUser == null) {
String msg = "No user found.";
- if (!UserGroupInformation.isSecurityEnabled())
+ if (!UserGroupInformation.isSecurityEnabled()) {
msg += " Missing " + PseudoAuthenticator.USER_NAME + " parameter.";
+ }
throw new NotAuthorizedException(msg);
}
if(doAs != null && !doAs.equals(requestingUser)) {
@@ -897,9 +922,10 @@ public class Server {
ProxyUserSupport.validate(requestingUser, getRequestingHost(requestingUser, request), doAs);
}
}
+
/**
* All 'tasks' spawned by WebHCat should be run as this user. W/o doAs query parameter
- * this is just the user making the request (or
+ * this is just the user making the request (or
* {@link org.apache.hadoop.security.authentication.client.PseudoAuthenticator#USER_NAME}
* query param).
* @return value of doAs query parameter or {@link #getRequestingUser()}
@@ -912,8 +938,9 @@ public class Server {
*/
public void verifyParam(String param, String name)
throws BadParam {
- if (param == null)
+ if (param == null) {
throw new BadParam("Missing " + name + " parameter");
+ }
}
/**
@@ -921,8 +948,9 @@ public class Server {
*/
public void verifyParam(List<String> param, String name)
throws BadParam {
- if (param == null || param.isEmpty())
+ if (param == null || param.isEmpty()) {
throw new BadParam("Missing " + name + " parameter");
+ }
}
public static final Pattern DDL_ID = Pattern.compile("[a-zA-Z]\\w*");
@@ -937,8 +965,9 @@ public class Server {
throws BadParam {
verifyParam(param, name);
Matcher m = DDL_ID.matcher(param);
- if (!m.matches())
+ if (!m.matches()) {
throw new BadParam("Invalid DDL identifier " + name);
+ }
}
/**
* Get the user name from the security context, i.e. the user making the HTTP request.
@@ -946,10 +975,12 @@ public class Server {
* value of user.name query param, in kerberos mode it's the kinit'ed user.
*/
private String getRequestingUser() {
- if (theSecurityContext == null)
+ if (theSecurityContext == null) {
return null;
- if (theSecurityContext.getUserPrincipal() == null)
+ }
+ if (theSecurityContext.getUserPrincipal() == null) {
return null;
+ }
//map hue/foo.bar@something.com->hue since user group checks
// and config files are in terms of short name
return UserGroupInformation.createRemoteUser(
@@ -960,16 +991,18 @@ public class Server {
* The callback url on this server when a task is completed.
*/
public String getCompletedUrl() {
- if (theUriInfo == null)
+ if (theUriInfo == null) {
return null;
- if (theUriInfo.getBaseUri() == null)
+ }
+ if (theUriInfo.getBaseUri() == null) {
return null;
+ }
return theUriInfo.getBaseUri() + VERSION
- + "/internal/complete/$jobId";
+ + "/internal/complete/$jobId?status=$jobStatus";
}
/**
- * Returns canonical host name from which the request is made; used for doAs validation
+ * Returns canonical host name from which the request is made; used for doAs validation
*/
private static String getRequestingHost(String requestingUser, HttpServletRequest request) {
final String unkHost = "???";
@@ -998,7 +1031,7 @@ public class Server {
}
private void checkEnableLogPrerequisite(boolean enablelog, String statusdir) throws BadParam {
- if (enablelog == true && !TempletonUtils.isset(statusdir))
+ if (enablelog && !TempletonUtils.isset(statusdir))
throw new BadParam("enablelog is only applicable when statusdir is set");
}
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java Fri Oct 18 22:42:59 2013
@@ -58,7 +58,7 @@ public class StreamingDelegator extends
return d.run(user, userArgs,
appConf.streamingJar(), null,
null, files, args, defines,
- statusdir, callback, completedUrl, enableLog, jobType);
+ statusdir, callback, false, completedUrl, enableLog, jobType);
}
private List<String> makeArgs(List<String> inputs,
Added: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java?rev=1533658&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java (added)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java Fri Oct 18 22:42:59 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton.tool;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/*
+ * Cache of delegation tokens. When {@link TempletonControllerJob} submits a job that requires
+ * metastore access and this access should be secure, TCJ will add a delegation token to the
+ * submitted job. When the job completes we need to cancel the token since by default the token
+ * lives for 7 days and over time can cause OOM (if not cancelled). Cancelling from
+ * TempletonControllerJob.LauchMapper mapper (via custom OutputCommitter for example) requires
+ * the jar containing HiveMetastoreClient (and any dependent jars) to be available on the node
+ * running LaunchMapper. Specifying transitive closure of the necessary jars is
+ * configuration/maintenance headache for each release. Caching the token means cancellation is
+ * done from WebHCat server and thus has Hive jars on the classpath.
+ *
+ * While it's possible that WebHCat crashes and looses this in-memory state, but this would be an
+ * exceptional condition and since tokens will automatically be cancelled after 7 days,
+ * the fact that this info is not persisted is OK. (Persisting it also complicates things
+ * because that needs to be done securely)
+ * @see TempletonControllerJob
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DelegationTokenCache<JobId, TokenObject> {
+ private ConcurrentHashMap<JobId, TokenObject> tokenCache =
+ new ConcurrentHashMap<JobId, TokenObject>();
+ private static final DelegationTokenCache<String, String> stringFormTokenCache =
+ new DelegationTokenCache<String, String>();
+
+ /*
+ * Returns the singleton instance of jobId->delegation-token-in-string-form cache
+ */
+ public static DelegationTokenCache<String, String> getStringFormTokenCache() {
+ return stringFormTokenCache;
+ }
+ TokenObject storeDelegationToken(JobId jobId, TokenObject token) {
+ return tokenCache.put(jobId, token);
+ }
+ public TokenObject getDelegationToken(JobId jobId) {
+ return tokenCache.get(jobId);
+ }
+ public void removeDelegationToken(JobId jobId) {
+ tokenCache.remove(jobId);
+ }
+}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java?rev=1533658&r1=1533657&r2=1533658&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java Fri Oct 18 22:42:59 2013
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -41,6 +42,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
@@ -55,9 +58,11 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.templeton.BadParam;
import org.apache.hive.hcatalog.templeton.LauncherDelegator;
+import org.apache.hive.hcatalog.templeton.SecureProxySupport;
+import org.apache.hive.hcatalog.templeton.UgiFactory;
+import org.apache.thrift.TException;
/**
* A Map Reduce job that will start another job.
@@ -70,6 +75,13 @@ import org.apache.hive.hcatalog.templeto
* - run a keep alive thread so the job doesn't end.
* - Optionally, store the stdout, stderr, and exit value of the child
* in hdfs files.
+ *
+ * A note on security. When jobs are submitted through WebHCat that use HCatalog, it means that
+ * metastore access is required. Hive queries, of course, need metastore access. This in turn
+ * requires delegation token to be obtained for metastore in a <em>secure cluster</em>. Since we
+ * can't usually parse the job to find out if it is using metastore, we require 'usehcatalog'
+ * parameter supplied in the REST call. WebHcat takes care of cancelling the token when the job
+ * is complete.
*/
public class TempletonControllerJob extends Configured implements Tool {
public static final String COPY_NAME = "templeton.copy";
@@ -89,12 +101,19 @@ public class TempletonControllerJob exte
public static final String TOKEN_FILE_ARG_PLACEHOLDER
= "__WEBHCAT_TOKEN_FILE_LOCATION__";
-
private static TrivialExecService execService = TrivialExecService.getInstance();
private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class);
+ private final boolean secureMetastoreAccess;
-
+ /**
+ * @param secureMetastoreAccess - if true, a delegation token will be created
+ * and added to the job
+ */
+ public TempletonControllerJob(boolean secureMetastoreAccess) {
+ super();
+ this.secureMetastoreAccess = secureMetastoreAccess;
+ }
public static class LaunchMapper
extends Mapper<NullWritable, NullWritable, Text, Text> {
protected Process startJob(Context context, String user,
@@ -194,8 +213,9 @@ public class TempletonControllerJob exte
proc.waitFor();
keepAlive.sendReport = false;
pool.shutdown();
- if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS))
+ if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) {
pool.shutdownNow();
+ }
writeExitValue(conf, proc.exitValue(), statusdir);
JobState state = new JobState(context.getJobID().toString(), conf);
@@ -210,11 +230,13 @@ public class TempletonControllerJob exte
logRetriever.run();
}
- if (proc.exitValue() != 0)
+ if (proc.exitValue() != 0) {
System.err.println("templeton: job failed with exit code "
+ proc.exitValue());
- else
+ }
+ else {
System.err.println("templeton: job completed with exit code 0");
+ }
}
private void executeWatcher(ExecutorService pool, Configuration conf,
@@ -248,10 +270,10 @@ public class TempletonControllerJob exte
}
private static class Watcher implements Runnable {
- private InputStream in;
+ private final InputStream in;
private OutputStream out;
- private JobID jobid;
- private Configuration conf;
+ private final JobID jobid;
+ private final Configuration conf;
public Watcher(Configuration conf, JobID jobid, InputStream in,
String statusdir, String name)
@@ -341,21 +363,26 @@ public class TempletonControllerJob exte
private JobID submittedJobId;
public String getSubmittedId() {
- if (submittedJobId == null)
+ if (submittedJobId == null) {
return null;
- else
+ }
+ else {
return submittedJobId.toString();
+ }
}
/**
* Enqueue the job and print out the job id for later collection.
+ * @see org.apache.hive.hcatalog.templeton.CompleteDelegator
*/
@Override
public int run(String[] args)
- throws IOException, InterruptedException, ClassNotFoundException {
+ throws IOException, InterruptedException, ClassNotFoundException, TException {
Configuration conf = getConf();
+
conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));
- conf.set("user.name", UserGroupInformation.getCurrentUser().getShortUserName());
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ conf.set("user.name", user);
Job job = new Job(conf);
job.setJarByClass(TempletonControllerJob.class);
job.setJobName("TempletonControllerJob");
@@ -363,8 +390,7 @@ public class TempletonControllerJob exte
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(SingleInputFormat.class);
- NullOutputFormat<NullWritable, NullWritable> of
- = new NullOutputFormat<NullWritable, NullWritable>();
+ NullOutputFormat<NullWritable, NullWritable> of = new NullOutputFormat<NullWritable, NullWritable>();
job.setOutputFormatClass(of.getClass());
job.setNumReduceTasks(0);
@@ -372,18 +398,51 @@ public class TempletonControllerJob exte
Token<DelegationTokenIdentifier> mrdt = jc.getDelegationToken(new Text("mr token"));
job.getCredentials().addToken(new Text("mr token"), mrdt);
+
+ String metastoreTokenStrForm = addHMSToken(job, user);
+
job.submit();
submittedJobId = job.getJobID();
+ if(metastoreTokenStrForm != null) {
+ //so that it can be cancelled later from CompleteDelegator
+ DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(
+ submittedJobId.toString(), metastoreTokenStrForm);
+ LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " +
+ "user=" + user);
+ }
return 0;
}
-
-
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new TempletonControllerJob(), args);
- if (ret != 0)
- System.err.println("TempletonControllerJob failed!");
- System.exit(ret);
+ private String addHMSToken(Job job, String user) throws IOException, InterruptedException,
+ TException {
+ if(!secureMetastoreAccess) {
+ return null;
+ }
+ Token<org.apache.hadoop.hive.thrift.DelegationTokenIdentifier> hiveToken =
+ new Token<org.apache.hadoop.hive.thrift.DelegationTokenIdentifier>();
+ String metastoreTokenStrForm = buildHcatDelegationToken(user);
+ hiveToken.decodeFromUrlString(metastoreTokenStrForm);
+ job.getCredentials().addToken(new
+ Text(SecureProxySupport.HCAT_SERVICE), hiveToken);
+ return metastoreTokenStrForm;
+ }
+ private String buildHcatDelegationToken(String user) throws IOException, InterruptedException,
+ TException {
+ final HiveConf c = new HiveConf();
+ LOG.debug("Creating hive metastore delegation token for user " + user);
+ final UserGroupInformation ugi = UgiFactory.getUgi(user);
+ UserGroupInformation real = ugi.getRealUser();
+ return real.doAs(new PrivilegedExceptionAction<String>() {
+ public String run() throws IOException, TException, InterruptedException {
+ final HiveMetaStoreClient client = new HiveMetaStoreClient(c);
+ return ugi.doAs(new PrivilegedExceptionAction<String>() {
+ public String run() throws IOException, TException, InterruptedException {
+ String u = ugi.getUserName();
+ return client.getDelegationToken(u);
+ }
+ });
+ }
+ });
}
}