You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rm...@apache.org on 2009/09/29 03:25:30 UTC
svn commit: r819792 [2/24] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/
contrib/src/test/results/clientnegative/
contrib/src/test/results/clientpositive/ data/conf/
ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apac...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Sep 29 01:25:15 2009
@@ -44,6 +44,8 @@
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.PreExecute;
+import org.apache.hadoop.hive.ql.hooks.PostExecute;
+
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -110,7 +112,7 @@
LOG.info("Returning cluster status: " + cs.toString());
return cs;
}
-
+
/**
* Get a Schema with fields represented with native Hive types
*/
@@ -139,7 +141,7 @@
if (td == null) {
throw new Exception("No table description found for fetch task: " + ft);
}
-
+
String tableName = "result";
List<FieldSchema> lst = MetaStoreUtils.getFieldsFromDeserializer(
tableName, td.getDeserializer());
@@ -156,12 +158,12 @@
LOG.info("Returning Hive schema: " + schema);
return schema;
}
-
+
/**
* Get a Schema with fields represented with Thrift DDL types
*/
public Schema getThriftSchema() throws Exception {
- Schema schema;
+ Schema schema;
try {
schema = this.getSchema();
if (schema != null) {
@@ -169,8 +171,8 @@
// Go over the schema and convert type to thrift type
if (lst != null) {
for (FieldSchema f : lst) {
- f.setType(MetaStoreUtils.typeToThriftType(f.getType()));
- }
+ f.setType(MetaStoreUtils.typeToThriftType(f.getType()));
+ }
}
}
}
@@ -251,7 +253,7 @@
// Do semantic analysis and plan generation
sem.analyze(tree, ctx);
LOG.info("Semantic Analysis Completed");
-
+
// validate the plan
sem.validate();
@@ -338,7 +340,7 @@
return pehooks;
String[] peClasses = pestr.split(",");
-
+
for(String peClass: peClasses) {
try {
pehooks.add((PreExecute)Class.forName(peClass.trim(), true, JavaUtils.getClassLoader()).newInstance());
@@ -347,10 +349,31 @@
throw e;
}
}
-
+
return pehooks;
}
-
+
+ private List<PostExecute> getPostExecHooks() throws Exception {
+ ArrayList<PostExecute> pehooks = new ArrayList<PostExecute>();
+ String pestr = conf.getVar(HiveConf.ConfVars.POSTEXECHOOKS);
+ pestr = pestr.trim();
+ if (pestr.equals(""))
+ return pehooks;
+
+ String[] peClasses = pestr.split(",");
+
+ for(String peClass: peClasses) {
+ try {
+ pehooks.add((PostExecute)Class.forName(peClass.trim(), true, JavaUtils.getClassLoader()).newInstance());
+ } catch (ClassNotFoundException e) {
+ console.printError("Post Exec Hook Class not found:" + e.getMessage());
+ throw e;
+ }
+ }
+
+ return pehooks;
+ }
+
public int execute() {
boolean noName = StringUtils.isEmpty(conf
.getVar(HiveConf.ConfVars.HADOOPJOBNAME));
@@ -366,7 +389,7 @@
LOG.info("Starting command: " + queryStr);
plan.setStarted();
-
+
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().startQuery(queryStr, conf.getVar(HiveConf.ConfVars.HIVEQUERYID) );
SessionState.get().getHiveHistory().logPlanProgress(plan);
@@ -377,11 +400,11 @@
// Get all the pre execution hooks and execute them.
for(PreExecute peh: getPreExecHooks()) {
- peh.run(SessionState.get(),
+ peh.run(SessionState.get(),
sem.getInputs(), sem.getOutputs(),
- UserGroupInformation.getCurrentUGI());
+ UserGroupInformation.getCurrentUGI());
}
-
+
int jobs = countJobs(sem.getRootTasks());
if (jobs > 0) {
console.printInfo("Total MapReduce jobs = " + jobs);
@@ -441,7 +464,7 @@
continue;
}
- for (Task<? extends Serializable> child : tsk.getChildTasks()) {
+ for (Task<? extends Serializable> child : tsk.getChildTasks()) {
// Check if the child is runnable
if (!child.isRunnable()) {
continue;
@@ -452,6 +475,14 @@
}
}
}
+
+ // Get all the post execution hooks and execute them.
+ for(PostExecute peh: getPostExecHooks()) {
+ peh.run(SessionState.get(),
+ sem.getInputs(), sem.getOutputs(),
+ UserGroupInformation.getCurrentUGI());
+ }
+
if (SessionState.get() != null){
SessionState.get().getHiveHistory().setQueryProperty(queryId,
Keys.QUERY_RET_CODE, String.valueOf(0));
@@ -476,6 +507,7 @@
}
}
plan.setDone();
+
if (SessionState.get() != null) {
try {
SessionState.get().getHiveHistory().logPlanProgress(plan);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Tue Sep 29 01:25:15 2009
@@ -81,10 +81,12 @@
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
/**
* DDLTask implementation
- *
+ *
**/
public class DDLTask extends Task<DDLWork> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -93,16 +95,16 @@
transient HiveConf conf;
static final private int separator = Utilities.tabCode;
static final private int terminator = Utilities.newLineCode;
-
+
public DDLTask() {
super();
}
-
+
public void initialize(HiveConf conf, QueryPlan queryPlan) {
super.initialize(conf, queryPlan);
this.conf = conf;
}
-
+
public int execute() {
// Create the db
@@ -129,22 +131,22 @@
if (alterTbl != null) {
return alterTable(db, alterTbl);
}
-
+
AddPartitionDesc addPartitionDesc = work.getAddPartitionDesc();
if (addPartitionDesc != null) {
return addPartition(db, addPartitionDesc);
- }
-
+ }
+
MsckDesc msckDesc = work.getMsckDesc();
if (msckDesc != null) {
return msck(db, msckDesc);
- }
+ }
descTableDesc descTbl = work.getDescTblDesc();
if (descTbl != null) {
return describeTable(db, descTbl);
}
-
+
descFunctionDesc descFunc = work.getDescFunctionDesc();
if (descFunc != null) {
return describeFunction(descFunc);
@@ -154,7 +156,7 @@
if (showTbls != null) {
return showTables(db, showTbls);
}
-
+
showTableStatusDesc showTblStatus = work.getShowTblStatusDesc();
if (showTblStatus != null) {
return showTableStatus(db, showTblStatus);
@@ -191,22 +193,25 @@
* @param db Database to add the partition to.
* @param addPartitionDesc Add this partition.
* @return Returns 0 when execution succeeds and above 0 if it fails.
- * @throws HiveException
+ * @throws HiveException
*/
- private int addPartition(Hive db, AddPartitionDesc addPartitionDesc)
+ private int addPartition(Hive db, AddPartitionDesc addPartitionDesc)
throws HiveException {
-
- Table tbl = db.getTable(addPartitionDesc.getDbName(),
+
+ Table tbl = db.getTable(addPartitionDesc.getDbName(),
addPartitionDesc.getTableName());
-
+
if(addPartitionDesc.getLocation() == null) {
db.createPartition(tbl, addPartitionDesc.getPartSpec());
} else {
//set partition path relative to table
- db.createPartition(tbl, addPartitionDesc.getPartSpec(),
+ db.createPartition(tbl, addPartitionDesc.getPartSpec(),
new Path(tbl.getPath(), addPartitionDesc.getLocation()));
}
-
+
+ Partition part = db.getPartition(tbl, addPartitionDesc.getPartSpec(), false);
+ work.getOutputs().add(new WriteEntity(part));
+
return 0;
}
@@ -215,19 +220,19 @@
* what is on the dfs.
* Current version checks for tables and partitions that
* are either missing on disk on in the metastore.
- *
+ *
* @param db The database in question.
* @param msckDesc Information about the tables and partitions
* we want to check for.
* @return Returns 0 when execution succeeds and above 0 if it fails.
*/
private int msck(Hive db, MsckDesc msckDesc) {
-
+
CheckResult result = new CheckResult();
try {
HiveMetaStoreChecker checker = new HiveMetaStoreChecker(db);
checker.checkMetastore(
- MetaStoreUtils.DEFAULT_DATABASE_NAME, msckDesc.getTableName(),
+ MetaStoreUtils.DEFAULT_DATABASE_NAME, msckDesc.getTableName(),
msckDesc.getPartitionSpec(),
result);
} catch (HiveException e) {
@@ -237,22 +242,22 @@
LOG.warn("Failed to run metacheck: ", e);
return 1;
} finally {
-
+
BufferedWriter resultOut = null;
try {
FileSystem fs = msckDesc.getResFile().getFileSystem(conf);
resultOut = new BufferedWriter(
new OutputStreamWriter(fs.create(msckDesc.getResFile())));
-
+
boolean firstWritten = false;
- firstWritten |= writeMsckResult(result.getTablesNotInMs(),
+ firstWritten |= writeMsckResult(result.getTablesNotInMs(),
"Tables not in metastore:", resultOut, firstWritten);
- firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
- "Tables missing on filesystem:", resultOut, firstWritten);
- firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
+ firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
+ "Tables missing on filesystem:", resultOut, firstWritten);
+ firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
"Partitions not in metastore:", resultOut, firstWritten);
- firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
- "Partitions missing from filesystem:", resultOut, firstWritten);
+ firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
+ "Partitions missing from filesystem:", resultOut, firstWritten);
} catch (IOException e) {
LOG.warn("Failed to save metacheck output: ", e);
return 1;
@@ -267,7 +272,7 @@
}
}
}
-
+
return 0;
}
@@ -280,14 +285,14 @@
* @return true if something was written
* @throws IOException In case the writing fails
*/
- private boolean writeMsckResult(List<? extends Object> result, String msg,
+ private boolean writeMsckResult(List<? extends Object> result, String msg,
Writer out, boolean wrote) throws IOException {
-
- if(!result.isEmpty()) {
+
+ if(!result.isEmpty()) {
if(wrote) {
out.write(terminator);
}
-
+
out.write(msg);
for (Object entry : result) {
out.write(separator);
@@ -295,13 +300,13 @@
}
return true;
}
-
+
return false;
}
/**
* Write a list of partitions to a file.
- *
+ *
* @param db The database in question.
* @param showParts These are the partitions we're interested in.
* @return Returns 0 when execution succeeds and above 0 if it fails.
@@ -351,7 +356,7 @@
/**
* Write a list of the tables in the database to a file.
- *
+ *
* @param db The database in question.
* @param showTbls These are the tables we're interested in.
* @return Returns 0 when execution succeeds and above 0 if it fails.
@@ -395,7 +400,7 @@
/**
* Write a list of the user defined functions to a file.
- *
+ *
* @param showFuncs are the functions we're interested in.
* @return Returns 0 when execution succeeds and above 0 if it fails.
* @throws HiveException Throws this exception if an unexpected error occurs.
@@ -438,14 +443,14 @@
/**
* Shows a description of a function.
- *
+ *
* @param descFunc is the function we are describing
* @throws HiveException
*/
private int describeFunction(descFunctionDesc descFunc)
throws HiveException {
String name = descFunc.getName();
-
+
// write the results in the file
try {
FileSystem fs = descFunc.getResFile().getFileSystem(conf);
@@ -454,18 +459,18 @@
// get the function documentation
description desc = null;
FunctionInfo fi = FunctionRegistry.getFunctionInfo(name);
-
+
Class<?> funcClass = null;
GenericUDF udf = fi.getGenericUDF();
if (udf != null) {
- // If it's a GenericUDFBridge, then let's use the
+ // If it's a GenericUDFBridge, then let's use the
if (udf instanceof GenericUDFBridge) {
funcClass = ((GenericUDFBridge)udf).getUdfClass();
} else {
funcClass = udf.getClass();
}
}
-
+
if (funcClass != null) {
desc = funcClass.getAnnotation(description.class);
}
@@ -478,9 +483,9 @@
outStream.writeBytes("Function " + name + " does not exist or cannot" +
" find documentation for it.");
}
-
+
outStream.write(terminator);
-
+
((FSDataOutputStream)outStream).close();
} catch (FileNotFoundException e) {
LOG.warn("describe function: " + StringUtils.stringifyException(e));
@@ -493,11 +498,11 @@
}
return 0;
}
-
-
+
+
/**
* Write the status of tables to a file.
- *
+ *
* @param db The database in question.
* @param showTblStatus tables we are interested in
* @return Return 0 when execution succeeds and above 0 if it fails.
@@ -597,10 +602,10 @@
}
return 0;
}
-
+
/**
* Write the description of a table to a file.
- *
+ *
* @param db The database in question.
* @param descTbl This is the table we're interested in.
* @return Returns 0 when execution succeeds and above 0 if it fails.
@@ -725,7 +730,7 @@
return 0;
}
-
+
private void writeFileSystemStats(DataOutput outStream, List<Path> locations,
Path tabLoc, boolean partSpecified, int indent) throws IOException {
long totalFileSize = 0;
@@ -750,7 +755,7 @@
LOG.warn("Cannot access File System. File System status will be unknown: ", e);
unknown = true;
}
-
+
if (!unknown) {
for (Path loc : locations) {
try {
@@ -789,43 +794,43 @@
}
}
String unknownString = "unknown";
-
+
for (int k = 0; k < indent; k++)
outStream.writeBytes(Utilities.INDENT);
outStream.writeBytes("totalNumberFiles:");
outStream.writeBytes(unknown ? unknownString : "" + numOfFiles);
outStream.write(terminator);
-
+
for (int k = 0; k < indent; k++)
outStream.writeBytes(Utilities.INDENT);
outStream.writeBytes("totalFileSize:");
outStream.writeBytes(unknown ? unknownString : "" + totalFileSize);
outStream.write(terminator);
-
+
for (int k = 0; k < indent; k++)
outStream.writeBytes(Utilities.INDENT);
outStream.writeBytes("maxFileSize:");
outStream.writeBytes(unknown ? unknownString : "" + maxFileSize);
outStream.write(terminator);
-
+
for (int k = 0; k < indent; k++)
- outStream.writeBytes(Utilities.INDENT);
+ outStream.writeBytes(Utilities.INDENT);
outStream.writeBytes("minFileSize:");
if (numOfFiles > 0)
outStream.writeBytes(unknown ? unknownString : "" + minFileSize);
else
outStream.writeBytes(unknown ? unknownString : "" + 0);
outStream.write(terminator);
-
+
for (int k = 0; k < indent; k++)
- outStream.writeBytes(Utilities.INDENT);
+ outStream.writeBytes(Utilities.INDENT);
outStream.writeBytes("lastAccessTime:");
outStream.writeBytes((unknown || lastAccessTime < 0) ? unknownString : ""
+ lastAccessTime);
outStream.write(terminator);
-
+
for (int k = 0; k < indent; k++)
- outStream.writeBytes(Utilities.INDENT);
+ outStream.writeBytes(Utilities.INDENT);
outStream.writeBytes("lastUpdateTime:");
outStream.writeBytes(unknown ? unknownString : "" + lastUpdateTime);
outStream.write(terminator);
@@ -834,7 +839,7 @@
/**
* Alter a given table.
- *
+ *
* @param db The database in question.
* @param alterTbl This is the table we're altering.
* @return Returns 0 when execution succeeds and above 0 if it fails.
@@ -843,8 +848,11 @@
private int alterTable(Hive db, alterTableDesc alterTbl) throws HiveException {
// alter the table
Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, alterTbl.getOldName());
- if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME)
+ Table oldTbl = tbl.copy();
+
+ if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME) {
tbl.getTTable().setTableName(alterTbl.getNewName());
+ }
else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDCOLS) {
List<FieldSchema> newCols = alterTbl.getNewCols();
List<FieldSchema> oldCols = tbl.getCols();
@@ -936,24 +944,40 @@
} catch (HiveException e) {
return 1;
}
+
+ // This is kind of hacky - the read entity contains the old table, whereas the write entity
+ // contains the new table. This is needed for rename - both the old and the new table names are
+ // passed
+ work.getInputs().add(new ReadEntity(oldTbl));
+ work.getOutputs().add(new WriteEntity(tbl));
return 0;
}
/**
* Drop a given table.
- *
+ *
* @param db The database in question.
* @param dropTbl This is the table we're dropping.
* @return Returns 0 when execution succeeds and above 0 if it fails.
* @throws HiveException Throws this exception if an unexpected error occurs.
*/
private int dropTable(Hive db, dropTableDesc dropTbl) throws HiveException {
+ // We need to fetch the table before it is dropped so that it can be passed to
+ // post-execution hook
+ Table tbl = null;
+ try {
+ tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, dropTbl.getTableName());
+ } catch (InvalidTableException e) {
+ // drop table is idempotent
+ }
+
if (dropTbl.getPartSpecs() == null) {
// drop the table
db.dropTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, dropTbl.getTableName());
+ if (tbl != null)
+ work.getOutputs().add(new WriteEntity(tbl));
} else {
// drop partitions in the list
- Table tbl = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, dropTbl.getTableName());
List<Partition> parts = new ArrayList<Partition>();
for (Map<String, String> partSpec : dropTbl.getPartSpecs()) {
Partition part = db.getPartition(tbl, partSpec, false);
@@ -969,12 +993,14 @@
db.dropPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, dropTbl
.getTableName(), partition.getValues(), true); // drop data for the
// partition
+ work.getOutputs().add(new WriteEntity(partition));
}
}
+
return 0;
}
-
+
/**
* Check if the given serde is valid
*/
@@ -993,7 +1019,7 @@
/**
* Create a new table.
- *
+ *
* @param db The database in question.
* @param crtTbl This is the table we're creating.
* @return Returns 0 when execution succeeds and above 0 if it fails.
@@ -1033,7 +1059,7 @@
}
if (crtTbl.getCollItemDelim() != null)
- tbl.setSerdeParam(Constants.COLLECTION_DELIM,
+ tbl.setSerdeParam(Constants.COLLECTION_DELIM,
crtTbl.getCollItemDelim());
if (crtTbl.getMapKeyDelim() != null)
tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
@@ -1043,8 +1069,8 @@
/**
* We use LazySimpleSerDe by default.
- *
- * If the user didn't specify a SerDe, and any of the columns are not simple types,
+ *
+ * If the user didn't specify a SerDe, and any of the columns are not simple types,
* we will have to use DynamicSerDe instead.
*/
if (crtTbl.getSerName() == null) {
@@ -1113,13 +1139,14 @@
// create the table
db.createTable(tbl, crtTbl.getIfNotExists());
+ work.getOutputs().add(new WriteEntity(tbl));
return 0;
}
-
-
+
+
/**
* Create a new table like an existing table.
- *
+ *
* @param db The database in question.
* @param crtTbl This is the table we're creating.
* @return Returns 0 when execution succeeds and above 0 if it fails.
@@ -1131,27 +1158,28 @@
StorageDescriptor tblStorDesc = tbl.getTTable().getSd();
tbl.getTTable().setTableName(crtTbl.getTableName());
-
+
if (crtTbl.isExternal()) {
tbl.setProperty("EXTERNAL", "TRUE");
} else {
tbl.setProperty("EXTERNAL", "FALSE");
}
-
+
if (crtTbl.getLocation() != null) {
tblStorDesc.setLocation(crtTbl.getLocation());
} else {
tblStorDesc.setLocation(null);
tblStorDesc.unsetLocation();
}
-
+
// create the table
db.createTable(tbl, crtTbl.getIfNotExists());
+ work.getOutputs().add(new WriteEntity(tbl));
return 0;
}
-
+
public int getType() {
return StageType.DDL;
}
-
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Sep 29 01:25:15 2009
@@ -29,8 +29,11 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.loadFileDesc;
import org.apache.hadoop.hive.ql.plan.loadTableDesc;
import org.apache.hadoop.hive.ql.plan.moveWork;
@@ -47,7 +50,7 @@
public MoveTask() {
super();
}
-
+
public int execute() {
try {
@@ -89,7 +92,7 @@
if (fs.exists(sourcePath))
fs.copyToLocalFile(sourcePath, targetPath);
else {
- if (!dstFs.mkdirs(targetPath))
+ if (!dstFs.mkdirs(targetPath))
throw new HiveException ("Unable to make local directory: " + targetPath);
}
} else {
@@ -102,20 +105,19 @@
loadTableDesc tbd = work.getLoadTableWork();
if (tbd != null) {
String mesg = "Loading data to table " + tbd.getTable().getTableName() +
- ((tbd.getPartitionSpec().size() > 0) ?
+ ((tbd.getPartitionSpec().size() > 0) ?
" partition " + tbd.getPartitionSpec().toString() : "");
String mesg_detail = " from " + tbd.getSourceDir();
console.printInfo(mesg, mesg_detail);
+ Table table = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd.getTable().getTableName());
if (work.getCheckFileFormat()) {
-
// Get all files from the src directory
FileStatus [] dirs;
ArrayList<FileStatus> files;
FileSystem fs;
try {
- fs = FileSystem.get
- (db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd.getTable().getTableName()).getDataLocation(),conf);
+ fs = FileSystem.get(table.getDataLocation(),conf);
dirs = fs.globStatus(new Path(tbd.getSourceDir()));
files = new ArrayList<FileStatus>();
for (int i=0; (dirs != null && i<dirs.length); i++) {
@@ -135,10 +137,15 @@
if(tbd.getPartitionSpec().size() == 0) {
db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()));
+ if (work.getOutputs() != null)
+ work.getOutputs().add(new WriteEntity(table));
} else {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
+ Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+ if (work.getOutputs() != null)
+ work.getOutputs().add(new WriteEntity(partn));
}
}
@@ -149,7 +156,7 @@
return (1);
}
}
-
+
/*
* Does the move task involve moving to a local file system
*/
@@ -157,7 +164,7 @@
loadTableDesc tbd = work.getLoadTableWork();
if (tbd != null)
return false;
-
+
loadFileDesc lfd = work.getLoadFileWork();
if (lfd != null) {
if (lfd.getIsDfsDir()) {
@@ -166,10 +173,10 @@
else
return true;
}
-
+
return false;
}
-
+
public int getType() {
return StageType.MOVE;
}
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java?rev=819792&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java Tue Sep 29 01:25:15 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Set;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * The post execute hook interface. A list of such hooks can
+ * be configured to be called after compilation and before
+ * execution.
+ */
+public interface PostExecute {
+
+ /**
+ * The run command that is called just before the execution of the
+ * query.
+ *
+ * @param sess The session state.
+ * @param inputs The set of input tables and partitions.
+ * @param outputs The set of output tables, partitions, local and hdfs directories.
+ * @param ugi The user group security information.
+ */
+ public void run(SessionState sess, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs, UserGroupInformation ugi)
+ throws Exception;
+
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java Tue Sep 29 01:25:15 2009
@@ -28,12 +28,12 @@
* tables that are read by the query.
*/
public class ReadEntity {
-
+
/**
* The partition. This is null for a non partitioned table.
*/
private Partition p;
-
+
/**
* The table.
*/
@@ -41,17 +41,17 @@
/**
* Constructor.
- *
+ *
* @param t The Table that the query reads from.
*/
public ReadEntity(Table t) {
this.t = t;
this.p = null;
}
-
+
/**
* Constructor given a partiton.
- *
+ *
* @param p The partition that the query reads from.
*/
public ReadEntity(Partition p) {
@@ -62,14 +62,14 @@
* Enum that tells what time of a read entity this is.
*/
public static enum Type {TABLE, PARTITION};
-
+
/**
* Get the type.
*/
public Type getType() {
return p == null ? Type.TABLE : Type.PARTITION;
}
-
+
/**
* Get the parameter map of the Entity.
*/
@@ -81,7 +81,7 @@
return t.getTTable().getParameters();
}
}
-
+
/**
* Get the location of the entity.
*/
@@ -107,20 +107,20 @@
public Table getTable() {
return t;
}
-
+
/**
* toString function.
*/
@Override
public String toString() {
if (p != null) {
- return p.getTable().getDbName() + "/" + p.getTable().getName() + "/" + p.getName();
+ return p.getTable().getDbName() + "@" + p.getTable().getName() + "@" + p.getName();
}
else {
- return t.getDbName() + "/" + t.getName();
+ return t.getDbName() + "@" + t.getName();
}
}
-
+
/**
* Equals function.
*/
@@ -128,7 +128,7 @@
public boolean equals(Object o) {
if (o == null)
return false;
-
+
if (o instanceof ReadEntity) {
ReadEntity ore = (ReadEntity)o;
return (toString().equalsIgnoreCase(ore.toString()));
@@ -136,7 +136,7 @@
else
return false;
}
-
+
/**
* Hashcode function.
*/
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java Tue Sep 29 01:25:15 2009
@@ -33,12 +33,12 @@
* The type of the write entity.
*/
public static enum Type {TABLE, PARTITION, DFS_DIR, LOCAL_DIR};
-
+
/**
* The type.
*/
private Type typ;
-
+
/**
* The table. This is null if this is a directory.
*/
@@ -48,15 +48,15 @@
* The partition.This is null if this object is not a partition.
*/
private Partition p;
-
+
/**
* The directory if this is a directory.
*/
private String d;
-
+
/**
* Constructor for a table.
- *
+ *
* @param t Table that is written to.
*/
public WriteEntity(Table t) {
@@ -65,10 +65,10 @@
this.t = t;
this.typ = Type.TABLE;
}
-
+
/**
* Constructor for a partition.
- *
+ *
* @param p Partition that is written to.
*/
public WriteEntity(Partition p) {
@@ -77,10 +77,10 @@
this.t = p.getTable();
this.typ = Type.PARTITION;
}
-
+
/**
* Constructor for a file.
- *
+ *
* @param d The name of the directory that is being written to.
* @param islocal Flag to decide whether this directory is local or in dfs.
*/
@@ -95,30 +95,30 @@
this.typ = Type.DFS_DIR;
}
}
-
+
/**
* Get the type of the entity.
*/
public Type getType() {
return typ;
}
-
+
/**
* Get the location of the entity.
*/
public URI getLocation() throws Exception {
if (typ == Type.TABLE)
return t.getDataLocation();
-
+
if (typ == Type.PARTITION)
return p.getDataLocation();
-
+
if (typ == Type.DFS_DIR || typ == Type.LOCAL_DIR)
return new URI(d);
-
+
return null;
}
-
+
/**
* Get the partition associated with the entity.
*/
@@ -139,14 +139,14 @@
public String toString() {
switch(typ) {
case TABLE:
- return t.getDbName() + "/" + t.getName();
+ return t.getDbName() + "@" + t.getName();
case PARTITION:
- return t.getDbName() + "/" + t.getName() + "/" + p.getName();
+ return t.getDbName() + "@" + t.getName() + "@" + p.getName();
default:
return d;
}
}
-
+
/**
* Equals function.
*/
@@ -154,7 +154,7 @@
public boolean equals(Object o) {
if (o == null)
return false;
-
+
if (o instanceof WriteEntity) {
WriteEntity ore = (WriteEntity)o;
return (toString().equalsIgnoreCase(ore.toString()));
@@ -162,7 +162,7 @@
else
return false;
}
-
+
/**
* Hashcode function.
*/
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Tue Sep 29 01:25:15 2009
@@ -70,7 +70,7 @@
/**
* Table (only used internally)
- * @throws HiveException
+ * @throws HiveException
*
*/
protected Table() throws HiveException {
@@ -88,7 +88,7 @@
*
* @exception HiveException on internal error. Note not possible now, but in the future reserve the right to throw an exception
*/
- public Table(String name, Properties schema, Deserializer deserializer,
+ public Table(String name, Properties schema, Deserializer deserializer,
Class<? extends InputFormat<?, ?>> inputFormatClass,
Class<?> outputFormatClass,
URI dataLocation, Hive hive) throws HiveException {
@@ -102,18 +102,18 @@
setOutputFormatClass(HiveFileFormatUtils.getOutputFormatSubstitute(outputFormatClass));
setDataLocation(dataLocation);
}
-
+
public Table(String name) {
// fill in defaults
initEmpty();
getTTable().setTableName(name);
getTTable().setDbName(MetaStoreUtils.DEFAULT_DATABASE_NAME);
- // We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does not
+ // We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does not
// support a table with no columns.
getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
getSerdeInfo().getParameters().put(Constants.SERIALIZATION_FORMAT, "1");
}
-
+
void initEmpty() {
setTTable(new org.apache.hadoop.hive.metastore.api.Table());
getTTable().setSd(new StorageDescriptor());
@@ -127,10 +127,10 @@
sd.setCols(new ArrayList<FieldSchema>());
sd.setParameters(new HashMap<String, String>());
sd.setSortCols(new ArrayList<Order>());
-
+
sd.getSerdeInfo().setParameters(new HashMap<String, String>());
}
-
+
public void reinitSerDe() throws HiveException {
try {
deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(), this.getTTable());
@@ -138,7 +138,7 @@
throw new HiveException(e);
}
}
-
+
protected void initSerDe() throws HiveException {
if (deserializer == null) {
try {
@@ -148,7 +148,7 @@
}
}
}
-
+
public void checkValidity() throws HiveException {
// check for validity
String name = getTTable().getTableName();
@@ -167,7 +167,7 @@
if (null == getOutputFormatClass()) {
throw new HiveException("must specify an OutputFormat class");
}
-
+
Iterator<FieldSchema> iterCols = getCols().iterator();
List<String> colNames = new ArrayList<String>();
while (iterCols.hasNext()) {
@@ -175,7 +175,7 @@
Iterator<String> iter = colNames.iterator();
while (iter.hasNext()) {
String oldColName = iter.next();
- if (colName.equalsIgnoreCase(oldColName))
+ if (colName.equalsIgnoreCase(oldColName))
throw new HiveException("Duplicate column name " + colName + " in the table definition.");
}
colNames.add(colName.toLowerCase());
@@ -195,7 +195,7 @@
}
/**
- * @param inputFormatClass
+ * @param inputFormatClass
*/
public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) {
this.inputFormatClass = inputFormatClass;
@@ -203,7 +203,7 @@
}
/**
- * @param class1
+ * @param class1
*/
public void setOutputFormatClass(Class<?> class1) {
this.outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(class1);
@@ -255,11 +255,11 @@
else
return true;
}
-
+
if((spec == null) || (spec.size() != partCols.size())) {
throw new HiveException("table is partitioned but partition spec is not specified or tab: " + spec);
}
-
+
for (FieldSchema field : partCols) {
if(spec.get(field.getName()) == null) {
throw new HiveException(field.getName() + " not found in table's partition spec: " + spec);
@@ -268,7 +268,7 @@
return true;
}
-
+
public void setProperty(String name, String value) {
getTTable().getParameters().put(name, value);
}
@@ -308,7 +308,7 @@
throw new RuntimeException(e);
}
}
-
+
/**
* @param schema the schema to set
*/
@@ -323,7 +323,7 @@
this.deserializer = deserializer;
}
- public String toString() {
+ public String toString() {
return getTTable().getTableName();
}
@@ -335,7 +335,7 @@
}
return partKeys;
}
-
+
public boolean isPartitionKey(String colName) {
for (FieldSchema key : getPartCols()) {
if(key.getName().toLowerCase().equals(colName)) {
@@ -351,11 +351,11 @@
if(bcols == null || bcols.size() == 0) {
return null;
}
-
+
if(bcols.size() > 1) {
LOG.warn(this + " table has more than one dimensions which aren't supported yet");
}
-
+
return bcols.get(0);
}
@@ -385,7 +385,7 @@
for (String col : bucketCols) {
if(!isField(col))
- throw new HiveException("Bucket columns " + col + " is not part of the table columns" );
+ throw new HiveException("Bucket columns " + col + " is not part of the table columns" );
}
getTTable().getSd().setBucketCols(bucketCols);
}
@@ -400,7 +400,7 @@
return true;
}
}
- return false;
+ return false;
}
public List<FieldSchema> getCols() {
@@ -419,7 +419,7 @@
/**
* Returns a list of all the columns of the table (data columns + partition columns in that order.
- *
+ *
* @return List<FieldSchema>
*/
public List<FieldSchema> getAllCols() {
@@ -439,7 +439,7 @@
public int getNumBuckets() {
return getTTable().getSd().getNumBuckets();
}
-
+
/**
* Replaces files in the partition with new data set specified by srcf. Works by moving files
* @param srcf Files to be replaced. Leaf directories or globbed file paths
@@ -487,7 +487,7 @@
}
}
-
+
public boolean isPartitioned() {
if(getPartCols() == null) {
return false;
@@ -554,11 +554,11 @@
public String getSerializationLib() {
return getSerdeInfo().getSerializationLib();
}
-
+
public String getSerdeParam(String param) {
return getSerdeInfo().getParameters().get(param);
}
-
+
public String setSerdeParam(String param, String value) {
return getSerdeInfo().getParameters().put(param, value);
}
@@ -570,7 +570,7 @@
public List<Order> getSortCols() {
return getTTable().getSd().getSortCols();
}
-
+
/**
* Creates a partition name -> value spec map object
* @param tp Use the information from this partition.
@@ -578,7 +578,7 @@
*/
public LinkedHashMap<String, String> createSpec(
org.apache.hadoop.hive.metastore.api.Partition tp) {
-
+
List<FieldSchema> fsl = getPartCols();
List<String> tpl = tp.getValues();
LinkedHashMap<String, String> spec = new LinkedHashMap<String, String>();
@@ -589,5 +589,17 @@
}
return spec;
}
-
+
+ public Table copy() throws HiveException {
+ Table newTbl = new Table();
+
+ newTbl.schema = this.schema;
+ newTbl.deserializer = this.deserializer; //TODO: convert to SerDeInfo format
+
+ newTbl.setTTable(getTTable().clone());
+ newTbl.uri = this.uri;
+ newTbl.inputFormatClass = this.inputFormatClass;
+ newTbl.outputFormatClass = this.outputFormatClass;
+ return newTbl;
+ }
};
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Tue Sep 29 01:25:15 2009
@@ -183,7 +183,7 @@
cplan.getPathToPartitionInfo().put(fsConf.getDirName(), new partitionDesc(fsConf.getTableInfo(), null));
cplan.setNumReduceTasks(-1);
- moveWork dummyMv = new moveWork(null, new loadFileDesc(fsOp.getConf().getDirName(), finalName, true, null, null), false);
+ moveWork dummyMv = new moveWork(null, null, null, new loadFileDesc(fsOp.getConf().getDirName(), finalName, true, null, null), false);
Task<? extends Serializable> dummyMergeTask = TaskFactory.get(dummyMv, ctx.getConf());
List<Serializable> listWorks = new ArrayList<Serializable>();
listWorks.add(dummyMv);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Tue Sep 29 01:25:15 2009
@@ -49,6 +49,15 @@
protected Context ctx;
protected HashMap<String, String> idToTableNameMap;
+ /**
+ * ReadEntitites that are passed to the hooks.
+ */
+ protected Set<ReadEntity> inputs;
+ /**
+ * List of WriteEntities that are passed to the hooks.
+ */
+ protected Set<WriteEntity> outputs;
+
public BaseSemanticAnalyzer(HiveConf conf) throws SemanticException {
try {
@@ -58,17 +67,19 @@
LOG = LogFactory.getLog(this.getClass().getName());
console = new LogHelper(LOG);
this.idToTableNameMap = new HashMap<String, String>();
+ inputs = new LinkedHashSet<ReadEntity>();
+ outputs = new LinkedHashSet<WriteEntity>();
} catch (Exception e) {
throw new SemanticException (e);
}
}
-
+
public HashMap<String, String> getIdToTableNameMap() {
return idToTableNameMap;
}
-
+
public abstract void analyzeInternal(ASTNode ast) throws SemanticException;
public void analyze(ASTNode ast, Context ctx) throws SemanticException {
@@ -79,7 +90,7 @@
public void validate() throws SemanticException {
// Implementations may choose to override this
}
-
+
public List<Task<? extends Serializable>> getRootTasks() {
return rootTasks;
}
@@ -114,11 +125,11 @@
if ((val.charAt(0) == '\'' && val.charAt(val.length() - 1) == '\'')
|| (val.charAt(0) == '\"' && val.charAt(val.length() - 1) == '\"')) {
val = val.substring(1, val.length() - 1);
- }
+ }
return val;
}
- public static String charSetString(String charSetName, String charSetString)
+ public static String charSetString(String charSetName, String charSetString)
throws SemanticException {
try
{
@@ -131,7 +142,7 @@
assert charSetString.charAt(0) == '0';
assert charSetString.charAt(1) == 'x';
charSetString = charSetString.substring(2);
-
+
byte[] bArray = new byte[charSetString.length()/2];
int j = 0;
for (int i = 0; i < charSetString.length(); i += 2)
@@ -144,7 +155,7 @@
String res = new String(bArray, charSetName);
return res;
- }
+ }
} catch (UnsupportedEncodingException e) {
throw new SemanticException(e);
}
@@ -162,7 +173,7 @@
}
if (val.charAt(0) == '`' && val.charAt(val.length() - 1) == '`') {
val = val.substring(1, val.length() - 1);
- }
+ }
return val;
}
@@ -172,11 +183,11 @@
Character enclosure = null;
// Some of the strings can be passed in as unicode. For example, the
- // delimiter can be passed in as \002 - So, we first check if the
+ // delimiter can be passed in as \002 - So, we first check if the
// string is a unicode number, else go back to the old behavior
StringBuilder sb = new StringBuilder(b.length());
for (int i=0; i < b.length(); i++) {
-
+
char currentChar = b.charAt(i);
if (enclosure == null) {
if (currentChar == '\'' || b.charAt(i) == '\"') {
@@ -185,12 +196,12 @@
// ignore all other chars outside the enclosure
continue;
}
-
+
if (enclosure.equals(currentChar)) {
enclosure = null;
continue;
}
-
+
if (currentChar == '\\' && (i+4 < b.length())) {
char i1 = b.charAt(i+1);
char i2 = b.charAt(i+2);
@@ -233,15 +244,15 @@
}
return sb.toString();
}
-
+
public Set<ReadEntity> getInputs() {
- return new LinkedHashSet<ReadEntity>();
+ return inputs;
}
-
+
public Set<WriteEntity> getOutputs() {
- return new LinkedHashSet<WriteEntity>();
+ return outputs;
}
-
+
public static class tableSpec {
public String tableName;
public Table tableHandle;
@@ -257,7 +268,7 @@
// get table metadata
tableName = unescapeIdentifier(ast.getChild(0).getText());
boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
- if (testMode)
+ if (testMode)
tableName = conf.getVar(HiveConf.ConfVars.HIVETESTMODEPREFIX) + tableName;
tableHandle = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
@@ -287,9 +298,9 @@
public String toString() {
- if(partHandle != null)
+ if(partHandle != null)
return partHandle.toString();
- else
+ else
return tableHandle.toString();
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Tue Sep 29 01:25:15 2009
@@ -93,7 +93,7 @@
private static final String RCFILE_OUTPUT = RCFileOutputFormat.class.getName();
private static final String COLUMNAR_SERDE = ColumnarSerDe.class.getName();
-
+
public static String getTypeName(int token) {
return TokenToTypeName.get(token);
}
@@ -128,7 +128,7 @@
analyzeDescFunction(ast);
} else if (ast.getToken().getType() == HiveParser.TOK_MSCK) {
ctx.setResFile(new Path(ctx.getLocalTmpFileURI()));
- analyzeMetastoreCheck(ast);
+ analyzeMetastoreCheck(ast);
} else if (ast.getToken().getType() == HiveParser.TOK_ALTERTABLE_RENAME)
analyzeAlterTableRename(ast);
else if (ast.getToken().getType() == HiveParser.TOK_ALTERTABLE_ADDCOLS)
@@ -155,7 +155,7 @@
}
}
- private void analyzeCreateTable(ASTNode ast)
+ private void analyzeCreateTable(ASTNode ast)
throws SemanticException {
String tableName = unescapeIdentifier(ast.getChild(0).getText());
String likeTableName = null;
@@ -183,7 +183,7 @@
outputFormat = SEQUENCEFILE_OUTPUT;
}
- LOG.info("Creating table" + tableName);
+ LOG.info("Creating table" + tableName);
int numCh = ast.getChildCount();
for (int num = 1; num < numCh; num++)
{
@@ -247,7 +247,7 @@
}
break;
case HiveParser.TOK_TABLESERIALIZER:
-
+
child = (ASTNode)child.getChild(0);
serde = unescapeSQLString(child.getChild(0).getText());
if (child.getChildCount() == 2) {
@@ -284,29 +284,29 @@
}
}
if (likeTableName == null) {
- createTableDesc crtTblDesc =
- new createTableDesc(tableName, isExt, cols, partCols, bucketCols,
+ createTableDesc crtTblDesc =
+ new createTableDesc(tableName, isExt, cols, partCols, bucketCols,
sortCols, numBuckets,
fieldDelim, fieldEscape,
collItemDelim, mapKeyDelim, lineDelim,
- comment, inputFormat, outputFormat, location, serde,
+ comment, inputFormat, outputFormat, location, serde,
mapProp, ifNotExists);
-
+
validateCreateTable(crtTblDesc);
- rootTasks.add(TaskFactory.get(new DDLWork(crtTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), crtTblDesc), conf));
} else {
- createTableLikeDesc crtTblLikeDesc =
+ createTableLikeDesc crtTblLikeDesc =
new createTableLikeDesc(tableName, isExt, location, ifNotExists, likeTableName);
- rootTasks.add(TaskFactory.get(new DDLWork(crtTblLikeDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), crtTblLikeDesc), conf));
}
-
+
}
private void validateCreateTable(createTableDesc crtTblDesc) throws SemanticException {
// no duplicate column names
// currently, it is a simple n*n algorithm - this can be optimized later if need be
// but it should not be a major bottleneck as the number of columns are anyway not so big
-
+
if((crtTblDesc.getCols() == null) || (crtTblDesc.getCols().size() == 0)) {
// for now make sure that serde exists
if(StringUtils.isEmpty(crtTblDesc.getSerName()) || SerDeUtils.isNativeSerDe(crtTblDesc.getSerName())) {
@@ -314,7 +314,7 @@
}
return;
}
-
+
try {
Class<?> origin = Class.forName(crtTblDesc.getOutputFormat(), true, JavaUtils.getClassLoader());
Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils.getOutputFormatSubstitute(origin);
@@ -323,7 +323,7 @@
} catch (ClassNotFoundException e) {
throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE.getMsg());
}
-
+
Iterator<FieldSchema> iterCols = crtTblDesc.getCols().iterator();
List<String> colNames = new ArrayList<String>();
while (iterCols.hasNext()) {
@@ -331,14 +331,14 @@
Iterator<String> iter = colNames.iterator();
while (iter.hasNext()) {
String oldColName = iter.next();
- if (colName.equalsIgnoreCase(oldColName))
+ if (colName.equalsIgnoreCase(oldColName))
throw new SemanticException(ErrorMsg.DUPLICATE_COLUMN_NAMES.getMsg());
}
colNames.add(colName);
}
if (crtTblDesc.getBucketCols() != null)
- {
+ {
// all columns in cluster and sort are valid columns
Iterator<String> bucketCols = crtTblDesc.getBucketCols().iterator();
while (bucketCols.hasNext()) {
@@ -376,7 +376,7 @@
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
}
}
-
+
if (crtTblDesc.getPartCols() != null)
{
// there is no overlap between columns and partitioning columns
@@ -386,40 +386,40 @@
Iterator<String> colNamesIter = colNames.iterator();
while (colNamesIter.hasNext()) {
String colName = unescapeIdentifier(colNamesIter.next());
- if (partCol.equalsIgnoreCase(colName))
+ if (partCol.equalsIgnoreCase(colName))
throw new SemanticException(ErrorMsg.COLUMN_REPEATED_IN_PARTITIONING_COLS.getMsg());
}
}
}
}
-
- private void analyzeDropTable(ASTNode ast)
+
+ private void analyzeDropTable(ASTNode ast)
throws SemanticException {
- String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
dropTableDesc dropTblDesc = new dropTableDesc(tableName);
- rootTasks.add(TaskFactory.get(new DDLWork(dropTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), dropTblDesc), conf));
}
- private void analyzeAlterTableProps(ASTNode ast) throws SemanticException {
- String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ private void analyzeAlterTableProps(ASTNode ast) throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
HashMap<String, String> mapProp = getProps((ASTNode)(ast.getChild(1)).getChild(0));
alterTableDesc alterTblDesc = new alterTableDesc(alterTableTypes.ADDPROPS);
alterTblDesc.setProps(mapProp);
alterTblDesc.setOldName(tableName);
- rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf));
}
- private void analyzeAlterTableSerdeProps(ASTNode ast) throws SemanticException {
- String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ private void analyzeAlterTableSerdeProps(ASTNode ast) throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
HashMap<String, String> mapProp = getProps((ASTNode)(ast.getChild(1)).getChild(0));
alterTableDesc alterTblDesc = new alterTableDesc(alterTableTypes.ADDSERDEPROPS);
alterTblDesc.setProps(mapProp);
alterTblDesc.setOldName(tableName);
- rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf));
}
- private void analyzeAlterTableSerde(ASTNode ast) throws SemanticException {
- String tableName = unescapeIdentifier(ast.getChild(0).getText());
+ private void analyzeAlterTableSerde(ASTNode ast) throws SemanticException {
+ String tableName = unescapeIdentifier(ast.getChild(0).getText());
String serdeName = unescapeSQLString(ast.getChild(1).getText());
alterTableDesc alterTblDesc = new alterTableDesc(alterTableTypes.ADDSERDE);
if(ast.getChildCount() > 2) {
@@ -428,7 +428,7 @@
}
alterTblDesc.setOldName(tableName);
alterTblDesc.setSerdeName(serdeName);
- rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf));
}
private HashMap<String, String> getProps(ASTNode prop) {
@@ -456,7 +456,7 @@
return getTypeName(typeNode.getType());
}
}
-
+
private static String getStructTypeStringFromAST(ASTNode typeNode)
throws SemanticException {
String typeStr = Constants.STRUCT_TYPE_NAME + "<";
@@ -471,11 +471,11 @@
if (i < children - 1)
typeStr += ",";
}
-
+
typeStr += ">";
return typeStr;
}
-
+
private List<FieldSchema> getColumns(ASTNode ast) throws SemanticException
{
List<FieldSchema> colList = new ArrayList<FieldSchema>();
@@ -486,14 +486,14 @@
col.setName(unescapeIdentifier(child.getChild(0).getText()));
ASTNode typeChild = (ASTNode)(child.getChild(1));
col.setType(getTypeStringFromAST(typeChild));
-
+
if (child.getChildCount() == 3)
col.setComment(unescapeSQLString(child.getChild(2).getText()));
colList.add(col);
}
return colList;
}
-
+
private List<String> getColumnNames(ASTNode ast)
{
List<String> colList = new ArrayList<String>();
@@ -520,7 +520,7 @@
}
/**
- * Get the fully qualified name in the ast. e.g. the ast of the form ^(DOT ^(DOT a b) c)
+ * Get the fully qualified name in the ast. e.g. the ast of the form ^(DOT ^(DOT a b) c)
* will generate a name of the form a.b.c
*
* @param ast The AST from which the qualified name has to be extracted
@@ -542,7 +542,7 @@
*/
private Task<? extends Serializable> createFetchTask(String schema) {
Properties prop = new Properties();
-
+
prop.setProperty(Constants.SERIALIZATION_FORMAT, "9");
prop.setProperty(Constants.SERIALIZATION_NULL_FORMAT, " ");
String[] colTypes = schema.split("#");
@@ -553,7 +553,7 @@
ctx.getResFile().toString(),
new tableDesc(LazySimpleSerDe.class, TextInputFormat.class, IgnoreKeyTextOutputFormat.class, prop),
-1
- );
+ );
fetch.setSerializationNullFormat(" ");
return TaskFactory.get(fetch, this.conf);
}
@@ -569,15 +569,15 @@
ASTNode partspec = (ASTNode) tableTypeExpr.getChild(1);
partSpec = getPartSpec(partspec);
}
-
+
boolean isExt = ast.getChildCount() > 1;
descTableDesc descTblDesc = new descTableDesc(ctx.getResFile(), tableName, partSpec, isExt);
- rootTasks.add(TaskFactory.get(new DDLWork(descTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), descTblDesc), conf));
setFetchTask(createFetchTask(descTblDesc.getSchema()));
LOG.info("analyzeDescribeTable done");
}
-
-
+
+
private HashMap<String, String> getPartSpec(ASTNode partspec)
throws SemanticException {
HashMap<String, String> partSpec = new LinkedHashMap<String, String>();
@@ -588,37 +588,37 @@
}
return partSpec;
}
-
- private void analyzeShowPartitions(ASTNode ast)
+
+ private void analyzeShowPartitions(ASTNode ast)
throws SemanticException {
showPartitionsDesc showPartsDesc;
String tableName = unescapeIdentifier(ast.getChild(0).getText());
showPartsDesc = new showPartitionsDesc(tableName, ctx.getResFile());
- rootTasks.add(TaskFactory.get(new DDLWork(showPartsDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), showPartsDesc), conf));
setFetchTask(createFetchTask(showPartsDesc.getSchema()));
}
-
- private void analyzeShowTables(ASTNode ast)
+
+ private void analyzeShowTables(ASTNode ast)
throws SemanticException {
showTablesDesc showTblsDesc;
if (ast.getChildCount() == 1)
{
- String tableNames = unescapeSQLString(ast.getChild(0).getText());
+ String tableNames = unescapeSQLString(ast.getChild(0).getText());
showTblsDesc = new showTablesDesc(ctx.getResFile(), tableNames);
}
else {
showTblsDesc = new showTablesDesc(ctx.getResFile());
}
- rootTasks.add(TaskFactory.get(new DDLWork(showTblsDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), showTblsDesc), conf));
setFetchTask(createFetchTask(showTblsDesc.getSchema()));
}
-
- private void analyzeShowTableStatus(ASTNode ast)
+
+ private void analyzeShowTableStatus(ASTNode ast)
throws SemanticException {
showTableStatusDesc showTblStatusDesc;
String tableNames = unescapeIdentifier(ast.getChild(0).getText());
String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
- int children = ast.getChildCount();
+ int children = ast.getChildCount();
HashMap<String, String> partSpec = null;
if (children >= 2) {
if(children > 3)
@@ -634,7 +634,7 @@
}
}
showTblStatusDesc = new showTableStatusDesc(ctx.getResFile(), dbName, tableNames, partSpec);
- rootTasks.add(TaskFactory.get(new DDLWork(showTblStatusDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), showTblStatusDesc), conf));
setFetchTask(createFetchTask(showTblStatusDesc.getSchema()));
}
@@ -654,10 +654,10 @@
else {
showFuncsDesc = new showFunctionsDesc(ctx.getResFile());
}
- rootTasks.add(TaskFactory.get(new DDLWork(showFuncsDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), showFuncsDesc), conf));
setFetchTask(createFetchTask(showFuncsDesc.getSchema()));
}
-
+
/**
* Add the task according to the parsed command tree.
* This is used for the CLI command "DESCRIBE FUNCTION;".
@@ -668,7 +668,7 @@
throws SemanticException {
String funcName;
boolean isExtended;
-
+
if(ast.getChildCount() == 1) {
funcName = ast.getChild(0).getText();
isExtended = false;
@@ -681,34 +681,34 @@
descFunctionDesc descFuncDesc = new descFunctionDesc(ctx.getResFile(),
funcName, isExtended);
- rootTasks.add(TaskFactory.get(new DDLWork(descFuncDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), descFuncDesc), conf));
setFetchTask(createFetchTask(descFuncDesc.getSchema()));
}
- private void analyzeAlterTableRename(ASTNode ast)
+ private void analyzeAlterTableRename(ASTNode ast)
throws SemanticException {
alterTableDesc alterTblDesc = new alterTableDesc(
unescapeIdentifier(ast.getChild(0).getText()),
unescapeIdentifier(ast.getChild(1).getText()));
- rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf));
}
- private void analyzeAlterTableModifyCols(ASTNode ast, alterTableTypes alterType)
+ private void analyzeAlterTableModifyCols(ASTNode ast, alterTableTypes alterType)
throws SemanticException {
String tblName = unescapeIdentifier(ast.getChild(0).getText());
List<FieldSchema> newCols = getColumns((ASTNode)ast.getChild(1));
alterTableDesc alterTblDesc = new alterTableDesc(tblName, newCols, alterType);
- rootTasks.add(TaskFactory.get(new DDLWork(alterTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf));
}
-
+
private void analyzeAlterTableDropParts(ASTNode ast) throws SemanticException {
String tblName = unescapeIdentifier(ast.getChild(0).getText());
// get table metadata
List<Map<String, String>> partSpecs = getPartitionSpecs(ast);
dropTableDesc dropTblDesc = new dropTableDesc(tblName, partSpecs);
- rootTasks.add(TaskFactory.get(new DDLWork(dropTblDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), dropTblDesc), conf));
}
-
+
/**
* Add one or more partitions to a table. Useful
* when the data has been copied to the right location
@@ -716,28 +716,28 @@
* @param ast The parsed command tree.
* @throws SemanticException Parsin failed
*/
- private void analyzeAlterTableAddParts(CommonTree ast)
+ private void analyzeAlterTableAddParts(CommonTree ast)
throws SemanticException {
-
+
String tblName = unescapeIdentifier(ast.getChild(0).getText());;
//partition name to value
List<Map<String, String>> partSpecs = getPartitionSpecs(ast);
-
+
Iterator<Map<String, String>> partIter = partSpecs.iterator();
-
+
String currentLocation = null;
Map<String, String> currentPart = null;
-
+
int numCh = ast.getChildCount();
for (int num = 1; num < numCh; num++) {
CommonTree child = (CommonTree)ast.getChild(num);
switch (child.getToken().getType()) {
case HiveParser.TOK_PARTSPEC:
if(currentPart != null) {
- AddPartitionDesc addPartitionDesc =
- new AddPartitionDesc(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ AddPartitionDesc addPartitionDesc =
+ new AddPartitionDesc(MetaStoreUtils.DEFAULT_DATABASE_NAME,
tblName, currentPart, currentLocation);
- rootTasks.add(TaskFactory.get(new DDLWork(addPartitionDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc), conf));
}
//create new partition, set values
currentLocation = null;
@@ -751,16 +751,16 @@
throw new SemanticException("Unknown child: " + child);
}
}
-
+
//add the last one
if(currentPart != null) {
- AddPartitionDesc addPartitionDesc =
- new AddPartitionDesc(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ AddPartitionDesc addPartitionDesc =
+ new AddPartitionDesc(MetaStoreUtils.DEFAULT_DATABASE_NAME,
tblName, currentPart, currentLocation);
- rootTasks.add(TaskFactory.get(new DDLWork(addPartitionDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc), conf));
}
- }
-
+ }
+
/**
* Verify that the information in the metastore matches up
* with the data on the fs.
@@ -774,7 +774,7 @@
}
List<Map<String, String>> specs = getPartitionSpecs(ast);
MsckDesc checkDesc = new MsckDesc(tableName, specs, ctx.getResFile());
- rootTasks.add(TaskFactory.get(new DDLWork(checkDesc), conf));
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), checkDesc), conf));
}
/**
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java Tue Sep 29 01:25:15 2009
@@ -64,7 +64,7 @@
String fromScheme = fromURI.getScheme();
String fromAuthority = fromURI.getAuthority();
String path = fromURI.getPath();
-
+
// generate absolute path relative to current directory or hdfs home directory
if(!path.startsWith("/")) {
if(isLocal) {
@@ -73,7 +73,7 @@
path = new Path(new Path("/user/"+System.getProperty("user.name")), path).toString();
}
}
-
+
// set correct scheme and authority
if(StringUtils.isEmpty(fromScheme)) {
if(isLocal) {
@@ -86,20 +86,20 @@
fromAuthority = defaultURI.getAuthority();
}
}
-
+
// if scheme is specified but not authority then use the default authority
if(fromScheme.equals("hdfs") && StringUtils.isEmpty(fromAuthority)) {
URI defaultURI = FileSystem.get(conf).getUri();
fromAuthority = defaultURI.getAuthority();
}
-
+
LOG.debug(fromScheme + "@" + fromAuthority + "@" + path);
return new URI(fromScheme, fromAuthority, path, null, null);
}
private void applyConstraints(URI fromURI, URI toURI, Tree ast, boolean isLocal) throws SemanticException {
- if(!fromURI.getScheme().equals("file") &&
+ if(!fromURI.getScheme().equals("file") &&
!fromURI.getScheme().equals("hdfs")) {
throw new SemanticException (ErrorMsg.INVALID_PATH.getMsg(ast, "only \"file\" or \"hdfs\" file systems accepted"));
}
@@ -134,7 +134,7 @@
}
- // only in 'local' mode do we copy stuff from one place to another.
+ // only in 'local' mode do we copy stuff from one place to another.
// reject different scheme/authority in other cases.
if(!isLocal && (!StringUtils.equals(fromURI.getScheme(), toURI.getScheme()) ||
!StringUtils.equals(fromURI.getAuthority(), toURI.getAuthority()))) {
@@ -183,7 +183,7 @@
&& (ts.partSpec == null || ts.partSpec.size() == 0)) {
throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
}
-
+
// make sure the arguments make sense
applyConstraints(fromURI, toURI, from_t, isLocal);
@@ -191,14 +191,14 @@
// create copy work
if(isLocal) {
- // if the local keyword is specified - we will always make a copy. this might seem redundant in the case
+ // if the local keyword is specified - we will always make a copy. this might seem redundant in the case
// that the hive warehouse is also located in the local file system - but that's just a test case.
String copyURIStr = ctx.getExternalTmpFileURI(toURI);
URI copyURI = URI.create(copyURIStr);
rTask = TaskFactory.get(new copyWork(fromURI.toString(), copyURIStr), this.conf);
fromURI = copyURI;
}
-
+
// create final load/move work
String loadTmpPath = ctx.getExternalTmpFileURI(toURI);
@@ -209,9 +209,9 @@
isOverWrite);
if(rTask != null) {
- rTask.addDependentTask(TaskFactory.get(new moveWork(loadTableWork, null, true), this.conf));
+ rTask.addDependentTask(TaskFactory.get(new moveWork(getInputs(), getOutputs(), loadTableWork, null, true), this.conf));
} else {
- rTask = TaskFactory.get(new moveWork(loadTableWork, null, true), this.conf);
+ rTask = TaskFactory.get(new moveWork(getInputs(), getOutputs(), loadTableWork, null, true), this.conf);
}
rootTasks.add(rTask);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Sep 29 01:25:15 2009
@@ -163,15 +163,6 @@
private UnionProcContext uCtx;
List<MapJoinOperator> listMapJoinOpsNoReducer;
- /**
- * ReadEntitites that are passed to the hooks.
- */
- private Set<ReadEntity> inputs;
- /**
- * List of WriteEntities that are passed to the hooks.
- */
- private Set<WriteEntity> outputs;
-
private static class Phase1Ctx {
String dest;
int nextNum;
@@ -193,12 +184,8 @@
this.destTableId = 1;
this.uCtx = null;
this.listMapJoinOpsNoReducer = new ArrayList<MapJoinOperator>();
-
- inputs = new LinkedHashSet<ReadEntity>();
- outputs = new LinkedHashSet<WriteEntity>();
}
-
@Override
protected void reset() {
super.reset();
@@ -4319,9 +4306,9 @@
// the tasks that have a file sink operation
List<moveWork> mv = new ArrayList<moveWork>();
for (loadTableDesc ltd : loadTableWork)
- mvTask.add(TaskFactory.get(new moveWork(ltd, null, false), this.conf));
+ mvTask.add(TaskFactory.get(new moveWork(null, null, ltd, null, false), this.conf));
for (loadFileDesc lfd : loadFileWork)
- mvTask.add(TaskFactory.get(new moveWork(null, lfd, false), this.conf));
+ mvTask.add(TaskFactory.get(new moveWork(null, null, null, lfd, false), this.conf));
}
// generate map reduce plans
@@ -4643,12 +4630,4 @@
validate(childTask);
}
- @Override
- public Set<ReadEntity> getInputs() {
- return inputs;
- }
-
- public Set<WriteEntity> getOutputs() {
- return outputs;
- }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java Tue Sep 29 01:25:15 2009
@@ -18,20 +18,52 @@
package org.apache.hadoop.hive.ql.parse;
-import org.apache.hadoop.hive.conf.HiveConf;
+import java.util.HashMap;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
public class SemanticAnalyzerFactory {
+ static HashMap<Integer, String> commandType = new HashMap<Integer, String>();
+
+ static {
+ commandType.put(HiveParser.TOK_EXPLAIN, "EXPLAIN");
+ commandType.put(HiveParser.TOK_LOAD, "LOAD");
+ commandType.put(HiveParser.TOK_CREATETABLE, "CREATETABLE");
+ commandType.put(HiveParser.TOK_DROPTABLE, "DROPTABLE");
+ commandType.put(HiveParser.TOK_DESCTABLE, "DESCTABLE");
+ commandType.put(HiveParser.TOK_DESCFUNCTION, "DESCFUNCTION");
+ commandType.put(HiveParser.TOK_MSCK, "MSCK");
+ commandType.put(HiveParser.TOK_ALTERTABLE_ADDCOLS, "ALTERTABLE_ADDCOLS");
+ commandType.put(HiveParser.TOK_ALTERTABLE_REPLACECOLS, "ALTERTABLE_REPLACECOLS");
+ commandType.put(HiveParser.TOK_ALTERTABLE_RENAME, "ALTERTABLE_RENAME");
+ commandType.put(HiveParser.TOK_ALTERTABLE_DROPPARTS, "ALTERTABLE_DROPPARTS");
+ commandType.put(HiveParser.TOK_ALTERTABLE_ADDPARTS, "ALTERTABLE_ADDPARTS");
+ commandType.put(HiveParser.TOK_ALTERTABLE_PROPERTIES, "ALTERTABLE_PROPERTIES");
+ commandType.put(HiveParser.TOK_ALTERTABLE_SERIALIZER, "ALTERTABLE_SERIALIZER");
+ commandType.put(HiveParser.TOK_ALTERTABLE_SERDEPROPERTIES, "ALTERTABLE_SERDEPROPERTIES");
+ commandType.put(HiveParser.TOK_SHOWTABLES, "SHOWTABLES");
+ commandType.put(HiveParser.TOK_SHOW_TABLESTATUS, "SHOW_TABLESTATUS");
+ commandType.put(HiveParser.TOK_SHOWFUNCTIONS, "SHOWFUNCTIONS");
+ commandType.put(HiveParser.TOK_SHOWPARTITIONS, "SHOWPARTITIONS");
+ commandType.put(HiveParser.TOK_CREATEFUNCTION, "CREATEFUNCTION");
+ commandType.put(HiveParser.TOK_DROPFUNCTION, "DROPFUNCTION");
+ commandType.put(HiveParser.TOK_QUERY, "QUERY");
+ }
+
public static BaseSemanticAnalyzer get(HiveConf conf, ASTNode tree) throws SemanticException {
if(tree.getToken() == null) {
throw new RuntimeException ("Empty Syntax Tree");
} else {
+ if (SessionState.get() != null)
+ SessionState.get().setCommandType(commandType.get(tree.getToken().getType()));
+
switch (tree.getToken().getType()) {
case HiveParser.TOK_EXPLAIN: return new ExplainSemanticAnalyzer(conf);
case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(conf);
- case HiveParser.TOK_CREATETABLE:
- case HiveParser.TOK_DROPTABLE:
+ case HiveParser.TOK_CREATETABLE:
+ case HiveParser.TOK_DROPTABLE:
case HiveParser.TOK_DESCTABLE:
case HiveParser.TOK_DESCFUNCTION:
case HiveParser.TOK_MSCK:
@@ -48,7 +80,7 @@
case HiveParser.TOK_SHOWFUNCTIONS:
case HiveParser.TOK_SHOWPARTITIONS:
return new DDLSemanticAnalyzer(conf);
- case HiveParser.TOK_CREATEFUNCTION:
+ case HiveParser.TOK_CREATEFUNCTION:
case HiveParser.TOK_DROPFUNCTION:
return new FunctionSemanticAnalyzer(conf);
default: return new SemanticAnalyzer(conf);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java Tue Sep 29 01:25:15 2009
@@ -18,7 +18,11 @@
package org.apache.hadoop.hive.ql.plan;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+
import java.io.Serializable;
+import java.util.Set;
public class DDLWork implements Serializable {
private static final long serialVersionUID = 1L;
@@ -35,87 +39,125 @@
private MsckDesc msckDesc;
private showTableStatusDesc showTblStatusDesc;
- public DDLWork() { }
+ /**
+ * ReadEntitites that are passed to the hooks.
+ */
+ protected Set<ReadEntity> inputs;
+ /**
+ * List of WriteEntities that are passed to the hooks.
+ */
+ protected Set<WriteEntity> outputs;
+
+ public DDLWork() {
+ }
+
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
+ this.inputs = inputs;
+ this.outputs = outputs;
+ }
/**
* @param alterTblDesc alter table descriptor
*/
- public DDLWork(alterTableDesc alterTblDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, alterTableDesc alterTblDesc) {
+ this(inputs, outputs);
this.alterTblDesc = alterTblDesc;
}
/**
* @param createTblDesc create table descriptor
*/
- public DDLWork(createTableDesc createTblDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, createTableDesc createTblDesc) {
+ this(inputs, outputs);
+
this.createTblDesc = createTblDesc;
}
/**
* @param createTblLikeDesc create table dlike escriptor
*/
- public DDLWork(createTableLikeDesc createTblLikeDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, createTableLikeDesc createTblLikeDesc) {
+ this(inputs, outputs);
+
this.createTblLikeDesc = createTblLikeDesc;
}
/**
* @param dropTblDesc drop table descriptor
*/
- public DDLWork(dropTableDesc dropTblDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, dropTableDesc dropTblDesc) {
+ this(inputs, outputs);
+
this.dropTblDesc = dropTblDesc;
}
/**
* @param descTblDesc
*/
- public DDLWork(descTableDesc descTblDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, descTableDesc descTblDesc) {
+ this(inputs, outputs);
+
this.descTblDesc = descTblDesc;
}
/**
* @param showTblsDesc
*/
- public DDLWork(showTablesDesc showTblsDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, showTablesDesc showTblsDesc) {
+ this(inputs, outputs);
+
this.showTblsDesc = showTblsDesc;
}
/**
* @param showFuncsDesc
*/
- public DDLWork(showFunctionsDesc showFuncsDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, showFunctionsDesc showFuncsDesc) {
+ this(inputs, outputs);
+
this.showFuncsDesc = showFuncsDesc;
}
-
+
/**
* @param descFuncDesc
*/
- public DDLWork(descFunctionDesc descFuncDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, descFunctionDesc descFuncDesc) {
+ this(inputs, outputs);
+
this.descFunctionDesc = descFuncDesc;
}
/**
* @param showPartsDesc
*/
- public DDLWork(showPartitionsDesc showPartsDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, showPartitionsDesc showPartsDesc) {
+ this(inputs, outputs);
+
this.showPartsDesc = showPartsDesc;
}
-
+
/**
* @param addPartitionDesc information about the partitions
* we want to add.
*/
- public DDLWork(AddPartitionDesc addPartitionDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, AddPartitionDesc addPartitionDesc) {
+ this(inputs, outputs);
+
this.addPartitionDesc = addPartitionDesc;
}
- public DDLWork(MsckDesc checkDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, MsckDesc checkDesc) {
+ this(inputs, outputs);
+
this.msckDesc = checkDesc;
}
/**
* @param showTblStatusDesc show table status descriptor
*/
- public DDLWork(showTableStatusDesc showTblStatusDesc) {
+ public DDLWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs, showTableStatusDesc showTblStatusDesc) {
+ this(inputs, outputs);
+
this.showTblStatusDesc = showTblStatusDesc;
}
@@ -201,7 +243,7 @@
public showFunctionsDesc getShowFuncsDesc() {
return showFuncsDesc;
}
-
+
/**
* @return the descFuncDesc
*/
@@ -216,7 +258,7 @@
public void setShowFuncsDesc(showFunctionsDesc showFuncsDesc) {
this.showFuncsDesc = showFuncsDesc;
}
-
+
/**
* @param descFuncDesc the showFuncsDesc to set
*/
@@ -297,5 +339,21 @@
public void setShowTblStatusDesc(showTableStatusDesc showTblStatusDesc) {
this.showTblStatusDesc = showTblStatusDesc;
}
-
+
+ public Set<ReadEntity> getInputs() {
+ return inputs;
+ }
+
+ public Set<WriteEntity> getOutputs() {
+ return outputs;
+ }
+
+ public void setInputs(Set<ReadEntity> inputs) {
+ this.inputs = inputs;
+ }
+
+ public void setOutputs(Set<WriteEntity> outputs) {
+ this.outputs = outputs;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/moveWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/moveWork.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/moveWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/moveWork.java Tue Sep 29 01:25:15 2009
@@ -20,6 +20,11 @@
import java.io.*;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import java.util.Set;
+
+
@explain(displayName="Move Operator")
public class moveWork implements Serializable {
private static final long serialVersionUID = 1L;
@@ -28,11 +33,30 @@
private boolean checkFileFormat;
- public moveWork() { }
+ /**
+ * ReadEntitites that are passed to the hooks.
+ */
+ protected Set<ReadEntity> inputs;
+ /**
+ * List of WriteEntities that are passed to the hooks.
+ */
+ protected Set<WriteEntity> outputs;
+
+ public moveWork() {
+ }
+
+ public moveWork(Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
+ this.inputs = inputs;
+ this.outputs = outputs;
+ }
+
public moveWork(
+ Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs,
final loadTableDesc loadTableWork,
final loadFileDesc loadFileWork,
boolean checkFileFormat) {
+ this(inputs, outputs);
this.loadTableWork = loadTableWork;
this.loadFileWork = loadFileWork;
this.checkFileFormat = checkFileFormat;
@@ -44,7 +68,7 @@
public void setLoadTableWork(final loadTableDesc loadTableWork) {
this.loadTableWork = loadTableWork;
}
-
+
@explain(displayName="files")
public loadFileDesc getLoadFileWork() {
return this.loadFileWork;
@@ -52,12 +76,28 @@
public void setLoadFileWork(final loadFileDesc loadFileWork) {
this.loadFileWork=loadFileWork;
}
-
+
public boolean getCheckFileFormat() {
return checkFileFormat;
}
public void setCheckFileFormat(boolean checkFileFormat) {
this.checkFileFormat = checkFileFormat;
}
-
+
+ public Set<ReadEntity> getInputs() {
+ return inputs;
+ }
+
+ public Set<WriteEntity> getOutputs() {
+ return outputs;
+ }
+
+ public void setInputs(Set<ReadEntity> inputs) {
+ this.inputs = inputs;
+ }
+
+ public void setOutputs(Set<WriteEntity> outputs) {
+ this.outputs = outputs;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=819792&r1=819791&r2=819792&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Tue Sep 29 01:25:15 2009
@@ -37,7 +37,7 @@
/**
* SessionState encapsulates common data associated with a session
- *
+ *
* Also provides support for a thread static session object that can
* be accessed from any point in the code to interact with the user
* and to retrieve configuration information
@@ -46,7 +46,7 @@
/**
* current configuration
- */
+ */
protected HiveConf conf;
/**
@@ -55,7 +55,7 @@
protected boolean isSilent;
/*
- * HiveHistory Object
+ * HiveHistory Object
*/
protected HiveHistory hiveHist;
/**
@@ -65,6 +65,11 @@
public InputStream in;
public PrintStream err;
+ /**
+ * type of the command
+ */
+ private String commandType;
+
public HiveConf getConf() { return conf; }
@@ -96,8 +101,8 @@
public String getCmd() {
return (conf.getVar(HiveConf.ConfVars.HIVEQUERYSTRING));
}
-
-
+
+
public String getQueryId() {
return (conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
}
@@ -129,12 +134,12 @@
* session object when switching from one session to another
*/
public static SessionState start(SessionState startSs) {
-
+
tss.set(startSs);
if(StringUtils.isEmpty(startSs.getConf().getVar(HiveConf.ConfVars.HIVESESSIONID))) {
startSs.getConf().setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId());
}
-
+
if (startSs.hiveHist == null){
startSs.hiveHist = new HiveHistory(startSs);
}
@@ -148,7 +153,7 @@
return tss.get();
}
-
+
/**
* get hiveHitsory object which does structured logging
* @return The hive history object
@@ -156,8 +161,8 @@
public HiveHistory getHiveHistory(){
return hiveHist;
}
-
-
+
+
private static String makeSessionId() {
GregorianCalendar gc = new GregorianCalendar();
String userid = System.getProperty("user.name");
@@ -186,10 +191,10 @@
/**
* This class provides helper routines to emit informational and error messages to the user
* and log4j files while obeying the current session's verbosity levels.
- *
+ *
* NEVER write directly to the SessionStates standard output other than to emit result data
* DO use printInfo and printError provided by LogHelper to emit non result data strings
- *
+ *
* It is perfectly acceptable to have global static LogHelper objects (for example - once per module)
* LogHelper always emits info/error to current session as required.
*/
@@ -197,7 +202,7 @@
protected Log LOG;
protected boolean isSilent;
-
+
public LogHelper(Log LOG) {
this(LOG, false);
}
@@ -209,7 +214,7 @@
public PrintStream getOutStream() {
SessionState ss = SessionState.get();
- return ((ss != null) && (ss.out != null)) ? ss.out : System.out;
+ return ((ss != null) && (ss.out != null)) ? ss.out : System.out;
}
public PrintStream getErrStream() {
@@ -289,7 +294,7 @@
return false;
}
}
-
+
public static boolean unregisterJar(String jarsToUnregister) {
LogHelper console = getConsole();
try {
@@ -325,7 +330,7 @@
}
public boolean postHook(Set<String> cur, String s) { return unregisterJar(s); }
}),
-
+
ARCHIVE(new ResourceHook () {
public String preHook(Set<String> cur, String s) { return validateFile(cur, s); }
public boolean postHook(Set<String> cur, String s) { return true; }
@@ -339,14 +344,14 @@
};
public static ResourceType find_resource_type(String s) {
-
+
s = s.trim().toUpperCase();
-
+
try {
return ResourceType.valueOf(s);
} catch (IllegalArgumentException e) {
}
-
+
// try singular
if(s.endsWith("S")) {
s = s.substring(0, s.length()-1);
@@ -414,4 +419,12 @@
resource_map.remove (t);
}
}
+
+ public String getCommandType() {
+ return commandType;
+ }
+
+ public void setCommandType(String commandType) {
+ this.commandType = commandType;
+ }
}