You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/02/24 00:58:42 UTC
svn commit: r1073992 [1/3] - in /hive/trunk:
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/
ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/ ql/src/java/org/a...
Author: nzhang
Date: Wed Feb 23 23:58:41 2011
New Revision: 1073992
URL: http://svn.apache.org/viewvc?rev=1073992&view=rev
Log:
HIVE-1950. Block merge for RCFile (Yongqiang He via Ning Zhang)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
hive/trunk/ql/src/test/queries/clientnegative/merge_negative_1.q
hive/trunk/ql/src/test/queries/clientnegative/merge_negative_2.q
hive/trunk/ql/src/test/queries/clientpositive/alter_merge.q
hive/trunk/ql/src/test/queries/clientpositive/alter_merge_stats.q
hive/trunk/ql/src/test/results/clientnegative/merge_negative_1.q.out
hive/trunk/ql/src/test/results/clientnegative/merge_negative_2.q.out
hive/trunk/ql/src/test/results/clientpositive/alter_merge.q.out
hive/trunk/ql/src/test/results/clientpositive/alter_merge_stats.q.out
hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/CombineHiveKey.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Wed Feb 23 23:58:41 2011
@@ -84,7 +84,7 @@ public class ConditionalTask extends Tas
for (Task<? extends Serializable> tsk : getListTasks()) {
if (!resTasks.contains(tsk)) {
driverContext.getRunnable().remove(tsk);
- console.printInfo(ExecDriver.getJobEndMsg("" + Utilities.randGen.nextInt())
+ console.printInfo(HadoopJobExecHelper.getJobEndMsg("" + Utilities.randGen.nextInt())
+ ", job is filtered out (removed at runtime).");
if (tsk.isMapRedTask()) {
driverContext.incCurJobNo(1);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Feb 23 23:58:41 2011
@@ -75,6 +75,9 @@ import org.apache.hadoop.hive.ql.DriverC
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
@@ -89,6 +92,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.MetaDataFormatUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.AlterTablePartMergeFilesDesc;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.AlterIndexDesc;
@@ -138,6 +142,7 @@ import org.apache.hadoop.hive.serde2.dyn
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.util.ToolRunner;
/**
@@ -350,6 +355,11 @@ public class DDLTask extends Task<DDLWor
if (showIndexes != null) {
return showIndexes(db, showIndexes);
}
+
+ AlterTablePartMergeFilesDesc mergeFilesDesc = work.getMergeFilesDesc();
+ if(mergeFilesDesc != null) {
+ return mergeFiles(db, mergeFilesDesc);
+ }
} catch (InvalidTableException e) {
console.printError("Table " + e.getTableName() + " does not exist");
@@ -369,6 +379,33 @@ public class DDLTask extends Task<DDLWor
return 0;
}
+ /**
+ * First, make sure the source table/partition is not
+ * archived/indexes/non-rcfile. If either of these is true, throw an
+ * exception.
+ *
+ * The way how it does the merge is to create a BlockMergeTask from the
+ * mergeFilesDesc.
+ *
+ * @param db
+ * @param mergeFilesDesc
+ * @return
+ * @throws HiveException
+ */
+ private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
+ throws HiveException {
+ // merge work only needs input and output.
+ MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
+ mergeFilesDesc.getOutputDir());
+ DriverContext driverCxt = new DriverContext();
+ BlockMergeTask taskExec = new BlockMergeTask();
+ taskExec.initialize(db.getConf(), null, driverCxt);
+ taskExec.setWork(mergeWork);
+ int ret = taskExec.execute(driverCxt);
+
+ return ret;
+ }
+
private int grantOrRevokeRole(GrantRevokeRoleDDL grantOrRevokeRoleDDL)
throws HiveException {
try {
@@ -894,23 +931,6 @@ public class DDLTask extends Task<DDLWor
return 0;
}
- /**
- * Determines whether a partition has been archived
- *
- * @param p
- * @return
- */
-
- private boolean isArchived(Partition p) {
- Map<String, String> params = p.getParameters();
- if ("true".equalsIgnoreCase(params.get(
- org.apache.hadoop.hive.metastore.api.Constants.IS_ARCHIVED))) {
- return true;
- } else {
- return false;
- }
- }
-
private void setIsArchived(Partition p, boolean state) {
Map<String, String> params = p.getParameters();
if (state) {
@@ -958,7 +978,7 @@ public class DDLTask extends Task<DDLWor
*/
private void setArchived(Partition p, Path parentDir, String dirInArchive, String archiveName)
throws URISyntaxException {
- assert(isArchived(p) == false);
+ assert(Utilities.isArchived(p) == false);
Map<String, String> params = p.getParameters();
URI parentUri = parentDir.toUri();
@@ -996,7 +1016,7 @@ public class DDLTask extends Task<DDLWor
* @param p - the partition to modify
*/
private void setUnArchived(Partition p) {
- assert(isArchived(p) == true);
+ assert(Utilities.isArchived(p) == true);
String parentDir = getOriginalLocation(p);
setIsArchived(p, false);
setOriginalLocation(p, null);
@@ -1051,7 +1071,7 @@ public class DDLTask extends Task<DDLWor
throw new HiveException("Specified partition does not exist");
}
- if (isArchived(p)) {
+ if (Utilities.isArchived(p)) {
// If there were a failure right after the metadata was updated in an
// archiving operation, it's possible that the original, unarchived files
// weren't deleted.
@@ -1236,7 +1256,7 @@ public class DDLTask extends Task<DDLWor
throw new HiveException("Specified partition does not exist");
}
- if (!isArchived(p)) {
+ if (!Utilities.isArchived(p)) {
Path location = new Path(p.getLocation());
Path leftOverArchiveDir = new Path(location.getParent(),
location.getName() + INTERMEDIATE_ARCHIVED_DIR_SUFFIX);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Feb 23 23:58:41 2011
@@ -28,13 +28,9 @@ import java.lang.management.MemoryMXBean
import java.net.URL;
import java.net.URLDecoder;
import java.net.URLEncoder;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -56,10 +52,6 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
-import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
-import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
-import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
@@ -85,7 +77,6 @@ import org.apache.hadoop.mapred.JobClien
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.log4j.Appender;
import org.apache.log4j.BasicConfigurator;
@@ -98,30 +89,22 @@ import org.apache.log4j.varia.NullAppend
* ExecDriver.
*
*/
-public class ExecDriver extends Task<MapredWork> implements Serializable {
+public class ExecDriver extends Task<MapredWork> implements Serializable, HadoopJobExecHook {
private static final long serialVersionUID = 1L;
protected transient JobConf job;
- protected transient int mapProgress = 0;
- protected transient int reduceProgress = 0;
- public transient String jobId;
-
- public String getJobId() {
- return jobId;
- }
-
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
public static MemoryMXBean memoryMXBean;
+ protected HadoopJobExecHelper jobExecHelper;
/**
* Constructor when invoked from QL.
*/
public ExecDriver() {
super();
+ LOG = LogFactory.getLog(this.getClass().getName());
+ console = new LogHelper(LOG);
+ this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
}
protected static String getResourceFiles(Configuration conf, SessionState.ResourceType t) {
@@ -179,6 +162,7 @@ public class ExecDriver extends Task<Map
if (StringUtils.isNotBlank(addedArchives)) {
HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives);
}
+ this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
}
/**
@@ -189,101 +173,7 @@ public class ExecDriver extends Task<Map
this.job = job;
LOG = LogFactory.getLog(this.getClass().getName());
console = new LogHelper(LOG, isSilent);
- }
-
- /**
- * A list of the currently running jobs spawned in this Hive instance that is used to kill all
- * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are
- * still jobs running.
- */
- private static Map<String, String> runningJobKillURIs = Collections
- .synchronizedMap(new HashMap<String, String>());
-
- /**
- * In Hive, when the user control-c's the command line, any running jobs spawned from that command
- * line are best-effort killed.
- *
- * This static constructor registers a shutdown thread to iterate over all the running job kill
- * URLs and do a get on them.
- *
- */
- static {
- if (new org.apache.hadoop.conf.Configuration()
- .getBoolean("webinterface.private.actions", false)) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- synchronized (runningJobKillURIs) {
- for (String uri : runningJobKillURIs.values()) {
- try {
- System.err.println("killing job with: " + uri);
- java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
- .openConnection();
- conn.setRequestMethod("POST");
- int retCode = conn.getResponseCode();
- if (retCode != 200) {
- System.err.println("Got an error trying to kill job with URI: " + uri + " = "
- + retCode);
- }
- } catch (Exception e) {
- System.err.println("trying to kill job, caught: " + e);
- // do nothing
- }
- }
- }
- }
- });
- }
- }
-
- /**
- * from StreamJob.java.
- */
- private void jobInfo(RunningJob rj) {
- if (job.get("mapred.job.tracker", "local").equals("local")) {
- console.printInfo("Job running in-process (local Hadoop)");
- } else {
- String hp = job.get("mapred.job.tracker");
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
- getId(), Keys.TASK_HADOOP_ID, rj.getJobID());
- }
- console.printInfo(ExecDriver.getJobStartMsg(rj.getJobID()) + ", Tracking URL = "
- + rj.getTrackingURL());
- console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
- + " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
- }
- }
-
- /**
- * This class contains the state of the running task Going forward, we will return this handle
- * from execute and Driver can split execute into start, monitorProgess and postProcess.
- */
- private static class ExecDriverTaskHandle extends TaskHandle {
- JobClient jc;
- RunningJob rj;
-
- JobClient getJobClient() {
- return jc;
- }
-
- RunningJob getRunningJob() {
- return rj;
- }
-
- public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
- this.jc = jc;
- this.rj = rj;
- }
-
- public void setRunningJob(RunningJob job) {
- rj = job;
- }
-
- @Override
- public Counters getCounters() throws IOException {
- return rj.getCounters();
- }
+ this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
}
/**
@@ -296,20 +186,7 @@ public class ExecDriver extends Task<Map
*
* @return true if fatal errors happened during job execution, false otherwise.
*/
- private boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
- if (ctrs == null) {
- // hadoop might return null if it cannot locate the job.
- // we may still be able to retrieve the job status - so ignore
- return false;
- }
- // check for number of created files
- long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
- long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
- if (numFiles > upperLimit) {
- errMsg.append("total number of created files exceeds ").append(upperLimit);
- return true;
- }
-
+ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
if (op.checkFatalErrors(ctrs, errMsg)) {
return true;
@@ -323,186 +200,7 @@ public class ExecDriver extends Task<Map
return false;
}
- private boolean progress(ExecDriverTaskHandle th) throws IOException {
- JobClient jc = th.getJobClient();
- RunningJob rj = th.getRunningJob();
- String lastReport = "";
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
- long reportTime = System.currentTimeMillis();
- long maxReportInterval = 60 * 1000; // One minute
- boolean fatal = false;
- StringBuilder errMsg = new StringBuilder();
- long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
- boolean initializing = true;
- boolean initOutputPrinted = false;
- while (!rj.isComplete()) {
- try {
- Thread.sleep(pullInterval);
- } catch (InterruptedException e) {
- }
-
- if (initializing && ShimLoader.getHadoopShims().isJobPreparing(rj)) {
- // No reason to poll untill the job is initialized
- continue;
- } else {
- // By now the job is initialized so no reason to do
- // rj.getJobState() again and we do not want to do an extra RPC call
- initializing = false;
- }
-
- if (!initOutputPrinted) {
- SessionState ss = SessionState.get();
-
- String logMapper;
- String logReducer;
-
- TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID());
- if (mappers == null) {
- logMapper = "no information for number of mappers; ";
- } else {
- int numMap = mappers.length;
- if (ss != null) {
- ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
- Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
- }
- logMapper = "number of mappers: " + numMap + "; ";
- }
-
- TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID());
- if (reducers == null) {
- logReducer = "no information for number of reducers. ";
- } else {
- int numReduce = reducers.length;
- if (ss != null) {
- ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
- Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
- }
- logReducer = "number of reducers: " + numReduce;
- }
-
- console
- .printInfo("Hadoop job information for " + getId() + ": " + logMapper + logReducer);
- initOutputPrinted = true;
- }
-
- RunningJob newRj = jc.getJob(rj.getJobID());
- if (newRj == null) {
- // under exceptional load, hadoop may not be able to look up status
- // of finished jobs (because it has purged them from memory). From
- // hive's perspective - it's equivalent to the job having failed.
- // So raise a meaningful exception
- throw new IOException("Could not find status of job: + rj.getJobID()");
- } else {
- th.setRunningJob(newRj);
- rj = newRj;
- }
-
- // If fatal errors happen we should kill the job immediately rather than
- // let the job retry several times, which eventually lead to failure.
- if (fatal) {
- continue; // wait until rj.isComplete
- }
-
- Counters ctrs = th.getCounters();
-
- if (fatal = checkFatalErrors(ctrs, errMsg)) {
- console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job.");
- rj.killJob();
- continue;
- }
- errMsg.setLength(0);
-
- updateCounters(ctrs, rj);
-
- String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress
- + "%";
-
- if (!report.equals(lastReport)
- || System.currentTimeMillis() >= reportTime + maxReportInterval) {
-
- // write out serialized plan with counters to log file
- // LOG.info(queryPlan);
- String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
- SessionState ss = SessionState.get();
- if (ss != null) {
- ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), getId(), ctrs);
- ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
- Keys.TASK_HADOOP_PROGRESS, output);
- ss.getHiveHistory().progressTask(SessionState.get().getQueryId(), this);
- ss.getHiveHistory().logPlanProgress(queryPlan);
- }
- console.printInfo(output);
- lastReport = report;
- reportTime = System.currentTimeMillis();
- }
- }
-
- boolean success;
- Counters ctrs = th.getCounters();
-
- if (fatal) {
- success = false;
- } else {
- // check for fatal error again in case it occurred after
- // the last check before the job is completed
- if (checkFatalErrors(ctrs, errMsg)) {
- console.printError("[Fatal Error] " + errMsg.toString());
- success = false;
- } else {
- success = rj.isSuccessful();
- }
- }
-
- setDone();
- // update based on the final value of the counters
- updateCounters(ctrs, rj);
-
- SessionState ss = SessionState.get();
- if (ss != null) {
- ss.getHiveHistory().logPlanProgress(queryPlan);
- }
- // LOG.info(queryPlan);
- return (success);
- }
-
- /**
- * Update counters relevant to this task.
- */
- private void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
- mapProgress = Math.round(rj.mapProgress() * 100);
- reduceProgress = Math.round(rj.reduceProgress() * 100);
- taskCounters.put("CNTR_NAME_" + getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
- taskCounters.put("CNTR_NAME_" + getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
- if (ctrs == null) {
- // hadoop might return null if it cannot locate the job.
- // we may still be able to retrieve the job status - so ignore
- return;
- }
- for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
- op.updateCounters(ctrs);
- }
- if (work.getReducer() != null) {
- work.getReducer().updateCounters(ctrs);
- }
- }
-
- public boolean mapStarted() {
- return mapProgress > 0;
- }
-
- public boolean reduceStarted() {
- return reduceProgress > 0;
- }
-
- public boolean mapDone() {
- return mapProgress == 100;
- }
-
- public boolean reduceDone() {
- return reduceProgress == 100;
- }
-
- /**
+ /**
* Execute a query plan using Hadoop.
*/
@Override
@@ -676,7 +374,6 @@ public class ExecDriver extends Task<Map
HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE");
}
JobClient jc = new JobClient(job);
-
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
@@ -692,35 +389,13 @@ public class ExecDriver extends Task<Map
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
-
- jobId = rj.getJobID();
-
// replace it back
if (pwd != null) {
HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
}
-
- // add to list of running jobs to kill in case of abnormal shutdown
-
- runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill");
-
- ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
- jobInfo(rj);
- success = progress(th);
-
- String statusMesg = getJobEndMsg(rj.getJobID());
- if (!success) {
- statusMesg += " with errors";
- returnVal = 2;
- console.printError(statusMesg);
- if (HiveConf.getBoolVar(job, HiveConf.ConfVars.SHOW_JOB_FAIL_DEBUG_INFO)) {
- showJobFailDebugInfo(job, rj);
- }
- } else {
- console.printInfo(statusMesg);
- }
-
-
+
+ returnVal = jobExecHelper.progress(rj, jc);
+ success = (returnVal == 0);
} catch (Exception e) {
e.printStackTrace();
String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
@@ -747,7 +422,7 @@ public class ExecDriver extends Task<Map
if (returnVal != 0) {
rj.killJob();
}
- runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
}
} catch (Exception e) {
}
@@ -778,179 +453,21 @@ public class ExecDriver extends Task<Map
return (returnVal);
}
-
- /**
- * This msg pattern is used to track when a job is started.
- *
- * @param jobId
- * @return
- */
- private static String getJobStartMsg(String jobId) {
- return "Starting Job = " + jobId;
- }
-
- /**
- * this msg pattern is used to track when a job is successfully done.
- *
- * @param jobId
- * @return
- */
- public static String getJobEndMsg(String jobId) {
- return "Ended Job = " + jobId;
+
+ public boolean mapStarted() {
+ return this.jobExecHelper.mapStarted();
}
- private String getTaskAttemptLogUrl(String taskTrackerHttpAddress, String taskAttemptId) {
- return taskTrackerHttpAddress + "/tasklog?taskid=" + taskAttemptId + "&all=true";
+ public boolean reduceStarted() {
+ return this.jobExecHelper.reduceStarted();
}
- // Used for showJobFailDebugInfo
- private static class TaskInfo {
- String jobId;
- HashSet<String> logUrls;
-
- public TaskInfo(String jobId) {
- this.jobId = jobId;
- logUrls = new HashSet<String>();
- }
-
- public void addLogUrl(String logUrl) {
- logUrls.add(logUrl);
- }
-
- public HashSet<String> getLogUrls() {
- return logUrls;
- }
-
- public String getJobId() {
- return jobId;
- }
+ public boolean mapDone() {
+ return this.jobExecHelper.mapDone();
}
- @SuppressWarnings("deprecation")
- private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
- // Mapping from task ID to the number of failures
- Map<String, Integer> failures = new HashMap<String, Integer>();
- // Successful task ID's
- Set<String> successes = new HashSet<String>();
-
- Map<String, TaskInfo> taskIdToInfo = new HashMap<String, TaskInfo>();
-
- int startIndex = 0;
-
- // Loop to get all task completion events because getTaskCompletionEvents
- // only returns a subset per call
- while (true) {
- TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
-
- if (taskCompletions == null || taskCompletions.length == 0) {
- break;
- }
-
- boolean more = true;
- for (TaskCompletionEvent t : taskCompletions) {
- // getTaskJobIDs returns Strings for compatibility with Hadoop versions
- // without TaskID or TaskAttemptID
- String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
-
- if (taskJobIds == null) {
- console.printError("Task attempt info is unavailable in this Hadoop version");
- more = false;
- break;
- }
-
- // For each task completion event, get the associated task id, job id
- // and the logs
- String taskId = taskJobIds[0];
- String jobId = taskJobIds[1];
-
- TaskInfo ti = taskIdToInfo.get(taskId);
- if (ti == null) {
- ti = new TaskInfo(jobId);
- taskIdToInfo.put(taskId, ti);
- }
- // These tasks should have come from the same job.
- assert (ti.getJobId() == jobId);
- ti.getLogUrls().add(getTaskAttemptLogUrl(t.getTaskTrackerHttp(), t.getTaskId()));
-
- // If a task failed, then keep track of the total number of failures
- // for that task (typically, a task gets re-run up to 4 times if it
- // fails
-
- if (t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
- Integer failAttempts = failures.get(taskId);
- if (failAttempts == null) {
- failAttempts = Integer.valueOf(0);
- }
- failAttempts = Integer.valueOf(failAttempts.intValue() + 1);
- failures.put(taskId, failAttempts);
- } else {
- successes.add(taskId);
- }
- }
- if (!more) {
- break;
- }
- startIndex += taskCompletions.length;
- }
- // Remove failures for tasks that succeeded
- for (String task : successes) {
- failures.remove(task);
- }
-
- if (failures.keySet().size() == 0) {
- return;
- }
-
- // Find the highest failure count
- int maxFailures = 0;
- for (Integer failCount : failures.values()) {
- if (maxFailures < failCount.intValue()) {
- maxFailures = failCount.intValue();
- }
- }
-
- // Display Error Message for tasks with the highest failure count
- String jtUrl = JobTrackerURLResolver.getURL(conf);
-
- for (String task : failures.keySet()) {
- if (failures.get(task).intValue() == maxFailures) {
- TaskInfo ti = taskIdToInfo.get(task);
- String jobId = ti.getJobId();
- String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid=" + task.toString();
-
- TaskLogProcessor tlp = new TaskLogProcessor(conf);
- for (String logUrl : ti.getLogUrls()) {
- tlp.addTaskAttemptLogUrl(logUrl);
- }
-
- List<ErrorAndSolution> errors = tlp.getErrors();
-
- StringBuilder sb = new StringBuilder();
- // We use a StringBuilder and then call printError only once as
- // printError will write to both stderr and the error log file. In
- // situations where both the stderr and the log file output is
- // simultaneously output to a single stream, this will look cleaner.
- sb.append("\n");
- sb.append("Task with the most failures(" + maxFailures + "): \n");
- sb.append("-----\n");
- sb.append("Task ID:\n " + task + "\n\n");
- sb.append("URL:\n " + taskUrl + "\n");
-
- for (ErrorAndSolution e : errors) {
- sb.append("\n");
- sb.append("Possible error:\n " + e.getError() + "\n\n");
- sb.append("Solution:\n " + e.getSolution() + "\n");
- }
- sb.append("-----\n");
-
- console.printError(sb.toString());
-
- // Only print out one task because that's good enough for debugging.
- break;
- }
- }
- return;
-
+ public boolean reduceDone() {
+ return this.jobExecHelper.reduceDone();
}
private static void printUsage() {
@@ -1375,4 +892,19 @@ public class ExecDriver extends Task<Map
}
}
}
+
+ @Override
+ public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
+ for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
+ op.updateCounters(ctrs);
+ }
+ if (work.getReducer() != null) {
+ work.getReducer().updateCounters(ctrs);
+ }
+ }
+
+ @Override
+ public void logPlanProgress(SessionState ss) throws IOException {
+ ss.getHiveHistory().logPlanProgress(queryPlan);
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Feb 23 23:58:41 2011
@@ -673,7 +673,7 @@ public class FileSinkOperator extends Te
if ((conf != null) && isNativeTable) {
String specPath = conf.getDirName();
DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
- mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx);
+ Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf);
}
} catch (IOException e) {
throw new HiveException(e);
@@ -681,83 +681,6 @@ public class FileSinkOperator extends Te
super.jobClose(hconf, success, feedBack);
}
- public void mvFileToFinalPath(String specPath, Configuration hconf,
- boolean success, Log log, DynamicPartitionCtx dpCtx) throws IOException, HiveException {
-
- FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
- Path tmpPath = Utilities.toTempPath(specPath);
- Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
- + ".intermediate");
- Path finalPath = new Path(specPath);
- if (success) {
- if (fs.exists(tmpPath)) {
- // Step1: rename tmp output folder to intermediate path. After this
- // point, updates from speculative tasks still writing to tmpPath
- // will not appear in finalPath.
- log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
- Utilities.rename(fs, tmpPath, intermediatePath);
- // Step2: remove any tmp file or double-committed output files
- ArrayList<String> emptyBuckets =
- Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
- // create empty buckets if necessary
- if (emptyBuckets.size() > 0) {
- createEmptyBuckets(hconf, emptyBuckets);
- }
-
- // Step3: move to the file destination
- log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
- Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
- }
- } else {
- fs.delete(tmpPath, true);
- }
- }
-
- /**
- * Check the existence of buckets according to bucket specification. Create empty buckets if
- * needed.
- * @param specPath The final path where the dynamic partitions should be in.
- * @param conf FileSinkDesc.
- * @param dpCtx dynamic partition context.
- * @throws HiveException
- * @throws IOException
- */
- private void createEmptyBuckets(Configuration hconf, ArrayList<String> paths)
- throws HiveException, IOException {
-
- JobConf jc;
- if (hconf instanceof JobConf) {
- jc = new JobConf(hconf);
- } else {
- // test code path
- jc = new JobConf(hconf, ExecDriver.class);
- }
- HiveOutputFormat<?, ?> hiveOutputFormat = null;
- Class<? extends Writable> outputClass = null;
- boolean isCompressed = conf.getCompressed();
- TableDesc tableInfo = conf.getTableInfo();
- try {
- Serializer serializer = (Serializer) tableInfo.getDeserializerClass().newInstance();
- serializer.initialize(null, tableInfo.getProperties());
- outputClass = serializer.getSerializedClass();
- hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
- } catch (SerDeException e) {
- throw new HiveException(e);
- } catch (InstantiationException e) {
- throw new HiveException(e);
- } catch (IllegalAccessException e) {
- throw new HiveException(e);
- }
-
- for (String p: paths) {
- Path path = new Path(p);
- RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
- jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
- writer.close(false);
- LOG.info("created empty bucket for enforcing bucketing at " + path);
- }
- }
-
@Override
public OperatorType getType() {
return OperatorType.FILESINK;
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1073992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Wed Feb 23 23:58:41 2011
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
+import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
+import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+
+public class HadoopJobExecHelper {
+
+ protected transient JobConf job;
+ protected Task<? extends Serializable> task;
+
+ protected transient int mapProgress = 0;
+ protected transient int reduceProgress = 0;
+ public transient String jobId;
+ private LogHelper console;
+ private HadoopJobExecHook callBackObj;
+
+ /**
+ * Update counters relevant to this task.
+ */
+ private void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
+ mapProgress = Math.round(rj.mapProgress() * 100);
+ reduceProgress = Math.round(rj.reduceProgress() * 100);
+ task.taskCounters.put("CNTR_NAME_" + task.getId() + "_MAP_PROGRESS", Long.valueOf(mapProgress));
+ task.taskCounters.put("CNTR_NAME_" + task.getId() + "_REDUCE_PROGRESS", Long.valueOf(reduceProgress));
+ if (ctrs == null) {
+ // hadoop might return null if it cannot locate the job.
+ // we may still be able to retrieve the job status - so ignore
+ return;
+ }
+ if(callBackObj != null) {
+ callBackObj.updateCounters(ctrs, rj);
+ }
+ }
+
+ /**
+ * This msg pattern is used to track when a job is started.
+ *
+ * @param jobId
+ * @return
+ */
+ private static String getJobStartMsg(String jobId) {
+ return "Starting Job = " + jobId;
+ }
+
+ /**
+ * this msg pattern is used to track when a job is successfully done.
+ *
+ * @param jobId
+ * @return
+ */
+ public static String getJobEndMsg(String jobId) {
+ return "Ended Job = " + jobId;
+ }
+
+ private String getTaskAttemptLogUrl(String taskTrackerHttpAddress, String taskAttemptId) {
+ return taskTrackerHttpAddress + "/tasklog?taskid=" + taskAttemptId + "&all=true";
+ }
+
+ public boolean mapStarted() {
+ return mapProgress > 0;
+ }
+
+ public boolean reduceStarted() {
+ return reduceProgress > 0;
+ }
+
+ public boolean mapDone() {
+ return mapProgress == 100;
+ }
+
+ public boolean reduceDone() {
+ return reduceProgress == 100;
+ }
+
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+
+ public HadoopJobExecHelper() {
+ }
+
+ public HadoopJobExecHelper(JobConf job, LogHelper console,
+ Task<? extends Serializable> task, HadoopJobExecHook hookCallBack) {
+ this.job = job;
+ this.console = console;
+ this.task = task;
+ this.callBackObj = hookCallBack;
+ }
+
+
+ /**
+ * A list of the currently running jobs spawned in this Hive instance that is used to kill all
+ * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts down while there are
+ * still jobs running.
+ */
+ public static Map<String, String> runningJobKillURIs = Collections
+ .synchronizedMap(new HashMap<String, String>());
+
+
+ /**
+ * In Hive, when the user control-c's the command line, any running jobs spawned from that command
+ * line are best-effort killed.
+ *
+ * This static constructor registers a shutdown thread to iterate over all the running job kill
+ * URLs and do a get on them.
+ *
+ */
+ static {
+ if (new org.apache.hadoop.conf.Configuration()
+ .getBoolean("webinterface.private.actions", false)) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ synchronized (runningJobKillURIs) {
+ for (String uri : runningJobKillURIs.values()) {
+ try {
+ System.err.println("killing job with: " + uri);
+ java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
+ .openConnection();
+ conn.setRequestMethod("POST");
+ int retCode = conn.getResponseCode();
+ if (retCode != 200) {
+ System.err.println("Got an error trying to kill job with URI: " + uri + " = "
+ + retCode);
+ }
+ } catch (Exception e) {
+ System.err.println("trying to kill job, caught: " + e);
+ // do nothing
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+
+ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
+ if (ctrs == null) {
+ // hadoop might return null if it cannot locate the job.
+ // we may still be able to retrieve the job status - so ignore
+ return false;
+ }
+ // check for number of created files
+ long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+ long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
+ if (numFiles > upperLimit) {
+ errMsg.append("total number of created files exceeds ").append(upperLimit);
+ return true;
+ }
+ return this.callBackObj.checkFatalErrors(ctrs, errMsg);
+ }
+
+ private boolean progress(ExecDriverTaskHandle th) throws IOException {
+ JobClient jc = th.getJobClient();
+ RunningJob rj = th.getRunningJob();
+ String lastReport = "";
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ long reportTime = System.currentTimeMillis();
+ long maxReportInterval = 60 * 1000; // One minute
+ boolean fatal = false;
+ StringBuilder errMsg = new StringBuilder();
+ long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
+ boolean initializing = true;
+ while (!rj.isComplete()) {
+ try {
+ Thread.sleep(pullInterval);
+ } catch (InterruptedException e) {
+ }
+
+ if (initializing && ShimLoader.getHadoopShims().isJobPreparing(rj)) {
+ // No reason to poll untill the job is initialized
+ continue;
+ } else {
+ // By now the job is initialized so no reason to do
+ // rj.getJobState() again and we do not want to do an extra RPC call
+ initializing = false;
+ }
+
+ RunningJob newRj = jc.getJob(rj.getJobID());
+ if (newRj == null) {
+ // under exceptional load, hadoop may not be able to look up status
+ // of finished jobs (because it has purged them from memory). From
+ // hive's perspective - it's equivalent to the job having failed.
+ // So raise a meaningful exception
+ throw new IOException("Could not find status of job: + rj.getJobID()");
+ } else {
+ th.setRunningJob(newRj);
+ rj = newRj;
+ }
+
+ // If fatal errors happen we should kill the job immediately rather than
+ // let the job retry several times, which eventually lead to failure.
+ if (fatal) {
+ continue; // wait until rj.isComplete
+ }
+
+ Counters ctrs = th.getCounters();
+
+ if (fatal = this.callBackObj.checkFatalErrors(ctrs, errMsg)) {
+ console.printError("[Fatal Error] " + errMsg.toString() + ". Killing the job.");
+ rj.killJob();
+ continue;
+ }
+ errMsg.setLength(0);
+
+ updateCounters(ctrs, rj);
+
+ String report = " " + getId() + " map = " + mapProgress + "%, reduce = " + reduceProgress
+ + "%";
+
+ if (!report.equals(lastReport)
+ || System.currentTimeMillis() >= reportTime + maxReportInterval) {
+
+ // write out serialized plan with counters to log file
+ // LOG.info(queryPlan);
+ String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
+ SessionState ss = SessionState.get();
+ if (ss != null) {
+ ss.getHiveHistory().setTaskCounters(SessionState.get().getQueryId(), getId(), ctrs);
+ ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
+ Keys.TASK_HADOOP_PROGRESS, output);
+ ss.getHiveHistory().progressTask(SessionState.get().getQueryId(), this.task);
+ this.callBackObj.logPlanProgress(ss);
+ }
+ console.printInfo(output);
+ lastReport = report;
+ reportTime = System.currentTimeMillis();
+ }
+ }
+
+ boolean success;
+ Counters ctrs = th.getCounters();
+
+ if (fatal) {
+ success = false;
+ } else {
+ // check for fatal error again in case it occurred after
+ // the last check before the job is completed
+ if (checkFatalErrors(ctrs, errMsg)) {
+ console.printError("[Fatal Error] " + errMsg.toString());
+ success = false;
+ } else {
+ success = rj.isSuccessful();
+ }
+ }
+
+ this.task.setDone();
+ // update based on the final value of the counters
+ updateCounters(ctrs, rj);
+
+ SessionState ss = SessionState.get();
+ if (ss != null) {
+ this.callBackObj.logPlanProgress(ss);
+ }
+ // LOG.info(queryPlan);
+ return (success);
+ }
+
+ private String getId() {
+ return this.task.getId();
+ }
+
+ /**
+ * from StreamJob.java.
+ */
+ public void jobInfo(RunningJob rj) {
+ if (job.get("mapred.job.tracker", "local").equals("local")) {
+ console.printInfo("Job running in-process (local Hadoop)");
+ } else {
+ String hp = job.get("mapred.job.tracker");
+ if (SessionState.get() != null) {
+ SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
+ getId(), Keys.TASK_HADOOP_ID, rj.getJobID());
+ }
+ console.printInfo(getJobStartMsg(rj.getJobID()) + ", Tracking URL = "
+ + rj.getTrackingURL());
+ console.printInfo("Kill Command = " + HiveConf.getVar(job, HiveConf.ConfVars.HADOOPBIN)
+ + " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
+ }
+ }
+
+ /**
+ * This class contains the state of the running task Going forward, we will return this handle
+ * from execute and Driver can split execute into start, monitorProgess and postProcess.
+ */
+ private static class ExecDriverTaskHandle extends TaskHandle {
+ JobClient jc;
+ RunningJob rj;
+
+ JobClient getJobClient() {
+ return jc;
+ }
+
+ RunningJob getRunningJob() {
+ return rj;
+ }
+
+ public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
+ this.jc = jc;
+ this.rj = rj;
+ }
+
+ public void setRunningJob(RunningJob job) {
+ rj = job;
+ }
+
+ @Override
+ public Counters getCounters() throws IOException {
+ return rj.getCounters();
+ }
+ }
+
+ // Used for showJobFailDebugInfo
+ private static class TaskInfo {
+ String jobId;
+ HashSet<String> logUrls;
+
+ public TaskInfo(String jobId) {
+ this.jobId = jobId;
+ logUrls = new HashSet<String>();
+ }
+
+ public void addLogUrl(String logUrl) {
+ logUrls.add(logUrl);
+ }
+
+ public HashSet<String> getLogUrls() {
+ return logUrls;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
+ // Mapping from task ID to the number of failures
+ Map<String, Integer> failures = new HashMap<String, Integer>();
+ // Successful task ID's
+ Set<String> successes = new HashSet<String>();
+
+ Map<String, TaskInfo> taskIdToInfo = new HashMap<String, TaskInfo>();
+
+ int startIndex = 0;
+
+ // Loop to get all task completion events because getTaskCompletionEvents
+ // only returns a subset per call
+ while (true) {
+ TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(startIndex);
+
+ if (taskCompletions == null || taskCompletions.length == 0) {
+ break;
+ }
+
+ boolean more = true;
+ for (TaskCompletionEvent t : taskCompletions) {
+ // getTaskJobIDs returns Strings for compatibility with Hadoop versions
+ // without TaskID or TaskAttemptID
+ String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
+
+ if (taskJobIds == null) {
+ console.printError("Task attempt info is unavailable in this Hadoop version");
+ more = false;
+ break;
+ }
+
+ // For each task completion event, get the associated task id, job id
+ // and the logs
+ String taskId = taskJobIds[0];
+ String jobId = taskJobIds[1];
+
+ TaskInfo ti = taskIdToInfo.get(taskId);
+ if (ti == null) {
+ ti = new TaskInfo(jobId);
+ taskIdToInfo.put(taskId, ti);
+ }
+ // These tasks should have come from the same job.
+ assert (ti.getJobId() == jobId);
+ ti.getLogUrls().add(getTaskAttemptLogUrl(t.getTaskTrackerHttp(), t.getTaskId()));
+
+ // If a task failed, then keep track of the total number of failures
+ // for that task (typically, a task gets re-run up to 4 times if it
+ // fails
+
+ if (t.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) {
+ Integer failAttempts = failures.get(taskId);
+ if (failAttempts == null) {
+ failAttempts = Integer.valueOf(0);
+ }
+ failAttempts = Integer.valueOf(failAttempts.intValue() + 1);
+ failures.put(taskId, failAttempts);
+ } else {
+ successes.add(taskId);
+ }
+ }
+ if (!more) {
+ break;
+ }
+ startIndex += taskCompletions.length;
+ }
+ // Remove failures for tasks that succeeded
+ for (String task : successes) {
+ failures.remove(task);
+ }
+
+ if (failures.keySet().size() == 0) {
+ return;
+ }
+
+ // Find the highest failure count
+ int maxFailures = 0;
+ for (Integer failCount : failures.values()) {
+ if (maxFailures < failCount.intValue()) {
+ maxFailures = failCount.intValue();
+ }
+ }
+
+ // Display Error Message for tasks with the highest failure count
+ String jtUrl = JobTrackerURLResolver.getURL(conf);
+
+ for (String task : failures.keySet()) {
+ if (failures.get(task).intValue() == maxFailures) {
+ TaskInfo ti = taskIdToInfo.get(task);
+ String jobId = ti.getJobId();
+ String taskUrl = jtUrl + "/taskdetails.jsp?jobid=" + jobId + "&tipid=" + task.toString();
+
+ TaskLogProcessor tlp = new TaskLogProcessor(conf);
+ for (String logUrl : ti.getLogUrls()) {
+ tlp.addTaskAttemptLogUrl(logUrl);
+ }
+
+ List<ErrorAndSolution> errors = tlp.getErrors();
+
+ StringBuilder sb = new StringBuilder();
+ // We use a StringBuilder and then call printError only once as
+ // printError will write to both stderr and the error log file. In
+ // situations where both the stderr and the log file output is
+ // simultaneously output to a single stream, this will look cleaner.
+ sb.append("\n");
+ sb.append("Task with the most failures(" + maxFailures + "): \n");
+ sb.append("-----\n");
+ sb.append("Task ID:\n " + task + "\n\n");
+ sb.append("URL:\n " + taskUrl + "\n");
+
+ for (ErrorAndSolution e : errors) {
+ sb.append("\n");
+ sb.append("Possible error:\n " + e.getError() + "\n\n");
+ sb.append("Solution:\n " + e.getSolution() + "\n");
+ }
+ sb.append("-----\n");
+
+ console.printError(sb.toString());
+
+ // Only print out one task because that's good enough for debugging.
+ break;
+ }
+ }
+ return;
+
+ }
+
+ public int progress(RunningJob rj, JobClient jc) throws IOException {
+ jobId = rj.getJobID();
+
+ int returnVal = 0;
+
+ // remove the pwd from conf file so that job tracker doesn't show this
+ // logs
+ String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
+ if (pwd != null) {
+ HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE");
+ }
+
+ // replace it back
+ if (pwd != null) {
+ HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
+ }
+
+ // add to list of running jobs to kill in case of abnormal shutdown
+
+ runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill");
+
+ ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
+ jobInfo(rj);
+ boolean success = progress(th);
+
+ String statusMesg = getJobEndMsg(rj.getJobID());
+ if (!success) {
+ statusMesg += " with errors";
+ returnVal = 2;
+ console.printError(statusMesg);
+ if (HiveConf.getBoolVar(job, HiveConf.ConfVars.SHOW_JOB_FAIL_DEBUG_INFO)) {
+ showJobFailDebugInfo(job, rj);
+ }
+ } else {
+ console.printInfo(statusMesg);
+ }
+
+ return returnVal;
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java?rev=1073992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHook.java Wed Feb 23 23:58:41 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.RunningJob;
+
+@SuppressWarnings("deprecation")
+public interface HadoopJobExecHook {
+
+ public void updateCounters(Counters ctrs, RunningJob rj) throws IOException;
+ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg);
+ public void logPlanProgress(SessionState ss) throws IOException;
+
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Wed Feb 23 23:58:41 2011
@@ -207,6 +207,7 @@ public class StatsTask extends Task<Stat
LOG.error("Cannot get table " + tableName, e);
console.printError("Cannot get table " + tableName, e.toString());
}
+
return aggregateStats();
}
@@ -227,9 +228,7 @@ public class StatsTask extends Task<Stat
private int aggregateStats() {
- String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- StatsFactory.setImplementation(statsImplementationClass, conf);
- StatsAggregator statsAggregator = StatsFactory.getStatsAggregator();
+ StatsAggregator statsAggregator = null;
try {
// Stats setup:
@@ -237,38 +236,44 @@ public class StatsTask extends Task<Stat
FileSystem fileSys;
FileStatus[] fileStatus;
- // manufacture a StatsAggregator
- if (!statsAggregator.connect(conf)) {
- throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+ if(!this.getWork().getNoStatsAggregator()) {
+ String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ StatsFactory.setImplementation(statsImplementationClass, conf);
+ statsAggregator = StatsFactory.getStatsAggregator();
+ // manufacture a StatsAggregator
+ if (!statsAggregator.connect(conf)) {
+ throw new HiveException("StatsAggregator connect failed " + statsImplementationClass);
+ }
}
TableStatistics tblStats = new TableStatistics();
- //
- // For partitioned table get the old table statistics for incremental update
- //
- if (table.isPartitioned()) {
- org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
- Map<String, String> parameters = tTable.getParameters();
- if (parameters.containsKey(StatsSetupConst.ROW_COUNT)) {
- tblStats.setNumRows(Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)));
- }
- if (parameters.containsKey(StatsSetupConst.NUM_PARTITIONS)) {
- tblStats.setNumPartitions(Integer.parseInt(parameters.get(StatsSetupConst.NUM_PARTITIONS)));
- }
- if (parameters.containsKey(StatsSetupConst.NUM_FILES)) {
- tblStats.setNumFiles(Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)));
- }
- if (parameters.containsKey(StatsSetupConst.TOTAL_SIZE)) {
- tblStats.setSize(Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)));
- }
+ org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
+ Map<String, String> parameters = tTable.getParameters();
+
+ boolean tableStatsExist = this.existStats(parameters);
+
+ if (parameters.containsKey(StatsSetupConst.ROW_COUNT)) {
+ tblStats.setNumRows(Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)));
+ }
+ if (parameters.containsKey(StatsSetupConst.NUM_PARTITIONS)) {
+ tblStats.setNumPartitions(Integer.parseInt(parameters.get(StatsSetupConst.NUM_PARTITIONS)));
+ }
+ if (parameters.containsKey(StatsSetupConst.NUM_FILES)) {
+ tblStats.setNumFiles(Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)));
+ }
+ if (parameters.containsKey(StatsSetupConst.TOTAL_SIZE)) {
+ tblStats.setSize(Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)));
}
List<Partition> partitions = getPartitionsList();
-
+ boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
+
if (partitions == null) {
// non-partitioned tables:
-
+ if (!tableStatsExist && atomic) {
+ return 0;
+ }
Path tablePath = wh.getDefaultTablePath(table.getDbName(), table.getTableName());
fileSys = tablePath.getFileSystem(conf);
fileStatus = Utilities.getFileStatusRecurse(tablePath, 1, fileSys);
@@ -280,12 +285,14 @@ public class StatsTask extends Task<Stat
tblStats.setSize(tableSize);
// In case of a non-partitioned table, the key for stats temporary store is "rootDir"
- String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT);
- if (rows != null) {
- tblStats.setNumRows(Long.parseLong(rows));
- } else {
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
- throw new HiveException("StatsAggregator failed to get numRows.");
+ if(statsAggregator != null) {
+ String rows = statsAggregator.aggregateStats(work.getAggKey(), StatsSetupConst.ROW_COUNT);
+ if (rows != null) {
+ tblStats.setNumRows(Long.parseLong(rows));
+ } else {
+ if (atomic) {
+ throw new HiveException("StatsAggregator failed to get numRows.");
+ }
}
}
} else {
@@ -294,20 +301,45 @@ public class StatsTask extends Task<Stat
// and update the table stats based on the old and new stats.
for (Partition partn : partitions) {
//
+ // get the old partition stats
+ //
+ org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
+ parameters = tPart.getParameters();
+
+ boolean hasStats = this.existStats(parameters);
+ if(!hasStats && atomic) {
+ continue;
+ }
+
+ int nf = parameters.containsKey(StatsSetupConst.NUM_FILES) ?
+ Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)) :
+ 0;
+ long nr = parameters.containsKey(StatsSetupConst.ROW_COUNT) ?
+ Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)) :
+ 0L;
+ long sz = parameters.containsKey(StatsSetupConst.TOTAL_SIZE) ?
+ Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)) :
+ 0L;
+
+ //
// get the new partition stats
//
PartitionStatistics newPartStats = new PartitionStatistics();
// In that case of a partition, the key for stats temporary store is "rootDir/[dynamic_partition_specs/]%"
String partitionID = work.getAggKey() + Warehouse.makePartPath(partn.getSpec());
-
- String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT);
- if (rows != null) {
- newPartStats.setNumRows(Long.parseLong(rows));
- } else {
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC)) {
- throw new HiveException("StatsAggregator failed to get numRows.");
+
+ if (statsAggregator != null) {
+ String rows = statsAggregator.aggregateStats(partitionID, StatsSetupConst.ROW_COUNT);
+ if (rows != null) {
+ newPartStats.setNumRows(Long.parseLong(rows));
+ } else {
+ if (atomic) {
+ throw new HiveException("StatsAggregator failed to get numRows.");
+ }
}
+ } else {
+ newPartStats.setNumRows(nr);
}
fileSys = partn.getPartitionPath().getFileSystem(conf);
@@ -320,26 +352,6 @@ public class StatsTask extends Task<Stat
}
newPartStats.setSize(partitionSize);
- //
- // get the old partition stats
- //
- org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
- Map<String, String> parameters = tPart.getParameters();
-
- boolean hasStats =
- parameters.containsKey(StatsSetupConst.NUM_FILES) ||
- parameters.containsKey(StatsSetupConst.ROW_COUNT) ||
- parameters.containsKey(StatsSetupConst.TOTAL_SIZE);
-
- int nf = parameters.containsKey(StatsSetupConst.NUM_FILES) ?
- Integer.parseInt(parameters.get(StatsSetupConst.NUM_FILES)) :
- 0;
- long nr = parameters.containsKey(StatsSetupConst.ROW_COUNT) ?
- Long.parseLong(parameters.get(StatsSetupConst.ROW_COUNT)) :
- 0L;
- long sz = parameters.containsKey(StatsSetupConst.TOTAL_SIZE) ?
- Long.parseLong(parameters.get(StatsSetupConst.TOTAL_SIZE)) :
- 0L;
if (hasStats) {
PartitionStatistics oldPartStats = new PartitionStatistics(nf, nr, sz);
tblStats.updateStats(oldPartStats, newPartStats);
@@ -363,12 +375,10 @@ public class StatsTask extends Task<Stat
}
}
-
//
// write table stats to metastore
//
- org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
- Map<String, String> parameters = tTable.getParameters();
+ parameters = tTable.getParameters();
parameters.put(StatsSetupConst.ROW_COUNT, Long.toString(tblStats.getNumRows()));
parameters.put(StatsSetupConst.NUM_PARTITIONS, Integer.toString(tblStats.getNumPartitions()));
parameters.put(StatsSetupConst.NUM_FILES, Integer.toString(tblStats.getNumFiles()));
@@ -387,11 +397,20 @@ public class StatsTask extends Task<Stat
"Failed with exception " + e.getMessage() + "\n"
+ StringUtils.stringifyException(e));
} finally {
- statsAggregator.closeConnection();
+ if(statsAggregator != null) {
+ statsAggregator.closeConnection();
+ }
}
// StatsTask always return 0 so that the whole job won't fail
return 0;
}
+
+ private boolean existStats(Map<String, String> parameters) {
+ return parameters.containsKey(StatsSetupConst.ROW_COUNT)
+ || parameters.containsKey(StatsSetupConst.NUM_FILES)
+ || parameters.containsKey(StatsSetupConst.TOTAL_SIZE)
+ || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
+ }
/**
* Get the list of partitions that need to update statistics.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java Wed Feb 23 23:58:41 2011
@@ -43,7 +43,7 @@ public final class Throttle {
/**
* Fetch http://tracker.om:/gc.jsp?threshold=period.
*/
- static void checkJobTracker(JobConf conf, Log LOG) {
+ public static void checkJobTracker(JobConf conf, Log LOG) {
try {
byte[] buffer = new byte[1024];
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Feb 23 23:58:41 2011
@@ -80,6 +80,9 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
@@ -91,6 +94,7 @@ import org.apache.hadoop.hive.ql.parse.E
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -101,9 +105,12 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
@@ -1133,6 +1140,83 @@ public final class Utilities {
Path pathPattern = new Path(path, sb.toString());
return fs.globStatus(pathPattern);
}
+
+ public static void mvFileToFinalPath(String specPath, Configuration hconf,
+ boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) throws IOException, HiveException {
+
+ FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+ Path tmpPath = Utilities.toTempPath(specPath);
+ Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
+ + ".intermediate");
+ Path finalPath = new Path(specPath);
+ if (success) {
+ if (fs.exists(tmpPath)) {
+ // Step1: rename tmp output folder to intermediate path. After this
+ // point, updates from speculative tasks still writing to tmpPath
+ // will not appear in finalPath.
+ log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+ Utilities.rename(fs, tmpPath, intermediatePath);
+ // Step2: remove any tmp file or double-committed output files
+ ArrayList<String> emptyBuckets =
+ Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
+ // create empty buckets if necessary
+ if (emptyBuckets.size() > 0) {
+ createEmptyBuckets(hconf, emptyBuckets, conf);
+ }
+
+ // Step3: move to the file destination
+ log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
+ Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+ }
+ } else {
+ fs.delete(tmpPath, true);
+ }
+ }
+
+ /**
+ * Check the existence of buckets according to bucket specification. Create empty buckets if
+ * needed.
+ * @param specPath The final path where the dynamic partitions should be in.
+ * @param conf FileSinkDesc.
+ * @param dpCtx dynamic partition context.
+ * @throws HiveException
+ * @throws IOException
+ */
+ private static void createEmptyBuckets(Configuration hconf, ArrayList<String> paths, FileSinkDesc conf)
+ throws HiveException, IOException {
+
+ JobConf jc;
+ if (hconf instanceof JobConf) {
+ jc = new JobConf(hconf);
+ } else {
+ // test code path
+ jc = new JobConf(hconf, ExecDriver.class);
+ }
+ HiveOutputFormat<?, ?> hiveOutputFormat = null;
+ Class<? extends Writable> outputClass = null;
+ boolean isCompressed = conf.getCompressed();
+ TableDesc tableInfo = conf.getTableInfo();
+ try {
+ Serializer serializer = (Serializer) tableInfo.getDeserializerClass().newInstance();
+ serializer.initialize(null, tableInfo.getProperties());
+ outputClass = serializer.getSerializedClass();
+ hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ } catch (InstantiationException e) {
+ throw new HiveException(e);
+ } catch (IllegalAccessException e) {
+ throw new HiveException(e);
+ }
+
+ for (String p: paths) {
+ Path path = new Path(p);
+ RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+ jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
+ writer.close(false);
+ LOG.info("created empty bucket for enforcing bucketing at " + path);
+ }
+ }
/**
* Remove all temporary files and duplicate (double-committed) files from a given directory.
@@ -1644,4 +1728,20 @@ public final class Utilities {
double result = (double) time / (double)1000;
return result;
}
+
+ /**
+ * Determines whether a partition has been archived
+ *
+ * @param p
+ * @return
+ */
+ public static boolean isArchived(Partition p) {
+ Map<String, String> params = p.getParameters();
+ if ("true".equalsIgnoreCase(params.get(
+ org.apache.hadoop.hive.metastore.api.Constants.IS_ARCHIVED))) {
+ return true;
+ } else {
+ return false;
+ }
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Feb 23 23:58:41 2011
@@ -387,10 +387,12 @@ public final class HiveFileFormatUtils {
public static List<Operator<? extends Serializable>> doGetAliasesFromPath(
Map<String, ArrayList<String>> pathToAliases,
Map<String, Operator<? extends Serializable>> aliasToWork, Path dir) {
-
- String path = getMatchingPath(pathToAliases, dir);
List<Operator<? extends Serializable>> opList =
new ArrayList<Operator<? extends Serializable>>();
+ if (pathToAliases == null) {
+ return opList;
+ }
+ String path = getMatchingPath(pathToAliases, dir);
List<String> aliases = pathToAliases.get(path);
for (String alias : aliases) {
opList.add(aliasToWork.get(alias));
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Feb 23 23:58:41 2011
@@ -374,7 +374,11 @@ public class HiveInputFormat<K extends W
if (this.mrwork == null) {
init(job);
}
-
+
+ if(this.mrwork.getPathToAliases() == null) {
+ return;
+ }
+
ArrayList<String> aliases = new ArrayList<String>();
Iterator<Entry<String, ArrayList<String>>> iterator = this.mrwork
.getPathToAliases().entrySet().iterator();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1073992&r1=1073991&r2=1073992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Wed Feb 23 23:58:41 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -190,7 +191,7 @@ public class RCFile {
* <li>{the end of the key part}</li>
* </ul>
*/
- static class KeyBuffer implements Writable {
+ public static class KeyBuffer implements WritableComparable {
// each column's value length in a split
private int[] eachColumnValueLen = null;
private int[] eachColumnUncompressedValueLen = null;
@@ -200,6 +201,14 @@ public class RCFile {
private int numberRows = 0;
// how many columns
private int columnNumber = 0;
+
+ // return the number of columns recorded in this file's header
+ public int getColumnNumber() {
+ return columnNumber;
+ }
+
+ public KeyBuffer(){
+ }
KeyBuffer(int columnNumber) {
this(0, columnNumber);
@@ -281,6 +290,12 @@ public class RCFile {
return ret;
}
+
+ @Override
+ public int compareTo(Object arg0) {
+ throw new RuntimeException("compareTo not supported in class "
+ + this.getClass().getName());
+ }
}
/**
@@ -293,7 +308,7 @@ public class RCFile {
* column_2_row_2_value,....]</li>
* </ul>
*/
- static class ValueBuffer implements Writable {
+ public static class ValueBuffer implements WritableComparable {
class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
@@ -347,6 +362,9 @@ public class RCFile {
Decompressor valDecompressor = null;
NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
CompressionInputStream deflatFilter = null;
+
+ public ValueBuffer() throws IOException {
+ }
public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
this(keyBuffer, null);
@@ -464,6 +482,12 @@ public class RCFile {
CodecPool.returnDecompressor(valDecompressor);
}
}
+
+ @Override
+ public int compareTo(Object arg0) {
+ throw new RuntimeException("compareTo not supported in class "
+ + this.getClass().getName());
+ }
}
/**
@@ -872,6 +896,33 @@ public class RCFile {
bufferedRecords = 0;
columnBufferSize = 0;
}
+
+ /**
+ * flush a block out without doing anything except compressing the key part.
+ */
+ public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
+ int recordLen, int keyLength, int compressedKeyLen) throws IOException {
+ checkAndWriteSync(); // sync
+ out.writeInt(recordLen); // total record length
+ out.writeInt(keyLength); // key portion length
+
+ if(this.isCompressed()) {
+ //compress key and write key out
+ keyCompressionBuffer.reset();
+ keyDeflateFilter.resetState();
+ keyBuffer.write(keyDeflateOut);
+ keyDeflateOut.flush();
+ keyDeflateFilter.finish();
+ compressedKeyLen = keyCompressionBuffer.getLength();
+ out.writeInt(compressedKeyLen);
+ out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen);
+ } else {
+ out.writeInt(compressedKeyLen);
+ keyBuffer.write(out);
+ }
+
+ valueBuffer.write(out); // value
+ }
private void clearColumnBuffers() throws IOException {
for (int i = 0; i < columnNumber; i++) {
@@ -1304,6 +1355,15 @@ public class RCFile {
currentValue.readFields(in);
currentValue.inited = true;
}
+
+ public boolean nextBlock() throws IOException {
+ int keyLength = nextKeyBuffer();
+ if(keyLength > 0) {
+ currentValueBuffer();
+ return true;
+ }
+ return false;
+ }
private boolean rowFetched = false;
@@ -1500,5 +1560,44 @@ public class RCFile {
CodecPool.returnDecompressor(keyDecompressor);
}
}
+
+ /**
+ * return the KeyBuffer object used in the reader. Internally in each
+ * reader, there is only one KeyBuffer object, which gets reused for every
+ * block.
+ */
+ public KeyBuffer getCurrentKeyBufferObj() {
+ return this.currentKey;
+ }
+
+ /**
+ * return the ValueBuffer object used in the reader. Internally in each
+ * reader, there is only one ValueBuffer object, which gets reused for every
+ * block.
+ */
+ public ValueBuffer getCurrentValueBufferObj() {
+ return this.currentValue;
+ }
+
+ //return the current block's length
+ public int getCurrentBlockLength() {
+ return this.currentRecordLength;
+ }
+
+ //return the current block's key length
+ public int getCurrentKeyLength() {
+ return this.currentKeyLength;
+ }
+
+ //return the current block's compressed key length
+ public int getCurrentCompressedKeyLen() {
+ return this.compressedKeyLen;
+ }
+
+ //return the CompressionCodec used for this file
+ public CompressionCodec getCompressionCodec() {
+ return this.codec;
+ }
+
}
}