You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [4/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jan 21 10:37:58 2010
@@ -18,48 +18,62 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
-import java.net.URLEncoder;
import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
-import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.StringUtils;
-
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
-import org.apache.hadoop.hive.ql.io.*;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.varia.NullAppender;
-import java.lang.ClassNotFoundException;
-import org.apache.hadoop.hive.common.FileUtils;
public class ExecDriver extends Task<mapredWork> implements Serializable {
@@ -69,8 +83,9 @@
transient protected int mapProgress = 0;
transient protected int reduceProgress = 0;
transient protected boolean success = false; // if job execution is successful
-
+
public static Random randGen = new Random();
+
/**
* Constructor when invoked from QL
*/
@@ -78,7 +93,8 @@
super();
}
- public static String getResourceFiles(Configuration conf, SessionState.ResourceType t) {
+ public static String getResourceFiles(Configuration conf,
+ SessionState.ResourceType t) {
// fill in local files to be added to the task environment
SessionState ss = SessionState.get();
Set<String> files = (ss == null) ? null : ss.list_resource(t, null);
@@ -109,7 +125,8 @@
* Initialization when invoked from QL
*/
@Override
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+ public void initialize(HiveConf conf, QueryPlan queryPlan,
+ DriverContext driverContext) {
super.initialize(conf, queryPlan, driverContext);
job = new JobConf(conf, ExecDriver.class);
// NOTE: initialize is only called if it is in non-local mode.
@@ -127,7 +144,8 @@
if (StringUtils.isNotBlank(addedJars)) {
HiveConf.setVar(job, ConfVars.HIVEADDEDJARS, addedJars);
}
- String addedArchives = getResourceFiles(job, SessionState.ResourceType.ARCHIVE);
+ String addedArchives = getResourceFiles(job,
+ SessionState.ResourceType.ARCHIVE);
if (StringUtils.isNotBlank(addedArchives)) {
HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives);
}
@@ -149,35 +167,35 @@
* used to kill all running jobs in the event of an unexpected shutdown -
* i.e., the JVM shuts down while there are still jobs running.
*/
- public static Map<String, String> runningJobKillURIs
- = Collections.synchronizedMap(new HashMap<String, String>());
+ public static Map<String, String> runningJobKillURIs = Collections
+ .synchronizedMap(new HashMap<String, String>());
/**
* In Hive, when the user control-c's the command line, any running jobs
* spawned from that command line are best-effort killed.
- *
+ *
* This static constructor registers a shutdown thread to iterate over all the
* running job kill URLs and do a get on them.
- *
+ *
*/
static {
if (new org.apache.hadoop.conf.Configuration().getBoolean(
"webinterface.private.actions", false)) {
Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
public void run() {
- synchronized(runningJobKillURIs) {
- for (Iterator<String> elems = runningJobKillURIs.values().iterator(); elems
- .hasNext();) {
- String uri = elems.next();
+ synchronized (runningJobKillURIs) {
+ for (String uri : runningJobKillURIs.values()) {
try {
System.err.println("killing job with: " + uri);
- java.net.HttpURLConnection conn = (java.net.HttpURLConnection)
- new java.net.URL(uri).openConnection();
+ java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(
+ uri).openConnection();
conn.setRequestMethod("POST");
int retCode = conn.getResponseCode();
if (retCode != 200) {
- System.err.println("Got an error trying to kill job with URI: "
- + uri + " = " + retCode);
+ System.err
+ .println("Got an error trying to kill job with URI: "
+ + uri + " = " + retCode);
}
} catch (Exception e) {
System.err.println("trying to kill job, caught: " + e);
@@ -200,11 +218,11 @@
String hp = job.get("mapred.job.tracker");
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setTaskProperty(
- SessionState.get().getQueryId(), getId(),
- Keys.TASK_HADOOP_ID, rj.getJobID());
+ SessionState.get().getQueryId(), getId(), Keys.TASK_HADOOP_ID,
+ rj.getJobID());
}
- console.printInfo(ExecDriver.getJobEndMsg(rj.getJobID()) + ", Tracking URL = "
- + rj.getTrackingURL());
+ console.printInfo(ExecDriver.getJobEndMsg(rj.getJobID())
+ + ", Tracking URL = " + rj.getTrackingURL());
console.printInfo("Kill Command = "
+ HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
+ " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
@@ -212,66 +230,73 @@
}
/**
- * This class contains the state of the running task
- * Going forward, we will return this handle from execute
- * and Driver can split execute into start, monitorProgess and postProcess
+ * This class contains the state of the running task Going forward, we will
+ * return this handle from execute and Driver can split execute into start,
+ * monitorProgess and postProcess
*/
public static class ExecDriverTaskHandle extends TaskHandle {
JobClient jc;
RunningJob rj;
+
JobClient getJobClient() {
return jc;
}
+
RunningJob getRunningJob() {
return rj;
}
+
public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
this.jc = jc;
this.rj = rj;
}
+
public void setRunningJob(RunningJob job) {
- this.rj = job;
+ rj = job;
}
+
public Counters getCounters() throws IOException {
return rj.getCounters();
}
}
-
+
/**
* Fatal errors are those errors that cannot be recovered by retries. These
- * are application dependent. Examples of fatal errors include:
- * - the small table in the map-side joins is too large to be feasible to be
- * handled by one mapper. The job should fail and the user should be warned
- * to use regular joins rather than map-side joins.
- * Fatal errors are indicated by counters that are set at execution time.
- * If the counter is non-zero, a fatal error occurred. The value of the counter
- * indicates the error type.
- * @return true if fatal errors happened during job execution, false otherwise.
+ * are application dependent. Examples of fatal errors include: - the small
+ * table in the map-side joins is too large to be feasible to be handled by
+ * one mapper. The job should fail and the user should be warned to use
+ * regular joins rather than map-side joins. Fatal errors are indicated by
+ * counters that are set at execution time. If the counter is non-zero, a
+ * fatal error occurred. The value of the counter indicates the error type.
+ *
+ * @return true if fatal errors happened during job execution, false
+ * otherwise.
*/
protected boolean checkFatalErrors(TaskHandle t, StringBuffer errMsg) {
ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
RunningJob rj = th.getRunningJob();
try {
Counters ctrs = th.getCounters();
- for (Operator<? extends Serializable> op: work.getAliasToWork().values()) {
- if (op.checkFatalErrors(ctrs, errMsg))
+ for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+ if (op.checkFatalErrors(ctrs, errMsg)) {
return true;
+ }
}
return false;
- } catch (IOException e) {
+ } catch (IOException e) {
// this exception can be tolerated
e.printStackTrace();
return false;
}
}
-
+
public void progress(TaskHandle taskHandle) throws IOException {
- ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle;
+ ExecDriverTaskHandle th = (ExecDriverTaskHandle) taskHandle;
JobClient jc = th.getJobClient();
RunningJob rj = th.getRunningJob();
String lastReport = "";
- SimpleDateFormat dateFormat
- = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ SimpleDateFormat dateFormat = new SimpleDateFormat(
+ "yyyy-MM-dd HH:mm:ss,SSS");
long reportTime = System.currentTimeMillis();
long maxReportInterval = 60 * 1000; // One minute
boolean fatal = false;
@@ -282,38 +307,41 @@
} catch (InterruptedException e) {
}
th.setRunningJob(jc.getJob(rj.getJobID()));
-
+
// If fatal errors happen we should kill the job immediately rather than
// let the job retry several times, which eventually lead to failure.
- if (fatal)
- continue; // wait until rj.isComplete
- if ( fatal = checkFatalErrors(th, errMsg)) {
+ if (fatal) {
+ continue; // wait until rj.isComplete
+ }
+ if (fatal = checkFatalErrors(th, errMsg)) {
success = false;
- console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job.");
+ console.printError("[Fatal Error] " + errMsg.toString()
+ + ". Killing the job.");
rj.killJob();
continue;
}
errMsg.setLength(0);
-
+
updateCounters(th);
- String report = " "+getId()+" map = " + this.mapProgress + "%, reduce = " + this.reduceProgress + "%";
-
+ String report = " " + getId() + " map = " + mapProgress + "%, reduce = "
+ + reduceProgress + "%";
+
if (!report.equals(lastReport)
|| System.currentTimeMillis() >= reportTime + maxReportInterval) {
// write out serialized plan with counters to log file
// LOG.info(queryPlan);
- String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
+ String output = dateFormat.format(Calendar.getInstance().getTime())
+ + report;
SessionState ss = SessionState.get();
if (ss != null) {
- ss.getHiveHistory().setTaskCounters(
- SessionState.get().getQueryId(), getId(), rj);
- ss.getHiveHistory().setTaskProperty(
- SessionState.get().getQueryId(), getId(),
- Keys.TASK_HADOOP_PROGRESS, output);
- ss.getHiveHistory().progressTask(
- SessionState.get().getQueryId(), this);
+ ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(),
+ getId(), rj);
+ ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
+ getId(), Keys.TASK_HADOOP_PROGRESS, output);
+ ss.getHiveHistory().progressTask(SessionState.get().getQueryId(),
+ this);
ss.getHiveHistory().logPlanProgress(queryPlan);
}
console.printInfo(output);
@@ -321,13 +349,15 @@
reportTime = System.currentTimeMillis();
}
}
- // check for fatal error again in case it occurred after the last check before the job is completed
- if ( !fatal && (fatal = checkFatalErrors(th, errMsg))) {
+ // check for fatal error again in case it occurred after the last check
+ // before the job is completed
+ if (!fatal && (fatal = checkFatalErrors(th, errMsg))) {
console.printError("[Fatal Error] " + errMsg.toString());
success = false;
- } else
+ } else {
success = rj.isSuccessful();
-
+ }
+
setDone();
th.setRunningJob(jc.getJob(rj.getJobID()));
updateCounters(th);
@@ -335,15 +365,17 @@
if (ss != null) {
ss.getHiveHistory().logPlanProgress(queryPlan);
}
- //LOG.info(queryPlan);
+ // LOG.info(queryPlan);
}
/**
* Estimate the number of reducers needed for this job, based on job input,
* and configuration parameters.
+ *
* @return the number of reducers.
*/
- public int estimateNumberOfReducers(HiveConf hive, JobConf job, mapredWork work) throws IOException {
+ public int estimateNumberOfReducers(HiveConf hive, JobConf job,
+ mapredWork work) throws IOException {
if (hive == null) {
hive = new HiveConf();
}
@@ -351,10 +383,10 @@
int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
long totalInputFileSize = getTotalInputFileSize(job, work);
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
- + " totalInputFileSize=" + totalInputFileSize);
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " totalInputFileSize=" + totalInputFileSize);
- int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+ int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
return reducers;
@@ -367,41 +399,54 @@
// this is a temporary hack to fix things that are not fixed in the compiler
Integer numReducersFromWork = work.getNumReduceTasks();
- if(work.getReducer() == null) {
- console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
+ if (work.getReducer() == null) {
+ console
+ .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
work.setNumReduceTasks(Integer.valueOf(0));
} else {
if (numReducersFromWork >= 0) {
- console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
+ console.printInfo("Number of reduce tasks determined at compile time: "
+ + work.getNumReduceTasks());
} else if (job.getNumReduceTasks() > 0) {
int reducers = job.getNumReduceTasks();
work.setNumReduceTasks(reducers);
- console.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " + reducers);
+ console
+ .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ + reducers);
} else {
int reducers = estimateNumberOfReducers(conf, job, work);
work.setNumReduceTasks(reducers);
- console.printInfo("Number of reduce tasks not specified. Estimated from input data size: " + reducers);
+ console
+ .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+ + reducers);
}
- console.printInfo("In order to change the average load for a reducer (in bytes):");
- console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "=<number>");
+ console
+ .printInfo("In order to change the average load for a reducer (in bytes):");
+ console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
+ + "=<number>");
console.printInfo("In order to limit the maximum number of reducers:");
- console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname + "=<number>");
+ console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname
+ + "=<number>");
console.printInfo("In order to set a constant number of reducers:");
- console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
+ console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
+ + "=<number>");
}
}
/**
* Calculate the total size of input files.
- * @param job the hadoop job conf.
+ *
+ * @param job
+ * the hadoop job conf.
* @return the total size in bytes.
* @throws IOException
*/
- public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {
+ public long getTotalInputFileSize(JobConf job, mapredWork work)
+ throws IOException {
long r = 0;
// For each input path, calculate the total size.
- for (String path: work.getPathToAliases().keySet()) {
+ for (String path : work.getPathToAliases().keySet()) {
try {
Path p = new Path(path);
FileSystem fs = p.getFileSystem(job);
@@ -419,14 +464,16 @@
*/
@Override
public void updateCounters(TaskHandle t) throws IOException {
- ExecDriverTaskHandle th = (ExecDriverTaskHandle)t;
+ ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
RunningJob rj = th.getRunningJob();
- this.mapProgress = Math.round(rj.mapProgress() * 100);
- this.reduceProgress = Math.round(rj.reduceProgress() * 100);
- taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long.valueOf(this.mapProgress));
- taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long.valueOf(this.reduceProgress));
+ mapProgress = Math.round(rj.mapProgress() * 100);
+ reduceProgress = Math.round(rj.reduceProgress() * 100);
+ taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long
+ .valueOf(mapProgress));
+ taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long
+ .valueOf(reduceProgress));
Counters ctrs = th.getCounters();
- for (Operator<? extends Serializable> op: work.getAliasToWork().values()) {
+ for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
op.updateCounters(ctrs);
}
if (work.getReducer() != null) {
@@ -450,21 +497,20 @@
return reduceProgress == 100;
}
-
/**
* Execute a query plan using Hadoop
*/
public int execute() {
-
+
success = true;
-
+
try {
setNumberOfReducers();
- } catch(IOException e) {
+ } catch (IOException e) {
String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: "
- + e.getMessage();
+ + e.getMessage();
console.printError(statusMesg, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
return 1;
}
@@ -473,16 +519,17 @@
throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
}
-
String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
- String jobScratchDirStr = hiveScratchDir + File.separator+ Utilities.randGen.nextInt();
- Path jobScratchDir = new Path(jobScratchDirStr);
+ String jobScratchDirStr = hiveScratchDir + File.separator
+ + Utilities.randGen.nextInt();
+ Path jobScratchDir = new Path(jobScratchDirStr);
String emptyScratchDirStr = null;
- Path emptyScratchDir = null;
+ Path emptyScratchDir = null;
int numTries = 3;
while (numTries > 0) {
- emptyScratchDirStr = hiveScratchDir + File.separator + Utilities.randGen.nextInt();
+ emptyScratchDirStr = hiveScratchDir + File.separator
+ + Utilities.randGen.nextInt();
emptyScratchDir = new Path(emptyScratchDirStr);
try {
@@ -490,10 +537,12 @@
fs.mkdirs(emptyScratchDir);
break;
} catch (Exception e) {
- if (numTries > 0)
+ if (numTries > 0) {
numTries--;
- else
- throw new RuntimeException("Failed to make dir " + emptyScratchDir.toString() + " : " + e.getMessage());
+ } else {
+ throw new RuntimeException("Failed to make dir "
+ + emptyScratchDir.toString() + " : " + e.getMessage());
+ }
}
}
@@ -507,17 +556,19 @@
job.setReducerClass(ExecReducer.class);
// Turn on speculative execution for reducers
- HiveConf.setVar(job,HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
- HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS));
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
+ HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS));
String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
- if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat)))
+ if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+ }
LOG.info("Using " + inpFormat);
try {
- job.setInputFormat((Class<? extends InputFormat>)(Class.forName(inpFormat)));
+ job.setInputFormat((Class<? extends InputFormat>) (Class
+ .forName(inpFormat)));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getMessage());
}
@@ -526,14 +577,14 @@
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
- // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands it
+ // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands
+ // it
String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS);
if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) {
- String allJars =
- StringUtils.isNotBlank(auxJars)
- ? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
- : addedJars;
+ String allJars = StringUtils.isNotBlank(auxJars) ? (StringUtils
+ .isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
+ : addedJars;
LOG.info("adding libjars: " + allJars);
initializeFiles("tmpjars", allJars);
}
@@ -544,7 +595,8 @@
initializeFiles("tmpfiles", addedFiles);
}
// Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it
- String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES);
+ String addedArchives = HiveConf.getVar(job,
+ HiveConf.ConfVars.HIVEADDEDARCHIVES);
if (StringUtils.isNotBlank(addedArchives)) {
initializeFiles("tmparchives", addedArchives);
}
@@ -552,12 +604,13 @@
int returnVal = 0;
RunningJob rj = null, orig_rj = null;
- boolean noName = StringUtils.isEmpty(HiveConf.
- getVar(job,HiveConf.ConfVars.HADOOPJOBNAME));
+ boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
+ HiveConf.ConfVars.HADOOPJOBNAME));
- if(noName) {
+ if (noName) {
// This is for a special case to ensure unit tests pass
- HiveConf.setVar(job,HiveConf.ConfVars.HADOOPJOBNAME, "JOB"+randGen.nextInt());
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB"
+ + randGen.nextInt());
}
try {
@@ -565,20 +618,22 @@
Utilities.setMapRedWork(job, work);
- // remove the pwd from conf file so that job tracker doesn't show this logs
+ // remove the pwd from conf file so that job tracker doesn't show this
+ // logs
String pwd = job.get(HiveConf.ConfVars.METASTOREPWD.varname);
- if (pwd != null)
+ if (pwd != null) {
job.set(HiveConf.ConfVars.METASTOREPWD.varname, "HIVE");
+ }
JobClient jc = new JobClient(job);
-
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
orig_rj = rj = jc.submitJob(job);
// replace it back
- if (pwd != null)
+ if (pwd != null) {
job.set(HiveConf.ConfVars.METASTOREPWD.varname, pwd);
+ }
// add to list of running jobs so in case of abnormal shutdown can kill
// it.
@@ -590,11 +645,12 @@
progress(th); // success status will be setup inside progress
if (rj == null) {
- // in the corner case where the running job has disappeared from JT memory
+ // in the corner case where the running job has disappeared from JT
+ // memory
// remember that we did actually submit the job.
rj = orig_rj;
success = false;
- }
+ }
String statusMesg = getJobEndMsg(rj.getJobID());
if (!success) {
@@ -636,41 +692,44 @@
try {
if (rj != null) {
- if(work.getAliasToWork() != null) {
- for(Operator<? extends Serializable> op:
- work.getAliasToWork().values()) {
+ if (work.getAliasToWork() != null) {
+ for (Operator<? extends Serializable> op : work.getAliasToWork()
+ .values()) {
op.jobClose(job, success);
}
}
- if(work.getReducer() != null) {
+ if (work.getReducer() != null) {
work.getReducer().jobClose(job, success);
}
}
} catch (Exception e) {
// jobClose needs to execute successfully otherwise fail task
- if(success) {
+ if (success) {
success = false;
returnVal = 3;
- String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
+ String mesg = "Job Commit failed with exception '"
+ + Utilities.getNameMessage(e) + "'";
console.printError(mesg, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
}
}
return (returnVal);
}
-
+
/**
* this msg pattern is used to track when a job is started
+ *
* @param jobId
* @return
*/
public static String getJobStartMsg(String jobId) {
return "Starting Job = " + jobId;
}
-
+
/**
* this msg pattern is used to track when a job is successfully done.
+ *
* @param jobId
* @return
*/
@@ -678,29 +737,33 @@
return "Ended Job = " + jobId;
}
- private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
+ private void showJobFailDebugInfo(JobConf conf, RunningJob rj)
+ throws IOException {
Map<String, Integer> failures = new HashMap<String, Integer>();
- Set<String> successes = new HashSet<String> ();
- Map<String, String> taskToJob = new HashMap<String,String>();
+ Set<String> successes = new HashSet<String>();
+ Map<String, String> taskToJob = new HashMap<String, String>();
int startIndex = 0;
- while(true) {
- TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
+ while (true) {
+ TaskCompletionEvent[] taskCompletions = rj
+ .getTaskCompletionEvents(startIndex);
- if(taskCompletions == null || taskCompletions.length == 0) {
+ if (taskCompletions == null || taskCompletions.length == 0) {
break;
}
boolean more = true;
- for(TaskCompletionEvent t : taskCompletions) {
- // getTaskJobIDs return Strings for compatibility with Hadoop version without
+ for (TaskCompletionEvent t : taskCompletions) {
+ // getTaskJobIDs return Strings for compatibility with Hadoop version
+ // without
// TaskID or TaskAttemptID
- String [] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
+ String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
- if(taskJobIds == null) {
- console.printError("Task attempt info is unavailable in this Hadoop version");
+ if (taskJobIds == null) {
+ console
+ .printError("Task attempt info is unavailable in this Hadoop version");
more = false;
break;
}
@@ -709,9 +772,9 @@
String jobId = taskJobIds[1];
taskToJob.put(taskId, jobId);
- if(t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
+ if (t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
Integer failAttempts = failures.get(taskId);
- if(failAttempts == null) {
+ if (failAttempts == null) {
failAttempts = Integer.valueOf(0);
}
failAttempts = Integer.valueOf(failAttempts.intValue() + 1);
@@ -720,36 +783,39 @@
successes.add(taskId);
}
}
- if(!more) {
+ if (!more) {
break;
}
startIndex += taskCompletions.length;
}
// Remove failures for tasks that succeeded
- for(String task : successes) {
+ for (String task : successes) {
failures.remove(task);
}
- if(failures.keySet().size() == 0) {
+ if (failures.keySet().size() == 0) {
return;
}
// Find the highest failure count
int maxFailures = 0;
- for(Integer failCount : failures.values()) {
- if(maxFailures < failCount.intValue())
+ for (Integer failCount : failures.values()) {
+ if (maxFailures < failCount.intValue()) {
maxFailures = failCount.intValue();
+ }
}
// Display Error Message for tasks with the highest failure count
- console.printError("\nFailed tasks with most" + "(" + maxFailures + ")" + " failures " + ": ");
+ console.printError("\nFailed tasks with most" + "(" + maxFailures + ")"
+ + " failures " + ": ");
String jtUrl = JobTrackerURLResolver.getURL(conf);
- for(String task : failures.keySet()) {
- if(failures.get(task).intValue() == maxFailures) {
+ for (String task : failures.keySet()) {
+ if (failures.get(task).intValue() == maxFailures) {
String jobId = taskToJob.get(task);
- String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid=" + task.toString();
- console.printError("Task URL: " + taskUrl +"\n");
+ String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid="
+ + task.toString();
+ console.printError("Task URL: " + taskUrl + "\n");
// Only print out one task because that's good enough for debugging.
break;
}
@@ -845,14 +911,18 @@
// see also - code in CliDriver.java
ClassLoader loader = conf.getClassLoader();
if (StringUtils.isNotBlank(auxJars)) {
- loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
+ loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars,
+ ","));
}
if (StringUtils.isNotBlank(addedJars)) {
- loader = Utilities.addToClassPath(loader, StringUtils.split(addedJars, ","));
+ loader = Utilities.addToClassPath(loader, StringUtils.split(
+ addedJars, ","));
}
conf.setClassLoader(loader);
- // Also set this to the Thread ContextClassLoader, so new threads will inherit
- // this class loader, and propagate into newly created Configurations by those
+ // Also set this to the Thread ContextClassLoader, so new threads will
+ // inherit
+ // this class loader, and propagate into newly created Configurations by
+ // those
// new threads.
Thread.currentThread().setContextClassLoader(loader);
} catch (Exception e) {
@@ -887,8 +957,9 @@
String oneProp = (String) one;
if (localMode
- && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir)))
+ && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
continue;
+ }
String oneValue = deltaP.getProperty(oneProp);
@@ -941,8 +1012,9 @@
if (inpFs.exists(dirPath)) {
FileStatus[] fStats = inpFs.listStatus(dirPath);
- if (fStats.length > 0)
+ if (fStats.length > 0) {
return false;
+ }
}
return true;
}
@@ -950,18 +1022,22 @@
/**
* Handle a empty/null path for a given alias
*/
- private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths,
- boolean isEmptyPath, String alias) throws Exception {
+ private int addInputPath(String path, JobConf job, mapredWork work,
+ String hiveScratchDir, int numEmptyPaths, boolean isEmptyPath,
+ String alias) throws Exception {
// either the directory does not exist or it is empty
assert path == null || isEmptyPath;
// The input file does not exist, replace it by a empty file
Class<? extends HiveOutputFormat> outFileFormat = null;
- if (isEmptyPath)
- outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc().getOutputFileFormatClass();
- else
- outFileFormat = work.getAliasToPartnInfo().get(alias).getTableDesc().getOutputFileFormatClass();
+ if (isEmptyPath) {
+ outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc()
+ .getOutputFileFormatClass();
+ } else {
+ outFileFormat = work.getAliasToPartnInfo().get(alias).getTableDesc()
+ .getOutputFileFormatClass();
+ }
// create a dummy empty file in a new directory
String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
@@ -974,13 +1050,13 @@
LOG.info("Changed input file to " + newPath.toString());
// toggle the work
- LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = work
+ .getPathToAliases();
if (isEmptyPath) {
assert path != null;
pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
pathToAliases.remove(path);
- }
- else {
+ } else {
assert path == null;
ArrayList<String> newList = new ArrayList<String>();
newList.add(alias);
@@ -989,25 +1065,28 @@
work.setPathToAliases(pathToAliases);
- LinkedHashMap<String,partitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+ LinkedHashMap<String, partitionDesc> pathToPartitionInfo = work
+ .getPathToPartitionInfo();
if (isEmptyPath) {
- pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
+ pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo
+ .get(path));
pathToPartitionInfo.remove(path);
- }
- else {
+ } else {
partitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
}
work.setPathToPartitionInfo(pathToPartitionInfo);
String onefile = newPath.toString();
- RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, new Properties(), null);
+ RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(
+ job, newFilePath, Text.class, false, new Properties(), null);
recWriter.close(false);
FileInputFormat.addInputPaths(job, onefile);
return numEmptyPaths;
}
- private void addInputPaths(JobConf job, mapredWork work, String hiveScratchDir) throws Exception {
+ private void addInputPaths(JobConf job, mapredWork work, String hiveScratchDir)
+ throws Exception {
int numEmptyPaths = 0;
List<String> pathsProcessed = new ArrayList<String>();
@@ -1015,7 +1094,7 @@
// AliasToWork contains all the aliases
for (String oneAlias : work.getAliasToWork().keySet()) {
LOG.info("Processing alias " + oneAlias);
- List<String> emptyPaths = new ArrayList<String>();
+ List<String> emptyPaths = new ArrayList<String>();
// The alias may not have any path
String path = null;
@@ -1024,31 +1103,41 @@
if (aliases.contains(oneAlias)) {
path = onefile;
- // Multiple aliases can point to the same path - it should be processed only once
- if (pathsProcessed.contains(path))
+ // Multiple aliases can point to the same path - it should be
+ // processed only once
+ if (pathsProcessed.contains(path)) {
continue;
+ }
pathsProcessed.add(path);
LOG.info("Adding input file " + path);
- if (!isEmptyPath(job, path))
+ if (!isEmptyPath(job, path)) {
FileInputFormat.addInputPaths(job, path);
- else
+ } else {
emptyPaths.add(path);
+ }
}
}
// Create a empty file if the directory is empty
- for (String emptyPath : emptyPaths)
- numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true, oneAlias);
+ for (String emptyPath : emptyPaths) {
+ numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir,
+ numEmptyPaths, true, oneAlias);
+ }
// If the query references non-existent partitions
- // We need to add a empty file, it is not acceptable to change the operator tree
+ // We need to add a empty file, it is not acceptable to change the
+ // operator tree
// Consider the query:
- // select * from (select count(1) from T union all select count(1) from T2) x;
- // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 rows)
- if (path == null)
- numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias);
+ // select * from (select count(1) from T union all select count(1) from
+ // T2) x;
+ // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
+ // rows)
+ if (path == null) {
+ numEmptyPaths = addInputPath(null, job, work, hiveScratchDir,
+ numEmptyPaths, false, oneAlias);
+ }
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Thu Jan 21 10:37:58 2010
@@ -56,17 +56,19 @@
private MemoryMXBean memoryMXBean;
private long numRows = 0;
private long nextCntr = 1;
-
+
+ @Override
public void configure(JobConf job) {
- // Allocate the bean at the beginning -
+ // Allocate the bean at the beginning -
memoryMXBean = ManagementFactory.getMemoryMXBean();
l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
+
try {
- l4j.info("conf classpath = "
- + Arrays.asList(((URLClassLoader)job.getClassLoader()).getURLs()));
- l4j.info("thread classpath = "
- + Arrays.asList(((URLClassLoader)Thread.currentThread().getContextClassLoader()).getURLs()));
+ l4j.info("conf classpath = "
+ + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+ l4j.info("thread classpath = "
+ + Arrays.asList(((URLClassLoader) Thread.currentThread()
+ .getContextClassLoader()).getURLs()));
} catch (Exception e) {
l4j.info("cannot get classpath: " + e.getMessage());
}
@@ -88,35 +90,39 @@
}
fetchOperators = new HashMap<String, FetchOperator>();
// create map local operators
- for (Map.Entry<String, fetchWork> entry : localWork.getAliasToFetchWork().entrySet()) {
- fetchOperators.put(entry.getKey(), new FetchOperator(entry.getValue(), job));
+ for (Map.Entry<String, fetchWork> entry : localWork.getAliasToFetchWork()
+ .entrySet()) {
+ fetchOperators.put(entry.getKey(), new FetchOperator(entry.getValue(),
+ job));
l4j.info("fetchoperator for " + entry.getKey() + " created");
}
// initialize map local operators
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- Operator<? extends Serializable> forwardOp = localWork.getAliasToWork().get(entry.getKey());
+ Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
+ .get(entry.getKey());
// All the operators need to be initialized before process
- forwardOp.initialize(jc, new ObjectInspector[]{entry.getValue().getOutputObjectInspector()});
+ forwardOp.initialize(jc, new ObjectInspector[] { entry.getValue()
+ .getOutputObjectInspector() });
l4j.info("fetchoperator for " + entry.getKey() + " initialized");
}
- // defer processing of map local operators to first row if in case there is no input (??)
+ // defer processing of map local operators to first row if in case there
+ // is no input (??)
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
// will this be true here?
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
} else {
- throw new RuntimeException ("Map operator initialization failed", e);
+ throw new RuntimeException("Map operator initialization failed", e);
}
}
}
- public void map(Object key, Object value,
- OutputCollector output,
- Reporter reporter) throws IOException {
- if(oc == null) {
+ public void map(Object key, Object value, OutputCollector output,
+ Reporter reporter) throws IOException {
+ if (oc == null) {
oc = output;
rp = reporter;
mo.setOutputCollector(oc);
@@ -126,11 +132,13 @@
try {
mapredLocalWork localWork = mo.getConf().getMapLocalWork();
int fetchOpNum = 0;
- for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+ for (Map.Entry<String, FetchOperator> entry : fetchOperators
+ .entrySet()) {
int fetchOpRows = 0;
String alias = entry.getKey();
FetchOperator fetchOp = entry.getValue();
- Operator<? extends Serializable> forwardOp = localWork.getAliasToWork().get(alias);
+ Operator<? extends Serializable> forwardOp = localWork
+ .getAliasToWork().get(alias);
while (true) {
InspectableObject row = fetchOp.getNextRow();
@@ -140,40 +148,46 @@
}
fetchOpRows++;
forwardOp.process(row.o, 0);
- // check if any operator had a fatal error or early exit during execution
- if ( forwardOp.getDone() ) {
+ // check if any operator had a fatal error or early exit during
+ // execution
+ if (forwardOp.getDone()) {
done = true;
break;
}
}
-
+
if (l4j.isInfoEnabled()) {
- l4j.info("fetch " + fetchOpNum++ + " processed " + fetchOpRows + " used mem: " + memoryMXBean.getHeapMemoryUsage().getUsed());
+ l4j
+ .info("fetch " + fetchOpNum++ + " processed " + fetchOpRows
+ + " used mem: "
+ + memoryMXBean.getHeapMemoryUsage().getUsed());
}
}
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
} else {
- throw new RuntimeException ("Map local work failed", e);
+ throw new RuntimeException("Map local work failed", e);
}
}
}
}
try {
- if (mo.getDone())
+ if (mo.getDone()) {
done = true;
- else {
- // Since there is no concept of a group, we don't invoke startGroup/endGroup for a mapper
- mo.process((Writable)value);
+ } else {
+ // Since there is no concept of a group, we don't invoke
+ // startGroup/endGroup for a mapper
+ mo.process((Writable) value);
if (l4j.isInfoEnabled()) {
numRows++;
if (numRows == nextCntr) {
long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processing " + numRows + " rows: used memory = " + used_memory);
+ l4j.info("ExecMapper: processing " + numRows
+ + " rows: used memory = " + used_memory);
nextCntr = getNextCntr(numRows);
}
}
@@ -182,26 +196,29 @@
abort = true;
e.printStackTrace();
if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
} else {
- throw new RuntimeException (e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
}
}
}
private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by the reducer. It dumps
+ // A very simple counter to keep track of number of rows processed by the
+ // reducer. It dumps
// every 1 million times, and quickly before that
- if (cntr >= 1000000)
+ if (cntr >= 1000000) {
return cntr + 1000000;
-
+ }
+
return 10 * cntr;
}
+ @Override
public void close() {
// No row was processed
- if(oc == null) {
+ if (oc == null) {
l4j.trace("Close called. no row processed by map.");
}
@@ -212,24 +229,26 @@
if (fetchOperators != null) {
mapredLocalWork localWork = mo.getConf().getMapLocalWork();
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
- Operator<? extends Serializable> forwardOp = localWork.getAliasToWork().get(entry.getKey());
+ Operator<? extends Serializable> forwardOp = localWork
+ .getAliasToWork().get(entry.getKey());
forwardOp.close(abort);
}
}
-
+
if (l4j.isInfoEnabled()) {
long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " + used_memory);
+ l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
+ + used_memory);
}
-
- reportStats rps = new reportStats (rp);
+
+ reportStats rps = new reportStats(rp);
mo.preorderMap(rps);
return;
} catch (Exception e) {
- if(!abort) {
+ if (!abort) {
// signal new failure to map-reduce
l4j.error("Hit error while closing operators - failing tree");
- throw new RuntimeException ("Error while closing operators", e);
+ throw new RuntimeException("Error while closing operators", e);
}
}
}
@@ -240,15 +259,17 @@
public static class reportStats implements Operator.OperatorFunc {
Reporter rp;
- public reportStats (Reporter rp) {
+
+ public reportStats(Reporter rp) {
this.rp = rp;
}
+
public void func(Operator op) {
Map<Enum, Long> opStats = op.getStats();
- for(Map.Entry<Enum, Long> e: opStats.entrySet()) {
- if(this.rp != null) {
- rp.incrCounter(e.getKey(), e.getValue());
- }
+ for (Map.Entry<Enum, Long> e : opStats.entrySet()) {
+ if (rp != null) {
+ rp.incrCounter(e.getKey(), e.getValue());
+ }
}
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Thu Jan 21 10:37:58 2010
@@ -18,23 +18,20 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
import java.net.URLClassLoader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
+import org.apache.hadoop.hive.ql.exec.ExecMapper.reportStats;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.plan.tableDesc;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.exec.ExecMapper.reportStats;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -43,12 +40,17 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
public class ExecReducer extends MapReduceBase implements Reducer {
private JobConf jc;
- private OutputCollector<?,?> oc;
+ private OutputCollector<?, ?> oc;
private Operator<?> reducer;
private Reporter rp;
private boolean abort = false;
@@ -56,101 +58,108 @@
private long cntr = 0;
private long nextCntr = 1;
- private static String [] fieldNames;
+ private static String[] fieldNames;
public static final Log l4j = LogFactory.getLog("ExecReducer");
-
+
// used to log memory usage periodically
private MemoryMXBean memoryMXBean;
-
+
// TODO: move to DynamicSerDe when it's ready
private Deserializer inputKeyDeserializer;
- // Input value serde needs to be an array to support different SerDe
+ // Input value serde needs to be an array to support different SerDe
// for different tags
- private SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
+ private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
static {
- ArrayList<String> fieldNameArray = new ArrayList<String> ();
- for(Utilities.ReduceField r: Utilities.ReduceField.values()) {
+ ArrayList<String> fieldNameArray = new ArrayList<String>();
+ for (Utilities.ReduceField r : Utilities.ReduceField.values()) {
fieldNameArray.add(r.toString());
}
- fieldNames = fieldNameArray.toArray(new String [0]);
+ fieldNames = fieldNameArray.toArray(new String[0]);
}
tableDesc keyTableDesc;
tableDesc[] valueTableDesc;
-
+
+ @Override
public void configure(JobConf job) {
ObjectInspector[] rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector keyObjectInspector;
- // Allocate the bean at the beginning -
+ // Allocate the bean at the beginning -
memoryMXBean = ManagementFactory.getMemoryMXBean();
l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
+
try {
- l4j.info("conf classpath = "
- + Arrays.asList(((URLClassLoader)job.getClassLoader()).getURLs()));
- l4j.info("thread classpath = "
- + Arrays.asList(((URLClassLoader)Thread.currentThread().getContextClassLoader()).getURLs()));
+ l4j.info("conf classpath = "
+ + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+ l4j.info("thread classpath = "
+ + Arrays.asList(((URLClassLoader) Thread.currentThread()
+ .getContextClassLoader()).getURLs()));
} catch (Exception e) {
l4j.info("cannot get classpath: " + e.getMessage());
}
jc = job;
mapredWork gWork = Utilities.getMapRedWork(job);
reducer = gWork.getReducer();
- reducer.setParentOperators(null); // clear out any parents as reducer is the root
+ reducer.setParentOperators(null); // clear out any parents as reducer is the
+ // root
isTagged = gWork.getNeedsTagging();
try {
keyTableDesc = gWork.getKeyDesc();
- inputKeyDeserializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+ inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+ .getDeserializerClass(), null);
inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
keyObjectInspector = inputKeyDeserializer.getObjectInspector();
- valueTableDesc = new tableDesc[gWork.getTagToValueDesc().size()];
- for(int tag=0; tag<gWork.getTagToValueDesc().size(); tag++) {
+ valueTableDesc = new tableDesc[gWork.getTagToValueDesc().size()];
+ for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
// We should initialize the SerDe with the TypeInfo when available.
valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
- inputValueDeserializer[tag] = (SerDe)ReflectionUtils.newInstance(valueTableDesc[tag].getDeserializerClass(), null);
- inputValueDeserializer[tag].initialize(null, valueTableDesc[tag].getProperties());
- valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector();
-
+ inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
+ valueTableDesc[tag].getDeserializerClass(), null);
+ inputValueDeserializer[tag].initialize(null, valueTableDesc[tag]
+ .getProperties());
+ valueObjectInspector[tag] = inputValueDeserializer[tag]
+ .getObjectInspector();
+
ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
ois.add(keyObjectInspector);
ois.add(valueObjectInspector[tag]);
ois.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
- rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
- Arrays.asList(fieldNames), ois);
+ rowObjectInspector[tag] = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Arrays.asList(fieldNames), ois);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
-
- //initialize reduce operator tree
+
+ // initialize reduce operator tree
try {
l4j.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
} else {
- throw new RuntimeException ("Reduce operator initialization failed", e);
+ throw new RuntimeException("Reduce operator initialization failed", e);
}
}
}
private Object keyObject;
- private Object[] valueObject = new Object[Byte.MAX_VALUE];
-
+ private final Object[] valueObject = new Object[Byte.MAX_VALUE];
+
private BytesWritable groupKey;
-
+
ArrayList<Object> row = new ArrayList<Object>(3);
ByteWritable tag = new ByteWritable();
- public void reduce(Object key, Iterator values,
- OutputCollector output,
- Reporter reporter) throws IOException {
- if(oc == null) {
+ public void reduce(Object key, Iterator values, OutputCollector output,
+ Reporter reporter) throws IOException {
+
+ if (oc == null) {
// propagete reporter and output collector to all operators
oc = output;
rp = reporter;
@@ -159,34 +168,35 @@
}
try {
- BytesWritable keyWritable = (BytesWritable)key;
- tag.set((byte)0);
+ BytesWritable keyWritable = (BytesWritable) key;
+ tag.set((byte) 0);
if (isTagged) {
// remove the tag
int size = keyWritable.getSize() - 1;
- tag.set(keyWritable.get()[size]);
+ tag.set(keyWritable.get()[size]);
keyWritable.setSize(size);
}
-
+
if (!keyWritable.equals(groupKey)) {
// If a operator wants to do some work at the beginning of a group
- if (groupKey == null) { //the first group
+ if (groupKey == null) { // the first group
groupKey = new BytesWritable();
} else {
// If a operator wants to do some work at the end of a group
l4j.trace("End Group");
reducer.endGroup();
}
-
+
try {
keyObject = inputKeyDeserializer.deserialize(keyWritable);
} catch (Exception e) {
- throw new HiveException("Unable to deserialize reduce input key from " +
- Utilities.formatBinaryString(keyWritable.get(), 0, keyWritable.getSize())
- + " with properties " + keyTableDesc.getProperties(),
- e);
+ throw new HiveException(
+ "Unable to deserialize reduce input key from "
+ + Utilities.formatBinaryString(keyWritable.get(), 0,
+ keyWritable.getSize()) + " with properties "
+ + keyTableDesc.getProperties(), e);
}
-
+
groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
l4j.trace("Start Group");
reducer.startGroup();
@@ -195,15 +205,18 @@
// System.err.print(keyObject.toString());
while (values.hasNext()) {
BytesWritable valueWritable = (BytesWritable) values.next();
- //System.err.print(who.getHo().toString());
+ // System.err.print(who.getHo().toString());
try {
- valueObject[tag.get()] = inputValueDeserializer[tag.get()].deserialize(valueWritable);
+ valueObject[tag.get()] = inputValueDeserializer[tag.get()]
+ .deserialize(valueWritable);
} catch (SerDeException e) {
- throw new HiveException("Unable to deserialize reduce input value (tag=" + tag.get()
- + ") from " +
- Utilities.formatBinaryString(valueWritable.get(), 0, valueWritable.getSize())
- + " with properties " + valueTableDesc[tag.get()].getProperties(),
- e);
+ throw new HiveException(
+ "Unable to deserialize reduce input value (tag="
+ + tag.get()
+ + ") from "
+ + Utilities.formatBinaryString(valueWritable.get(), 0,
+ valueWritable.getSize()) + " with properties "
+ + valueTableDesc[tag.get()].getProperties(), e);
}
row.clear();
row.add(keyObject);
@@ -214,7 +227,8 @@
cntr++;
if (cntr == nextCntr) {
long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecReducer: processing " + cntr + " rows: used memory = " + used_memory);
+ l4j.info("ExecReducer: processing " + cntr
+ + " rows: used memory = " + used_memory);
nextCntr = getNextCntr(cntr);
}
}
@@ -224,27 +238,30 @@
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
} else {
- throw new IOException (e);
+ throw new IOException(e);
}
}
}
private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by the reducer. It dumps
+ // A very simple counter to keep track of number of rows processed by the
+ // reducer. It dumps
// every 1 million times, and quickly before that
- if (cntr >= 1000000)
+ if (cntr >= 1000000) {
return cntr + 1000000;
-
+ }
+
return 10 * cntr;
}
+ @Override
public void close() {
// No row was processed
- if(oc == null) {
+ if (oc == null) {
l4j.trace("Close called no row");
}
@@ -255,18 +272,20 @@
reducer.endGroup();
}
if (l4j.isInfoEnabled()) {
- l4j.info("ExecReducer: processed " + cntr + " rows: used memory = " + memoryMXBean.getHeapMemoryUsage().getUsed());
+ l4j.info("ExecReducer: processed " + cntr + " rows: used memory = "
+ + memoryMXBean.getHeapMemoryUsage().getUsed());
}
-
+
reducer.close(abort);
- reportStats rps = new reportStats (rp);
+ reportStats rps = new reportStats(rp);
reducer.preorderMap(rps);
return;
} catch (Exception e) {
- if(!abort) {
+ if (!abort) {
// signal new failure to map-reduce
l4j.error("Hit error while closing operators - failing tree");
- throw new RuntimeException ("Error while closing operators: " + e.getMessage(), e);
+ throw new RuntimeException("Error while closing operators: "
+ + e.getMessage(), e);
}
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Thu Jan 21 10:37:58 2010
@@ -36,7 +36,6 @@
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.util.StringUtils;
-
/**
* ExplainTask implementation
*
@@ -47,52 +46,53 @@
public ExplainTask() {
super();
}
-
+
+ @Override
public int execute() {
-
+
try {
- OutputStream outS = work.getResFile().getFileSystem(conf).create(work.getResFile());
+ OutputStream outS = work.getResFile().getFileSystem(conf).create(
+ work.getResFile());
PrintStream out = new PrintStream(outS);
-
+
// Print out the parse AST
outputAST(work.getAstStringTree(), out, 0);
out.println();
-
+
outputDependencies(out, work.getRootTasks(), 0);
out.println();
-
+
// Go over all the tasks and dump out the plans
outputStagePlans(out, work.getRootTasks(), 0);
out.close();
-
+
return (0);
- }
- catch (Exception e) {
- console.printError("Failed with exception " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
+ } catch (Exception e) {
+ console.printError("Failed with exception " + e.getMessage(), "\n"
+ + StringUtils.stringifyException(e));
return (1);
}
}
private String indentString(int indent) {
StringBuilder sb = new StringBuilder();
- for(int i=0; i<indent; ++i) {
+ for (int i = 0; i < indent; ++i) {
sb.append(" ");
}
-
+
return sb.toString();
}
- private void outputMap(Map<?, ?> mp, String header,
- PrintStream out, boolean extended, int indent)
- throws Exception {
+ private void outputMap(Map<?, ?> mp, String header, PrintStream out,
+ boolean extended, int indent) throws Exception {
boolean first_el = true;
- for(Entry<?,?> ent: mp.entrySet()) {
+ for (Entry<?, ?> ent : mp.entrySet()) {
if (first_el) {
out.println(header);
}
first_el = false;
-
+
// Print the key
out.print(indentString(indent));
out.printf("%s ", ent.getKey().toString());
@@ -105,78 +105,74 @@
out.println();
} else if (ent.getValue() instanceof Serializable) {
out.println();
- outputPlan((Serializable)ent.getValue(), out, extended, indent+2);
- }
+ outputPlan((Serializable) ent.getValue(), out, extended, indent + 2);
+ }
}
}
- private void outputList(List<?> l, String header,
- PrintStream out, boolean extended, int indent)
- throws Exception {
-
+ private void outputList(List<?> l, String header, PrintStream out,
+ boolean extended, int indent) throws Exception {
+
boolean first_el = true;
boolean nl = false;
- for(Object o: l) {
+ for (Object o : l) {
if (first_el) {
out.print(header);
}
-
+
if (isPrintable(o)) {
if (!first_el) {
out.print(", ");
} else {
out.print(" ");
}
-
+
out.print(o);
nl = true;
- }
- else if (o instanceof Serializable) {
+ } else if (o instanceof Serializable) {
if (first_el) {
out.println();
}
- outputPlan((Serializable)o, out, extended, indent+2);
+ outputPlan((Serializable) o, out, extended, indent + 2);
}
-
+
first_el = false;
}
-
+
if (nl) {
out.println();
}
}
private boolean isPrintable(Object val) {
- if (val instanceof Boolean ||
- val instanceof String ||
- val instanceof Integer ||
- val instanceof Byte ||
- val instanceof Float ||
- val instanceof Double) {
+ if (val instanceof Boolean || val instanceof String
+ || val instanceof Integer || val instanceof Byte
+ || val instanceof Float || val instanceof Double) {
return true;
}
if (val.getClass().isPrimitive()) {
return true;
}
-
+
return false;
}
- private void outputPlan(Serializable work, PrintStream out, boolean extended, int indent)
- throws Exception {
+ private void outputPlan(Serializable work, PrintStream out, boolean extended,
+ int indent) throws Exception {
// Check if work has an explain annotation
Annotation note = work.getClass().getAnnotation(explain.class);
-
+
if (note instanceof explain) {
- explain xpl_note = (explain)note;
+ explain xpl_note = (explain) note;
if (extended || xpl_note.normalExplain()) {
out.print(indentString(indent));
out.println(xpl_note.displayName());
}
}
- // If this is an operator then we need to call the plan generation on the conf and then
+ // If this is an operator then we need to call the plan generation on the
+ // conf and then
// the children
if (work instanceof Operator) {
Operator<? extends Serializable> operator = (Operator<? extends Serializable>) work;
@@ -184,42 +180,42 @@
outputPlan(operator.getConf(), out, extended, indent);
}
if (operator.getChildOperators() != null) {
- for(Operator<? extends Serializable> op: operator.getChildOperators()) {
- outputPlan(op, out, extended, indent+2);
+ for (Operator<? extends Serializable> op : operator.getChildOperators()) {
+ outputPlan(op, out, extended, indent + 2);
}
}
return;
}
-
+
// We look at all methods that generate values for explain
Method[] methods = work.getClass().getMethods();
Arrays.sort(methods, new MethodComparator());
- for(Method m: methods) {
- int prop_indents = indent+2;
+ for (Method m : methods) {
+ int prop_indents = indent + 2;
note = m.getAnnotation(explain.class);
if (note instanceof explain) {
- explain xpl_note = (explain)note;
+ explain xpl_note = (explain) note;
if (extended || xpl_note.normalExplain()) {
-
+
Object val = m.invoke(work);
if (val == null) {
continue;
}
-
+
String header = null;
- if (!xpl_note.displayName().equals("")){
- header = indentString(prop_indents) + xpl_note.displayName() +":";
+ if (!xpl_note.displayName().equals("")) {
+ header = indentString(prop_indents) + xpl_note.displayName() + ":";
} else {
prop_indents = indent;
header = indentString(prop_indents);
}
if (isPrintable(val)) {
-
+
out.printf("%s ", header);
out.println(val);
continue;
@@ -227,98 +223,100 @@
// Try this as a map
try {
// Go through the map and print out the stuff
- Map<?,?> mp = (Map<?,?>)val;
- outputMap(mp, header, out, extended, prop_indents+2);
+ Map<?, ?> mp = (Map<?, ?>) val;
+ outputMap(mp, header, out, extended, prop_indents + 2);
continue;
- }
- catch (ClassCastException ce) {
+ } catch (ClassCastException ce) {
// Ignore - all this means is that this is not a map
}
// Try this as a list
try {
- List<?> l = (List<?>)val;
- outputList(l, header, out, extended, prop_indents+2);
-
+ List<?> l = (List<?>) val;
+ outputList(l, header, out, extended, prop_indents + 2);
+
continue;
- }
- catch (ClassCastException ce) {
+ } catch (ClassCastException ce) {
// Ignore
}
-
// Finally check if it is serializable
try {
- Serializable s = (Serializable)val;
+ Serializable s = (Serializable) val;
out.println(header);
- outputPlan(s, out, extended, prop_indents+2);
-
+ outputPlan(s, out, extended, prop_indents + 2);
+
continue;
- }
- catch (ClassCastException ce) {
+ } catch (ClassCastException ce) {
// Ignore
}
}
}
}
}
-
- private void outputPlan(Task<? extends Serializable> task, PrintStream out,
- boolean extended, HashSet<Task<? extends Serializable>> displayedSet,
- int indent)
- throws Exception {
-
+
+ private void outputPlan(Task<? extends Serializable> task, PrintStream out,
+ boolean extended, HashSet<Task<? extends Serializable>> displayedSet,
+ int indent) throws Exception {
+
if (displayedSet.contains(task)) {
return;
}
displayedSet.add(task);
-
+
out.print(indentString(indent));
out.printf("Stage: %s\n", task.getId());
- // Start by getting the work part of the task and call the output plan for the work
- outputPlan(task.getWork(), out, extended, indent+2);
+ // Start by getting the work part of the task and call the output plan for
+ // the work
+ outputPlan(task.getWork(), out, extended, indent + 2);
out.println();
- if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
- for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+ if (task instanceof ConditionalTask
+ && ((ConditionalTask) task).getListTasks() != null) {
+ for (Task<? extends Serializable> con : ((ConditionalTask) task)
+ .getListTasks()) {
outputPlan(con, out, extended, displayedSet, indent);
}
}
if (task.getChildTasks() != null) {
- for(Task<? extends Serializable> child: task.getChildTasks()) {
+ for (Task<? extends Serializable> child : task.getChildTasks()) {
outputPlan(child, out, extended, displayedSet, indent);
}
}
}
- private Set<Task<? extends Serializable>> dependeciesTaskSet = new HashSet<Task<? extends Serializable>>();
- private void outputDependencies(Task<? extends Serializable> task, PrintStream out, int indent, boolean rootTskCandidate)
- throws Exception {
-
- if(dependeciesTaskSet.contains(task))
+ private final Set<Task<? extends Serializable>> dependeciesTaskSet = new HashSet<Task<? extends Serializable>>();
+
+ private void outputDependencies(Task<? extends Serializable> task,
+ PrintStream out, int indent, boolean rootTskCandidate) throws Exception {
+
+ if (dependeciesTaskSet.contains(task)) {
return;
+ }
dependeciesTaskSet.add(task);
-
+
out.print(indentString(indent));
out.printf("%s", task.getId());
if ((task.getParentTasks() == null || task.getParentTasks().isEmpty())) {
- if(rootTskCandidate)
+ if (rootTskCandidate) {
out.print(" is a root stage");
- }
- else {
+ }
+ } else {
out.print(" depends on stages: ");
boolean first = true;
- for(Task<? extends Serializable> parent: task.getParentTasks()) {
+ for (Task<? extends Serializable> parent : task.getParentTasks()) {
if (!first) {
out.print(", ");
}
first = false;
out.print(parent.getId());
}
-
- if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
+
+ if (task instanceof ConditionalTask
+ && ((ConditionalTask) task).getListTasks() != null) {
out.print(" , consists of ");
first = true;
- for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+ for (Task<? extends Serializable> con : ((ConditionalTask) task)
+ .getListTasks()) {
if (!first) {
out.print(", ");
}
@@ -326,63 +324,63 @@
out.print(con.getId());
}
}
-
+
}
out.println();
-
- if(task instanceof ConditionalTask && ((ConditionalTask)task).getListTasks() != null) {
- for(Task<? extends Serializable> con: ((ConditionalTask)task).getListTasks()) {
+
+ if (task instanceof ConditionalTask
+ && ((ConditionalTask) task).getListTasks() != null) {
+ for (Task<? extends Serializable> con : ((ConditionalTask) task)
+ .getListTasks()) {
outputDependencies(con, out, indent, false);
}
}
-
+
if (task.getChildTasks() != null) {
- for(Task<? extends Serializable> child: task.getChildTasks()) {
+ for (Task<? extends Serializable> child : task.getChildTasks()) {
outputDependencies(child, out, indent, true);
}
}
-
+
}
public void outputAST(String treeString, PrintStream out, int indent) {
out.print(indentString(indent));
out.println("ABSTRACT SYNTAX TREE:");
- out.print(indentString(indent+2));
- out.println(treeString);
+ out.print(indentString(indent + 2));
+ out.println(treeString);
}
- public void outputDependencies(PrintStream out,
- List<Task<? extends Serializable>> rootTasks,
- int indent)
- throws Exception {
+ public void outputDependencies(PrintStream out,
+ List<Task<? extends Serializable>> rootTasks, int indent)
+ throws Exception {
out.print(indentString(indent));
out.println("STAGE DEPENDENCIES:");
- for(Task<? extends Serializable> rootTask: rootTasks) {
- outputDependencies(rootTask, out, indent+2, true);
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ outputDependencies(rootTask, out, indent + 2, true);
}
}
- public void outputStagePlans(PrintStream out,
- List<Task<? extends Serializable>> rootTasks,
- int indent)
- throws Exception {
+ public void outputStagePlans(PrintStream out,
+ List<Task<? extends Serializable>> rootTasks, int indent)
+ throws Exception {
out.print(indentString(indent));
out.println("STAGE PLANS:");
HashSet<Task<? extends Serializable>> displayedSet = new HashSet<Task<? extends Serializable>>();
- for(Task<? extends Serializable> rootTask: rootTasks) {
- outputPlan(rootTask, out, work.getExtended(),
- displayedSet, indent+2);
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ outputPlan(rootTask, out, work.getExtended(), displayedSet, indent + 2);
}
}
public static class MethodComparator implements Comparator {
public int compare(Object o1, Object o2) {
- Method m1 = (Method)o1;
- Method m2 = (Method)o2;
+ Method m1 = (Method) o1;
+ Method m2 = (Method) o2;
return m1.getName().compareTo(m2.getName());
}
}
+ @Override
public int getType() {
return StageType.EXPLAIN;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java Thu Jan 21 10:37:58 2010
@@ -18,13 +18,10 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.Arrays;
-
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
@@ -33,10 +30,10 @@
public class ExprNodeColumnEvaluator extends ExprNodeEvaluator {
protected exprNodeColumnDesc expr;
-
+
transient StructObjectInspector[] inspectors;
transient StructField[] fields;
-
+
public ExprNodeColumnEvaluator(exprNodeColumnDesc expr) {
this.expr = expr;
}
@@ -45,27 +42,28 @@
public ObjectInspector initialize(ObjectInspector rowInspector)
throws HiveException {
- // We need to support field names like KEY.0, VALUE.1 between
+ // We need to support field names like KEY.0, VALUE.1 between
// map-reduce boundary.
String[] names = expr.getColumn().split("\\.");
inspectors = new StructObjectInspector[names.length];
fields = new StructField[names.length];
-
- for(int i=0; i<names.length; i++) {
- if (i==0) {
+
+ for (int i = 0; i < names.length; i++) {
+ if (i == 0) {
inspectors[0] = (StructObjectInspector) rowInspector;
} else {
- inspectors[i] = (StructObjectInspector) fields[i-1].getFieldObjectInspector();
+ inspectors[i] = (StructObjectInspector) fields[i - 1]
+ .getFieldObjectInspector();
}
fields[i] = inspectors[i].getStructFieldRef(names[i]);
}
- return fields[names.length-1].getFieldObjectInspector();
+ return fields[names.length - 1].getFieldObjectInspector();
}
-
+
@Override
public Object evaluate(Object row) throws HiveException {
Object o = row;
- for(int i=0; i<fields.length; i++) {
+ for (int i = 0; i < fields.length; i++) {
o = inspectors[i].getStructFieldData(o, fields[i]);
}
return o;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java Thu Jan 21 10:37:58 2010
@@ -20,7 +20,6 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeConstantDesc;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -31,25 +30,25 @@
protected exprNodeConstantDesc expr;
transient ObjectInspector writableObjectInspector;
transient Object writableValue;
-
+
public ExprNodeConstantEvaluator(exprNodeConstantDesc expr) {
this.expr = expr;
- PrimitiveCategory pc = ((PrimitiveTypeInfo)expr.getTypeInfo())
+ PrimitiveCategory pc = ((PrimitiveTypeInfo) expr.getTypeInfo())
.getPrimitiveCategory();
writableObjectInspector = PrimitiveObjectInspectorFactory
.getPrimitiveWritableObjectInspector(pc);
- // Convert from Java to Writable
+ // Convert from Java to Writable
writableValue = PrimitiveObjectInspectorFactory
- .getPrimitiveJavaObjectInspector(pc)
- .getPrimitiveWritableObject(expr.getValue());
+ .getPrimitiveJavaObjectInspector(pc).getPrimitiveWritableObject(
+ expr.getValue());
}
@Override
public ObjectInspector initialize(ObjectInspector rowInspector)
- throws HiveException {
+ throws HiveException {
return writableObjectInspector;
}
-
+
@Override
public Object evaluate(Object row) throws HiveException {
return writableValue;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java Thu Jan 21 10:37:58 2010
@@ -24,16 +24,16 @@
public abstract class ExprNodeEvaluator {
/**
- * Initialize should be called once and only once.
- * Return the ObjectInspector for the return value, given the rowInspector.
+ * Initialize should be called once and only once. Return the ObjectInspector
+ * for the return value, given the rowInspector.
*/
- public abstract ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException;
+ public abstract ObjectInspector initialize(ObjectInspector rowInspector)
+ throws HiveException;
/**
- * Evaluate the expression given the row.
- * This method should use the rowInspector passed in from initialize to
- * inspect the row object.
- * The return value will be inspected by the return value of initialize.
+ * Evaluate the expression given the row. This method should use the
+ * rowInspector passed in from initialize to inspect the row object. The
+ * return value will be inspected by the return value of initialize.
*/
public abstract Object evaluate(Object row) throws HiveException;