You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ec...@apache.org on 2012/06/28 02:28:14 UTC

svn commit: r1354781 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/ExecDriver.java exec/MapRedTask.java exec/MapredLocalTask.java io/rcfile/merge/BlockMergeTask.java

Author: ecapriolo
Date: Thu Jun 28 00:28:12 2012
New Revision: 1354781

URL: http://svn.apache.org/viewvc?rev=1354781&view=rev
Log:
HIVE-3127 Pass hconf values as XML instead of command line arguments to child JVM. Kanna Karanam (via egc)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jun 28 00:28:12 2012
@@ -21,13 +21,11 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.net.URL;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,6 +95,7 @@ import org.apache.log4j.varia.NullAppend
 public class ExecDriver extends Task<MapredWork> implements Serializable, HadoopJobExecHook {
 
   private static final long serialVersionUID = 1L;
+  private static final String JOBCONF_FILENAME = "jobconf.xml";
 
   protected transient JobConf job;
   public static MemoryMXBean memoryMXBean;
@@ -533,7 +532,7 @@ public class ExecDriver extends Task<Map
   }
 
   private static void printUsage() {
-    System.err.println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...] "
+    System.err.println("ExecDriver -plan <plan-file> [-jobconffile <job conf file>]"
         + "[-files <file1>[,<file2>] ...]");
     System.exit(1);
   }
@@ -570,7 +569,7 @@ public class ExecDriver extends Task<Map
   public static void main(String[] args) throws IOException, HiveException {
 
     String planFileName = null;
-    ArrayList<String> jobConfArgs = new ArrayList<String>();
+    String jobConfFileName = null;
     boolean noLog = false;
     String files = null;
     boolean localtask = false;
@@ -578,8 +577,8 @@ public class ExecDriver extends Task<Map
       for (int i = 0; i < args.length; i++) {
         if (args[i].equals("-plan")) {
           planFileName = args[++i];
-        } else if (args[i].equals("-jobconf")) {
-          jobConfArgs.add(args[++i]);
+        } else if (args[i].equals("-jobconffile")) {
+          jobConfFileName = args[++i];
         } else if (args[i].equals("-nolog")) {
           noLog = true;
         } else if (args[i].equals("-files")) {
@@ -599,22 +598,9 @@ public class ExecDriver extends Task<Map
     } else {
       conf = new JobConf(ExecDriver.class);
     }
-    StringBuilder sb = new StringBuilder("JobConf:\n");
 
-    for (String one : jobConfArgs) {
-      int eqIndex = one.indexOf('=');
-      if (eqIndex != -1) {
-        try {
-          String key = one.substring(0, eqIndex);
-          String value = URLDecoder.decode(one.substring(eqIndex + 1), "UTF-8");
-          conf.set(key, value);
-          sb.append(key).append("=").append(value).append("\n");
-        } catch (UnsupportedEncodingException e) {
-          System.err.println("Unexpected error " + e.getMessage() + " while encoding "
-              + one.substring(eqIndex + 1));
-          System.exit(3);
-        }
-      }
+    if (jobConfFileName != null) {
+      conf.addResource(new Path(jobConfFileName));
     }
 
     if (files != null) {
@@ -649,9 +635,6 @@ public class ExecDriver extends Task<Map
       }
     }
 
-    // log the list of job conf parameters for reference
-    LOG.info(sb.toString());
-
     // the plan file should always be in local directory
     Path p = new Path(planFileName);
     FileSystem fs = FileSystem.getLocal(conf);
@@ -703,53 +686,44 @@ public class ExecDriver extends Task<Map
    * Given a Hive Configuration object - generate a command line fragment for passing such
    * configuration information to ExecDriver.
    */
-  public static String generateCmdLine(HiveConf hconf) {
+  public static String generateCmdLine(HiveConf hconf, Context ctx)
+      throws IOException {
+    HiveConf tempConf = new HiveConf();
+    Path hConfFilePath = new Path(ctx.getLocalTmpFileURI(), JOBCONF_FILENAME);
+    OutputStream out = null;
+
+    Properties deltaP = hconf.getChangedProperties();
+    boolean hadoopLocalMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
+    String hadoopSysDir = "mapred.system.dir";
+    String hadoopWorkDir = "mapred.local.dir";
+
+    for (Object one : deltaP.keySet()) {
+      String oneProp = (String) one;
+
+      if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
+        continue;
+      }
+
+      tempConf.set(oneProp, deltaP.getProperty(oneProp));
+    }
+
+    // Multiple concurrent local mode job submissions can cause collisions in
+    // working dirs and system dirs
+    // Workaround is to rename map red working dir to a temp dir in such cases
+    if (hadoopLocalMode) {
+      tempConf.set(hadoopSysDir, hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt());
+      tempConf.set(hadoopWorkDir, hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt());
+    }
+
     try {
-      StringBuilder sb = new StringBuilder();
-      Properties deltaP = hconf.getChangedProperties();
-      boolean hadoopLocalMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
-      String hadoopSysDir = "mapred.system.dir";
-      String hadoopWorkDir = "mapred.local.dir";
-
-      for (Object one : deltaP.keySet()) {
-        String oneProp = (String) one;
-
-        if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
-          continue;
-        }
-
-        String oneValue = deltaP.getProperty(oneProp);
-
-        sb.append("-jobconf ");
-        sb.append(oneProp);
-        sb.append("=");
-        sb.append(URLEncoder.encode(oneValue, "UTF-8"));
-        sb.append(" ");
-      }
-
-      // Multiple concurrent local mode job submissions can cause collisions in
-      // working dirs
-      // Workaround is to rename map red working dir to a temp dir in such cases
-
-      if (hadoopLocalMode) {
-        sb.append("-jobconf ");
-        sb.append(hadoopSysDir);
-        sb.append("=");
-        sb.append(URLEncoder.encode(hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt(),
-            "UTF-8"));
-
-        sb.append(" ");
-        sb.append("-jobconf ");
-        sb.append(hadoopWorkDir);
-        sb.append("=");
-        sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt(),
-            "UTF-8"));
-      }
-
-      return sb.toString();
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
+      out = FileSystem.getLocal(hconf).create(hConfFilePath);
+      tempConf.writeXml(out);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
     }
+    return " -jobconffile " + hConfFilePath.toString();
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Thu Jun 28 00:28:12 2012
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
-
 /**
  * Extension of ExecDriver:
  * - can optionally spawn a map-reduce task from a separate jvm
@@ -165,8 +164,9 @@ public class MapRedTask extends ExecDriv
           libJarsOption = " -libjars " + addedJars + "," + auxJars + " ";
         }
       }
+
       // Generate the hiveConfArgs after potentially adding the jars
-      String hiveConfArgs = generateCmdLine(conf);
+      String hiveConfArgs = generateCmdLine(conf, ctx);
 
       // write out the plan to a local file
       Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Thu Jun 28 00:28:12 2012
@@ -129,8 +129,7 @@ public class MapredLocalTask extends Tas
       String jarCmd;
 
       jarCmd = hiveJar + " " + ExecDriver.class.getName();
-
-      String hiveConfArgs = ExecDriver.generateCmdLine(conf);
+      String hiveConfArgs = ExecDriver.generateCmdLine(conf, ctx);
       String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan " + planPath.toString()
           + " " + isSilent + " " + hiveConfArgs;
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Thu Jun 28 00:28:12 2012
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.io.rcf
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -258,18 +256,16 @@ public class BlockMergeTask extends Task
   public static String INPUT_SEPERATOR = ":";
 
   public static void main(String[] args) {
-
-    ArrayList<String> jobConfArgs = new ArrayList<String>();
-
     String inputPathStr = null;
     String outputDir = null;
+    String jobConfFileName = null;
 
     try {
       for (int i = 0; i < args.length; i++) {
         if (args[i].equals("-input")) {
           inputPathStr = args[++i];
-        } else if (args[i].equals("-jobconf")) {
-          jobConfArgs.add(args[++i]);
+        } else if (args[i].equals("-jobconffile")) {
+          jobConfFileName = args[++i];
         } else if (args[i].equals("-outputDir")) {
           outputDir = args[++i];
         }
@@ -312,22 +308,8 @@ public class BlockMergeTask extends Task
       }
     }
 
-    StringBuilder sb = new StringBuilder("JobConf:\n");
-
-    for (String one : jobConfArgs) {
-      int eqIndex = one.indexOf('=');
-      if (eqIndex != -1) {
-        try {
-          String key = one.substring(0, eqIndex);
-          String value = URLDecoder.decode(one.substring(eqIndex + 1), "UTF-8");
-          conf.set(key, value);
-          sb.append(key).append("=").append(value).append("\n");
-        } catch (UnsupportedEncodingException e) {
-          System.err.println("Unexpected error " + e.getMessage()
-              + " while encoding " + one.substring(eqIndex + 1));
-          System.exit(3);
-        }
-      }
+    if (jobConfFileName != null) {
+      conf.addResource(new Path(jobConfFileName));
     }
     HiveConf hiveConf = new HiveConf(conf, BlockMergeTask.class);
 
@@ -347,9 +329,6 @@ public class BlockMergeTask extends Task
       }
     }
 
-    // log the list of job conf parameters for reference
-    LOG.info(sb.toString());
-
     MergeWork mergeWork = new MergeWork(inputPaths, outputDir);
     DriverContext driverCxt = new DriverContext();
     BlockMergeTask taskExec = new BlockMergeTask();
@@ -365,7 +344,7 @@ public class BlockMergeTask extends Task
 
   private static void printUsage() {
     System.err.println("BlockMergeTask -input <colon seperated input paths>  "
-        + "-outputDir outputDir [-jobconf k1=v1 [-jobconf k2=v2] ...] ");
+        + "-outputDir outputDir [-jobconffile <job conf file>] ");
     System.exit(1);
   }