You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2013/10/25 19:42:07 UTC

svn commit: r1535796 - in /hive/trunk: hcatalog/src/test/e2e/templeton/ hcatalog/src/test/e2e/templeton/drivers/ hcatalog/src/test/e2e/templeton/tests/ hcatalog/webhcat/svr/src/main/bin/ hcatalog/webhcat/svr/src/main/config/ hcatalog/webhcat/svr/src/ma...

Author: thejas
Date: Fri Oct 25 17:42:07 2013
New Revision: 1535796

URL: http://svn.apache.org/r1535796
Log:
HIVE-5511 : percentComplete returned by job status from WebHCat is null (Eugene Koifman via Thejas Nair)

Added:
    hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
Modified:
    hive/trunk/hcatalog/src/test/e2e/templeton/README.txt
    hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
    hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
    hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
    hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
    hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
    hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
    hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java

Modified: hive/trunk/hcatalog/src/test/e2e/templeton/README.txt
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/README.txt?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/README.txt (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/README.txt Fri Oct 25 17:42:07 2013
@@ -141,8 +141,10 @@ In order for this test suite to work, we
 and webhcat.proxyuser.hue.hosts defined, i.e. 'hue' should be allowed to impersonate 'joe'.
 [Of course, 'hcat' proxyuser should be configured in core-site.xml for the command to succeed.]
 
-Furthermore, metastore side file based security should be enabled.  To do this 3 properties in
-hive-site.xml should be configured:
+Furthermore, metastore side file based security should be enabled. 
+(See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Authorization#LanguageManualAuthorization-MetastoreServerSecurity for more info) 
+
+To do this 3 properties in hive-site.xml should be configured:
 1) hive.security.metastore.authorization.manager set to 
     org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider
 2) hive.security.metastore.authenticator.manager set to 

Modified: hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/drivers/TestDriverCurl.pm Fri Oct 25 17:42:07 2013
@@ -788,7 +788,8 @@ sub compare
 
     if ( (defined $testCmd->{'check_job_created'})
          || (defined $testCmd->{'check_job_complete'})
-         || (defined $testCmd->{'check_job_exit_value'}) ) {    
+         || (defined $testCmd->{'check_job_exit_value'})
+         || (defined $testCmd->{'check_job_percent_complete'}) ) {    
       my $jobid = $json_hash->{'id'};
       if (!defined $jobid) {
         print $log "$0::$subName WARN check failed: " 
@@ -803,7 +804,8 @@ sub compare
             . "jobresult not defined ";
           $result = 0;
         }
-        if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})) {
+        if (defined($testCmd->{'check_job_complete'}) || defined($testCmd->{'check_job_exit_value'})
+            || defined($testCmd->{'check_job_percent_complete'})) {
           my $jobComplete;
           my $NUM_RETRIES = 60;
           my $SLEEP_BETWEEN_RETRIES = 5;
@@ -841,6 +843,15 @@ sub compare
                 $result = 0;
               }
             }
+            # check the percentComplete value
+            if (defined($testCmd->{'check_job_percent_complete'})) {
+              my $pcValue = $res_hash->{'percentComplete'};
+              my $expectedPercentComplete = $testCmd->{'check_job_percent_complete'};
+              if ( (!defined $pcValue) || $pcValue ne $expectedPercentComplete ) {
+                print $log "check_job_percent_complete failed. got percentComplete $pcValue,  expected  $expectedPercentComplete";
+                $result = 0;
+              }
+            }
           }
 
 	  #Check userargs

Modified: hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Fri Oct 25 17:42:07 2013
@@ -73,6 +73,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS',
+     'check_job_percent_complete' => 'map 100% reduce 100%',
      'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
@@ -166,6 +167,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_percent_complete' => '100% complete',
      'check_job_exit_value' => 0,
      'check_call_back' => 1,
     },
@@ -386,6 +388,7 @@ $cfg = 
      'status_code' => 200,
      'check_job_created' => 1,
      'check_job_complete' => 'SUCCESS', 
+     'check_job_percent_complete' => 'map 100% reduce 100%',
      'check_job_exit_value' => 0,
      'check_call_back' => 1,
 

Modified: hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf (original)
+++ hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission_streaming.conf Fri Oct 25 17:42:07 2013
@@ -54,7 +54,9 @@ $cfg = 
     },
     {
      #-ve test - no input file
-     'num' => 2,
+     #TempletonController job status should be success, but exit value should be 1
+     #if yarn log is redirected to stderr check_job_complete is FAILURE, if not SUCCESS (HIVE-5511)
+    'num' => 2,
      'method' => 'POST',
      'url' => ':TEMPLETON_URL:/templeton/v1/mapreduce/streaming',
      'post_options' => ['user.name=:UNAME:','input=:INPDIR_HDFS:/nums.txt','input=:INPDIR_HDFS:/nums.txt','output=:OUTDIR:/mycounts', 

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/bin/webhcat_config.sh Fri Oct 25 17:42:07 2013
@@ -75,7 +75,7 @@ elif [ -e "${WEBHCAT_PREFIX}/conf/webhca
 else
   DEFAULT_CONF_DIR="/etc/webhcat"
 fi
-WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}"
+export WEBHCAT_CONF_DIR="${WEBHCAT_CONF_DIR:-$DEFAULT_CONF_DIR}"
 
 #users can add various env vars to webhcat-env.sh in the conf
 #rather than having to export them before running the command

Added: hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties?rev=1535796&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties (added)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/config/override-container-log4j.properties Fri Oct 25 17:42:07 2013
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+#
+# This log4j config overrides hadoop-yarn-server-nodemanager-2.1.0-beta.jar/container-log4j.properties.  
+#In Hadoop 2, (by default) the log information about M/R job progress is not sent to stderr, 
+#which is where LaunchMapper expects it.  Thus WebHCat is unable to report the
+#percentComplete attribute in job status.  There is something broken in YARN that doesn't allow 
+#its log4j properties to be overridden.  Thus for now (10/07/2013) we resort to overriding it
+#using this file, where log4j.rootLogger specify additional 'console' appender.  This file is made 
+#available through DistributedCache.  See TrivialExecService and TempletonControllerJob for more 
+#info.
+
+hadoop.root.logger=INFO,CLA
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, console, EventCounter
+
+# Logging Threshold
+log4j.threshold=ALL
+
+#
+# ContainerLog Appender
+#
+
+#Default values
+yarn.app.container.log.dir=null
+yarn.app.container.log.filesize=100
+
+log4j.appender.CLA=org.apache.hadoop.yarn.ContainerLogAppender
+log4j.appender.CLA.containerLogDir=${yarn.app.container.log.dir}
+log4j.appender.CLA.totalLogFileSize=${yarn.app.container.log.filesize}
+
+log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
+
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/AppConfig.java Fri Oct 25 17:42:07 2013
@@ -71,6 +71,7 @@ public class AppConfig extends Configura
   };
 
   public static final String TEMPLETON_HOME_VAR = "TEMPLETON_HOME";
+  public static final String WEBHCAT_CONF_DIR = "WEBHCAT_CONF_DIR";
 
   public static final String[] TEMPLETON_CONF_FILENAMES = {
     "webhcat-default.xml",
@@ -153,6 +154,9 @@ public class AppConfig extends Configura
   public static String getTempletonDir() {
     return System.getenv(TEMPLETON_HOME_VAR);
   }
+  public static String getWebhcatConfDir() {
+    return System.getenv(WEBHCAT_CONF_DIR);
+  }
 
   private boolean loadOneFileConfig(String dir, String fname) {
     if (dir != null) {

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java Fri Oct 25 17:42:07 2013
@@ -68,7 +68,7 @@ public class CompleteDelegator extends T
     try {
       state = new JobState(id, Main.getAppConfigInstance());
       if (state.getCompleteStatus() == null)
-        failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null);
+        failed("Job not yet complete. jobId=" + id + " Status from JobTracker=" + jobStatus, null);
 
       Long notified = state.getNotifiedTime();
       if (notified != null) {

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HDFSStorage.java Fri Oct 25 17:42:07 2013
@@ -91,6 +91,12 @@ public class HDFSStorage implements Temp
     BufferedReader in = null;
     Path p = new Path(getPath(type) + "/" + id + "/" + key);
     try {
+      if(!fs.exists(p)) {
+        //check first, otherwise webhcat.log is full of stack traces from FileSystem when
+        //clients check for status ('exitValue', 'completed', etc.)
+        LOG.debug(p + " does not exist.");
+        return null;
+      }
       in = new BufferedReader(new InputStreamReader(fs.open(p)));
       String line = null;
       String val = "";
@@ -102,9 +108,7 @@ public class HDFSStorage implements Temp
       }
       return val;
     } catch (Exception e) {
-      //don't print stack trace since clients poll for 'exitValue', 'completed',
-      //files which are not there until job completes
-      LOG.info("Couldn't find " + p + ": " + e.getMessage());
+      LOG.error("Couldn't find " + p + ": " + e.getMessage(), e);
     } finally {
       close(in);
     }

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/HiveJobIDParser.java Fri Oct 25 17:42:07 2013
@@ -32,6 +32,6 @@ class HiveJobIDParser extends JobIDParse
 
   @Override
   List<String> parseJobID() throws IOException {
-    return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+    return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
   }
 }

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JarJobIDParser.java Fri Oct 25 17:42:07 2013
@@ -32,7 +32,7 @@ class JarJobIDParser extends JobIDParser
 
   @Override
   List<String> parseJobID() throws IOException {
-    return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+    return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
   }
 
 }

Added: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java?rev=1535796&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java (added)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/JobSubmissionConstants.java Fri Oct 25 17:42:07 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton.tool;
+
+public interface JobSubmissionConstants {
+  public static final String COPY_NAME = "templeton.copy";
+  public static final String STATUSDIR_NAME = "templeton.statusdir";
+  public static final String ENABLE_LOG = "templeton.enablelog";
+  public static final String JOB_TYPE = "templeton.jobtype";
+  public static final String JAR_ARGS_NAME = "templeton.args";
+  public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
+  public static final String OVERRIDE_CONTAINER_LOG4J_PROPS = "override.containerLog4j";
+  //name of file
+  static final String CONTAINER_LOG4J_PROPS = "override-container-log4j.properties";
+  public static final String STDOUT_FNAME = "stdout";
+  public static final String STDERR_FNAME = "stderr";
+  public static final String EXIT_FNAME = "exit";
+  public static final int WATCHER_TIMEOUT_SECS = 10;
+  public static final int KEEP_ALIVE_MSEC = 60 * 1000;
+  public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__";
+}

Added: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java?rev=1535796&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java (added)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/LaunchMapper.java Fri Oct 25 17:42:07 2013
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton.tool;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Shell;
+import org.apache.hive.hcatalog.templeton.BadParam;
+import org.apache.hive.hcatalog.templeton.LauncherDelegator;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Note that this class is used in a different JVM than WebHCat server.  Thus it should not call 
+ * any classes not available on every node in the cluster (outside webhcat jar).
+ * TempletonControllerJob#run() calls Job.setJarByClass(LaunchMapper.class) which
+ * causes webhcat jar to be shipped to target node, but not it's transitive closure.
+ * Long term we need to clean up this separation and create a separate jar to ship so that the
+ * dependencies are clear.  (This used to be an inner class of TempletonControllerJob)
+ */
+@InterfaceAudience.Private
+public class LaunchMapper extends Mapper<NullWritable, NullWritable, Text, Text> implements
+        JobSubmissionConstants {
+  /**
+   * This class currently sends everything to stderr, but it should probably use Log4J - 
+   * it will end up in 'syslog' of this Map task.  For example, look for KeepAlive heartbeat msgs.
+   */
+  private static final Log LOG = LogFactory.getLog(LaunchMapper.class);
+
+
+  protected Process startJob(Context context, String user, String overrideClasspath)
+    throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    copyLocal(COPY_NAME, conf);
+    String[] jarArgs = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
+
+    ArrayList<String> removeEnv = new ArrayList<String>();
+    //todo: we really need some comments to explain exactly why each of these is removed
+    removeEnv.add("HADOOP_ROOT_LOGGER");
+    removeEnv.add("hadoop-command");
+    removeEnv.add("CLASS");
+    removeEnv.add("mapredcommand");
+    Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
+            overrideClasspath);
+    List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
+    String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+
+
+    if (tokenFile != null) {
+      //Token is available, so replace the placeholder
+      tokenFile = tokenFile.replaceAll("\"", "");
+      String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
+      if (Shell.WINDOWS) {
+        try {
+          tokenArg = TempletonUtils.quoteForWindows(tokenArg);
+        } catch (BadParam e) {
+          String msg = "cannot pass " + tokenFile + " to mapreduce.job.credentials.binary";
+          LOG.error(msg, e);
+          throw new IOException(msg, e);
+        }
+      }
+      for(int i=0; i<jarArgsList.size(); i++){
+        String newArg =
+                jarArgsList.get(i).replace(TOKEN_FILE_ARG_PLACEHOLDER, tokenArg);
+        jarArgsList.set(i, newArg);
+      }
+
+    }else{
+      //No token, so remove the placeholder arg
+      Iterator<String> it = jarArgsList.iterator();
+      while(it.hasNext()){
+        String arg = it.next();
+        if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){
+          it.remove();
+        }
+      }
+    }
+    boolean overrideLog4jProps = conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS) == null ?
+            false : Boolean.valueOf(conf.get(OVERRIDE_CONTAINER_LOG4J_PROPS));
+    return TrivialExecService.getInstance().run(jarArgsList, removeEnv, env, overrideLog4jProps);
+  }
+
+  private void copyLocal(String var, Configuration conf) throws IOException {
+    String[] filenames = TempletonUtils.decodeArray(conf.get(var));
+    if (filenames != null) {
+      for (String filename : filenames) {
+        Path src = new Path(filename);
+        Path dst = new Path(src.getName());
+        FileSystem fs = src.getFileSystem(conf);
+        LOG.info("templeton: copy " + src + " => " + dst);
+        fs.copyToLocalFile(src, dst);
+      }
+    }
+  }
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+
+    Configuration conf = context.getConfiguration();
+
+    Process proc = startJob(context,
+            conf.get("user.name"),
+            conf.get(OVERRIDE_CLASSPATH));
+
+    String statusdir = conf.get(STATUSDIR_NAME);
+
+    if (statusdir != null) {
+      try {
+        statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir,
+                conf.get("user.name"));
+      } catch (URISyntaxException e) {
+        String msg = "Invalid status dir URI";
+        LOG.error(msg, e);
+        throw new IOException(msg, e);
+      }
+    }
+
+    Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
+    LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+    executeWatcher(pool, conf, context.getJobID(),
+            proc.getInputStream(), statusdir, STDOUT_FNAME);
+    executeWatcher(pool, conf, context.getJobID(),
+            proc.getErrorStream(), statusdir, STDERR_FNAME);
+    KeepAlive keepAlive = startCounterKeepAlive(pool, context);
+
+    proc.waitFor();
+    keepAlive.sendReport = false;
+    pool.shutdown();
+    if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) {
+      pool.shutdownNow();
+    }
+
+    writeExitValue(conf, proc.exitValue(), statusdir);
+    JobState state = new JobState(context.getJobID().toString(), conf);
+    state.setExitValue(proc.exitValue());
+    state.setCompleteStatus("done");
+    state.close();
+
+    if (enablelog && TempletonUtils.isset(statusdir)) {
+      LOG.info("templeton: collecting logs for " + context.getJobID().toString()
+              + " to " + statusdir + "/logs");
+      LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf);
+      logRetriever.run();
+    }
+
+    if (proc.exitValue() != 0) {
+      LOG.info("templeton: job failed with exit code "
+              + proc.exitValue());
+    } else {
+      LOG.info("templeton: job completed with exit code 0");
+    }
+  }
+
+  private void executeWatcher(ExecutorService pool, Configuration conf, JobID jobid, InputStream in,
+                              String statusdir, String name) throws IOException {
+    Watcher w = new Watcher(conf, jobid, in, statusdir, name);
+    pool.execute(w);
+  }
+
+  private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context) throws IOException {
+    KeepAlive k = new KeepAlive(context);
+    pool.execute(k);
+    return k;
+  }
+
+  private void writeExitValue(Configuration conf, int exitValue, String statusdir) 
+    throws IOException {
+    if (TempletonUtils.isset(statusdir)) {
+      Path p = new Path(statusdir, EXIT_FNAME);
+      FileSystem fs = p.getFileSystem(conf);
+      OutputStream out = fs.create(p);
+      LOG.info("templeton: Writing exit value " + exitValue + " to " + p);
+      PrintWriter writer = new PrintWriter(out);
+      writer.println(exitValue);
+      writer.close();
+    }
+  }
+
+
+  private static class Watcher implements Runnable {
+    private final InputStream in;
+    private OutputStream out;
+    private final JobID jobid;
+    private final Configuration conf;
+
+    public Watcher(Configuration conf, JobID jobid, InputStream in, String statusdir, String name)
+      throws IOException {
+      this.conf = conf;
+      this.jobid = jobid;
+      this.in = in;
+
+      if (name.equals(STDERR_FNAME)) {
+        out = System.err;
+      } else {
+        out = System.out;
+      }
+
+      if (TempletonUtils.isset(statusdir)) {
+        Path p = new Path(statusdir, name);
+        FileSystem fs = p.getFileSystem(conf);
+        out = fs.create(p);
+        LOG.info("templeton: Writing status to " + p);
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        InputStreamReader isr = new InputStreamReader(in);
+        BufferedReader reader = new BufferedReader(isr);
+        PrintWriter writer = new PrintWriter(out);
+
+        String line;
+        while ((line = reader.readLine()) != null) {
+          writer.println(line);
+          JobState state = null;
+          try {
+            String percent = TempletonUtils.extractPercentComplete(line);
+            String childid = TempletonUtils.extractChildJobId(line);
+
+            if (percent != null || childid != null) {
+              state = new JobState(jobid.toString(), conf);
+              state.setPercentComplete(percent);
+              state.setChildId(childid);
+            }
+          } catch (IOException e) {
+            LOG.error("templeton: state error: ", e);
+          } finally {
+            if (state != null) {
+              try {
+                state.close();
+              } catch (IOException e) {
+                LOG.warn(e);
+              }
+            }
+          }
+        }
+        writer.flush();
+        if(out != System.err && out != System.out) {
+          //depending on FileSystem implementation flush() may or may not do anything 
+          writer.close();
+        }
+      } catch (IOException e) {
+        LOG.error("templeton: execute error: ", e);
+      }
+    }
+  }
+
+  private static class KeepAlive implements Runnable {
+    private final Context context;
+    private volatile boolean sendReport;
+
+    public KeepAlive(Context context)
+    {
+      this.sendReport = true;
+      this.context = context;
+    }
+    private static StringBuilder makeDots(int count) {
+      StringBuilder sb = new StringBuilder();
+      for(int i = 0; i < count; i++) {
+        sb.append('.');
+      }
+      return sb;
+    }
+
+    @Override
+    public void run() {
+      try {
+        int count = 0;
+        while (sendReport) {
+          // Periodically report progress on the Context object
+          // to prevent TaskTracker from killing the Templeton
+          // Controller task
+          context.progress();
+          count++;
+          String msg = "KeepAlive Heart beat" + makeDots(count);
+          LOG.info(msg);
+          Thread.sleep(KEEP_ALIVE_MSEC);
+        }
+      } catch (InterruptedException e) {
+        // Ok to be interrupted
+      }
+    }
+  }
+}

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/PigJobIDParser.java Fri Oct 25 17:42:07 2013
@@ -32,6 +32,6 @@ class PigJobIDParser extends JobIDParser
 
   @Override
   List<String> parseJobID() throws IOException {
-    return parseJobID(TempletonControllerJob.STDERR_FNAME, jobidPattern);
+    return parseJobID(JobSubmissionConstants.STDERR_FNAME, jobidPattern);
   }
 }

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java Fri Oct 25 17:42:07 2013
@@ -18,23 +18,10 @@
  */
 package org.apache.hive.hcatalog.templeton.tool;
 
-import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.URISyntaxException;
+import java.net.URI;
 import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,24 +29,24 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Tool;
-import org.apache.hive.hcatalog.templeton.BadParam;
-import org.apache.hive.hcatalog.templeton.LauncherDelegator;
+import org.apache.hive.hcatalog.templeton.AppConfig;
+import org.apache.hive.hcatalog.templeton.Main;
 import org.apache.hive.hcatalog.templeton.SecureProxySupport;
 import org.apache.hive.hcatalog.templeton.UgiFactory;
 import org.apache.thrift.TException;
@@ -83,281 +70,99 @@ import org.apache.thrift.TException;
  * parameter supplied in the REST call.  WebHcat takes care of cancelling the token when the job
  * is complete.
  */
-public class TempletonControllerJob extends Configured implements Tool {
-  public static final String COPY_NAME = "templeton.copy";
-  public static final String STATUSDIR_NAME = "templeton.statusdir";
-  public static final String ENABLE_LOG = "templeton.enablelog";
-  public static final String JOB_TYPE = "templeton.jobtype";
-  public static final String JAR_ARGS_NAME = "templeton.args";
-  public static final String OVERRIDE_CLASSPATH = "templeton.override-classpath";
-
-  public static final String STDOUT_FNAME = "stdout";
-  public static final String STDERR_FNAME = "stderr";
-  public static final String EXIT_FNAME = "exit";
-
-  public static final int WATCHER_TIMEOUT_SECS = 10;
-  public static final int KEEP_ALIVE_MSEC = 60 * 1000;
-
-  public static final String TOKEN_FILE_ARG_PLACEHOLDER 
-    = "__WEBHCAT_TOKEN_FILE_LOCATION__";
-
-  private static TrivialExecService execService = TrivialExecService.getInstance();
-
+@InterfaceAudience.Private
+public class TempletonControllerJob extends Configured implements Tool, JobSubmissionConstants {
   private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class);
-  private final boolean secureMetastoreAccess;
+  //file to add to DistributedCache
+  private static URI overrideLog4jURI = null;
+  private static boolean overrideContainerLog4jProps;
+  //Jar cmd submission likely will be affected, Pig likely not
+  private static final String affectedMsg = "Monitoring of Hadoop jobs submitted through WebHCat " +
+          "may be affected.";
+  private static final String TMP_DIR_PROP = "hadoop.tmp.dir";
 
   /**
-   * @param secureMetastoreAccess - if true, a delegation token will be created
-   *                              and added to the job
+   * Copy the file from local file system to tmp dir
    */
-  public TempletonControllerJob(boolean secureMetastoreAccess) {
-    super();
-    this.secureMetastoreAccess = secureMetastoreAccess;
-  }
-  public static class LaunchMapper
-    extends Mapper<NullWritable, NullWritable, Text, Text> {
-    protected Process startJob(Context context, String user,
-                   String overrideClasspath)
-      throws IOException, InterruptedException {
-      Configuration conf = context.getConfiguration();
-      copyLocal(COPY_NAME, conf);
-      String[] jarArgs
-        = TempletonUtils.decodeArray(conf.get(JAR_ARGS_NAME));
-
-      ArrayList<String> removeEnv = new ArrayList<String>();
-      removeEnv.add("HADOOP_ROOT_LOGGER");
-      removeEnv.add("hadoop-command");
-      removeEnv.add("CLASS");
-      removeEnv.add("mapredcommand");
-      Map<String, String> env = TempletonUtils.hadoopUserEnv(user,
-        overrideClasspath);
-      List<String> jarArgsList = new LinkedList<String>(Arrays.asList(jarArgs));
-      String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
-
-
-      if (tokenFile != null) {
-        //Token is available, so replace the placeholder
-        tokenFile = tokenFile.replaceAll("\"", "");
-        String tokenArg = "mapreduce.job.credentials.binary=" + tokenFile;
-        if (Shell.WINDOWS) {
-          try {
-            tokenArg = TempletonUtils.quoteForWindows(tokenArg);
-          } catch (BadParam e) {
-            throw new IOException("cannot pass " + tokenFile + " to mapreduce.job.credentials.binary", e);
-          }
-        }
-        for(int i=0; i<jarArgsList.size(); i++){
-          String newArg = 
-            jarArgsList.get(i).replace(TOKEN_FILE_ARG_PLACEHOLDER, tokenArg);
-          jarArgsList.set(i, newArg);
-        }
-
-      }else{
-        //No token, so remove the placeholder arg
-        Iterator<String> it = jarArgsList.iterator();
-        while(it.hasNext()){
-          String arg = it.next();
-          if(arg.contains(TOKEN_FILE_ARG_PLACEHOLDER)){
-            it.remove();
-          }
-        }
-      }
-      return execService.run(jarArgsList, removeEnv, env);
-    }
-
-    private void copyLocal(String var, Configuration conf)
-      throws IOException {
-      String[] filenames = TempletonUtils.decodeArray(conf.get(var));
-      if (filenames != null) {
-        for (String filename : filenames) {
-          Path src = new Path(filename);
-          Path dst = new Path(src.getName());
-          FileSystem fs = src.getFileSystem(conf);
-          System.err.println("templeton: copy " + src + " => " + dst);
-          fs.copyToLocalFile(src, dst);
+  private static URI copyLog4JtoFileSystem(final String localFile) throws IOException,
+          InterruptedException {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    return ugi.doAs(new PrivilegedExceptionAction<URI>() {
+      @Override
+      public URI run() throws IOException {
+        AppConfig appConfig = Main.getAppConfigInstance();
+        String fsTmpDir = appConfig.get(TMP_DIR_PROP);
+        if(fsTmpDir == null || fsTmpDir.length() <= 0) {
+          LOG.warn("Could not find 'hadoop.tmp.dir'; " + affectedMsg);
+          return null;
         }
-      }
-    }
-
-    @Override
-    public void run(Context context)
-      throws IOException, InterruptedException {
-
-      Configuration conf = context.getConfiguration();
-
-      Process proc = startJob(context,
-        conf.get("user.name"),
-        conf.get(OVERRIDE_CLASSPATH));
-
-      String statusdir = conf.get(STATUSDIR_NAME);
-
-      if (statusdir != null) {
-        try {
-          statusdir = TempletonUtils.addUserHomeDirectoryIfApplicable(statusdir,
-            conf.get("user.name"));
-        } catch (URISyntaxException e) {
-          throw new IOException("Invalid status dir URI", e);
+        FileSystem fs = FileSystem.get(appConfig);
+        Path dirPath = new Path(fsTmpDir);
+        if(!fs.exists(dirPath)) {
+          LOG.warn(dirPath + " does not exist; " + affectedMsg);
+          return null;
         }
+        Path dst = fs.makeQualified(new Path(fsTmpDir, CONTAINER_LOG4J_PROPS));
+        fs.copyFromLocalFile(new Path(localFile), dst);
+        //make readable by all users since TempletonControllerJob#run() is run as submitting user
+        fs.setPermission(dst, new FsPermission((short)0644));
+        return dst.toUri();
       }
-
-      Boolean enablelog = Boolean.parseBoolean(conf.get(ENABLE_LOG));
-      LauncherDelegator.JobType jobType = LauncherDelegator.JobType.valueOf(conf.get(JOB_TYPE));
-
-      ExecutorService pool = Executors.newCachedThreadPool();
-      executeWatcher(pool, conf, context.getJobID(),
-        proc.getInputStream(), statusdir, STDOUT_FNAME);
-      executeWatcher(pool, conf, context.getJobID(),
-        proc.getErrorStream(), statusdir, STDERR_FNAME);
-      KeepAlive keepAlive = startCounterKeepAlive(pool, context);
-
-      proc.waitFor();
-      keepAlive.sendReport = false;
-      pool.shutdown();
-      if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) {
-        pool.shutdownNow();
-      }
-
-      writeExitValue(conf, proc.exitValue(), statusdir);
-      JobState state = new JobState(context.getJobID().toString(), conf);
-      state.setExitValue(proc.exitValue());
-      state.setCompleteStatus("done");
-      state.close();
-
-      if (enablelog && TempletonUtils.isset(statusdir)) {
-        System.err.println("templeton: collecting logs for " + context.getJobID().toString()
-          + " to " + statusdir + "/logs");
-        LogRetriever logRetriever = new LogRetriever(statusdir, jobType, conf);
-        logRetriever.run();
-      }
-
-      if (proc.exitValue() != 0) {
-        System.err.println("templeton: job failed with exit code "
-          + proc.exitValue());
-      }
-      else {
-        System.err.println("templeton: job completed with exit code 0");
-      }
-    }
-
-    private void executeWatcher(ExecutorService pool, Configuration conf,
-                  JobID jobid, InputStream in, String statusdir,
-                  String name)
-      throws IOException {
-      Watcher w = new Watcher(conf, jobid, in, statusdir, name);
-      pool.execute(w);
-    }
-
-    private KeepAlive startCounterKeepAlive(ExecutorService pool, Context context)
-      throws IOException {
-      KeepAlive k = new KeepAlive(context);
-      pool.execute(k);
-      return k;
-    }
-
-    private void writeExitValue(Configuration conf, int exitValue, String statusdir)
-      throws IOException {
-      if (TempletonUtils.isset(statusdir)) {
-        Path p = new Path(statusdir, EXIT_FNAME);
-        FileSystem fs = p.getFileSystem(conf);
-        OutputStream out = fs.create(p);
-        System.err.println("templeton: Writing exit value "
-          + exitValue + " to " + p);
-        PrintWriter writer = new PrintWriter(out);
-        writer.println(exitValue);
-        writer.close();
-      }
-    }
+    });
   }
-
-  private static class Watcher implements Runnable {
-    private final InputStream in;
-    private OutputStream out;
-    private final JobID jobid;
-    private final Configuration conf;
-
-    public Watcher(Configuration conf, JobID jobid, InputStream in,
-             String statusdir, String name)
-      throws IOException {
-      this.conf = conf;
-      this.jobid = jobid;
-      this.in = in;
-
-      if (name.equals(STDERR_FNAME))
-        out = System.err;
-      else
-        out = System.out;
-
-      if (TempletonUtils.isset(statusdir)) {
-        Path p = new Path(statusdir, name);
-        FileSystem fs = p.getFileSystem(conf);
-        out = fs.create(p);
-        System.err.println("templeton: Writing status to " + p);
-      }
-    }
-
-    @Override
-    public void run() {
-      try {
-        InputStreamReader isr = new InputStreamReader(in);
-        BufferedReader reader = new BufferedReader(isr);
-        PrintWriter writer = new PrintWriter(out);
-
-        String line;
-        while ((line = reader.readLine()) != null) {
-          writer.println(line);
-          JobState state = null;
+  /**
+   * local file system
+   * @return
+   */
+  private static String getLog4JPropsLocal() {
+    return AppConfig.getWebhcatConfDir() + File.separator + CONTAINER_LOG4J_PROPS;
+  }
+  static {
+    //initialize once-per-JVM (i.e. one running WebHCat server) state and log it once since it's 
+    // the same for every job
+    try {
+      //safe (thread) publication 
+      // http://docs.oracle.com/javase/specs/jls/se5.0/html/execution.html#12.4.2
+      LOG.info("Using Hadoop Version: " + ShimLoader.getMajorVersion());
+      overrideContainerLog4jProps = "0.23".equals(ShimLoader.getMajorVersion());
+      if(overrideContainerLog4jProps) {
+        //see detailed note in CONTAINER_LOG4J_PROPS file
+        LOG.info(AppConfig.WEBHCAT_CONF_DIR + "=" + AppConfig.getWebhcatConfDir());
+        File localFile = new File(getLog4JPropsLocal());
+        if(localFile.exists()) {
+          LOG.info("Found " + localFile.getAbsolutePath() + " to use for job submission.");
           try {
-            String percent = TempletonUtils.extractPercentComplete(line);
-            String childid = TempletonUtils.extractChildJobId(line);
-
-            if (percent != null || childid != null) {
-              state = new JobState(jobid.toString(), conf);
-              state.setPercentComplete(percent);
-              state.setChildId(childid);
-            }
-          } catch (IOException e) {
-            System.err.println("templeton: state error: " + e);
-          } finally {
-            if (state != null) {
-              try {
-                state.close();
-              } catch (IOException e) {
-              }
-            }
+            overrideLog4jURI = copyLog4JtoFileSystem(getLog4JPropsLocal());
+            LOG.info("Job submission will use log4j.properties=" + overrideLog4jURI);
+          }
+          catch(IOException ex) {
+            LOG.warn("Will not add " + CONTAINER_LOG4J_PROPS + " to Distributed Cache.  " +
+                    "Some fields in job status may be unavailable", ex);
           }
         }
-        writer.flush();
-      } catch (IOException e) {
-        System.err.println("templeton: execute error: " + e);
+        else {
+          LOG.warn("Could not find " + localFile.getAbsolutePath() + ". " + affectedMsg);
+        }
       }
     }
+    catch(Throwable t) {
+      //this intentionally doesn't use TempletonControllerJob.class.getName() to be able to
+      //log errors which may be due to class loading
+      String msg = "org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob is not " +
+              "properly initialized. " + affectedMsg;
+      LOG.error(msg, t);
+    }
   }
 
-  public static class KeepAlive implements Runnable {
-    private Context context;
-    public boolean sendReport;
-
-    public KeepAlive(Context context)
-    {
-      this.sendReport = true;
-      this.context = context;
-    }
+  private final boolean secureMetastoreAccess;
 
-    @Override
-    public void run() {
-      try {
-        while (sendReport) {
-          // Periodically report progress on the Context object
-          // to prevent TaskTracker from killing the Templeton
-          // Controller task
-          context.progress();
-          System.err.println("KeepAlive Heart beat");
-          Thread.sleep(KEEP_ALIVE_MSEC);
-        }
-      } catch (InterruptedException e) {
-        // Ok to be interrupted
-      }
-    }
+  /**
+   * @param secureMetastoreAccess - if true, a delegation token will be created
+   *                              and added to the job
+   */
+  public TempletonControllerJob(boolean secureMetastoreAccess) {
+    super();
+    this.secureMetastoreAccess = secureMetastoreAccess;
   }
 
   private JobID submittedJobId;
@@ -365,8 +170,7 @@ public class TempletonControllerJob exte
   public String getSubmittedId() {
     if (submittedJobId == null) {
       return null;
-    }
-    else {
+    } else {
       return submittedJobId.toString();
     }
   }
@@ -376,20 +180,39 @@ public class TempletonControllerJob exte
    * @see org.apache.hive.hcatalog.templeton.CompleteDelegator
    */
   @Override
-  public int run(String[] args)
-    throws IOException, InterruptedException, ClassNotFoundException, TException {
+  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, 
+          TException {
     Configuration conf = getConf();
-    
+
     conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args));
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     conf.set("user.name", user);
+    if(overrideContainerLog4jProps && overrideLog4jURI != null) {
+      //must be done before Job object is created
+      conf.set(OVERRIDE_CONTAINER_LOG4J_PROPS, Boolean.TRUE.toString());
+    }
     Job job = new Job(conf);
-    job.setJarByClass(TempletonControllerJob.class);
-    job.setJobName("TempletonControllerJob");
+    job.setJarByClass(LaunchMapper.class);
+    job.setJobName(TempletonControllerJob.class.getSimpleName());
     job.setMapperClass(LaunchMapper.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Text.class);
     job.setInputFormatClass(SingleInputFormat.class);
+    if(overrideContainerLog4jProps && overrideLog4jURI != null) {
+      FileSystem fs = FileSystem.get(conf);
+      if(fs.exists(new Path(overrideLog4jURI))) {
+        ShimLoader.getHadoopShims().getWebHCatShim(conf, UgiFactory.getUgi(user)).addCacheFile(
+                overrideLog4jURI, job);
+        LOG.debug("added " + overrideLog4jURI + " to Dist Cache");
+      }
+      else {
+        //in case this file was deleted by someone issue a warning but don't try to add to 
+        // DistributedCache as that will throw and fail job submission
+        LOG.warn("Cannot find " + overrideLog4jURI + " which is created on WebHCat startup/job " +
+                "submission.  " + affectedMsg);
+      }
+    }
+
     NullOutputFormat<NullWritable, NullWritable> of = new NullOutputFormat<NullWritable, NullWritable>();
     job.setOutputFormatClass(of.getClass());
     job.setNumReduceTasks(0);
@@ -404,13 +227,16 @@ public class TempletonControllerJob exte
     job.submit();
 
     submittedJobId = job.getJobID();
-
     if(metastoreTokenStrForm != null) {
       //so that it can be cancelled later from CompleteDelegator
       DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(
               submittedJobId.toString(), metastoreTokenStrForm);
-      LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " +
-              "user=" + user);
+      LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() +
+              " user=" + user);
+    }
+    if(overrideContainerLog4jProps && overrideLog4jURI == null) {
+      //do this here so that log msg has JobID
+      LOG.warn("Could not override container log4j properties for " + submittedJobId);
     }
     return 0;
   }

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java Fri Oct 25 17:42:07 2013
@@ -85,10 +85,11 @@ public class TempletonUtils {
     return (col != null) && (!col.isEmpty());
   }
 
-
-  public static final Pattern JAR_COMPLETE
-    = Pattern.compile(" map \\d+%\\s+reduce \\d+%$");
+  //looking for map 100% reduce 100%
+  public static final Pattern JAR_COMPLETE = Pattern.compile(" map \\d+%\\s+reduce \\d+%$");
   public static final Pattern PIG_COMPLETE = Pattern.compile(" \\d+% complete$");
+  //looking for map = 100%,  reduce = 100%
+  public static final Pattern HIVE_COMPLETE = Pattern.compile(" map = \\d+%,\\s+reduce = \\d+%$");
 
   /**
    * Extract the percent complete line from Pig or Jar jobs.
@@ -101,7 +102,19 @@ public class TempletonUtils {
     Matcher pig = PIG_COMPLETE.matcher(line);
     if (pig.find())
       return pig.group().trim();
-
+    
+    Matcher hive = HIVE_COMPLETE.matcher(line);
+    if(hive.find()) {
+      StringBuilder sb = new StringBuilder(hive.group().trim());
+      String[] toRemove = {"= ", ", "};
+      for(String pattern : toRemove) {
+        int pos;
+        while((pos = sb.indexOf(pattern)) >= 0) {
+          sb.delete(pos, pos + pattern.length());
+        }
+      }
+      return sb.toString();//normalized to look like JAR_COMPLETE
+    }
     return null;
   }
 

Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TrivialExecService.java Fri Oct 25 17:42:07 2013
@@ -18,21 +18,30 @@
  */
 package org.apache.hive.hcatalog.templeton.tool;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * Execute a local program.  This is a singleton service that will
  * execute a programs on the local box.
+ * 
+ * Note that is is executed from LaunchMapper which is executed in 
+ * different JVM from WebHCat (Templeton) server.  Thus it should not call any classes
+ * not available on every node in the cluster (outside webhcat jar)
  */
-public class TrivialExecService {
-  private static volatile TrivialExecService theSingleton;
+final class TrivialExecService {
+  //with default log4j config, this output ends up in 'syslog' of the LaunchMapper task
   private static final Log LOG = LogFactory.getLog(TrivialExecService.class);
-
+  private static volatile TrivialExecService theSingleton;
+  private static final String HADOOP_CLIENT_OPTS = "HADOOP_CLIENT_OPTS";
   /**
    * Retrieve the singleton.
    */
@@ -41,32 +50,53 @@ public class TrivialExecService {
       theSingleton = new TrivialExecService();
     return theSingleton;
   }
-
+  /**
+   * See {@link JobSubmissionConstants#CONTAINER_LOG4J_PROPS} file for details.
+   */
+  private static void hadoop2LogRedirect(ProcessBuilder processBuilder) {
+    Map<String, String> env = processBuilder.environment();
+    if(!env.containsKey(HADOOP_CLIENT_OPTS)) {
+      return;
+    }
+    String hcopts = env.get(HADOOP_CLIENT_OPTS);
+    if(!hcopts.contains("log4j.configuration=container-log4j.properties")) {
+      return;
+    }
+    //TempletonControllerJob ensures that this file is in DistributedCache
+    File log4jProps = new File(JobSubmissionConstants.CONTAINER_LOG4J_PROPS);
+    hcopts = hcopts.replace("log4j.configuration=container-log4j.properties",
+            "log4j.configuration=file://" + log4jProps.getAbsolutePath());
+    //helps figure out what log4j is doing, but may confuse 
+    //some jobs due to extra output to stdout
+    //hcopts = hcopts + " -Dlog4j.debug=true";
+    env.put(HADOOP_CLIENT_OPTS, hcopts);
+  }
   public Process run(List<String> cmd, List<String> removeEnv,
-             Map<String, String> environmentVariables)
+             Map<String, String> environmentVariables, boolean overrideContainerLog4jProps)
     throws IOException {
-    logDebugCmd(cmd, environmentVariables);
+    LOG.info("run(cmd, removeEnv, environmentVariables, " + overrideContainerLog4jProps + ")");
+    LOG.info("Starting cmd: " + cmd);
     ProcessBuilder pb = new ProcessBuilder(cmd);
-    for (String key : removeEnv)
+    for (String key : removeEnv) {
+      if(pb.environment().containsKey(key)) {
+        LOG.info("Removing env var: " + key + "=" + pb.environment().get(key));
+      }
       pb.environment().remove(key);
+    }
     pb.environment().putAll(environmentVariables);
+    if(overrideContainerLog4jProps) {
+      hadoop2LogRedirect(pb);
+    }
+    logDebugInfo("Starting process with env:", pb.environment());
     return pb.start();
   }
-
-  private void logDebugCmd(List<String> cmd,
-    Map<String, String> environmentVariables) {
-    if(!LOG.isDebugEnabled()){
-      return;
-    }
-    LOG.debug("starting " + cmd);
-    LOG.debug("With environment variables: " );
-    for(Map.Entry<String, String> keyVal : environmentVariables.entrySet()){
-      LOG.debug(keyVal.getKey() + "=" + keyVal.getValue());
-    }
-    LOG.debug("With environment variables already set: " );
-    Map<String, String> env = System.getenv();
-    for (String envName : env.keySet()) {
-      LOG.debug(envName + "=" + env.get(envName));
-    }
+  private static void logDebugInfo(String msg, Map<String, String> props) {
+    LOG.info(msg);
+    List<String> keys = new ArrayList<String>();
+    keys.addAll(props.keySet());
+    Collections.sort(keys);
+    for(String key : keys) {
+      LOG.info(key + "=" + props.get(key));
+    }    
   }
 }

Modified: hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java (original)
+++ hive/trunk/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/tool/TestTrivialExecService.java Fri Oct 25 17:42:07 2013
@@ -38,7 +38,7 @@ public class TestTrivialExecService {
       Process process = TrivialExecService.getInstance()
         .run(list,
           new ArrayList<String>(),
-          new HashMap<String, String>());
+          new HashMap<String, String>(),false);
       out = new BufferedReader(new InputStreamReader(
         process.getInputStream()));
       err = new BufferedReader(new InputStreamReader(

Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java Fri Oct 25 17:42:07 2013
@@ -18,82 +18,89 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 
 /**
  * This is in org.apache.hadoop.mapred package because it relies on 
  * JobSubmissionProtocol which is package private
  */
 public class WebHCatJTShim20S implements WebHCatJTShim {
-    private JobSubmissionProtocol cnx;
+  private JobSubmissionProtocol cnx;
 
-    /**
-     * Create a connection to the Job Tracker.
-     */
-    public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi)
-            throws IOException {
-      cnx = (JobSubmissionProtocol)
-              RPC.getProxy(JobSubmissionProtocol.class,
-                      JobSubmissionProtocol.versionID,
-                      getAddress(conf),
-                      ugi,
-                      conf,
-                      NetUtils.getSocketFactory(conf,
-                              JobSubmissionProtocol.class));
-    }
-
-    /**
-     * Grab a handle to a job that is already known to the JobTracker.
-     *
-     * @return Profile of the job, or null if not found.
-     */
-    public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid)
-            throws IOException {
-      return cnx.getJobProfile(jobid);
-    }
-
-    /**
-     * Grab a handle to a job that is already known to the JobTracker.
-     *
-     * @return Status of the job, or null if not found.
-     */
-    public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid)
-            throws IOException {
-      return cnx.getJobStatus(jobid);
-    }
-
-
-    /**
-     * Kill a job.
-     */
-    public void killJob(org.apache.hadoop.mapred.JobID jobid)
-            throws IOException {
-      cnx.killJob(jobid);
-    }
-
-    /**
-     * Get all the jobs submitted.
-     */
-    public org.apache.hadoop.mapred.JobStatus[] getAllJobs()
-            throws IOException {
-      return cnx.getAllJobs();
-    }
-
-    /**
-     * Close the connection to the Job Tracker.
-     */
-    public void close() {
-      RPC.stopProxy(cnx);
-    }
-    private InetSocketAddress getAddress(Configuration conf) {
-      String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012");
-      return NetUtils.createSocketAddr(jobTrackerStr);
-    }
+  /**
+   * Create a connection to the Job Tracker.
+   */
+  public WebHCatJTShim20S(Configuration conf, UserGroupInformation ugi)
+          throws IOException {
+    cnx = (JobSubmissionProtocol)
+            RPC.getProxy(JobSubmissionProtocol.class,
+                    JobSubmissionProtocol.versionID,
+                    getAddress(conf),
+                    ugi,
+                    conf,
+                    NetUtils.getSocketFactory(conf,
+                            JobSubmissionProtocol.class));
   }
 
+  /**
+   * Grab a handle to a job that is already known to the JobTracker.
+   *
+   * @return Profile of the job, or null if not found.
+   */
+  public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid)
+          throws IOException {
+    return cnx.getJobProfile(jobid);
+  }
+
+  /**
+   * Grab a handle to a job that is already known to the JobTracker.
+   *
+   * @return Status of the job, or null if not found.
+   */
+  public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid)
+          throws IOException {
+    return cnx.getJobStatus(jobid);
+  }
+
+
+  /**
+   * Kill a job.
+   */
+  public void killJob(org.apache.hadoop.mapred.JobID jobid)
+          throws IOException {
+    cnx.killJob(jobid);
+  }
+
+  /**
+   * Get all the jobs submitted.
+   */
+  public org.apache.hadoop.mapred.JobStatus[] getAllJobs()
+          throws IOException {
+    return cnx.getAllJobs();
+  }
+
+  /**
+   * Close the connection to the Job Tracker.
+   */
+  public void close() {
+    RPC.stopProxy(cnx);
+  }
+  private InetSocketAddress getAddress(Configuration conf) {
+    String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012");
+    return NetUtils.createSocketAddr(jobTrackerStr);
+  }
+  @Override
+  public void addCacheFile(URI uri, Job job) {
+    DistributedCache.addCacheFile(uri, job.getConfiguration());
+  }
+}
+

Modified: hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java (original)
+++ hive/trunk/shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java Fri Oct 25 17:42:07 2013
@@ -18,10 +18,12 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
 
 import java.io.IOException;
+import java.net.URI;
 
 public class WebHCatJTShim23 implements WebHCatJTShim {
   private JobClient jc;
@@ -88,4 +90,8 @@ public class WebHCatJTShim23 implements 
     } catch (IOException e) {
     }
   }
+  @Override
+  public void addCacheFile(URI uri, Job job) {
+    job.addCacheFile(uri);
+  }
 }

Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1535796&r1=1535795&r2=1535796&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Fri Oct 25 17:42:07 2013
@@ -561,6 +561,11 @@ public interface HadoopShims {
      * Close the connection to the Job Tracker.
      */
     public void close();
+    /**
+     * Does exactly what org.apache.hadoop.mapreduce.Job#addCacheFile(URI) in Hadoop 2.
+     * Assumes that both parameters are not {@code null}.
+     */
+    public void addCacheFile(URI uri, Job job);
   }
 
   /**