You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ec...@apache.org on 2012/06/28 02:28:14 UTC
svn commit: r1354781 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
exec/ExecDriver.java exec/MapRedTask.java exec/MapredLocalTask.java
io/rcfile/merge/BlockMergeTask.java
Author: ecapriolo
Date: Thu Jun 28 00:28:12 2012
New Revision: 1354781
URL: http://svn.apache.org/viewvc?rev=1354781&view=rev
Log:
HIVE-3127 Pass hconf values as XML instead of command line arguments to child JVM. Kanna Karanam (via egc)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jun 28 00:28:12 2012
@@ -21,13 +21,11 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URL;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -97,6 +95,7 @@ import org.apache.log4j.varia.NullAppend
public class ExecDriver extends Task<MapredWork> implements Serializable, HadoopJobExecHook {
private static final long serialVersionUID = 1L;
+ private static final String JOBCONF_FILENAME = "jobconf.xml";
protected transient JobConf job;
public static MemoryMXBean memoryMXBean;
@@ -533,7 +532,7 @@ public class ExecDriver extends Task<Map
}
private static void printUsage() {
- System.err.println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...] "
+ System.err.println("ExecDriver -plan <plan-file> [-jobconffile <job conf file>]"
+ "[-files <file1>[,<file2>] ...]");
System.exit(1);
}
@@ -570,7 +569,7 @@ public class ExecDriver extends Task<Map
public static void main(String[] args) throws IOException, HiveException {
String planFileName = null;
- ArrayList<String> jobConfArgs = new ArrayList<String>();
+ String jobConfFileName = null;
boolean noLog = false;
String files = null;
boolean localtask = false;
@@ -578,8 +577,8 @@ public class ExecDriver extends Task<Map
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-plan")) {
planFileName = args[++i];
- } else if (args[i].equals("-jobconf")) {
- jobConfArgs.add(args[++i]);
+ } else if (args[i].equals("-jobconffile")) {
+ jobConfFileName = args[++i];
} else if (args[i].equals("-nolog")) {
noLog = true;
} else if (args[i].equals("-files")) {
@@ -599,22 +598,9 @@ public class ExecDriver extends Task<Map
} else {
conf = new JobConf(ExecDriver.class);
}
- StringBuilder sb = new StringBuilder("JobConf:\n");
- for (String one : jobConfArgs) {
- int eqIndex = one.indexOf('=');
- if (eqIndex != -1) {
- try {
- String key = one.substring(0, eqIndex);
- String value = URLDecoder.decode(one.substring(eqIndex + 1), "UTF-8");
- conf.set(key, value);
- sb.append(key).append("=").append(value).append("\n");
- } catch (UnsupportedEncodingException e) {
- System.err.println("Unexpected error " + e.getMessage() + " while encoding "
- + one.substring(eqIndex + 1));
- System.exit(3);
- }
- }
+ if (jobConfFileName != null) {
+ conf.addResource(new Path(jobConfFileName));
}
if (files != null) {
@@ -649,9 +635,6 @@ public class ExecDriver extends Task<Map
}
}
- // log the list of job conf parameters for reference
- LOG.info(sb.toString());
-
// the plan file should always be in local directory
Path p = new Path(planFileName);
FileSystem fs = FileSystem.getLocal(conf);
@@ -703,53 +686,44 @@ public class ExecDriver extends Task<Map
* Given a Hive Configuration object - generate a command line fragment for passing such
* configuration information to ExecDriver.
*/
- public static String generateCmdLine(HiveConf hconf) {
+ public static String generateCmdLine(HiveConf hconf, Context ctx)
+ throws IOException {
+ HiveConf tempConf = new HiveConf();
+ Path hConfFilePath = new Path(ctx.getLocalTmpFileURI(), JOBCONF_FILENAME);
+ OutputStream out = null;
+
+ Properties deltaP = hconf.getChangedProperties();
+ boolean hadoopLocalMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
+ String hadoopSysDir = "mapred.system.dir";
+ String hadoopWorkDir = "mapred.local.dir";
+
+ for (Object one : deltaP.keySet()) {
+ String oneProp = (String) one;
+
+ if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
+ continue;
+ }
+
+ tempConf.set(oneProp, deltaP.getProperty(oneProp));
+ }
+
+ // Multiple concurrent local mode job submissions can cause collisions in
+ // working dirs and system dirs
+ // Workaround is to rename map red working dir to a temp dir in such cases
+ if (hadoopLocalMode) {
+ tempConf.set(hadoopSysDir, hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt());
+ tempConf.set(hadoopWorkDir, hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt());
+ }
+
try {
- StringBuilder sb = new StringBuilder();
- Properties deltaP = hconf.getChangedProperties();
- boolean hadoopLocalMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
- String hadoopSysDir = "mapred.system.dir";
- String hadoopWorkDir = "mapred.local.dir";
-
- for (Object one : deltaP.keySet()) {
- String oneProp = (String) one;
-
- if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
- continue;
- }
-
- String oneValue = deltaP.getProperty(oneProp);
-
- sb.append("-jobconf ");
- sb.append(oneProp);
- sb.append("=");
- sb.append(URLEncoder.encode(oneValue, "UTF-8"));
- sb.append(" ");
- }
-
- // Multiple concurrent local mode job submissions can cause collisions in
- // working dirs
- // Workaround is to rename map red working dir to a temp dir in such cases
-
- if (hadoopLocalMode) {
- sb.append("-jobconf ");
- sb.append(hadoopSysDir);
- sb.append("=");
- sb.append(URLEncoder.encode(hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt(),
- "UTF-8"));
-
- sb.append(" ");
- sb.append("-jobconf ");
- sb.append(hadoopWorkDir);
- sb.append("=");
- sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt(),
- "UTF-8"));
- }
-
- return sb.toString();
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
+ out = FileSystem.getLocal(hconf).create(hConfFilePath);
+ tempConf.writeXml(out);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
}
+ return " -jobconffile " + hConfFilePath.toString();
}
@Override
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Thu Jun 28 00:28:12 2012
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
-
/**
* Extension of ExecDriver:
* - can optionally spawn a map-reduce task from a separate jvm
@@ -165,8 +164,9 @@ public class MapRedTask extends ExecDriv
libJarsOption = " -libjars " + addedJars + "," + auxJars + " ";
}
}
+
// Generate the hiveConfArgs after potentially adding the jars
- String hiveConfArgs = generateCmdLine(conf);
+ String hiveConfArgs = generateCmdLine(conf, ctx);
// write out the plan to a local file
Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Thu Jun 28 00:28:12 2012
@@ -129,8 +129,7 @@ public class MapredLocalTask extends Tas
String jarCmd;
jarCmd = hiveJar + " " + ExecDriver.class.getName();
-
- String hiveConfArgs = ExecDriver.generateCmdLine(conf);
+ String hiveConfArgs = ExecDriver.generateCmdLine(conf, ctx);
String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan " + planPath.toString()
+ " " + isSilent + " " + hiveConfArgs;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1354781&r1=1354780&r2=1354781&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Thu Jun 28 00:28:12 2012
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.io.rcf
import java.io.IOException;
import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
@@ -258,18 +256,16 @@ public class BlockMergeTask extends Task
public static String INPUT_SEPERATOR = ":";
public static void main(String[] args) {
-
- ArrayList<String> jobConfArgs = new ArrayList<String>();
-
String inputPathStr = null;
String outputDir = null;
+ String jobConfFileName = null;
try {
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-input")) {
inputPathStr = args[++i];
- } else if (args[i].equals("-jobconf")) {
- jobConfArgs.add(args[++i]);
+ } else if (args[i].equals("-jobconffile")) {
+ jobConfFileName = args[++i];
} else if (args[i].equals("-outputDir")) {
outputDir = args[++i];
}
@@ -312,22 +308,8 @@ public class BlockMergeTask extends Task
}
}
- StringBuilder sb = new StringBuilder("JobConf:\n");
-
- for (String one : jobConfArgs) {
- int eqIndex = one.indexOf('=');
- if (eqIndex != -1) {
- try {
- String key = one.substring(0, eqIndex);
- String value = URLDecoder.decode(one.substring(eqIndex + 1), "UTF-8");
- conf.set(key, value);
- sb.append(key).append("=").append(value).append("\n");
- } catch (UnsupportedEncodingException e) {
- System.err.println("Unexpected error " + e.getMessage()
- + " while encoding " + one.substring(eqIndex + 1));
- System.exit(3);
- }
- }
+ if (jobConfFileName != null) {
+ conf.addResource(new Path(jobConfFileName));
}
HiveConf hiveConf = new HiveConf(conf, BlockMergeTask.class);
@@ -347,9 +329,6 @@ public class BlockMergeTask extends Task
}
}
- // log the list of job conf parameters for reference
- LOG.info(sb.toString());
-
MergeWork mergeWork = new MergeWork(inputPaths, outputDir);
DriverContext driverCxt = new DriverContext();
BlockMergeTask taskExec = new BlockMergeTask();
@@ -365,7 +344,7 @@ public class BlockMergeTask extends Task
private static void printUsage() {
System.err.println("BlockMergeTask -input <colon seperated input paths> "
- + "-outputDir outputDir [-jobconf k1=v1 [-jobconf k2=v2] ...] ");
+ + "-outputDir outputDir [-jobconffile <job conf file>] ");
System.exit(1);
}