You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by at...@apache.org on 2009/01/28 22:15:25 UTC
svn commit: r738626 - in /hadoop/hive/trunk: ./
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/...
Author: athusoo
Date: Wed Jan 28 21:15:25 2009
New Revision: 738626
URL: http://svn.apache.org/viewvc?rev=738626&view=rev
Log:
HIVE-176. Added a history log for Hive. (Suresh Anthony via athusoo)
Added:
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Jan 28 21:15:25 2009
@@ -9,6 +9,8 @@
NEW FEATURES
+ HIVE-176. Added a history log for Hive. (Suresh Anthony via athusoo)
+
HIVE-244. Add SQRT() UDF. (Jeff Hammerbacher via zshao)
HIVE-216. Generate ruby bindings for service. (Raghotham Murthy via zshao)
Modified: hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java (original)
+++ hadoop/hive/trunk/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java Wed Jan 28 21:15:25 2009
@@ -250,7 +250,6 @@
System.exit(3);
}
- SessionState.start(ss);
if(! oproc.process_stage2(ss)) {
System.exit(2);
@@ -261,6 +260,8 @@
for(Map.Entry<Object, Object> item: ss.cmdProperties.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
}
+
+ SessionState.start(ss);
CliDriver cli = new CliDriver ();
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 28 21:15:25 2009
@@ -82,8 +82,13 @@
// session identifier
HIVESESSIONID("hive.session.id", ""),
+
// query being executed (multiple per session)
- HIVEQUERYID("hive.query.string", ""),
+ HIVEQUERYSTRING("hive.query.string", ""),
+
+ // id of query being executed (multiple per session)
+ HIVEQUERYID("hive.query.id", ""),
+
// id of the mapred plan being executed (multiple per query)
HIVEPLANID("hive.query.planid", ""),
// max jobname length
@@ -104,7 +109,11 @@
// Default file format for CREATE TABLE statement
// Options: TextFile, SequenceFile
- HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile");
+ HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"),
+
+ //Location of Hive run time structured log file
+ HIVEHISTORYFILELOC("hive.querylog.location", "/tmp/"+System.getProperty("user.name"));
+
public final String varname;
public final String defaultVal;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Jan 28 21:15:25 2009
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.commons.lang.StringUtils;
@@ -39,6 +40,8 @@
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.serde.ByteStream;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,21 +52,21 @@
public class Driver implements CommandProcessor {
static final private Log LOG = LogFactory.getLog("hive.ql.Driver");
- private int maxRows = 100;
+ private int maxRows = 100;
ByteStream.Output bos = new ByteStream.Output();
-
- private ParseDriver pd;
- private HiveConf conf;
- private DataInput resStream;
- private LogHelper console;
- private Context ctx;
+
+ private ParseDriver pd;
+ private HiveConf conf;
+ private DataInput resStream;
+ private LogHelper console;
+ private Context ctx;
private BaseSemanticAnalyzer sem;
-
+
public int countJobs(List<Task<? extends Serializable>> tasks) {
if (tasks == null)
return 0;
int jobs = 0;
- for (Task<? extends Serializable> task: tasks) {
+ for (Task<? extends Serializable> task : tasks) {
if (task.isMapRedTask()) {
jobs++;
}
@@ -81,11 +84,12 @@
sem.setFetchTaskInit(true);
sem.getFetchTask().initialize(conf);
}
- FetchTask ft = (FetchTask)sem.getFetchTask();
+ FetchTask ft = (FetchTask) sem.getFetchTask();
tableDesc td = ft.getTblDesc();
String tableName = "result";
- List<FieldSchema> lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer());
+ List<FieldSchema> lst = MetaStoreUtils.getFieldsFromDeserializer(
+ tableName, td.getDeserializer());
String schema = MetaStoreUtils.getDDLFromFieldSchema(tableName, lst);
return schema;
}
@@ -111,7 +115,7 @@
return false;
boolean hasReduce = false;
- for (Task<? extends Serializable> task: tasks) {
+ for (Task<? extends Serializable> task : tasks) {
if (task.hasReduce()) {
return true;
}
@@ -121,10 +125,9 @@
return hasReduce;
}
-
/**
* for backwards compatibility with current tests
- */
+ */
public Driver(HiveConf conf) {
console = new LogHelper(LOG);
this.conf = conf;
@@ -133,34 +136,55 @@
public Driver() {
console = new LogHelper(LOG);
- if(SessionState.get() != null) {
+ if (SessionState.get() != null) {
conf = SessionState.get().getConf();
ctx = new Context(conf);
}
}
+ private String makeQueryId() {
+ GregorianCalendar gc = new GregorianCalendar();
+ String userid = System.getProperty("user.name");
+
+ return userid + "_" +
+ String.format("%1$4d%2$02d%3$02d%4$02d%5$02d%5$02d", gc.get(Calendar.YEAR),
+ gc.get(Calendar.MONTH) + 1,
+ gc.get(Calendar.DAY_OF_MONTH),
+ gc.get(Calendar.HOUR_OF_DAY),
+ gc.get(Calendar.MINUTE), gc.get(Calendar.SECOND));
+ }
+
+
public int run(String command) {
- boolean noName = StringUtils.isEmpty(conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME));
+ boolean noName = StringUtils.isEmpty(conf
+ .getVar(HiveConf.ConfVars.HADOOPJOBNAME));
int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
int jobs = 0;
- conf.setVar(HiveConf.ConfVars.HIVEQUERYID, command);
+ conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, command);
+
+ String queryId = makeQueryId();
+ conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
try {
-
+
TaskFactory.resetId();
LOG.info("Starting command: " + command);
ctx.clear();
ctx.makeScratchDir();
+
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().startQuery(command, conf.getVar(HiveConf.ConfVars.HIVEQUERYID) );
+
resStream = null;
-
+
pd = new ParseDriver();
ASTNode tree = pd.parse(command);
- while((tree.getToken() == null) && (tree.getChildCount() > 0)) {
- tree = (ASTNode)tree.getChild(0);
+ while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
+ tree = (ASTNode) tree.getChild(0);
}
sem = SemanticAnalyzerFactory.get(conf, tree);
@@ -173,55 +197,71 @@
if (jobs > 0) {
console.printInfo("Total MapReduce jobs = " + jobs);
}
-
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().setQueryProperty(queryId,
+ Keys.QUERY_NUM_TASKS, String.valueOf(jobs));
+
boolean hasReduce = hasReduceTasks(sem.getRootTasks());
+
if (hasReduce) {
- console.printInfo("Number of reducers = " + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS));
+ console.printInfo("Number of reducers = "
+ + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS));
console.printInfo("In order to change numer of reducers use:");
console.printInfo(" set mapred.reduce.tasks = <number>");
}
String jobname = Utilities.abbreviate(command, maxlen - 6);
int curJob = 0;
- for(Task<? extends Serializable> rootTask: sem.getRootTasks()) {
+ for (Task<? extends Serializable> rootTask : sem.getRootTasks()) {
// assumption that only top level tasks are map-reduce tasks
if (rootTask.isMapRedTask()) {
- curJob ++;
- if(noName) {
- conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + curJob + "/" + jobs + ")");
+ curJob++;
+ if (noName) {
+ conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + curJob
+ + "/" + jobs + ")");
}
}
rootTask.initialize(conf);
}
// A very simple runtime that keeps putting runnable takss
- // on a list and when a job completes, it puts the children at the back of the list
+ // on a list and when a job completes, it puts the children at the back of
+ // the list
// while taking the job to run from the front of the list
Queue<Task<? extends Serializable>> runnable = new LinkedList<Task<? extends Serializable>>();
- for(Task<? extends Serializable> rootTask:sem.getRootTasks()) {
+ for (Task<? extends Serializable> rootTask : sem.getRootTasks()) {
if (runnable.offer(rootTask) == false) {
LOG.error("Could not insert the first task into the queue");
return (1);
}
}
- while(runnable.peek() != null) {
+ while (runnable.peek() != null) {
Task<? extends Serializable> tsk = runnable.remove();
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().startTask(queryId, tsk,
+ tsk.getClass().getName());
+
int exitVal = tsk.execute();
+ if (SessionState.get() != null) {
+ SessionState.get().getHiveHistory().setTaskProperty(queryId,
+ tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal));
+ SessionState.get().getHiveHistory().endTask(queryId, tsk);
+ }
if (exitVal != 0) {
- console.printError("FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName());
+ console.printError("FAILED: Execution Error, return code " + exitVal
+ + " from " + tsk.getClass().getName());
return 9;
}
-
tsk.setDone();
if (tsk.getChildTasks() == null) {
continue;
}
- for(Task<? extends Serializable> child: tsk.getChildTasks()) {
+ for (Task<? extends Serializable> child : tsk.getChildTasks()) {
// Check if the child is runnable
if (!child.isRunnable()) {
continue;
@@ -232,51 +272,66 @@
}
}
}
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().setQueryProperty(queryId,
+ Keys.QUERY_RET_CODE, String.valueOf(0));
} catch (SemanticException e) {
- console.printError("FAILED: Error in semantic analysis: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().setQueryProperty(queryId,
+ Keys.QUERY_RET_CODE, String.valueOf(10));
+ console.printError("FAILED: Error in semantic analysis: "
+ + e.getMessage(), "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (10);
} catch (ParseException e) {
- console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().setQueryProperty(queryId,
+ Keys.QUERY_RET_CODE, String.valueOf(11));
+ console.printError("FAILED: Parse Error: " + e.getMessage(), "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (11);
} catch (Exception e) {
- // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
- console.printError("FAILED: Unknown exception : " + e.getMessage(),
- "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().setQueryProperty(queryId,
+ Keys.QUERY_RET_CODE, String.valueOf(12));
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (12);
} finally {
- if(noName) {
+ if (SessionState.get() != null)
+ SessionState.get().getHiveHistory().endQuery(queryId);
+ if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, "");
- }
+ }
}
console.printInfo("OK");
return (0);
}
-
-
- public boolean getResults(Vector<String> res)
- {
+
+ public boolean getResults(Vector<String> res) {
if (sem != null && sem.getFetchTask() != null) {
if (!sem.getFetchTaskInit()) {
sem.setFetchTaskInit(true);
sem.getFetchTask().initialize(conf);
}
- FetchTask ft = (FetchTask)sem.getFetchTask();
+ FetchTask ft = (FetchTask) sem.getFetchTask();
ft.setMaxRows(maxRows);
return ft.fetch(res);
}
if (resStream == null)
resStream = ctx.getStream();
- if (resStream == null) return false;
-
+ if (resStream == null)
+ return false;
+
int numRows = 0;
String row = null;
- while (numRows < maxRows)
- {
- if (resStream == null)
- {
+ while (numRows < maxRows) {
+ if (resStream == null) {
if (numRows > 0)
return true;
else
@@ -285,8 +340,7 @@
bos.reset();
Utilities.streamStatus ss;
- try
- {
+ try {
ss = Utilities.readColumn(resStream, bos);
if (bos.getCount() > 0)
row = new String(bos.getData(), 0, bos.getCount(), "UTF-8");
@@ -298,12 +352,13 @@
res.add(row);
}
} catch (IOException e) {
- console.printError("FAILED: Unexpected IO exception : " + e.getMessage());
+ console.printError("FAILED: Unexpected IO exception : "
+ + e.getMessage());
res = null;
return false;
}
- if (ss == Utilities.streamStatus.EOF)
+ if (ss == Utilities.streamStatus.EOF)
resStream = ctx.getStream();
}
return true;
@@ -314,14 +369,12 @@
// Delete the scratch directory from the context
ctx.removeScratchDir();
ctx.clear();
+ } catch (Exception e) {
+ console.printError("FAILED: Unknown exception : " + e.getMessage(), "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return (13);
}
- catch (Exception e) {
- console.printError("FAILED: Unknown exception : " + e.getMessage(),
- "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
- return(13);
- }
-
- return(0);
+
+ return (0);
}
}
-
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Jan 28 21:15:25 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.io.*;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -61,15 +62,16 @@
public static String getRealFiles(Configuration conf) {
// fill in local files to be added to the task environment
SessionState ss = SessionState.get();
- Set<String> files = (ss == null) ? null : ss.list_resource(SessionState.ResourceType.FILE, null);
- if(files != null) {
- ArrayList<String> realFiles = new ArrayList<String> (files.size());
- for(String one: files) {
+ Set<String> files = (ss == null) ? null : ss.list_resource(
+ SessionState.ResourceType.FILE, null);
+ if (files != null) {
+ ArrayList<String> realFiles = new ArrayList<String>(files.size());
+ for (String one : files) {
try {
realFiles.add(Utilities.realFile(one, conf));
} catch (IOException e) {
- throw new RuntimeException ("Cannot validate file " + one +
- "due to exception: " + e.getMessage(), e);
+ throw new RuntimeException("Cannot validate file " + one
+ + "due to exception: " + e.getMessage(), e);
}
}
return StringUtils.join(realFiles, ",");
@@ -78,11 +80,10 @@
}
}
-
/**
* Initialization when invoked from QL
*/
- public void initialize (HiveConf conf) {
+ public void initialize(HiveConf conf) {
super.initialize(conf);
job = new JobConf(conf, ExecDriver.class);
String realFiles = getRealFiles(job);
@@ -91,7 +92,7 @@
// workaround for hadoop-17 - jobclient only looks at commandlineconfig
Configuration commandConf = JobClient.getCommandLineConfig();
- if(commandConf != null) {
+ if (commandConf != null) {
commandConf.set("tmpfiles", realFiles);
}
}
@@ -100,63 +101,70 @@
/**
* Constructor/Initialization for invocation as independent utility
*/
- public ExecDriver(mapredWork plan, JobConf job, boolean isSilent) throws HiveException {
+ public ExecDriver(mapredWork plan, JobConf job, boolean isSilent)
+ throws HiveException {
setWork(plan);
this.job = job;
LOG = LogFactory.getLog(this.getClass().getName());
- console = new LogHelper(LOG, isSilent);
+ console = new LogHelper(LOG, isSilent);
}
protected void fillInDefaults() {
// this is a temporary hack to fix things that are not fixed in the compiler
- if(work.getNumReduceTasks() == null) {
- if(work.getReducer() == null) {
- LOG.warn("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator");
+ if (work.getNumReduceTasks() == null) {
+ if (work.getReducer() == null) {
+ LOG
+ .warn("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator");
work.setNumReduceTasks(Integer.valueOf(0));
} else {
- LOG.warn("Number of reduce tasks not specified. Defaulting to jobconf value of: " + job.getNumReduceTasks());
+ LOG
+ .warn("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ + job.getNumReduceTasks());
work.setNumReduceTasks(job.getNumReduceTasks());
}
- }
- else
- LOG.info("Number of reduce tasks determined at compile : " + work.getNumReduceTasks());
+ } else
+ LOG.info("Number of reduce tasks determined at compile : "
+ + work.getNumReduceTasks());
}
/**
- * A list of the currently running jobs spawned in this Hive instance that is used
- * to kill all running jobs in the event of an unexpected shutdown - i.e., the JVM shuts
- * down while there are still jobs running.
+ * A list of the currently running jobs spawned in this Hive instance that is
+ * used to kill all running jobs in the event of an unexpected shutdown -
+ * i.e., the JVM shuts down while there are still jobs running.
*/
- public static HashMap<String,String> runningJobKillURIs = new HashMap<String, String> ();
-
+ public static HashMap<String, String> runningJobKillURIs = new HashMap<String, String>();
/**
- * In Hive, when the user control-c's the command line, any running jobs spawned from that command
- * line are best-effort killed.
- *
- * This static constructor registers a shutdown thread to iterate over all the running job
- * kill URLs and do a get on them.
- *
+ * In Hive, when the user control-c's the command line, any running jobs
+ * spawned from that command line are best-effort killed.
+ *
+ * This static constructor registers a shutdown thread to iterate over all the
+ * running job kill URLs and do a get on them.
+ *
*/
static {
- if(new org.apache.hadoop.conf.Configuration().getBoolean("webinterface.private.actions", false)) {
+ if (new org.apache.hadoop.conf.Configuration().getBoolean(
+ "webinterface.private.actions", false)) {
Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- for(Iterator<String> elems = runningJobKillURIs.values().iterator(); elems.hasNext() ; ) {
- String uri = elems.next();
- try {
- System.err.println("killing job with: " + uri);
- int retCode = ((java.net.HttpURLConnection)new java.net.URL(uri).openConnection()).getResponseCode();
- if(retCode != 200) {
- System.err.println("Got an error trying to kill job with URI: " + uri + " = " + retCode);
- }
- } catch(Exception e) {
- System.err.println("trying to kill job, caught: " + e);
- // do nothing
+ public void run() {
+ for (Iterator<String> elems = runningJobKillURIs.values().iterator(); elems
+ .hasNext();) {
+ String uri = elems.next();
+ try {
+ System.err.println("killing job with: " + uri);
+ int retCode = ((java.net.HttpURLConnection) new java.net.URL(uri)
+ .openConnection()).getResponseCode();
+ if (retCode != 200) {
+ System.err.println("Got an error trying to kill job with URI: "
+ + uri + " = " + retCode);
}
+ } catch (Exception e) {
+ System.err.println("trying to kill job, caught: " + e);
+ // do nothing
}
}
- });
+ }
+ });
}
}
@@ -168,19 +176,23 @@
console.printInfo("Job running in-process (local Hadoop)");
} else {
String hp = job.get("mapred.job.tracker");
- console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = " + rj.getTrackingURL());
- console.printInfo("Kill Command = " +
- HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN) +
- " job -Dmapred.job.tracker=" + hp + " -kill "
- + rj.getJobID());
+ if (SessionState.get() != null) {
+ SessionState.get().getHiveHistory().setTaskProperty(
+ SessionState.get().getQueryId(), getId(),
+ Keys.TASK_HADOOP_ID, rj.getJobID());
+ }
+ console.printInfo("Starting Job = " + rj.getJobID() + ", Tracking URL = "
+ + rj.getTrackingURL());
+ console.printInfo("Kill Command = "
+ + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
+ + " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
}
}
/**
* from StreamJob.java
*/
- public RunningJob jobProgress(JobClient jc, RunningJob rj)
- throws IOException {
+ public RunningJob jobProgress(JobClient jc, RunningJob rj) throws IOException {
String lastReport = "";
while (!rj.isComplete()) {
try {
@@ -190,9 +202,20 @@
rj = jc.getJob(rj.getJobID());
String report = null;
report = " map = " + Math.round(rj.mapProgress() * 100) + "%, reduce ="
- + Math.round(rj.reduceProgress() * 100) + "%";
-
+ + Math.round(rj.reduceProgress() * 100) + "%";
+
if (!report.equals(lastReport)) {
+
+ SessionState ss = SessionState.get();
+ if (ss != null) {
+ ss.getHiveHistory().setTaskCounters(
+ SessionState.get().getQueryId(), getId(), rj);
+ ss.getHiveHistory().setTaskProperty(
+ SessionState.get().getQueryId(), getId(),
+ Keys.TASK_HADOOP_PROGRESS, report);
+ ss.getHiveHistory().progressTask(
+ SessionState.get().getQueryId(), this);
+ }
console.printInfo(report);
lastReport = report;
}
@@ -202,48 +225,51 @@
private void inferNumReducers() throws Exception {
FileSystem fs = FileSystem.get(job);
-
+
if ((work.getReducer() != null) && (work.getInferNumReducers() == true)) {
long inpSz = 0;
-
+
// based on the input size - estimate the number of reducers
Path[] inputPaths = FileInputFormat.getInputPaths(job);
-
+
for (Path inputP : inputPaths) {
if (fs.exists(inputP)) {
FileStatus[] fStats = fs.listStatus(inputP);
- for (FileStatus fStat:fStats)
+ for (FileStatus fStat : fStats)
inpSz += fStat.getLen();
}
}
-
- int newRed = (int)(inpSz / LOAD_PER_REDUCER) + 1;
- if (newRed < work.getNumReduceTasks().intValue())
- {
- LOG.warn("Number of reduce tasks inferred based on input size to : " + newRed);
+ int newRed = (int) (inpSz / LOAD_PER_REDUCER) + 1;
+ if (newRed < work.getNumReduceTasks().intValue()) {
+
+ LOG.warn("Number of reduce tasks inferred based on input size to : "
+ + newRed);
work.setNumReduceTasks(Integer.valueOf(newRed));
+
}
}
}
/**
* Add new elements to the classpath
- * @param newPaths Array of classpath elements
+ *
+ * @param newPaths
+ * Array of classpath elements
*/
- private static void addToClassPath(String [] newPaths) throws Exception {
+ private static void addToClassPath(String[] newPaths) throws Exception {
Thread curThread = Thread.currentThread();
- URLClassLoader loader = (URLClassLoader)curThread.getContextClassLoader();
+ URLClassLoader loader = (URLClassLoader) curThread.getContextClassLoader();
List<URL> curPath = Arrays.asList(loader.getURLs());
- ArrayList<URL> newPath = new ArrayList<URL> ();
+ ArrayList<URL> newPath = new ArrayList<URL>();
- for(String onestr: newPaths) {
+ for (String onestr : newPaths) {
URL oneurl = (new File(onestr)).toURL();
- if(!curPath.contains(oneurl)) {
+ if (!curPath.contains(oneurl)) {
newPath.add(oneurl);
}
}
-
+
loader = new URLClassLoader(newPath.toArray(new URL[0]), loader);
curThread.setContextClassLoader(loader);
}
@@ -256,35 +282,34 @@
fillInDefaults();
String invalidReason = work.isInvalid();
- if(invalidReason != null) {
- throw new RuntimeException("Plan invalid, Reason: "+invalidReason);
+ if (invalidReason != null) {
+ throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
}
Utilities.setMapRedWork(job, work);
-
-
- for(String onefile: work.getPathToAliases().keySet()) {
+
+ for (String onefile : work.getPathToAliases().keySet()) {
LOG.info("Adding input file " + onefile);
FileInputFormat.addInputPaths(job, onefile);
}
-
+
String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
String jobScratchDir = hiveScratchDir + Utilities.randGen.nextInt();
FileOutputFormat.setOutputPath(job, new Path(jobScratchDir));
job.setMapperClass(ExecMapper.class);
-
- job.setMapOutputKeyClass(HiveKey.class);
+
+ job.setMapOutputKeyClass(HiveKey.class);
job.setMapOutputValueClass(BytesWritable.class);
-
+
job.setNumReduceTasks(work.getNumReduceTasks().intValue());
job.setReducerClass(ExecReducer.class);
-
+
job.setInputFormat(org.apache.hadoop.hive.ql.io.HiveInputFormat.class);
-
- // No-Op - we don't really write anything here ..
+
+ // No-Op - we don't really write anything here ..
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
-
+
String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
if (StringUtils.isNotBlank(auxJars)) {
LOG.info("adding libjars: " + auxJars);
@@ -294,47 +319,62 @@
int returnVal = 0;
FileSystem fs = null;
RunningJob rj = null;
-
+
try {
fs = FileSystem.get(job);
-
+
// if the input is empty exit gracefully
Path[] inputPaths = FileInputFormat.getInputPaths(job);
boolean emptyInput = true;
for (Path inputP : inputPaths) {
- if(!fs.exists(inputP))
+ if (!fs.exists(inputP))
continue;
-
+
FileStatus[] fStats = fs.listStatus(inputP);
- for (FileStatus fStat:fStats) {
- if (fStat.getLen() > 0) {
- emptyInput = false;
- break;
- }
+ for (FileStatus fStat : fStats) {
+ if (fStat.getLen() > 0) {
+ emptyInput = false;
+ break;
+ }
}
}
-
+
if (emptyInput) {
console.printInfo("Job need not be submitted: no output: Success");
- return 0;
+ return 0;
}
-
+
inferNumReducers();
- JobClient jc = new JobClient(job);
+
+ if (SessionState.get() != null) {
+ if (work.getReducer() != null) {
+ SessionState.get().getHiveHistory().setTaskProperty(
+ SessionState.get().getQueryId(), getId(),
+ Keys.TASK_NUM_REDUCERS, String.valueOf(work.getNumReduceTasks()));
+ } else {
+ SessionState.get().getHiveHistory().setTaskProperty(
+ SessionState.get().getQueryId(), getId(),
+ Keys.TASK_NUM_REDUCERS, String.valueOf(0));
+ }
+ }
+ JobClient jc = new JobClient(job);
+
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
rj = jc.submitJob(job);
- // add to list of running jobs so in case of abnormal shutdown can kill it.
- runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill");
+ // add to list of running jobs so in case of abnormal shutdown can kill
+ // it.
+ runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL()
+ + "&action=kill");
jobInfo(rj);
rj = jobProgress(jc, rj);
String statusMesg = "Ended Job = " + rj.getJobID();
- if(!rj.isSuccessful()) {
+ if (!rj.isSuccessful()) {
statusMesg += " with errors";
returnVal = 2;
console.printError(statusMesg);
@@ -343,45 +383,49 @@
}
} catch (Exception e) {
String mesg = " with exception '" + e.getMessage() + "'";
- if(rj != null) {
+ if (rj != null) {
mesg = "Ended Job = " + rj.getJobID() + mesg;
} else {
mesg = "Job Submission failed" + mesg;
}
- // Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
- console.printError(mesg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ console.printError(mesg, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
returnVal = 1;
} finally {
Utilities.clearMapRedWork(job);
try {
fs.delete(new Path(jobScratchDir), true);
- if(returnVal != 0 && rj != null) {
+ if (returnVal != 0 && rj != null) {
rj.killJob();
}
runningJobKillURIs.remove(rj.getJobID());
- } catch (Exception e) {}
+ } catch (Exception e) {
+ }
}
return (returnVal);
}
-
+
private static void printUsage() {
- System.out.println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...] "+
- "[-files <file1>[,<file2>] ...]");
+ System.out
+ .println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...] "
+ + "[-files <file1>[,<file2>] ...]");
System.exit(1);
}
public static void main(String[] args) throws IOException, HiveException {
String planFileName = null;
- ArrayList<String> jobConfArgs = new ArrayList<String> ();
+ ArrayList<String> jobConfArgs = new ArrayList<String>();
boolean isSilent = false;
String files = null;
- try{
- for(int i=0; i<args.length; i++) {
- if(args[i].equals("-plan")) {
+ try {
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-plan")) {
planFileName = args[++i];
- System.out.println("plan = "+planFileName);
+ System.out.println("plan = " + planFileName);
} else if (args[i].equals("-jobconf")) {
jobConfArgs.add(args[++i]);
} else if (args[i].equals("-silent")) {
@@ -395,33 +439,33 @@
printUsage();
}
- if(planFileName == null) {
+ if (planFileName == null) {
System.err.println("Must specify Plan File Name");
printUsage();
}
JobConf conf = new JobConf(ExecDriver.class);
- for(String one: jobConfArgs) {
+ for (String one : jobConfArgs) {
int eqIndex = one.indexOf('=');
- if(eqIndex != -1) {
+ if (eqIndex != -1) {
try {
- conf.set(one.substring(0, eqIndex),
- URLDecoder.decode(one.substring(eqIndex+1), "UTF-8"));
+ conf.set(one.substring(0, eqIndex), URLDecoder.decode(one
+ .substring(eqIndex + 1), "UTF-8"));
} catch (UnsupportedEncodingException e) {
- System.err.println("Unexpected error " + e.getMessage() + " while encoding " +
- one.substring(eqIndex+1));
+ System.err.println("Unexpected error " + e.getMessage()
+ + " while encoding " + one.substring(eqIndex + 1));
System.exit(3);
}
}
}
- if(files != null) {
+ if (files != null) {
conf.set("tmpfiles", files);
}
URI pathURI = (new Path(planFileName)).toUri();
InputStream pathData;
- if(StringUtils.isEmpty(pathURI.getScheme())) {
+ if (StringUtils.isEmpty(pathURI.getScheme())) {
// default to local file system
pathData = new FileInputStream(planFileName);
} else {
@@ -429,17 +473,19 @@
FileSystem fs = FileSystem.get(conf);
pathData = fs.open(new Path(planFileName));
}
-
- // workaround for hadoop-17 - libjars are not added to classpath. this affects local
+
+ // workaround for hadoop-17 - libjars are not added to classpath. this
+ // affects local
// mode execution
- boolean localMode = HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local");
- if(localMode) {
+ boolean localMode = HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT)
+ .equals("local");
+ if (localMode) {
String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
if (StringUtils.isNotBlank(auxJars)) {
try {
addToClassPath(StringUtils.split(auxJars, ","));
} catch (Exception e) {
- throw new HiveException (e.getMessage(), e);
+ throw new HiveException(e.getMessage(), e);
}
}
}
@@ -447,28 +493,30 @@
mapredWork plan = Utilities.deserializeMapRedWork(pathData);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
int ret = ed.execute();
- if(ret != 0) {
+ if (ret != 0) {
System.out.println("Job Failed");
System.exit(2);
}
}
/**
- * Given a Hive Configuration object - generate a command line
- * fragment for passing such configuration information to ExecDriver
+ * Given a Hive Configuration object - generate a command line fragment for
+ * passing such configuration information to ExecDriver
*/
public static String generateCmdLine(HiveConf hconf) {
try {
- StringBuilder sb = new StringBuilder ();
+ StringBuilder sb = new StringBuilder();
Properties deltaP = hconf.getChangedProperties();
- boolean localMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
+ boolean localMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals(
+ "local");
String hadoopSysDir = "mapred.system.dir";
String hadoopWorkDir = "mapred.local.dir";
- for(Object one: deltaP.keySet()) {
- String oneProp = (String)one;
-
- if(localMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir)))
+ for (Object one : deltaP.keySet()) {
+ String oneProp = (String) one;
+
+ if (localMode
+ && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir)))
continue;
String oneValue = deltaP.getProperty(oneProp);
@@ -480,19 +528,23 @@
sb.append(" ");
}
- // Multiple concurrent local mode job submissions can cause collisions in working dirs
- // Workaround is to rename map red working dir to a temp dir in such a case
- if(localMode) {
+ // Multiple concurrent local mode job submissions can cause collisions in
+ // working dirs
+ // Workaround is to rename map red working dir to a temp dir in such a
+ // case
+ if (localMode) {
sb.append("-jobconf ");
sb.append(hadoopSysDir);
sb.append("=");
- sb.append(URLEncoder.encode(hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt(), "UTF-8"));
+ sb.append(URLEncoder.encode(hconf.get(hadoopSysDir) + "/"
+ + Utilities.randGen.nextInt(), "UTF-8"));
sb.append(" ");
sb.append("-jobconf ");
sb.append(hadoopWorkDir);
sb.append("=");
- sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt(), "UTF-8"));
+ sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/"
+ + Utilities.randGen.nextInt(), "UTF-8"));
}
return sb.toString();
@@ -512,4 +564,3 @@
return w.getReducer() != null;
}
}
-
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=738626&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Wed Jan 28 21:15:25 2009
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.history;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+
+
+public class HiveHistory {
+
+ PrintWriter histStream; // History File stream
+
+ String histFileName; // History file name
+
+ static final private Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory");
+
+ private LogHelper console;
+
+ // Job Hash Map
+ private HashMap<String, QueryInfo> queryInfoMap = new HashMap<String, QueryInfo>();
+
+ // Task Hash Map
+ private HashMap<String, TaskInfo> taskInfoMap = new HashMap<String, TaskInfo>();
+
+ private static final String DELIMITER = " ";
+
+ public static enum RecordTypes {
+ QueryStart, QueryEnd, TaskStart, TaskEnd, TaskProgress, SessionStart, SessionEnd
+ };
+
+ public static enum Keys {
+ SESSION_ID, QUERY_ID, TASK_ID, QUERY_RET_CODE, QUERY_NUM_TASKS, QUERY_STRING, TIME,
+ TASK_RET_CODE, TASK_NAME, TASK_HADOOP_ID, TASK_HADOOP_PROGRESS, TASK_COUNTERS, TASK_NUM_REDUCERS
+ };
+
+ private static final String KEY = "(\\w+)";
+ private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
+
+ private static final Pattern pattern = Pattern.compile(KEY + "=" + "\""
+ + VALUE + "\"");
+
+ // temp buffer for parsed dataa
+ private static Map<String, String> parseBuffer = new HashMap<String, String>();
+
+ /**
+ * Listner interface Parser will call handle function for each record type
+ */
+ public static interface Listener {
+
+ public void handle(RecordTypes recType, Map<String, String> values)
+ throws IOException;
+ }
+
+ /**
+ * Parses history file and calls call back functions
+ *
+ * @param path
+ * @param l
+ * @throws IOException
+ */
+ public static void parseHiveHistory(String path, Listener l)
+ throws IOException {
+ FileInputStream fi = new FileInputStream(path);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fi));
+ try {
+ String line = null;
+ StringBuffer buf = new StringBuffer();
+ while ((line = reader.readLine()) != null) {
+ buf.append(line);
+ //if it does not end with " then it is line continuation
+ if (!line.trim().endsWith("\"")) {
+ continue;
+ }
+ parseLine(buf.toString(), l);
+ buf = new StringBuffer();
+ }
+ } finally {
+ try {
+ reader.close();
+ } catch (IOException ex) {
+ }
+ }
+ }
+
+ /**
+ * Parse a single line of history.
+ *
+ * @param line
+ * @param l
+ * @throws IOException
+ */
+ private static void parseLine(String line, Listener l) throws IOException {
+ // extract the record type
+ int idx = line.indexOf(' ');
+ String recType = line.substring(0, idx);
+ String data = line.substring(idx + 1, line.length());
+
+ Matcher matcher = pattern.matcher(data);
+
+ while (matcher.find()) {
+ String tuple = matcher.group(0);
+ String[] parts = tuple.split("=");
+
+ parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1));
+ }
+
+ l.handle(RecordTypes.valueOf(recType), parseBuffer);
+
+ parseBuffer.clear();
+ }
+
+ public static class Info {
+
+ }
+
+ public static class SessionInfo extends Info {
+ public String sessionId;
+ };
+
+ public static class QueryInfo extends Info {
+ public Map<String, String> hm = new HashMap<String, String>();
+ };
+
+ public static class TaskInfo extends Info {
+ public Map<String, String> hm = new HashMap<String, String>();
+
+ };
+
+ /**
+ * Construct HiveHistory object an open history log file.
+ *
+ * @param ss
+ */
+ public HiveHistory(SessionState ss) {
+
+ try {
+ console = new LogHelper(LOG);
+ String conf_file_loc = ss.getConf().getVar(
+ HiveConf.ConfVars.HIVEHISTORYFILELOC);
+ if ((conf_file_loc == null) || conf_file_loc.length() == 0)
+ {
+ console.printError("No history file location given");
+ return;
+ }
+
+ //Create directory
+ File f = new File(conf_file_loc);
+ if (!f.exists()){
+ if (!f.mkdir()){
+ console.printError("Unable to create log directory "+conf_file_loc );
+ return;
+ }
+ }
+ histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId()
+ + ".txt";
+ console.printInfo("Hive history file=" + histFileName);
+ histStream = new PrintWriter(histFileName);
+
+ HashMap<String, String> hm = new HashMap<String, String>();
+ hm.put(Keys.SESSION_ID.name(), ss.getSessionId());
+ log(RecordTypes.SessionStart, hm);
+ } catch (FileNotFoundException e) {
+ console.printError("FAILED: Failed to open Query Log : " +histFileName+ " "+ e.getMessage(), "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+
+ }
+
+ /**
+ * @return historyFileName
+ */
+ public String getHistFileName() {
+ return histFileName;
+ }
+
+ /**
+ * Write the a history record to history file
+ *
+ * @param rt
+ * @param keyValMap
+ */
+ void log(RecordTypes rt, Map<String, String> keyValMap) {
+
+ if (histStream == null)
+ return;
+
+ StringBuffer sb = new StringBuffer();
+ sb.append(rt.name());
+
+ for (Map.Entry<String, String> ent : keyValMap.entrySet()) {
+
+ sb.append(DELIMITER);
+ String key = ent.getKey();
+ String val = ent.getValue();
+ val = val.replace('\n', ' ');
+ sb.append(key + "=\"" + val + "\"");
+
+ }
+ sb.append(DELIMITER);
+ sb.append(Keys.TIME.name() + "=\"" + System.currentTimeMillis() + "\"");
+ histStream.println(sb);
+ histStream.flush();
+
+ }
+
+ /**
+ * Called at the start of job Driver.run()
+ */
+ public void startQuery(String cmd, String id) {
+ SessionState ss = SessionState.get();
+ if (ss == null)
+ return;
+ QueryInfo ji = new QueryInfo();
+
+ ji.hm.put(Keys.QUERY_ID.name(), id);
+ ji.hm.put(Keys.QUERY_STRING.name(), cmd);
+
+ queryInfoMap.put(id, ji);
+
+
+
+ log(RecordTypes.QueryStart, ji.hm);
+
+ }
+
+ /**
+ * Used to set job status and other attributes of a job
+ *
+ * @param queryId
+ * @param propName
+ * @param propValue
+ */
+ public void setQueryProperty(String queryId, Keys propName, String propValue) {
+ QueryInfo ji = queryInfoMap.get(queryId);
+ if (ji == null)
+ return;
+ ji.hm.put(propName.name(), propValue);
+ }
+
+ /**
+ * Used to set task properties.
+ *
+ * @param taskId
+ * @param propName
+ * @param propValue
+ */
+ public void setTaskProperty(String queryId, String taskId, Keys propName,
+ String propValue) {
+ String id = queryId + ":" + taskId;
+ TaskInfo ti = taskInfoMap.get(id);
+ if (ti == null)
+ return;
+ ti.hm.put(propName.name(), propValue);
+ }
+
+ /**
+ * Serialize the task counters and set as a task property.
+ *
+ * @param taskId
+ * @param rj
+ */
+ public void setTaskCounters(String queryId, String taskId, RunningJob rj) {
+ String id = queryId + ":" + taskId;
+ TaskInfo ti = taskInfoMap.get(id);
+ if (ti == null)
+ return;
+ StringBuilder sb = new StringBuilder("");
+ try {
+
+ boolean first = true;
+ for (Group group : rj.getCounters()) {
+ for (Counter counter : group) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(',');
+ }
+ sb.append(group.getDisplayName());
+ sb.append('.');
+ sb.append(counter.getDisplayName());
+ sb.append(':');
+ sb.append(counter.getCounter());
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString());
+ }
+
+ /**
+ * Called at the end of Job. A Job is sql query.
+ *
+ * @param queryId
+ */
+ public void endQuery(String queryId) {
+
+ QueryInfo ji = queryInfoMap.get(queryId);
+ if (ji == null)
+ return;
+ log(RecordTypes.QueryEnd, ji.hm);
+ }
+
+ /**
+ * Called at the start of a task. Called by Driver.run() A Job can have
+ * multiple tasks. Tasks will have multiple operator.
+ *
+ * @param task
+ */
+ public void startTask(String queryId, Task<? extends Serializable> task,
+ String taskName) {
+ SessionState ss = SessionState.get();
+ if (ss == null)
+ return;
+ TaskInfo ti = new TaskInfo();
+
+ ti.hm.put(Keys.QUERY_ID.name(), ss.getQueryId());
+ ti.hm.put(Keys.TASK_ID.name(), task.getId());
+ ti.hm.put(Keys.TASK_NAME.name(), taskName);
+
+ String id = queryId + ":" + task.getId();
+ taskInfoMap.put(id, ti);
+
+ log(RecordTypes.TaskStart, ti.hm);
+
+ }
+
+ /**
+ * Called at the end of a task.
+ *
+ * @param task
+ */
+ public void endTask(String queryId, Task<? extends Serializable> task) {
+ String id = queryId + ":" + task.getId();
+ TaskInfo ti = taskInfoMap.get(id);
+
+ if (ti == null)
+ return;
+ log(RecordTypes.TaskEnd, ti.hm);
+ }
+
+ /**
+ * Called at the end of a task.
+ *
+ * @param task
+ */
+ public void progressTask(String queryId, Task<? extends Serializable> task) {
+ String id = queryId + ":" + task.getId();
+ TaskInfo ti = taskInfoMap.get(id);
+ if (ti == null)
+ return;
+ log(RecordTypes.TaskProgress, ti.hm);
+
+ }
+
+}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java?rev=738626&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java Wed Jan 28 21:15:25 2009
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.history;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+
+public class HiveHistoryViewer implements
+ org.apache.hadoop.hive.ql.history.HiveHistory.Listener {
+
+ String historyFile;
+
+ String sessionId;
+
+ // Job Hash Map
+ private HashMap<String, QueryInfo> jobInfoMap = new HashMap<String, QueryInfo>();
+
+ // Task Hash Map
+ private HashMap<String, TaskInfo> taskInfoMap = new HashMap<String, TaskInfo>();
+
+ public HiveHistoryViewer(String path) {
+ historyFile = path;
+ init();
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public Map<String, QueryInfo> getJobInfoMap() {
+ return jobInfoMap;
+ }
+
+ public Map<String, TaskInfo> getTaskInfoMap() {
+ return taskInfoMap;
+ }
+
+ /**
+ * parse history files
+ */
+ void init() {
+
+ try {
+ HiveHistory.parseHiveHistory(historyFile, this);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ /**
+ * Implementation Listner interface function
+ *
+ * @see org.apache.hadoop.hive.ql.history.HiveHistory.Listener#handle(org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes,
+ * java.util.Map)
+ */
+ public void handle(RecordTypes recType, Map<String, String> values) {
+
+ if (recType == RecordTypes.SessionStart) {
+ sessionId = values.get(Keys.SESSION_ID.name());
+ } else if (recType == RecordTypes.QueryStart || recType == RecordTypes.QueryEnd) {
+ String key = values.get(Keys.QUERY_ID.name());
+ QueryInfo ji;
+ if (jobInfoMap.containsKey(key)) {
+ ji = jobInfoMap.get(key);
+
+ ji.hm.putAll(values);
+
+ } else {
+ ji = new QueryInfo();
+ ji.hm = new HashMap<String, String>();
+ ji.hm.putAll(values);
+
+ jobInfoMap.put(key, ji);
+
+ }
+ } else if (recType == RecordTypes.TaskStart
+ || recType == RecordTypes.TaskEnd
+ || recType == RecordTypes.TaskProgress) {
+
+ String jobid = values.get(Keys.QUERY_ID.name());
+ String taskid = values.get(Keys.TASK_ID.name());
+ String key = jobid + ":" + taskid;
+ TaskInfo ti;
+ if (taskInfoMap.containsKey(key)) {
+ ti = taskInfoMap.get(key);
+ ti.hm.putAll(values);
+ } else {
+ ti = new TaskInfo();
+ ti.hm = new HashMap<String, String>();
+ ti.hm.putAll(values);
+ taskInfoMap.put(key, ti);
+
+ }
+
+ }
+
+ }
+
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Wed Jan 28 21:15:25 2009
@@ -31,6 +31,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.commons.lang.StringUtils;
@@ -58,6 +59,10 @@
*/
protected Hive db;
+ /*
+ * HiveHistory Object
+ */
+ protected HiveHistory hiveHist;
/**
* Streams to read/write from
*/
@@ -121,10 +126,15 @@
}
public void setCmd(String cmdString) {
- conf.setVar(HiveConf.ConfVars.HIVEQUERYID, cmdString);
+ conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, cmdString);
}
public String getCmd() {
+ return (conf.getVar(HiveConf.ConfVars.HIVEQUERYSTRING));
+ }
+
+
+ public String getQueryId() {
return (conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
}
@@ -144,6 +154,7 @@
public static SessionState start(HiveConf conf) {
SessionState ss = new SessionState (conf);
ss.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId());
+ ss.hiveHist = new HiveHistory(ss);
tss.set(ss);
return (ss);
}
@@ -154,10 +165,15 @@
* session object when switching from one session to another
*/
public static SessionState start(SessionState startSs) {
+
tss.set(startSs);
if(StringUtils.isEmpty(startSs.getConf().getVar(HiveConf.ConfVars.HIVESESSIONID))) {
startSs.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId());
}
+
+ if (startSs.hiveHist == null){
+ startSs.hiveHist = new HiveHistory(startSs);
+ }
return startSs;
}
@@ -168,7 +184,16 @@
return tss.get();
}
-
+
+ /**
+ * get hiveHitsory object which does structured logging
+ * @return
+ */
+ public HiveHistory getHiveHistory(){
+ return hiveHist;
+ }
+
+
private static String makeSessionId() {
GregorianCalendar gc = new GregorianCalendar();
String userid = System.getProperty("user.name");
Modified: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java Wed Jan 28 21:15:25 2009
@@ -159,6 +159,8 @@
this.logDir = logDir;
conf = new HiveConf(Driver.class);
+ CliSessionState ss = new CliSessionState(conf);
+ SessionState.start(ss);
// System.out.println(conf.toString());
testFiles = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
@@ -371,8 +373,8 @@
createSources();
}
- CliSessionState ss = new CliSessionState(conf);
-
+ //CliSessionState ss = new CliSessionState(conf);
+ SessionState ss = SessionState.get();
ss.in = System.in;
File qf = new File(outDir, tname);
Added: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java?rev=738626&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java (added)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/history/TestHiveHistory.java Wed Jan 28 21:15:25 2009
@@ -0,0 +1,164 @@
+package org.apache.hadoop.hive.ql.history;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.tools.LineageInfo;
+import org.apache.hadoop.hive.service.HiveInterface;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import junit.framework.TestCase;
+
+public class TestHiveHistory extends TestCase {
+
+ static HiveConf conf;
+
+ static private String tmpdir = "/tmp/" + System.getProperty("user.name")
+ + "/";
+ static private Path tmppath = new Path(tmpdir);
+ static private Hive db;
+ static private FileSystem fs;
+
+ /*
+ * intialize the tables
+ */
+
+ protected void setUp(){
+ try {
+ conf = new HiveConf(HiveHistory.class);
+
+ fs = FileSystem.get(conf);
+ if (fs.exists(tmppath) && !fs.getFileStatus(tmppath).isDir()) {
+ throw new RuntimeException(tmpdir + " exists but is not a directory");
+ }
+
+ if (!fs.exists(tmppath)) {
+ if (!fs.mkdirs(tmppath)) {
+ throw new RuntimeException("Could not make scratch directory "
+ + tmpdir);
+ }
+ }
+
+ // copy the test files into hadoop if required.
+ int i = 0;
+ Path[] hadoopDataFile = new Path[2];
+ String[] testFiles = { "kv1.txt", "kv2.txt" };
+ String testFileDir = "file://"
+ + conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+ for (String oneFile : testFiles) {
+ Path localDataFile = new Path(testFileDir, oneFile);
+ hadoopDataFile[i] = new Path(tmppath, oneFile);
+ fs.copyFromLocalFile(false, true, localDataFile, hadoopDataFile[i]);
+ i++;
+ }
+
+ // load the test files into tables
+ i = 0;
+ db = Hive.get(conf);
+ String[] srctables = { "src", "src2" };
+ LinkedList<String> cols = new LinkedList<String>();
+ cols.add("key");
+ cols.add("value");
+ for (String src : srctables) {
+ db.dropTable(src, true, true);
+ db.createTable(src, cols, null, TextInputFormat.class,
+ IgnoreKeyTextOutputFormat.class);
+ db.loadTable(hadoopDataFile[i], src, false);
+ i++;
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new RuntimeException("Encountered throwable");
+ }
+}
+
+ /**
+ * check history file output for this query.als
+ */
+ public void testSimpleQuery() {
+ LineageInfo lep = new LineageInfo();
+ try {
+
+ // NOTE: It is critical to do this here so that log4j is reinitialized
+ // before
+ // any of the other core hive classes are loaded
+ SessionState.initHiveLog4j();
+
+ CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
+ ss.in = System.in;
+ try {
+ ss.out = new PrintStream(System.out, true, "UTF-8");
+ ss.err = new PrintStream(System.err, true, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ System.exit(3);
+ }
+
+ SessionState.start(ss);
+
+ String cmd = "select a.key from src a";
+ Driver d = new Driver();
+ int ret = d.run(cmd);
+ if (ret != 0) {
+ fail("Failed");
+ }
+ HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get()
+ .getHiveHistory().getHistFileName());
+ Map<String, QueryInfo> jobInfoMap = hv.getJobInfoMap();
+ Map<String, TaskInfo> taskInfoMap = hv.getTaskInfoMap();
+ if (jobInfoMap.size() != 1) {
+ fail("jobInfo Map size not 1");
+ }
+
+ if (taskInfoMap.size() != 1) {
+ fail("jobInfo Map size not 1");
+ }
+
+
+ cmd = (String)jobInfoMap.keySet().toArray()[0];
+ QueryInfo ji = jobInfoMap.get(cmd);
+
+ if (!ji.hm.get(Keys.QUERY_NUM_TASKS.name()).equals("1")) {
+ fail("Wrong number of tasks");
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed");
+ }
+ }
+
+}
Modified: hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm?rev=738626&r1=738625&r2=738626&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm (original)
+++ hadoop/hive/trunk/ql/src/test/templates/TestCliDriver.vm Wed Jan 28 21:15:25 2009
@@ -9,6 +9,13 @@
import org.apache.hadoop.hive.ql.QTestUtil;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.history.HiveHistoryViewer;
+import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
import org.antlr.runtime.*;
import org.antlr.runtime.tree.*;
@@ -62,6 +69,19 @@
if (ecode != 0) {
fail("Client Execution failed with error code = " + ecode);
}
+ if (SessionState.get() != null) {
+ HiveHistoryViewer hv = new HiveHistoryViewer(SessionState.get()
+ .getHiveHistory().getHistFileName());
+ Map<String, QueryInfo> jobInfoMap = hv.getJobInfoMap();
+ Map<String, TaskInfo> taskInfoMap = hv.getTaskInfoMap();
+
+ String cmd = (String)jobInfoMap.keySet().toArray()[0];
+ QueryInfo ji = jobInfoMap.get(cmd);
+
+ if (!ji.hm.get(Keys.QUERY_RET_CODE.name()).equals("0")) {
+ fail("Wrong return code in hive history");
+ }
+ }
ecode = qt.checkCliDriverResults("$fname");
if (ecode != 0) {