You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2013/10/25 19:42:07 UTC
svn commit: r1535796 - in /hive/trunk: hcatalog/src/test/e2e/templeton/
hcatalog/src/test/e2e/templeton/drivers/
hcatalog/src/test/e2e/templeton/tests/ hcatalog/webhcat/svr/src/main/bin/
hcatalog/webhcat/svr/src/main/config/ hcatalog/webhcat/svr/src/ma...
Author: thejas
Date: Fri Oct 25 17:42:07 2013
New Revision: 1535796
URL: http://svn.apache.org/r1535796
Log:
HIVE-5511 : percentComplete returned by job status from WebHCat is null (Eugene Koifman via Thejas Nair)
Added:
hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
Modified:
hive/trunk/hcatalog/src/test/e2e/templeton/README.txt
hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
Modified: hive/trunk/hcatalog/src/test/e2e/templeton/README.txt
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/README.txt?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/README.txt (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/README.txt Fri Oct 25 17:42:07 2013
@@ -141,8 +141,10 @@ In order for this test suite to work, we
and webhcat.proxyuser.hue.hosts defined, i.e. 'hue' should be allowed to impersonate 'joe'.
[Of course, 'hcat' proxyuser should be configured in core-site.xml for the command to succeed.]
-Furthermore, metastore side file based security should be enabled. To do this 3 properties in
-hive-site.xml should be configured:
+Furthermore, metastore side file based security should be enabled.
+(See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-MetastoreServerSecurity for more info)
+
+To do this 3 properties in hive-site.xml should be configured:
1) hive.security.metastore.authorization.manager set to
org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider
2) hive.security.metastore.authenticator.manager set to
Modified: hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Fri Oct 25 17:42:07 2013
@@ -788,7 +788,8 @@ sub compare
if ( (defined $testCmd->{'check_job_created'})
|| (defined $testCmd->{'check_job_complete'})
- || (defined $testCmd->{'check_job_exit_value'}) ) {
+ || (defined $testCmd->{'check_job_exit_value'})
+ || (defined $testCmd->{'check_job_percent_complete'}) ) {
my $jobid = $json_hash->{'id'};
if (!defined $jobid) {
print $log "$0::$subName WARN check failed: "
@@ -803,7 +804,8 @@ sub compare
. "jobresult not defined ";
$result = 0;
}
- if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) {
+ if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})
+ || defined($testCmd->{'check_job_percent_complete'})) {
my $jobComplete;
my $NUM_RETRIES = 60;
my $SLEEP_BETWEEN_RETRIES = 5;
@@ -841,6 +843,15 @@ sub compare
$result = 0;
}
}
+ # check the percentComplete value
+ if (defined($testCmd->{'check_job_percent_complete'})) {
+ my $pcValue = $res_hash->{'percentComplete'};
+ my $expectedPercentComplete = $testCmd->{'check_job_percent_complete'};
+ if ( (!defined $pcValue) || $pcValue ne $expectedPercentComplete ) {
+ print $log "check_job_percent_complete failed. got percentComplete $pcValue, expected $expectedPercentComplete";
+ $result = 0;
+ }
+ }
}
#Check userargs
Modified: hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Fri Oct 25 17:42:07 2013
@@ -73,6 +73,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_percent_complete' => 'map 100% reduce 100%',
'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -166,6 +167,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_percent_complete' => '100% complete',
'check_job_exit_value' => 0,
'check_call_back' => 1,
},
@@ -386,6 +388,7 @@ $cfg =
'status_code' => 200,
'check_job_created' => 1,
'check_job_complete' => 'SUCCESS',
+ 'check_job_percent_complete' => 'map 100% reduce 100%',
'check_job_exit_value' => 0,
'check_call_back' => 1,
Modified: hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf Fri Oct 25 17:42:07 2013
@@ -54,7 +54,9 @@ $cfg =
},
{
#-ve test - no input file
- 'num' => 2,
+ #TempletonController job status should be success, but exit value should be 1
+ #if yarn log is redirected to stderr check_job_complete is FAILURE, if not SUCCESS (HIVE-5511)
+ 'num' => 2,
'method' => 'POST',
'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming',
'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts',
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh Fri Oct 25 17:42:07 2013
@@ -75,7 +75,7 @@ elif [ -e "${WEBHCAT_PREFIX}/conf/webhca
else
DEFAULT_CONF_DIR="/etc/webhcat"
fi
-WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}"
+export WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}"
#users can add various env vars to webhcat-env.sh in the conf
#rather than having to export them before running the command
Added: hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties?rev=1535796&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties (added)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties Fri Oct 25 17:42:07 2013
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+#
+# This log4j config overrides hadoop-yarn-server-nodemanager-2.1.0-beta.jar/container-log4j.properties.
+#In Hadoop 2, (by default) the log information about M/R job progress is not sent to stderr,
+#which is where LaunchMapper expects it. Thus WebHCat is unable to report the
+#percentComplete attribute in job status. There is something broken in YARN that doesn't allow
+#its log4j properties to be overridden. Thus for now (10/07/2013) we resort to overriding it
+#using this file, where log4j.rootLogger specify additional 'console' appender. This file is made
+#available through DistributedCache. See TrivialExecService and TempletonControllerJob for more
+#info.
+
+hadoop.root.logger=INFO,CLA
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, console, EventCounter
+
+# Logging Threshold
+log4j.threshold=ALL
+
+#
+# ContainerLog Appender
+#
+
+#Default values
+yarn.app.container.log.dir=null
+yarn.app.container.log.filesize=100
+
+log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender
+log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
+log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize}
+
+log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
+
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java Fri Oct 25 17:42:07 2013
@@ -71,6 +71,7 @@ public class AppConfig extends Configura
};
public static final String TEMPLETON_HOME_VAR = "TEMPLETON_HOME";
+ public static final String WEBHCAT_CONF_DIR = "WEBHCAT_CONF_DIR";
public static final String[] TEMPLETON_CONF_FILENAMES = {
"webhcat-default.xml",
@@ -153,6 +154,9 @@ public class AppConfig extends Configura
public static String getTempletonDir() {
return System.getenv(TEMPLETON_HOME_VAR);
}
+ public static String getWebhcatConfDir() {
+ return System.getenv(WEBHCAT_CONF_DIR);
+ }
private boolean loadOneFileConfig(String dir, String fname) {
if (dir != null) {
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java Fri Oct 25 17:42:07 2013
@@ -68,7 +68,7 @@ public class CompleteDelegator extends T
try {
state = new JobState(id, Main.getAppConfigInstance());
if (state.getCompleteStatus() == null)
- failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null);
+ failed("Job not yet complete. jobId=" + id + " Status from JobTracker=" + jobStatus, null);
Long notified = state.getNotifiedTime();
if (notified != null) {
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java Fri Oct 25 17:42:07 2013
@@ -91,6 +91,12 @@ public class HDFSStorage implements Temp
BufferedReader in = null;
Path p = new Path(getPath(type) + "/" + id + "/" + key);
try {
+ if(!fs.exists(p)) {
+ //check first, otherwise webhcat.log is full of stack traces from FileSystem when
+ //clients check for status ('exitValue', 'completed', etc.)
+ LOG.debug(p + " does not exist.");
+ return null;
+ }
in = new BufferedReader(new InputStreamReader(fs.open(p)));
String line = null;
String val = "";
@@ -102,9 +108,7 @@ public class HDFSStorage implements Temp
}
return val;
} catch (Exception e) {
- //don't print stack trace since clients poll for 'exitValue', 'completed',
- //files which are not there until job completes
- LOG.info("Couldn't find " + p + ": " + e.getMessage());
+ LOG.error("Couldn't find " + p + ": " + e.getMessage(), e);
} finally {
close(in);
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java Fri Oct 25 17:42:07 2013
@@ -32,6 +32,6 @@ class HiveJobIDParser extends JobIDParse
@Override
List<String> parseJobID() throws IOException {
- return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+ return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
}
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java Fri Oct 25 17:42:07 2013
@@ -32,7 +32,7 @@ class JarJobIDParser extends JobIDParser
@Override
List<String> parseJobID() throws IOException {
- return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+ return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
}
}
Added: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java?rev=1535796&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java (added)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java Fri Oct 25 17:42:07 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton.tool;
+
+public interface JobSubmissionConstants {
+ public static final String COPY_NAME = "templeton.copy";
+ public static final String STATUSDIR_NAME = "templeton.statusdir";
+ public static final String ENABLE_LOG = "templeton.enablelog";
+ public static final String JOB_TYPE = "templeton.jobtype";
+ public static final String JAR_ARGS_NAME = "templeton.args";
+ public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
+ public static final String OVERRIDE_CONTAINER_LOG4J_PROPS = "override.containerLog4j";
+ //name of file
+ static final String CONTAINER_LOG4J_PROPS = "override-container-log4j.properties";
+ public static final String STDOUT_FNAME = "stdout";
+ public static final String STDERR_FNAME = "stderr";
+ public static final String EXIT_FNAME = "exit";
+ public static final int WATCHER_TIMEOUT_SECS = 10;
+ public static final int KEEP_ALIVE_MSEC = 60 * 1000;
+ public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__";
+}
Added: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java?rev=1535796&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java (added)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java Fri Oct 25 17:42:07 2013
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton.tool;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Shell;
+import org.apache.hive.hcatalog.templeton.BadParam;
+import org.apache.hive.hcatalog.templeton.LauncherDelegator;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Note that this class is used in a different JVM than WebHCat server. Thus it should not call
+ * any classes not available on every node in the cluster (outside webhcat jar).
+ * TempletonControllerJob#run() calls Job.setJarByClass(LaunchMapper.class) which
+ * causes webhcat jar to be shipped to target node, but not it's transitive closure.
+ * Long term we need to clean up this separation and create a separate jar to ship so that the
+ * dependencies are clear. (This used to be an inner class of TempletonControllerJob)
+ */
+@InterfaceAudience.Private
+public class LaunchMapper extends Mapper<NullWritable, NullWritable, Text, Text> implements
+ JobSubmissionConstants {
+ /**
+ * This class currently sends everything to stderr, but it should probably use Log4J -
+ * it will end up in 'syslog' of this Map task. For example, look for KeepAlive heartbeat msgs.
+ */
+ private static final Log LOG = LogFactory.getLog(LaunchMapper.class);
+
+
+ protected Process startJob(Context context, String user, String overrideClasspath)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ copyLocal(COPY_NAME, conf);
+ String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
+
+ ArrayList<String> removeEnv = new ArrayList<String>();
+ //todo: we really need some comments to explain exactly why each of these is removed
+ removeEnv.add("HADOOP_ROOT_LOGGER");
+ removeEnv.add("hadoop-command");
+ removeEnv.add("CLASS");
+ removeEnv.add("mapredcommand");
+ Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
+ overrideClasspath);
+ List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
+ String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+
+
+ if (tokenFile != null) {
+ //Token is available, so replace the placeholder
+ tokenFile = tokenFile.replaceAll("\"", "");
+ String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
+ if (Shell.WINDOWS) {
+ try {
+ tokenArg = TempletonUtils.quoteForWindows(tokenArg);
+ } catch (BadParam e) {
+ String msg = "cannot pass " + tokenFile + " to mapreduce.job.credentials.binary";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+ for(int i=0; i<jarArgsList.size(); i++){
+ String newArg =
+ jarArgsList.get(i).replace(TOKEN_FILE_ARG_PLACEHOLDER, tokenArg);
+ jarArgsList.set(i, newArg);
+ }
+
+ }else{
+ //No token, so remove the placeholder arg
+ Iterator<String> it = jarArgsList.iterator();
+ while(it.hasNext()){
+ String arg = it.next();
+ if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){
+ it.remove();
+ }
+ }
+ }
+ boolean overrideLog4jProps = conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS) == null ?
+ false : Boolean.valueOf(conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS));
+ return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env, overrideLog4jProps);
+ }
+
+ private void copyLocal(String var, Configuration conf) throws IOException {
+ String[] filenames = TempletonUtils.decodeArray(conf.get(var));
+ if (filenames != null) {
+ for (String filename : filenames) {
+ Path src = new Path(filename);
+ Path dst = new Path(src.getName());
+ FileSystem fs = src.getFileSystem(conf);
+ LOG.info("templeton: copy " + src + " => " + dst);
+ fs.copyToLocalFile(src, dst);
+ }
+ }
+ }
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+
+ Process proc = startJob(context,
+ conf.get("user.name"),
+ conf.get(OVERRIDE_CLASSPATH));
+
+ String statusdir = conf.get(STATUSDIR_NAME);
+
+ if (statusdir != null) {
+ try {
+ statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir,
+ conf.get("user.name"));
+ } catch (URISyntaxException e) {
+ String msg = "Invalid status dir URI";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+
+ Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
+ LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
+
+ ExecutorService pool = Executors.newCachedThreadPool();
+ executeWatcher(pool, conf, context.getJobID(),
+ proc.getInputStream(), statusdir, STDOUT_FNAME);
+ executeWatcher(pool, conf, context.getJobID(),
+ proc.getErrorStream(), statusdir, STDERR_FNAME);
+ KeepAlive keepAlive = startCounterKeepAlive(pool, context);
+
+ proc.waitFor();
+ keepAlive.sendReport = false;
+ pool.shutdown();
+ if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+
+ writeExitValue(conf, proc.exitValue(), statusdir);
+ JobState state = new JobState(context.getJobID().toString(), conf);
+ state.setExitValue(proc.exitValue());
+ state.setCompleteStatus("done");
+ state.close();
+
+ if (enablelog && TempletonUtils.isset(statusdir)) {
+ LOG.info("templeton: collecting logs for " + context.getJobID().toString()
+ + " to " + statusdir + "/logs");
+ LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf);
+ logRetriever.run();
+ }
+
+ if (proc.exitValue() != 0) {
+ LOG.info("templeton: job failed with exit code "
+ + proc.exitValue());
+ } else {
+ LOG.info("templeton: job completed with exit code 0");
+ }
+ }
+
+ private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in,
+ String statusdir, String name) throws IOException {
+ Watcher w = new Watcher(conf, jobid, in, statusdir, name);
+ pool.execute(w);
+ }
+
+ private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) throws IOException {
+ KeepAlive k = new KeepAlive(context);
+ pool.execute(k);
+ return k;
+ }
+
+ private void writeExitValue(Configuration conf, int exitValue, String statusdir)
+ throws IOException {
+ if (TempletonUtils.isset(statusdir)) {
+ Path p = new Path(statusdir, EXIT_FNAME);
+ FileSystem fs = p.getFileSystem(conf);
+ OutputStream out = fs.create(p);
+ LOG.info("templeton: Writing exit value " + exitValue + " to " + p);
+ PrintWriter writer = new PrintWriter(out);
+ writer.println(exitValue);
+ writer.close();
+ }
+ }
+
+
+ private static class Watcher implements Runnable {
+ private final InputStream in;
+ private OutputStream out;
+ private final JobID jobid;
+ private final Configuration conf;
+
+ public Watcher(Configuration conf, JobID jobid, InputStream in, String statusdir, String name)
+ throws IOException {
+ this.conf = conf;
+ this.jobid = jobid;
+ this.in = in;
+
+ if (name.equals(STDERR_FNAME)) {
+ out = System.err;
+ } else {
+ out = System.out;
+ }
+
+ if (TempletonUtils.isset(statusdir)) {
+ Path p = new Path(statusdir, name);
+ FileSystem fs = p.getFileSystem(conf);
+ out = fs.create(p);
+ LOG.info("templeton: Writing status to " + p);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ InputStreamReader isr = new InputStreamReader(in);
+ BufferedReader reader = new BufferedReader(isr);
+ PrintWriter writer = new PrintWriter(out);
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ writer.println(line);
+ JobState state = null;
+ try {
+ String percent = TempletonUtils.extractPercentComplete(line);
+ String childid = TempletonUtils.extractChildJobId(line);
+
+ if (percent != null || childid != null) {
+ state = new JobState(jobid.toString(), conf);
+ state.setPercentComplete(percent);
+ state.setChildId(childid);
+ }
+ } catch (IOException e) {
+ LOG.error("templeton: state error: ", e);
+ } finally {
+ if (state != null) {
+ try {
+ state.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+ }
+ writer.flush();
+ if(out != System.err && out != System.out) {
+ //depending on FileSystem implementation flush() may or may not do anything
+ writer.close();
+ }
+ } catch (IOException e) {
+ LOG.error("templeton: execute error: ", e);
+ }
+ }
+ }
+
+ private static class KeepAlive implements Runnable {
+ private final Context context;
+ private volatile boolean sendReport;
+
+ public KeepAlive(Context context)
+ {
+ this.sendReport = true;
+ this.context = context;
+ }
+ private static StringBuilder makeDots(int count) {
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < count; i++) {
+ sb.append('.');
+ }
+ return sb;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int count = 0;
+ while (sendReport) {
+ // Periodically report progress on the Context object
+ // to prevent TaskTracker from killing the Templeton
+ // Controller task
+ context.progress();
+ count++;
+ String msg = "KeepAlive Heart beat" + makeDots(count);
+ LOG.info(msg);
+ Thread.sleep(KEEP_ALIVE_MSEC);
+ }
+ } catch (InterruptedException e) {
+ // Ok to be interrupted
+ }
+ }
+ }
+}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java Fri Oct 25 17:42:07 2013
@@ -32,6 +32,6 @@ class PigJobIDParser extends JobIDParser
@Override
List<String> parseJobID() throws IOException {
- return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+ return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
}
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java Fri Oct 25 17:42:07 2013
@@ -18,23 +18,10 @@
*/
package org.apache.hive.hcatalog.templeton.tool;
-import java.io.BufferedReader;
+import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.URISyntaxException;
+import java.net.URI;
import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,24 +29,24 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
-import org.apache.hive.hcatalog.templeton.BadParam;
-import org.apache.hive.hcatalog.templeton.LauncherDelegator;
+import org.apache.hive.hcatalog.templeton.AppConfig;
+import org.apache.hive.hcatalog.templeton.Main;
import org.apache.hive.hcatalog.templeton.SecureProxySupport;
import org.apache.hive.hcatalog.templeton.UgiFactory;
import org.apache.thrift.TException;
@@ -83,281 +70,99 @@ import org.apache.thrift.TException;
* parameter supplied in the REST call. WebHcat takes care of cancelling the token when the job
* is complete.
*/
-public class TempletonControllerJob extends Configured implements Tool {
- public static final String COPY_NAME = "templeton.copy";
- public static final String STATUSDIR_NAME = "templeton.statusdir";
- public static final String ENABLE_LOG = "templeton.enablelog";
- public static final String JOB_TYPE = "templeton.jobtype";
- public static final String JAR_ARGS_NAME = "templeton.args";
- public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
-
- public static final String STDOUT_FNAME = "stdout";
- public static final String STDERR_FNAME = "stderr";
- public static final String EXIT_FNAME = "exit";
-
- public static final int WATCHER_TIMEOUT_SECS = 10;
- public static final int KEEP_ALIVE_MSEC = 60 * 1000;
-
- public static final String TOKEN_FILE_ARG_PLACEHOLDER
- = "__WEBHCAT_TOKEN_FILE_LOCATION__";
-
- private static TrivialExecService execService = TrivialExecService.getInstance();
-
+@InterfaceAudience.Private
+public class TempletonControllerJob extends Configured implements Tool, JobSubmissionConstants {
private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class);
- private final boolean secureMetastoreAccess;
+ //file to add to DistributedCache
+ private static URI overrideLog4jURI = null;
+ private static boolean overrideContainerLog4jProps;
+ //Jar cmd submission likely will be affected, Pig likely not
+ private static final String affectedMsg = "Monitoring of Hadoop jobs submitted through WebHCat " +
+ "may be affected.";
+ private static final String TMP_DIR_PROP = "hadoop.tmp.dir";
/**
- * @param secureMetastoreAccess - if true, a delegation token will be created
- * and added to the job
+ * Copy the file from local file system to tmp dir
*/
- public TempletonControllerJob(boolean secureMetastoreAccess) {
- super();
- this.secureMetastoreAccess = secureMetastoreAccess;
- }
- public static class LaunchMapper
- extends Mapper<NullWritable, NullWritable, Text, Text> {
- protected Process startJob(Context context, String user,
- String overrideClasspath)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- copyLocal(COPY_NAME, conf);
- String[] jarArgs
- = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
-
- ArrayList<String> removeEnv = new ArrayList<String>();
- removeEnv.add("HADOOP_ROOT_LOGGER");
- removeEnv.add("hadoop-command");
- removeEnv.add("CLASS");
- removeEnv.add("mapredcommand");
- Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
- overrideClasspath);
- List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
- String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
-
-
- if (tokenFile != null) {
- //Token is available, so replace the placeholder
- tokenFile = tokenFile.replaceAll("\"", "");
- String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
- if (Shell.WINDOWS) {
- try {
- tokenArg = TempletonUtils.quoteForWindows(tokenArg);
- } catch (BadParam e) {
- throw new IOException("cannot pass " + tokenFile + " to mapreduce.job.credentials.binary", e);
- }
- }
- for(int i=0; i<jarArgsList.size(); i++){
- String newArg =
- jarArgsList.get(i).replace(TOKEN_FILE_ARG_PLACEHOLDER, tokenArg);
- jarArgsList.set(i, newArg);
- }
-
- }else{
- //No token, so remove the placeholder arg
- Iterator<String> it = jarArgsList.iterator();
- while(it.hasNext()){
- String arg = it.next();
- if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){
- it.remove();
- }
- }
- }
- return execService.run(jarArgsList, removeEnv, env);
- }
-
- private void copyLocal(String var, Configuration conf)
- throws IOException {
- String[] filenames = TempletonUtils.decodeArray(conf.get(var));
- if (filenames != null) {
- for (String filename : filenames) {
- Path src = new Path(filename);
- Path dst = new Path(src.getName());
- FileSystem fs = src.getFileSystem(conf);
- System.err.println("templeton: copy " + src + " => " + dst);
- fs.copyToLocalFile(src, dst);
+ private static URI copyLog4JtoFileSystem(final String localFile) throws IOException,
+ InterruptedException {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ return ugi.doAs(new PrivilegedExceptionAction<URI>() {
+ @Override
+ public URI run() throws IOException {
+ AppConfig appConfig = Main.getAppConfigInstance();
+ String fsTmpDir = appConfig.get(TMP_DIR_PROP);
+ if(fsTmpDir == null || fsTmpDir.length() <= 0) {
+ LOG.warn("Could not find 'hadoop.tmp.dir'; " + affectedMsg);
+ return null;
}
- }
- }
-
- @Override
- public void run(Context context)
- throws IOException, InterruptedException {
-
- Configuration conf = context.getConfiguration();
-
- Process proc = startJob(context,
- conf.get("user.name"),
- conf.get(OVERRIDE_CLASSPATH));
-
- String statusdir = conf.get(STATUSDIR_NAME);
-
- if (statusdir != null) {
- try {
- statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir,
- conf.get("user.name"));
- } catch (URISyntaxException e) {
- throw new IOException("Invalid status dir URI", e);
+ FileSystem fs = FileSystem.get(appConfig);
+ Path dirPath = new Path(fsTmpDir);
+ if(!fs.exists(dirPath)) {
+ LOG.warn(dirPath + " does not exist; " + affectedMsg);
+ return null;
}
+ Path dst = fs.makeQualified(new Path(fsTmpDir, CONTAINER_LOG4J_PROPS));
+ fs.copyFromLocalFile(new Path(localFile), dst);
+ //make readable by all users since TempletonControllerJob#run() is run as submitting user
+ fs.setPermission(dst, new FsPermission((short)0644));
+ return dst.toUri();
}
-
- Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
- LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
-
- ExecutorService pool = Executors.newCachedThreadPool();
- executeWatcher(pool, conf, context.getJobID(),
- proc.getInputStream(), statusdir, STDOUT_FNAME);
- executeWatcher(pool, conf, context.getJobID(),
- proc.getErrorStream(), statusdir, STDERR_FNAME);
- KeepAlive keepAlive = startCounterKeepAlive(pool, context);
-
- proc.waitFor();
- keepAlive.sendReport = false;
- pool.shutdown();
- if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- }
-
- writeExitValue(conf, proc.exitValue(), statusdir);
- JobState state = new JobState(context.getJobID().toString(), conf);
- state.setExitValue(proc.exitValue());
- state.setCompleteStatus("done");
- state.close();
-
- if (enablelog && TempletonUtils.isset(statusdir)) {
- System.err.println("templeton: collecting logs for " + context.getJobID().toString()
- + " to " + statusdir + "/logs");
- LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf);
- logRetriever.run();
- }
-
- if (proc.exitValue() != 0) {
- System.err.println("templeton: job failed with exit code "
- + proc.exitValue());
- }
- else {
- System.err.println("templeton: job completed with exit code 0");
- }
- }
-
- private void executeWatcher(ExecutorService pool, Configuration conf,
- JobID jobid, InputStream in, String statusdir,
- String name)
- throws IOException {
- Watcher w = new Watcher(conf, jobid, in, statusdir, name);
- pool.execute(w);
- }
-
- private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context)
- throws IOException {
- KeepAlive k = new KeepAlive(context);
- pool.execute(k);
- return k;
- }
-
- private void writeExitValue(Configuration conf, int exitValue, String statusdir)
- throws IOException {
- if (TempletonUtils.isset(statusdir)) {
- Path p = new Path(statusdir, EXIT_FNAME);
- FileSystem fs = p.getFileSystem(conf);
- OutputStream out = fs.create(p);
- System.err.println("templeton: Writing exit value "
- + exitValue + " to " + p);
- PrintWriter writer = new PrintWriter(out);
- writer.println(exitValue);
- writer.close();
- }
- }
+ });
}
-
- private static class Watcher implements Runnable {
- private final InputStream in;
- private OutputStream out;
- private final JobID jobid;
- private final Configuration conf;
-
- public Watcher(Configuration conf, JobID jobid, InputStream in,
- String statusdir, String name)
- throws IOException {
- this.conf = conf;
- this.jobid = jobid;
- this.in = in;
-
- if (name.equals(STDERR_FNAME))
- out = System.err;
- else
- out = System.out;
-
- if (TempletonUtils.isset(statusdir)) {
- Path p = new Path(statusdir, name);
- FileSystem fs = p.getFileSystem(conf);
- out = fs.create(p);
- System.err.println("templeton: Writing status to " + p);
- }
- }
-
- @Override
- public void run() {
- try {
- InputStreamReader isr = new InputStreamReader(in);
- BufferedReader reader = new BufferedReader(isr);
- PrintWriter writer = new PrintWriter(out);
-
- String line;
- while ((line = reader.readLine()) != null) {
- writer.println(line);
- JobState state = null;
+ /**
+ * local file system
+ * @return
+ */
+ private static String getLog4JPropsLocal() {
+ return AppConfig.getWebhcatConfDir() + File.separator + CONTAINER_LOG4J_PROPS;
+ }
+ static {
+ //initialize once-per-JVM (i.e. one running WebHCat server) state and log it once since it's
+ // the same for every job
+ try {
+ //safe (thread) publication
+ // http://docs.oracle.com/javase/specs/jls/se5.0/html/execution.html#12.4.2
+ LOG.info("Using Hadoop Version: " + ShimLoader.getMajorVersion());
+ overrideContainerLog4jProps = "0.23".equals(ShimLoader.getMajorVersion());
+ if(overrideContainerLog4jProps) {
+ //see detailed note in CONTAINER_LOG4J_PROPS file
+ LOG.info(AppConfig.WEBHCAT_CONF_DIR + "=" + AppConfig.getWebhcatConfDir());
+ File localFile = new File(getLog4JPropsLocal());
+ if(localFile.exists()) {
+ LOG.info("Found " + localFile.getAbsolutePath() + " to use for job submission.");
try {
- String percent = TempletonUtils.extractPercentComplete(line);
- String childid = TempletonUtils.extractChildJobId(line);
-
- if (percent != null || childid != null) {
- state = new JobState(jobid.toString(), conf);
- state.setPercentComplete(percent);
- state.setChildId(childid);
- }
- } catch (IOException e) {
- System.err.println("templeton: state error: " + e);
- } finally {
- if (state != null) {
- try {
- state.close();
- } catch (IOException e) {
- }
- }
+ overrideLog4jURI = copyLog4JtoFileSystem(getLog4JPropsLocal());
+ LOG.info("Job submission will use log4j.properties=" + overrideLog4jURI);
+ }
+ catch(IOException ex) {
+ LOG.warn("Will not add " + CONTAINER_LOG4J_PROPS + " to Distributed Cache. " +
+ "Some fields in job status may be unavailable", ex);
}
}
- writer.flush();
- } catch (IOException e) {
- System.err.println("templeton: execute error: " + e);
+ else {
+ LOG.warn("Could not find " + localFile.getAbsolutePath() + ". " + affectedMsg);
+ }
}
}
+ catch(Throwable t) {
+ //this intentionally doesn't use TempletonControllerJob.class.getName() to be able to
+ //log errors which may be due to class loading
+ String msg = "org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob is not " +
+ "properly initialized. " + affectedMsg;
+ LOG.error(msg, t);
+ }
}
- public static class KeepAlive implements Runnable {
- private Context context;
- public boolean sendReport;
-
- public KeepAlive(Context context)
- {
- this.sendReport = true;
- this.context = context;
- }
+ private final boolean secureMetastoreAccess;
- @Override
- public void run() {
- try {
- while (sendReport) {
- // Periodically report progress on the Context object
- // to prevent TaskTracker from killing the Templeton
- // Controller task
- context.progress();
- System.err.println("KeepAlive Heart beat");
- Thread.sleep(KEEP_ALIVE_MSEC);
- }
- } catch (InterruptedException e) {
- // Ok to be interrupted
- }
- }
+ /**
+ * @param secureMetastoreAccess - if true, a delegation token will be created
+ * and added to the job
+ */
+ public TempletonControllerJob(boolean secureMetastoreAccess) {
+ super();
+ this.secureMetastoreAccess = secureMetastoreAccess;
}
private JobID submittedJobId;
@@ -365,8 +170,7 @@ public class TempletonControllerJob exte
public String getSubmittedId() {
if (submittedJobId == null) {
return null;
- }
- else {
+ } else {
return submittedJobId.toString();
}
}
@@ -376,20 +180,39 @@ public class TempletonControllerJob exte
* @see org.apache.hive.hcatalog.templeton.CompleteDelegator
*/
@Override
- public int run(String[] args)
- throws IOException, InterruptedException, ClassNotFoundException, TException {
+ public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
+ TException {
Configuration conf = getConf();
-
+
conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
conf.set("user.name", user);
+ if(overrideContainerLog4jProps && overrideLog4jURI != null) {
+ //must be done before Job object is created
+ conf.set(OVERRIDE_CONTAINER_LOG4J_PROPS, Boolean.TRUE.toString());
+ }
Job job = new Job(conf);
- job.setJarByClass(TempletonControllerJob.class);
- job.setJobName("TempletonControllerJob");
+ job.setJarByClass(LaunchMapper.class);
+ job.setJobName(TempletonControllerJob.class.getSimpleName());
job.setMapperClass(LaunchMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(SingleInputFormat.class);
+ if(overrideContainerLog4jProps && overrideLog4jURI != null) {
+ FileSystem fs = FileSystem.get(conf);
+ if(fs.exists(new Path(overrideLog4jURI))) {
+ ShimLoader.getHadoopShims().getWebHCatShim(conf, UgiFactory.getUgi(user)).addCacheFile(
+ overrideLog4jURI, job);
+ LOG.debug("added " + overrideLog4jURI + " to Dist Cache");
+ }
+ else {
+ //in case this file was deleted by someone issue a warning but don't try to add to
+ // DistributedCache as that will throw and fail job submission
+ LOG.warn("Cannot find " + overrideLog4jURI + " which is created on WebHCat startup/job " +
+ "submission. " + affectedMsg);
+ }
+ }
+
NullOutputFormat<NullWritable, NullWritable> of = new NullOutputFormat<NullWritable, NullWritable>();
job.setOutputFormatClass(of.getClass());
job.setNumReduceTasks(0);
@@ -404,13 +227,16 @@ public class TempletonControllerJob exte
job.submit();
submittedJobId = job.getJobID();
-
if(metastoreTokenStrForm != null) {
//so that it can be cancelled later from CompleteDelegator
DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(
submittedJobId.toString(), metastoreTokenStrForm);
- LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " +
- "user=" + user);
+ LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() +
+ " user=" + user);
+ }
+ if(overrideContainerLog4jProps && overrideLog4jURI == null) {
+ //do this here so that log msg has JobID
+ LOG.warn("Could not override container log4j properties for " + submittedJobId);
}
return 0;
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java Fri Oct 25 17:42:07 2013
@@ -85,10 +85,11 @@ public class TempletonUtils {
return (col != null) && (!col.isEmpty());
}
-
- public static final Pattern JAR_COMPLETE
- = Pattern.compile(" map \\d+%\\s+reduce \\d+%$");
+ //looking for map 100% reduce 100%
+ public static final Pattern JAR_COMPLETE = Pattern.compile(" map \\d+%\\s+reduce \\d+%$");
public static final Pattern PIG_COMPLETE = Pattern.compile(" \\d+% complete$");
+ //looking for map = 100%, reduce = 100%
+ public static final Pattern HIVE_COMPLETE = Pattern.compile(" map = \\d+%,\\s+reduce = \\d+%$");
/**
* Extract the percent complete line from Pig or Jar jobs.
@@ -101,7 +102,19 @@ public class TempletonUtils {
Matcher pig = PIG_COMPLETE.matcher(line);
if (pig.find())
return pig.group().trim();
-
+
+ Matcher hive = HIVE_COMPLETE.matcher(line);
+ if(hive.find()) {
+ StringBuilder sb = new StringBuilder(hive.group().trim());
+ String[] toRemove = {"= ", ", "};
+ for(String pattern : toRemove) {
+ int pos;
+ while((pos = sb.indexOf(pattern)) >= 0) {
+ sb.delete(pos, pos + pattern.length());
+ }
+ }
+ return sb.toString();//normalized to look like JAR_COMPLETE
+ }
return null;
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java Fri Oct 25 17:42:07 2013
@@ -18,21 +18,30 @@
*/
package org.apache.hive.hcatalog.templeton.tool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* Execute a local program. This is a singleton service that will
* execute a programs on the local box.
+ *
+ * Note that is is executed from LaunchMapper which is executed in
+ * different JVM from WebHCat (Templeton) server. Thus it should not call any classes
+ * not available on every node in the cluster (outside webhcat jar)
*/
-public class TrivialExecService {
- private static volatile TrivialExecService theSingleton;
+final class TrivialExecService {
+ //with default log4j config, this output ends up in 'syslog' of the LaunchMapper task
private static final Log LOG = LogFactory.getLog(TrivialExecService.class);
-
+ private static volatile TrivialExecService theSingleton;
+ private static final String HADOOP_CLIENT_OPTS = "HADOOP_CLIENT_OPTS";
/**
* Retrieve the singleton.
*/
@@ -41,32 +50,53 @@ public class TrivialExecService {
theSingleton = new TrivialExecService();
return theSingleton;
}
-
+ /**
+ * See {@link JobSubmissionConstants#CONTAINER_LOG4J_PROPS} file for details.
+ */
+ private static void hadoop2LogRedirect(ProcessBuilder processBuilder) {
+ Map<String, String> env = processBuilder.environment();
+ if(!env.containsKey(HADOOP_CLIENT_OPTS)) {
+ return;
+ }
+ String hcopts = env.get(HADOOP_CLIENT_OPTS);
+ if(!hcopts.contains("log4j.configuration=container-log4j.properties")) {
+ return;
+ }
+ //TempletonControllerJob ensures that this file is in DistributedCache
+ File log4jProps = new File(JobSubmissionConstants.CONTAINER_LOG4J_PROPS);
+ hcopts = hcopts.replace("log4j.configuration=container-log4j.properties",
+ "log4j.configuration=file://" + log4jProps.getAbsolutePath());
+ //helps figure out what log4j is doing, but may confuse
+ //some jobs due to extra output to stdout
+ //hcopts = hcopts + " -Dlog4j.debug=true";
+ env.put(HADOOP_CLIENT_OPTS, hcopts);
+ }
public Process run(List<String> cmd, List<String> removeEnv,
- Map<String, String> environmentVariables)
+ Map<String, String> environmentVariables, boolean overrideContainerLog4jProps)
throws IOException {
- logDebugCmd(cmd, environmentVariables);
+ LOG.info("run(cmd, removeEnv, environmentVariables, " + overrideContainerLog4jProps + ")");
+ LOG.info("Starting cmd: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd);
- for (String key : removeEnv)
+ for (String key : removeEnv) {
+ if(pb.environment().containsKey(key)) {
+ LOG.info("Removing env var: " + key + "=" + pb.environment().get(key));
+ }
pb.environment().remove(key);
+ }
pb.environment().putAll(environmentVariables);
+ if(overrideContainerLog4jProps) {
+ hadoop2LogRedirect(pb);
+ }
+ logDebugInfo("Starting process with env:", pb.environment());
return pb.start();
}
-
- private void logDebugCmd(List<String> cmd,
- Map<String, String> environmentVariables) {
- if(!LOG.isDebugEnabled()){
- return;
- }
- LOG.debug("starting " + cmd);
- LOG.debug("With environment variables: " );
- for(Map.Entry<String, String> keyVal : environmentVariables.entrySet()){
- LOG.debug(keyVal.getKey() + "=" + keyVal.getValue());
- }
- LOG.debug("With environment variables already set: " );
- Map<String, String> env = System.getenv();
- for (String envName : env.keySet()) {
- LOG.debug(envName + "=" + env.get(envName));
- }
+ private static void logDebugInfo(String msg, Map<String, String> props) {
+ LOG.info(msg);
+ List<String> keys = new ArrayList<String>();
+ keys.addAll(props.keySet());
+ Collections.sort(keys);
+ for(String key : keys) {
+ LOG.info(key + "=" + props.get(key));
+ }
}
}
Modified: hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java Fri Oct 25 17:42:07 2013
@@ -38,7 +38,7 @@ public class TestTrivialExecService {
Process process = TrivialExecService.getInstance()
.run(list,
new ArrayList<String>(),
- new HashMap<String, String>());
+ new HashMap<String, String>(),false);
out = new BufferedReader(new InputStreamReader(
process.getInputStream()));
err = new BufferedReader(new InputStreamReader(
Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java Fri Oct 25 17:42:07 2013
@@ -18,82 +18,89 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
/**
* This is in org.apache.hadoop.mapred package because it relies on
* JobSubmissionProtocol which is package private
*/
public class WebHCatJTShim20S implements WebHCatJTShim {
- private JobSubmissionProtocol cnx;
+ private JobSubmissionProtocol cnx;
- /**
- * Create a connection to the Job Tracker.
- */
- public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi)
- throws IOException {
- cnx = (JobSubmissionProtocol)
- RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID,
- getAddress(conf),
- ugi,
- conf,
- NetUtils.getSocketFactory(conf,
- JobSubmissionProtocol.class));
- }
-
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Profile of the job, or null if not found.
- */
- public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid)
- throws IOException {
- return cnx.getJobProfile(jobid);
- }
-
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Status of the job, or null if not found.
- */
- public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid)
- throws IOException {
- return cnx.getJobStatus(jobid);
- }
-
-
- /**
- * Kill a job.
- */
- public void killJob(org.apache.hadoop.mapred.JobID jobid)
- throws IOException {
- cnx.killJob(jobid);
- }
-
- /**
- * Get all the jobs submitted.
- */
- public org.apache.hadoop.mapred.JobStatus[] getAllJobs()
- throws IOException {
- return cnx.getAllJobs();
- }
-
- /**
- * Close the connection to the Job Tracker.
- */
- public void close() {
- RPC.stopProxy(cnx);
- }
- private InetSocketAddress getAddress(Configuration conf) {
- String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012");
- return NetUtils.createSocketAddr(jobTrackerStr);
- }
+ /**
+ * Create a connection to the Job Tracker.
+ */
+ public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi)
+ throws IOException {
+ cnx = (JobSubmissionProtocol)
+ RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID,
+ getAddress(conf),
+ ugi,
+ conf,
+ NetUtils.getSocketFactory(conf,
+ JobSubmissionProtocol.class));
}
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Profile of the job, or null if not found.
+ */
+ public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ return cnx.getJobProfile(jobid);
+ }
+
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Status of the job, or null if not found.
+ */
+ public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ return cnx.getJobStatus(jobid);
+ }
+
+
+ /**
+ * Kill a job.
+ */
+ public void killJob(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ cnx.killJob(jobid);
+ }
+
+ /**
+ * Get all the jobs submitted.
+ */
+ public org.apache.hadoop.mapred.JobStatus[] getAllJobs()
+ throws IOException {
+ return cnx.getAllJobs();
+ }
+
+ /**
+ * Close the connection to the Job Tracker.
+ */
+ public void close() {
+ RPC.stopProxy(cnx);
+ }
+ private InetSocketAddress getAddress(Configuration conf) {
+ String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012");
+ return NetUtils.createSocketAddr(jobTrackerStr);
+ }
+ @Override
+ public void addCacheFile(URI uri, Job job) {
+ DistributedCache.addCacheFile(uri, job.getConfiguration());
+ }
+}
+
Modified: hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java (original)
+++ hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java Fri Oct 25 17:42:07 2013
@@ -18,10 +18,12 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
import java.io.IOException;
+import java.net.URI;
public class WebHCatJTShim23 implements WebHCatJTShim {
private JobClient jc;
@@ -88,4 +90,8 @@ public class WebHCatJTShim23 implements
} catch (IOException e) {
}
}
+ @Override
+ public void addCacheFile(URI uri, Job job) {
+ job.addCacheFile(uri);
+ }
}
Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Fri Oct 25 17:42:07 2013
@@ -561,6 +561,11 @@ public interface HadoopShims {
* Close the connection to the Job Tracker.
*/
public void close();
+ /**
+ * Does exactly what org.apache.hadoop.mapreduce.Job#addCacheFile(URI) in Hadoop 2.
+ * Assumes that both parameters are not {@code null}.
+ */
+ public void addCacheFile(URI uri, Job job);
}
/**