You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/10/21 20:11:18 UTC
svn commit: r706704 [2/23] - in /hadoop/core/trunk: ./ src/contrib/hive/
src/contrib/hive/bin/
src/contrib/hive/cli/src/java/org/apache/hadoop/hive/cli/
src/contrib/hive/common/src/java/org/apache/hadoop/hive/conf/
src/contrib/hive/conf/ src/contrib/hi...
Modified: hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hadoop/core/trunk/src/contrib/hive/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Tue Oct 21 11:11:05 2008
@@ -528,11 +528,24 @@
try {
openTransaction();
dbName = dbName.toLowerCase();
- pattern = "(?i)" + pattern; // add the case insensitivity
- Query q = pm.newQuery("select tableName from org.apache.hadoop.hive.metastore.model.MTable where database.name == dbName && tableName.matches(pattern)");
- q.declareParameters("java.lang.String dbName, java.lang.String pattern");
+ // Take the pattern and split it on the | to get all the composing patterns
+ String [] subpatterns = pattern.trim().split("\\|");
+ String query = "select tableName from org.apache.hadoop.hive.metastore.model.MTable where database.name == dbName && (";
+ boolean first = true;
+ for(String subpattern: subpatterns) {
+ subpattern = "(?i)" + subpattern.replaceAll("\\*", ".*");
+ if (!first) {
+ query = query + " || ";
+ }
+ query = query + " tableName.matches(\"" + subpattern + "\")";
+ first = false;
+ }
+ query = query + ")";
+
+ Query q = pm.newQuery(query);
+ q.declareParameters("java.lang.String dbName");
q.setResult("tableName");
- Collection names = (Collection) q.execute(dbName.trim(), pattern.trim());
+ Collection names = (Collection) q.execute(dbName.trim());
tbls = new ArrayList<String>();
for (Iterator i = names.iterator (); i.hasNext ();) {
tbls.add((String) i.next ());
@@ -817,7 +830,7 @@
LOG.debug("Executing getPartitionNames");
dbName = dbName.toLowerCase();
tableName = tableName.toLowerCase();
- Query q = pm.newQuery("select partitionName from org.apache.hadoop.hive.metastore.model.MPartition where table.database.name == t1 && table.tableName == t2");
+ Query q = pm.newQuery("select partitionName from org.apache.hadoop.hive.metastore.model.MPartition where table.database.name == t1 && table.tableName == t2 order by partitionName asc");
q.declareParameters("java.lang.String t1, java.lang.String t2");
q.setResult("partitionName");
Collection names = (Collection) q.execute(dbName.trim(), tableName.trim());
@@ -847,9 +860,10 @@
Query query = pm.newQuery(MPartition.class, "table.tableName == t1 && table.database.name == t2");
query.declareParameters("java.lang.String t1, java.lang.String t2");
mparts = (List<MPartition>) query.execute(tableName.trim(), dbName.trim());
+ LOG.debug("Done executing query for listMPartitions");
pm.retrieveAll(mparts);
success = commitTransaction();
- LOG.debug("Done e xecuting listMPartitions");
+ LOG.debug("Done retrieving all objects for listMPartitions");
} finally {
if(!success) {
rollbackTransaction();
Modified: hadoop/core/trunk/src/contrib/hive/ql/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/build.xml?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/build.xml (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/build.xml Tue Oct 21 11:11:05 2008
@@ -64,6 +64,13 @@
resultsDirectory="${ql.test.results.dir}/clientpositive" className="TestCliDriver"
logFile="${test.log.dir}/testclidrivergen.log"/>
+ <qtestgen outputDirectory="${test.build.src}/org/apache/hadoop/hive/cli"
+ templatePath="${ql.test.template.dir}" template="TestNegativeCliDriver.vm"
+ queryDirectory="${ql.test.query.dir}/clientnegative"
+ queryFile="${qfile}"
+ resultsDirectory="${ql.test.results.dir}/clientnegative" className="TestNegativeCliDriver"
+ logFile="${test.log.dir}/testnegclidrivergen.log"/>
+
</target>
<uptodate property="grammarBuild.notRequired">
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Context.java Tue Oct 21 11:11:05 2008
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql;
+import java.io.File;
import java.io.DataInput;
import java.io.IOException;
import java.io.FileNotFoundException;
@@ -29,6 +30,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.util.StringUtils;
+import java.util.Random;
public class Context {
private Path resFile;
@@ -38,9 +40,12 @@
private Path[] resDirPaths;
private int resDirFilesNum;
boolean initialized;
+ private String scratchDir;
+ private HiveConf conf;
public Context(HiveConf conf) {
try {
+ this.conf = conf;
fs = FileSystem.get(conf);
initialized = false;
resDir = null;
@@ -50,6 +55,23 @@
}
}
+ public void makeScratchDir() throws Exception {
+ Random rand = new Random();
+ int randomid = Math.abs(rand.nextInt()%rand.nextInt());
+ scratchDir = conf.getVar(HiveConf.ConfVars.SCRATCHDIR) + File.separator + randomid;
+ Path tmpdir = new Path(scratchDir);
+ fs.mkdirs(tmpdir);
+ }
+
+ public String getScratchDir() {
+ return scratchDir;
+ }
+
+ public void removeScratchDir() throws Exception {
+ Path tmpdir = new Path(scratchDir);
+ fs.delete(tmpdir, true);
+ }
+
/**
* @return the resFile
*/
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Oct 21 11:11:05 2008
@@ -34,10 +34,8 @@
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.serde.ByteStream;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -55,13 +53,14 @@
private DataInput resStream;
private LogHelper console;
private Context ctx;
+ private BaseSemanticAnalyzer sem;
public int countJobs(List<Task<? extends Serializable>> tasks) {
if (tasks == null)
return 0;
int jobs = 0;
for (Task<? extends Serializable> task: tasks) {
- if ((task instanceof ExecDriver) || (task instanceof MapRedTask)) {
+ if (task.isMapRedTask()) {
jobs++;
}
jobs += countJobs(task.getChildTasks());
@@ -69,6 +68,22 @@
return jobs;
}
+ public boolean hasReduceTasks(List<Task<? extends Serializable>> tasks) {
+ if (tasks == null)
+ return false;
+
+ boolean hasReduce = false;
+ for (Task<? extends Serializable> task: tasks) {
+ if (task.hasReduce()) {
+ return true;
+ }
+
+ hasReduce = (hasReduce || hasReduceTasks(task.getChildTasks()));
+ }
+ return hasReduce;
+ }
+
+
/**
* for backwards compatibility with current tests
*/
@@ -97,11 +112,10 @@
try {
TaskFactory.resetId();
-
- BaseSemanticAnalyzer sem;
LOG.info("Starting command: " + command);
ctx.clear();
+ ctx.makeScratchDir();
resStream = null;
pd = new ParseDriver();
@@ -122,12 +136,18 @@
console.printInfo("Total MapReduce jobs = " + jobs);
}
-
+ boolean hasReduce = hasReduceTasks(sem.getRootTasks());
+ if (hasReduce) {
+ console.printInfo("Number of reducers = " + conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS));
+ console.printInfo("In order to change numer of reducers use:");
+ console.printInfo(" set mapred.reduce.tasks = <number>");
+ }
+
String jobname = Utilities.abbreviate(command, maxlen - 6);
int curJob = 0;
for(Task<? extends Serializable> rootTask: sem.getRootTasks()) {
// assumption that only top level tasks are map-reduce tasks
- if ((rootTask instanceof ExecDriver) || (rootTask instanceof MapRedTask)) {
+ if (rootTask.isMapRedTask()) {
curJob ++;
if(noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + curJob + "/" + jobs + ")");
@@ -175,10 +195,10 @@
}
}
} catch (SemanticException e) {
- console.printError("FAILED: Error in semantic analysis: " + e.getMessage());
+ console.printError("FAILED: Error in semantic analysis: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (10);
} catch (ParseException e) {
- console.printError("FAILED: Parse Error: " + e.getMessage());
+ console.printError("FAILED: Parse Error: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
return (11);
} catch (Exception e) {
// Has to use full name to make sure it does not conflict with org.apache.commons.lang.StringUtils
@@ -196,14 +216,23 @@
}
- public boolean getResults(Vector<Vector<String>> res)
+ public boolean getResults(Vector<String> res)
{
+ if (sem.getFetchTask() != null) {
+ if (!sem.getFetchTaskInit()) {
+ sem.setFetchTaskInit(true);
+ sem.getFetchTask().initialize(conf);
+ }
+ boolean ret = sem.getFetchTask().fetch(res);
+ return ret;
+ }
+
if (resStream == null)
resStream = ctx.getStream();
if (resStream == null) return false;
int numRows = 0;
- Vector<String> row = new Vector<String>();
+ String row = null;
while (numRows < MAX_ROWS)
{
@@ -215,47 +244,45 @@
return false;
}
- String col = null;
bos.reset();
- Utilities.streamStatus ss = Utilities.streamStatus.NORMAL;
+ Utilities.streamStatus ss;
try
{
ss = Utilities.readColumn(resStream, bos);
if (bos.getCount() > 0)
- col = new String(bos.getData(), 0, bos.getCount(), "UTF-8");
- else if (ss == Utilities.streamStatus.NORMAL)
- col = Utilities.NSTR;
+ row = new String(bos.getData(), 0, bos.getCount(), "UTF-8");
+ else if (ss == Utilities.streamStatus.TERMINATED)
+ row = new String();
+
+ if (row != null) {
+ numRows++;
+ res.add(row);
+ }
} catch (IOException e) {
console.printError("FAILED: Unexpected IO exception : " + e.getMessage());
res = null;
return false;
}
-
- if ((ss == Utilities.streamStatus.EOF) ||
- (ss == Utilities.streamStatus.TERMINATED))
- {
- if (col != null)
- row.add(col.equals(Utilities.nullStringStorage) ? null : col);
- else if (row.size() != 0)
- row.add(null);
-
- numRows++;
- res.add(row);
- row = new Vector<String>();
- col = null;
- if (ss == Utilities.streamStatus.EOF)
- resStream = ctx.getStream();
- }
- else if (ss == Utilities.streamStatus.NORMAL)
- {
- row.add(col.equals(Utilities.nullStringStorage) ? null : col);
- col = null;
- }
- else
- assert false;
+ if (ss == Utilities.streamStatus.EOF)
+ resStream = ctx.getStream();
}
return true;
}
+
+ public int close() {
+ try {
+ // Delete the scratch directory from the context
+ ctx.removeScratchDir();
+ ctx.clear();
+ }
+ catch (Exception e) {
+ console.printError("FAILED: Unknown exception : " + e.getMessage(),
+ "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return(13);
+ }
+
+ return(0);
+ }
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java Tue Oct 21 11:11:05 2008
@@ -48,16 +48,14 @@
public ColumnInfo() {
}
- public ColumnInfo(String internalName, TypeInfo type, boolean isVirtual) {
+ public ColumnInfo(String internalName, TypeInfo type) {
this.internalName = internalName;
this.type = type;
- this.isVirtual = isVirtual;
}
- public ColumnInfo(String internalName, Class type, boolean isVirtual) {
+ public ColumnInfo(String internalName, Class type) {
this.internalName = internalName;
this.type = TypeInfoFactory.getPrimitiveTypeInfo(type);
- this.isVirtual = isVirtual;
}
public TypeInfo getType() {
@@ -67,10 +65,6 @@
public String getInternalName() {
return internalName;
}
-
- public boolean getIsVirtual() {
- return isVirtual;
- }
public void setType(TypeInfo type) {
this.type = type;
@@ -79,9 +73,4 @@
public void setInternalName(String internalName) {
this.internalName = internalName;
}
-
- public void setIsVirtual(boolean isVirtual) {
- this.isVirtual = isVirtual;
- }
-
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Tue Oct 21 11:11:05 2008
@@ -22,39 +22,46 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.alterTableDesc;
import org.apache.hadoop.hive.ql.plan.createTableDesc;
import org.apache.hadoop.hive.ql.plan.descTableDesc;
import org.apache.hadoop.hive.ql.plan.dropTableDesc;
+import org.apache.hadoop.hive.ql.plan.showPartitionsDesc;
import org.apache.hadoop.hive.ql.plan.showTablesDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde.thrift.columnsetSerDe;
+import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.serde.Constants;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
import com.facebook.thrift.TException;
/**
@@ -66,7 +73,7 @@
static final private Log LOG = LogFactory.getLog("hive.ql.exec.DDLTask");
transient HiveConf conf;
- static final private int separator = Utilities.ctrlaCode;
+ static final private int separator = Utilities.tabCode;
static final private int terminator = Utilities.newLineCode;
public void initialize(HiveConf conf) {
@@ -98,14 +105,45 @@
tbl.setPartCols(crtTbl.getPartCols());
if (crtTbl.getNumBuckets() != -1)
tblStorDesc.setNumBuckets(crtTbl.getNumBuckets());
- if (crtTbl.getFieldDelim() != null)
- tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
- if (crtTbl.getCollItemDelim() != null)
- tbl.setSerdeParam(Constants.COLLECTION_DELIM, crtTbl.getCollItemDelim());
- if (crtTbl.getMapKeyDelim() != null)
- tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
- if (crtTbl.getLineDelim() != null)
- tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
+
+ if (crtTbl.getSerName() != null) {
+ tbl.setSerializationLib(crtTbl.getSerName());
+ if (crtTbl.getMapProp() != null) {
+ Iterator<Map.Entry<String, String>> iter = crtTbl.getMapProp().entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> m = (Map.Entry)iter.next();
+ tbl.setSerdeParam(m.getKey(), m.getValue());
+ }
+ }
+ }
+ else
+ {
+ if (crtTbl.getFieldDelim() != null)
+ {
+ tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
+ tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim());
+ }
+
+ if (crtTbl.getCollItemDelim() != null)
+ tbl.setSerdeParam(Constants.COLLECTION_DELIM, crtTbl.getCollItemDelim());
+ if (crtTbl.getMapKeyDelim() != null)
+ tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
+ if (crtTbl.getLineDelim() != null)
+ tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
+ }
+
+ /**
+ * For now, if the user specifies either the map or the collections delimiter, we infer the
+ * table to DynamicSerDe/TCTLSeparatedProtocol.
+ * In the future, we should infer this for any delimiters specified, but this will break older
+ * hive tables, so not for now.
+ */
+ if (crtTbl.getCollItemDelim() != null || crtTbl.getMapKeyDelim() != null) {
+ tbl.setSerializationLib(org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe.class.getName());
+ tbl.setSerdeParam(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol.class.getName());
+ }
+
+
if (crtTbl.getComment() != null)
tbl.setProperty("comment", crtTbl.getComment());
if (crtTbl.getLocation() != null)
@@ -157,6 +195,11 @@
tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE");
}
}
+
+ // set owner, create_time etc
+ tbl.setOwner(System.getProperty("user.name"));
+ // set create time
+ tbl.getTTable().setCreateTime((int) (System.currentTimeMillis()/1000));
// create the table
db.createTable(tbl);
@@ -165,8 +208,30 @@
dropTableDesc dropTbl = work.getDropTblDesc();
if (dropTbl != null) {
- // drop the table
- db.dropTable(dropTbl.getTableName());
+ if(dropTbl.getPartSpecs() == null) {
+ // drop the table
+ db.dropTable(dropTbl.getTableName());
+ } else {
+ // drop partitions in the list
+ Table tbl = db.getTable(dropTbl.getTableName());
+ List<Partition> parts = new ArrayList<Partition>();
+ for(HashMap<String, String> partSpec : dropTbl.getPartSpecs()) {
+ Partition part = db.getPartition(tbl, partSpec, false);
+ if(part == null) {
+ console.printInfo("Partition " + partSpec + " does not exist.");
+ } else {
+ parts.add(part);
+ }
+ }
+ // drop all existing partitions from the list
+ for (Partition partition : parts) {
+ console.printInfo("Dropping the partition " + partition.getName());
+ db.dropPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME,
+ dropTbl.getTableName(),
+ partition.getValues(),
+ true); //drop data for the partition
+ }
+ }
return 0;
}
@@ -174,10 +239,55 @@
if (alterTbl != null) {
// alter the table
Table tbl = db.getTable(alterTbl.getOldName());
- if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME)
- tbl.getTTable().setTableName(alterTbl.getNewName());
- else
- tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
+ if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME)
+ tbl.getTTable().setTableName(alterTbl.getNewName());
+ else if(alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDCOLS) {
+ List<FieldSchema> newCols = alterTbl.getNewCols();
+ List<FieldSchema> oldCols = tbl.getCols();
+ if(tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
+ console.printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
+ tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+ tbl.getTTable().getSd().setCols(newCols);
+ }
+ else {
+ // make sure the columns does not already exist
+ Iterator<FieldSchema> iterNewCols = newCols.iterator();
+ while (iterNewCols.hasNext()) {
+ FieldSchema newCol = iterNewCols.next();
+ String newColName = newCol.getName();
+ Iterator<FieldSchema> iterOldCols = oldCols.iterator();
+ while (iterOldCols.hasNext()) {
+ String oldColName = iterOldCols.next().getName();
+ if (oldColName.equalsIgnoreCase(newColName)) {
+ console.printError("Column '" + newColName + "' exists");
+ return 1;
+ }
+ }
+ oldCols.add(newCol);
+ }
+ tbl.getTTable().getSd().setCols(oldCols);
+ }
+ }
+ else if(alterTbl.getOp() == alterTableDesc.alterTableTypes.REPLACECOLS) {
+ // change SerDe to MetadataTypedColumnsetSerDe if it is columnsetSerDe
+ if(tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
+ console.printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
+ tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+ }
+ else if(!tbl.getSerializationLib().equals(MetadataTypedColumnsetSerDe.class.getName())) {
+ console.printError("Replace columns is not supported for this table. SerDe may be incompatible.");
+ return 1;
+ }
+ tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
+ }
+ else {
+ console.printError("Unsupported Alter commnad");
+ return 1;
+ }
+
+ // set last modified by properties
+ tbl.setProperty("last_modified_by", System.getProperty("user.name"));
+ tbl.setProperty("last_modified_time", Long.toString(System.currentTimeMillis()/1000));
try {
db.alterTable(alterTbl.getOldName(), tbl);
@@ -194,17 +304,46 @@
descTableDesc descTbl = work.getDescTblDesc();
if (descTbl != null) {
- boolean found = true;
-
+ // describe the table - populate the output stream
+ Table tbl = db.getTable(descTbl.getTableName(), false);
+ Partition part = null;
try {
- // describe the table - populate the output stream
- Table tbl = db.getTable(descTbl.getTableName());
-
+ if(tbl == null) {
+ DataOutput outStream = (DataOutput)fs.open(descTbl.getResFile());
+ String errMsg = "Table " + descTbl.getTableName() + " does not exist";
+ outStream.write(errMsg.getBytes("UTF-8"));
+ ((FSDataOutputStream)outStream).close();
+ return 0;
+ }
+ if(descTbl.getPartSpec() != null) {
+ part = db.getPartition(tbl, descTbl.getPartSpec(), false);
+ if(part == null) {
+ DataOutput outStream = (DataOutput)fs.open(descTbl.getResFile());
+ String errMsg = "Partition " + descTbl.getPartSpec() + " for table " + descTbl.getTableName() + " does not exist";
+ outStream.write(errMsg.getBytes("UTF-8"));
+ ((FSDataOutputStream)outStream).close();
+ return 0;
+ }
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("describe table: " + StringUtils.stringifyException(e));
+ return 1;
+ }
+ catch (IOException e) {
+ LOG.info("describe table: " + StringUtils.stringifyException(e));
+ return 1;
+ }
+
+ try {
+
LOG.info("DDLTask: got data for " + tbl.getName());
// write the results in the file
DataOutput os = (DataOutput)fs.create(descTbl.getResFile());
List<FieldSchema> cols = tbl.getCols();
+ if(part != null) {
+ cols = part.getTPartition().getSd().getCols();
+ }
Iterator<FieldSchema> iterCols = cols.iterator();
boolean firstCol = true;
while (iterCols.hasNext())
@@ -239,6 +378,19 @@
os.write(col.getComment().getBytes("UTF-8"));
}
}
+
+ // if extended desc table then show the complete details of the table
+ if(descTbl.isExt()) {
+ if(part != null) {
+ // show partition informatio
+ os.write("\n\nDetailed Partition Information:\n".getBytes("UTF-8"));
+ os.write(part.getTPartition().toString().getBytes("UTF-8"));
+ } else {
+ os.write("\nDetailed Table Information:\n".getBytes("UTF-8"));
+ os.write(tbl.getTTable().toString().getBytes("UTF-8"));
+ }
+ }
+
LOG.info("DDLTask: written data for " + tbl.getName());
((FSDataOutputStream)os).close();
@@ -246,30 +398,10 @@
LOG.info("describe table: " + StringUtils.stringifyException(e));
return 1;
}
- catch (InvalidTableException e) {
- found = false;
- }
catch (IOException e) {
LOG.info("describe table: " + StringUtils.stringifyException(e));
return 1;
}
-
- if (!found)
- {
- try {
- DataOutput outStream = (DataOutput)fs.open(descTbl.getResFile());
- String errMsg = "Table " + descTbl.getTableName() + " does not exist";
- outStream.write(errMsg.getBytes("UTF-8"));
- ((FSDataOutputStream)outStream).close();
- } catch (FileNotFoundException e) {
- LOG.info("describe table: " + StringUtils.stringifyException(e));
- return 1;
- }
- catch (IOException e) {
- LOG.info("describe table: " + StringUtils.stringifyException(e));
- return 1;
- }
- }
return 0;
}
@@ -310,7 +442,52 @@
return 0;
}
- } catch (HiveException e) {
+ showPartitionsDesc showParts = work.getShowPartsDesc();
+ if (showParts != null) {
+ // get the partitions for the table and populate the output
+ String tabName = showParts.getTabName();
+ Table tbl = null;
+ List<String> parts = null;
+
+ tbl = db.getTable(tabName);
+
+ if (!tbl.isPartitioned()) {
+ console.printError("Table " + tabName + " is not a partitioned table");
+ return 1;
+ }
+
+ parts = db.getPartitionNames(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), Short.MAX_VALUE);
+
+ // write the results in the file
+ try {
+ DataOutput outStream = (DataOutput)fs.create(showParts.getResFile());
+ Iterator<String> iterParts = parts.iterator();
+ boolean firstCol = true;
+ while (iterParts.hasNext())
+ {
+ if (!firstCol)
+ outStream.write(terminator);
+ outStream.write(iterParts.next().getBytes("UTF-8"));
+ firstCol = false;
+ }
+ ((FSDataOutputStream)outStream).close();
+ } catch (FileNotFoundException e) {
+ LOG.info("show partitions: " + StringUtils.stringifyException(e));
+ return 1;
+ } catch (IOException e) {
+ LOG.info("show partitions: " + StringUtils.stringifyException(e));
+ return 1;
+ }
+ return 0;
+ }
+
+ }
+ catch (InvalidTableException e) {
+ console.printError("Table " + e.getTableName() + " does not exist");
+ LOG.debug(StringUtils.stringifyException(e));
+ return 1;
+ }
+ catch (HiveException e) {
console.printError("FAILED: Error in metadata: " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
LOG.debug(StringUtils.stringifyException(e));
return 1;
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Tue Oct 21 11:11:05 2008
@@ -27,12 +27,13 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -42,6 +43,7 @@
public class ExecDriver extends Task<mapredWork> implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final long LOAD_PER_REDUCER = 1024 * 1024 * 1024;
transient protected JobConf job;
@@ -80,7 +82,48 @@
LOG.warn("Number of reduce tasks not specified. Defaulting to jobconf value of: " + job.getNumReduceTasks());
work.setNumReduceTasks(job.getNumReduceTasks());
}
- }
+ }
+ else
+ LOG.info("Number of reduce tasks determined at compile : " + work.getNumReduceTasks());
+ }
+
+ /**
+ * A list of the currently running jobs spawned in this Hive instance that is used
+ * to kill all running jobs in the event of an unexpected shutdown - i.e., the JVM shuts
+ * down while there are still jobs running.
+ */
+ public static HashMap<String,String> runningJobKillURIs = new HashMap<String, String> ();
+
+
+ /**
+ * In Hive, when the user control-c's the command line, any running jobs spawned from that command
+ * line are best-effort killed.
+ *
+ * This static constructor registers a shutdown thread to iterate over all the running job
+ * kill URLs and do a get on them.
+ *
+ */
+ static {
+ if(new org.apache.hadoop.conf.Configuration().getBoolean("webinterface.private.actions", false)) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ for(Iterator<String> elems = runningJobKillURIs.values().iterator(); elems.hasNext() ; ) {
+ String uri = elems.next();
+ try {
+ System.err.println("killing job with: " + uri);
+ int retCode = ((java.net.HttpURLConnection)new java.net.URL(uri).openConnection()).getResponseCode();
+ if(retCode != 200) {
+ System.err.println("Got an error trying to kill job with URI: " + uri + " = " + retCode);
+ }
+ } catch(Exception e) {
+ System.err.println("trying to kill job, caught: " + e);
+ // do nothing
+ }
+ }
+ }
+ }
+ );
+ }
}
/**
@@ -123,6 +166,33 @@
return rj;
}
+ private void inferNumReducers() throws Exception {
+ FileSystem fs = FileSystem.get(job);
+
+ if ((work.getReducer() != null) && (work.getInferNumReducers() == true)) {
+ long inpSz = 0;
+
+ // based on the input size - estimate the number of reducers
+ Path[] inputPaths = FileInputFormat.getInputPaths(job);
+
+ for (Path inputP : inputPaths) {
+ if (fs.exists(inputP)) {
+ FileStatus[] fStats = fs.listStatus(inputP);
+ for (FileStatus fStat:fStats)
+ inpSz += fStat.getLen();
+ }
+ }
+
+
+ int newRed = (int)(inpSz / LOAD_PER_REDUCER) + 1;
+ if (newRed < work.getNumReduceTasks().intValue())
+ {
+ LOG.warn("Number of reduce tasks inferred based on input size to : " + newRed);
+ work.setNumReduceTasks(Integer.valueOf(newRed));
+ }
+ }
+ }
+
/**
* Execute a query plan using Hadoop
*/
@@ -141,24 +211,24 @@
LOG.info("Adding input file " + onefile);
FileInputFormat.addInputPaths(job, onefile);
}
-
+
String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
String jobScratchDir = hiveScratchDir + Utilities.randGen.nextInt();
FileOutputFormat.setOutputPath(job, new Path(jobScratchDir));
job.setMapperClass(ExecMapper.class);
-
+
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(HiveKey.class);
-
+
job.setNumReduceTasks(work.getNumReduceTasks().intValue());
job.setReducerClass(ExecReducer.class);
-
+
job.setInputFormat(org.apache.hadoop.hive.ql.io.HiveInputFormat.class);
-
+
// No-Op - we don't really write anything here ..
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
-
+
String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
if (StringUtils.isNotBlank(auxJars)) {
LOG.info("adding libjars: " + auxJars);
@@ -168,15 +238,41 @@
int returnVal = 0;
FileSystem fs = null;
RunningJob rj = null;
-
+
try {
fs = FileSystem.get(job);
+
+ // if the input is empty exit gracefully
+ Path[] inputPaths = FileInputFormat.getInputPaths(job);
+ boolean emptyInput = true;
+ for (Path inputP : inputPaths) {
+ if(!fs.exists(inputP))
+ continue;
+
+ FileStatus[] fStats = fs.listStatus(inputP);
+ for (FileStatus fStat:fStats) {
+ if (fStat.getLen() > 0) {
+ emptyInput = false;
+ break;
+ }
+ }
+ }
+
+ if (emptyInput) {
+ console.printInfo("Job need not be submitted: no output: Success");
+ return 0;
+ }
+
+ inferNumReducers();
JobClient jc = new JobClient(job);
rj = jc.submitJob(job);
+ // add to list of running jobs so in case of abnormal shutdown can kill it.
+ runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill");
+
jobInfo(rj);
rj = jobProgress(jc, rj);
-
+
String statusMesg = "Ended Job = " + rj.getJobID();
if(!rj.isSuccessful()) {
statusMesg += " with errors";
@@ -203,6 +299,7 @@
if(returnVal != 0 && rj != null) {
rj.killJob();
}
+ runningJobKillURIs.remove(rj.getJobID());
} catch (Exception e) {}
}
return (returnVal);
@@ -297,5 +394,16 @@
}
return sb.toString();
}
+
+ @Override
+ public boolean isMapRedTask() {
+ return true;
+ }
+
+ @Override
+ public boolean hasReduce() {
+ mapredWork w = getWork();
+ return w.getReducer() != null;
+ }
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Tue Oct 21 11:11:05 2008
@@ -37,7 +37,8 @@
private boolean abort = false;
private Reporter rp;
public static final Log l4j = LogFactory.getLog("ExecMapper");
-
+ private static boolean done;
+
public void configure(JobConf job) {
jc = job;
mapredWork mrwork = Utilities.getMapRedWork(job);
@@ -63,8 +64,11 @@
}
try {
- // Since there is no concept of a group, we don't invoke startGroup/endGroup for a mapper
- mo.process((Writable)value);
+ if (mo.getDone())
+ done = true;
+ else
+ // Since there is no concept of a group, we don't invoke startGroup/endGroup for a mapper
+ mo.process((Writable)value);
} catch (HiveException e) {
abort = true;
e.printStackTrace();
@@ -73,6 +77,19 @@
}
public void close() {
+ // No row was processed
+ if(oc == null) {
+ try {
+ l4j.trace("Close called no row");
+ mo.initialize(jc);
+ rp = null;
+ } catch (HiveException e) {
+ abort = true;
+ e.printStackTrace();
+ throw new RuntimeException ("Map operator close failed during initialize", e);
+ }
+ }
+
// detecting failed executions by exceptions thrown by the operator tree
// ideally hadoop should let us know whether map execution failed or not
try {
@@ -89,6 +106,10 @@
}
}
+ public static boolean getDone() {
+ return done;
+ }
+
public static class reportStats implements Operator.OperatorFunc {
Reporter rp;
public reportStats (Reporter rp) {
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Tue Oct 21 11:11:05 2008
@@ -183,6 +183,20 @@
}
public void close() {
+
+ // No row was processed
+ if(oc == null) {
+ try {
+ l4j.trace("Close called no row");
+ reducer.initialize(jc);
+ rp = null;
+ } catch (HiveException e) {
+ abort = true;
+ e.printStackTrace();
+ throw new RuntimeException ("Reduce operator close failed during initialize", e);
+ }
+ }
+
try {
if (groupKey != null) {
// If a operator wants to do some work at the end of a group
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Tue Oct 21 11:11:05 2008
@@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Arrays;
+import java.util.Comparator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.plan.explain;
@@ -183,7 +185,10 @@
}
// We look at all methods that generate values for explain
- for(Method m: work.getClass().getMethods()) {
+ Method[] methods = work.getClass().getMethods();
+ Arrays.sort(methods, new MethodComparator());
+
+ for(Method m: methods) {
int prop_indents = indent+2;
note = m.getAnnotation(explain.class);
@@ -330,4 +335,13 @@
new HashSet<Task<? extends Serializable>>(), indent+2);
}
}
+
+ public static class MethodComparator implements Comparator {
+ public int compare(Object o1, Object o2) {
+ Method m1 = (Method)o1;
+ Method m2 = (Method)o2;
+ return m1.getName().compareTo(m2.getName());
+ }
+ }
+
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java Tue Oct 21 11:11:05 2008
@@ -18,11 +18,16 @@
package org.apache.hadoop.hive.ql.exec;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeFieldDesc;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -31,9 +36,10 @@
protected exprNodeFieldDesc desc;
transient ExprNodeEvaluator leftEvaluator;
transient InspectableObject leftInspectableObject;
- transient StructObjectInspector cachedLeftObjectInspector;
+ transient StructObjectInspector structObjectInspector;
transient StructField field;
- transient ObjectInspector fieldObjectInspector;
+ transient ObjectInspector structFieldObjectInspector;
+ transient ObjectInspector resultObjectInspector;
public ExprNodeFieldEvaluator(exprNodeFieldDesc desc) {
this.desc = desc;
@@ -50,14 +56,19 @@
leftEvaluator.evaluate(row, rowInspector, leftInspectableObject);
if (field == null) {
- cachedLeftObjectInspector = (StructObjectInspector)leftInspectableObject.oi;
- field = cachedLeftObjectInspector.getStructFieldRef(desc.getFieldName());
- fieldObjectInspector = field.getFieldObjectInspector();
+ evaluateInspector(rowInspector);
+ }
+ result.oi = resultObjectInspector;
+ if (desc.getIsList()) {
+ List<?> list = ((ListObjectInspector)leftInspectableObject.oi).getList(leftInspectableObject.o);
+ List<Object> r = new ArrayList<Object>(list.size());
+ for(int i=0; i<list.size(); i++) {
+ r.add(structObjectInspector.getStructFieldData(list.get(i), field));
+ }
+ result.o = r;
} else {
- assert(cachedLeftObjectInspector == leftInspectableObject.oi);
+ result.o = structObjectInspector.getStructFieldData(leftInspectableObject.o, field);
}
- result.oi = fieldObjectInspector;
- result.o = cachedLeftObjectInspector.getStructFieldData(leftInspectableObject.o, field);
}
public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
@@ -66,13 +77,20 @@
// is different from the previous row
leftInspectableObject.oi = leftEvaluator.evaluateInspector(rowInspector);
if (field == null) {
- cachedLeftObjectInspector = (StructObjectInspector)leftInspectableObject.oi;
- field = cachedLeftObjectInspector.getStructFieldRef(desc.getFieldName());
- fieldObjectInspector = field.getFieldObjectInspector();
+ if (desc.getIsList()) {
+ structObjectInspector = (StructObjectInspector)((ListObjectInspector)leftInspectableObject.oi).getListElementObjectInspector();
+ } else {
+ structObjectInspector = (StructObjectInspector)leftInspectableObject.oi;
+ }
+ field = structObjectInspector.getStructFieldRef(desc.getFieldName());
+ structFieldObjectInspector = field.getFieldObjectInspector();
+ }
+ if (desc.getIsList()) {
+ resultObjectInspector = ObjectInspectorFactory.getStandardListObjectInspector(structFieldObjectInspector);
} else {
- assert(cachedLeftObjectInspector == leftInspectableObject.oi);
+ resultObjectInspector = structFieldObjectInspector;
}
- return fieldObjectInspector;
+ return resultObjectInspector;
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java Tue Oct 21 11:11:05 2008
@@ -22,7 +22,9 @@
import org.apache.hadoop.hive.ql.plan.exprNodeIndexDesc;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
public class ExprNodeIndexEvaluator extends ExprNodeEvaluator {
@@ -44,16 +46,38 @@
assert(result != null);
mainEvaluator.evaluate(row, rowInspector, mainInspectableObject);
indexEvaluator.evaluate(row, rowInspector, indexInspectableObject);
- int index = ((Number)indexInspectableObject.o).intValue();
+
+ if (mainInspectableObject.oi.getCategory() == Category.LIST) {
+ int index = ((Number)indexInspectableObject.o).intValue();
- ListObjectInspector loi = (ListObjectInspector)mainInspectableObject.oi;
- result.oi = loi.getListElementObjectInspector();
- result.o = loi.getListElement(mainInspectableObject.o, index);
+ ListObjectInspector loi = (ListObjectInspector)mainInspectableObject.oi;
+ result.oi = loi.getListElementObjectInspector();
+ result.o = loi.getListElement(mainInspectableObject.o, index);
+ }
+ else if (mainInspectableObject.oi.getCategory() == Category.MAP) {
+ MapObjectInspector moi = (MapObjectInspector)mainInspectableObject.oi;
+ result.oi = moi.getMapValueObjectInspector();
+ result.o = moi.getMapValueElement(mainInspectableObject.o, indexInspectableObject.o);
+ }
+ else {
+ // Should never happen because we checked this in SemanticAnalyzer.getXpathOrFuncExprNodeDesc
+ throw new RuntimeException("Hive 2 Internal error: cannot evaluate index expression on "
+ + mainInspectableObject.oi.getTypeName());
+ }
}
public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
throws HiveException {
- return ((ListObjectInspector)mainEvaluator.evaluateInspector(rowInspector)).getListElementObjectInspector();
+ ObjectInspector mainInspector = mainEvaluator.evaluateInspector(rowInspector);
+ if (mainInspector.getCategory() == Category.LIST) {
+ return ((ListObjectInspector)mainInspector).getListElementObjectInspector();
+ } else if (mainInspector.getCategory() == Category.MAP) {
+ return ((MapObjectInspector)mainInspector).getMapValueObjectInspector();
+ } else {
+ // Should never happen because we checked this in SemanticAnalyzer.getXpathOrFuncExprNodeDesc
+ throw new RuntimeException("Hive 2 Internal error: cannot evaluate index expression on "
+ + mainInspector.getTypeName());
+ }
}
}
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=706704&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Tue Oct 21 11:11:05 2008
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * FetchTask implementation
+ **/
+public class FetchTask extends Task<fetchWork> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ static final private int MAX_ROWS = 100;
+
+ public void initialize (HiveConf conf) {
+ super.initialize(conf);
+ splitNum = 0;
+ currRecReader = null;
+
+ try {
+ // Create a file system handle
+ fs = FileSystem.get(conf);
+ serde = work.getDeserializerClass().newInstance();
+ serde.initialize(null, work.getSchema());
+ job = new JobConf(conf, ExecDriver.class);
+ Path inputP = work.getSrcDir();
+ if(!fs.exists(inputP)) {
+ empty = true;
+ return;
+ }
+
+ empty = true;
+ FileStatus[] fStats = fs.listStatus(inputP);
+ for (FileStatus fStat:fStats) {
+ if (fStat.getLen() > 0) {
+ empty = false;
+ break;
+ }
+ }
+
+ if (empty)
+ return;
+
+ FileInputFormat.setInputPaths(job, inputP);
+ inputFormat = getInputFormatFromCache(work.getInputFormatClass(), job);
+ inputSplits = inputFormat.getSplits(job, 1);
+ mSerde = new MetadataTypedColumnsetSerDe();
+ Properties mSerdeProp = new Properties();
+ mSerdeProp.put(Constants.SERIALIZATION_FORMAT, "" + Utilities.tabCode);
+ mSerdeProp.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+ mSerde.initialize(null, mSerdeProp);
+ totalRows = 0;
+ } catch (Exception e) {
+ // Bail out ungracefully - we should never hit
+ // this here - but would have hit it in SemanticAnalyzer
+ LOG.error(StringUtils.stringifyException(e));
+ throw new RuntimeException (e);
+ }
+ }
+
+ public int execute() {
+ assert false;
+ return 0;
+ }
+
+ /**
+ * A cache of InputFormat instances.
+ */
+ private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats =
+ new HashMap<Class, InputFormat<WritableComparable, Writable>>();
+
+ static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass, Configuration conf) throws IOException {
+ if (!inputFormats.containsKey(inputFormatClass)) {
+ try {
+ InputFormat<WritableComparable, Writable> newInstance =
+ (InputFormat<WritableComparable, Writable>)ReflectionUtils.newInstance(inputFormatClass, conf);
+ inputFormats.put(inputFormatClass, newInstance);
+ } catch (Exception e) {
+ throw new IOException("Cannot create an instance of InputFormat class " + inputFormatClass.getName()
+ + " as specified in mapredWork!");
+ }
+ }
+ return inputFormats.get(inputFormatClass);
+ }
+
+ private int splitNum;
+ private FileSystem fs;
+ private RecordReader<WritableComparable, Writable> currRecReader;
+ private InputSplit[] inputSplits;
+ private InputFormat inputFormat;
+ private JobConf job;
+ private WritableComparable key;
+ private Writable value;
+ private Deserializer serde;
+ private MetadataTypedColumnsetSerDe mSerde;
+ private int totalRows;
+ private boolean empty;
+
+ private RecordReader<WritableComparable, Writable> getRecordReader() throws Exception {
+ if (splitNum >= inputSplits.length)
+ return null;
+ currRecReader = inputFormat.getRecordReader(inputSplits[splitNum++], job, Reporter.NULL);
+ key = currRecReader.createKey();
+ value = currRecReader.createValue();
+ return currRecReader;
+ }
+
+ public boolean fetch(Vector<String> res) {
+ try {
+ if (empty)
+ return false;
+
+ int numRows = 0;
+ int rowsRet = MAX_ROWS;
+ if ((work.getLimit() >= 0) && ((work.getLimit() - totalRows) < rowsRet))
+ rowsRet = work.getLimit() - totalRows;
+ if (rowsRet <= 0) {
+ if (currRecReader != null)
+ currRecReader.close();
+ return false;
+ }
+
+ while (numRows < rowsRet) {
+ if (currRecReader == null) {
+ currRecReader = getRecordReader();
+ if (currRecReader == null) {
+ if (numRows == 0)
+ return false;
+ totalRows += numRows;
+ return true;
+ }
+ }
+ boolean ret = currRecReader.next(key, value);
+ if (ret) {
+ Object obj = serde.deserialize(value);
+ res.add(((Text)mSerde.serialize(obj, serde.getObjectInspector())).toString());
+ numRows++;
+ }
+ else {
+ currRecReader.close();
+ currRecReader = getRecordReader();
+ if (currRecReader == null) {
+ if (numRows == 0)
+ return false;
+ totalRows += numRows;
+ return true;
+ }
+ else {
+ key = currRecReader.createKey();
+ value = currRecReader.createValue();
+ }
+ }
+ }
+ totalRows += numRows;
+ return true;
+ }
+ catch (Exception e) {
+ console.printError("Failed with exception " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
+ return false;
+ }
+ }
+}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Oct 21 11:11:05 2008
@@ -28,6 +28,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -101,12 +102,27 @@
if(isCompressed) {
finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf) + ".gz");
}
+ String rowSeparatorString = conf.getTableInfo().getProperties().getProperty(Constants.LINE_DELIM, "\n");
+ int rowSeparator = 0;
+ try {
+ rowSeparator = Byte.parseByte(rowSeparatorString);
+ } catch (NumberFormatException e) {
+ rowSeparator = rowSeparatorString.charAt(0);
+ }
+ final int finalRowSeparator = rowSeparator;
final OutputStream outStream = Utilities.createCompressedStream(jc, fs.create(outPath));
outWriter = new RecordWriter () {
public void write(Writable r) throws IOException {
- Text tr = (Text)r;
- outStream.write(tr.getBytes(), 0, tr.getLength());
- outStream.write('\n');
+ if (r instanceof Text) {
+ Text tr = (Text)r;
+ outStream.write(tr.getBytes(), 0, tr.getLength());
+ outStream.write(finalRowSeparator);
+ } else {
+ // DynamicSerDe always writes out BytesWritable
+ BytesWritable bw = (BytesWritable)r;
+ outStream.write(bw.get(), 0, bw.getSize());
+ outStream.write(finalRowSeparator);
+ }
}
public void close(boolean abort) throws IOException {
outStream.close();
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Tue Oct 21 11:11:05 2008
@@ -52,7 +52,15 @@
registerUDF("str_lt", UDFStrLt.class, OperatorType.PREFIX, false);
registerUDF("str_ge", UDFStrGe.class, OperatorType.PREFIX, false);
registerUDF("str_le", UDFStrLe.class, OperatorType.PREFIX, false);
+
+ registerUDF("size", UDFSize.class, OperatorType.PREFIX, false);
+ registerUDF("round", UDFRound.class, OperatorType.PREFIX, false);
+ registerUDF("floor", UDFFloor.class, OperatorType.PREFIX, false);
+ registerUDF("ceil", UDFCeil.class, OperatorType.PREFIX, false);
+ registerUDF("ceiling", UDFCeil.class, OperatorType.PREFIX, false);
+ registerUDF("rand", UDFRand.class, OperatorType.PREFIX, false);
+
registerUDF("upper", UDFUpper.class, OperatorType.PREFIX, false);
registerUDF("lower", UDFLower.class, OperatorType.PREFIX, false);
registerUDF("ucase", UDFUpper.class, OperatorType.PREFIX, false);
@@ -66,6 +74,9 @@
registerUDF("regexp", UDFRegExp.class, OperatorType.INFIX, true);
registerUDF("regexp_replace", UDFRegExpReplace.class, OperatorType.PREFIX, false);
+ registerUDF("positive", UDFOPPositive.class, OperatorType.PREFIX, true, "+");
+ registerUDF("negative", UDFOPNegative.class, OperatorType.PREFIX, true, "-");
+
registerUDF("+", UDFOPPlus.class, OperatorType.INFIX, true);
registerUDF("-", UDFOPMinus.class, OperatorType.INFIX, true);
registerUDF("*", UDFOPMultiply.class, OperatorType.INFIX, true);
@@ -75,7 +86,7 @@
registerUDF("&", UDFOPBitAnd.class, OperatorType.INFIX, true);
registerUDF("|", UDFOPBitOr.class, OperatorType.INFIX, true);
registerUDF("^", UDFOPBitXor.class, OperatorType.INFIX, true);
- registerUDF("~", UDFOPBitNot.class, OperatorType.INFIX, true);
+ registerUDF("~", UDFOPBitNot.class, OperatorType.PREFIX, true);
registerUDF("=", UDFOPEqual.class, OperatorType.INFIX, true);
registerUDF("==", UDFOPEqual.class, OperatorType.INFIX, true, "=");
@@ -89,8 +100,8 @@
registerUDF("&&", UDFOPAnd.class, OperatorType.INFIX, true, "and");
registerUDF("or", UDFOPOr.class, OperatorType.INFIX, true);
registerUDF("||", UDFOPOr.class, OperatorType.INFIX, true, "or");
- registerUDF("not", UDFOPNot.class, OperatorType.INFIX, true);
- registerUDF("!", UDFOPNot.class, OperatorType.INFIX, true, "not");
+ registerUDF("not", UDFOPNot.class, OperatorType.PREFIX, true);
+ registerUDF("!", UDFOPNot.class, OperatorType.PREFIX, true, "not");
registerUDF("isnull", UDFOPNull.class, OperatorType.POSTFIX, true, "is null");
registerUDF("isnotnull", UDFOPNotNull.class, OperatorType.POSTFIX, true, "is not null");
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=706704&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Tue Oct 21 11:11:05 2008
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.*;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.limitDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Limit operator implementation
+ * Limits a subobject and passes that on.
+ **/
+public class LimitOperator extends Operator<limitDesc> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ transient protected int limit;
+ transient protected int currCount;
+
+ public void initialize(Configuration hconf) throws HiveException {
+ super.initialize(hconf);
+ limit = conf.getLimit();
+ currCount = 0;
+ }
+
+ public void process(Object row, ObjectInspector rowInspector) throws HiveException {
+ if (currCount < limit) {
+ forward(row, rowInspector);
+ currCount++;
+ }
+ else
+ setDone(true);
+ }
+}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Tue Oct 21 11:11:05 2008
@@ -88,4 +88,15 @@
return (1);
}
}
+
+ @Override
+ public boolean isMapRedTask() {
+ return true;
+ }
+
+ @Override
+ public boolean hasReduce() {
+ mapredWork w = getWork();
+ return w.getReducer() != null;
+ }
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 21 11:11:05 2008
@@ -55,6 +55,7 @@
protected String id;
protected T conf;
+ protected boolean done;
public void setConf(T conf) {
this.conf = conf;
@@ -73,6 +74,14 @@
return id;
}
+ public boolean getDone() {
+ return done;
+ }
+
+ public void setDone(boolean done) {
+ this.done = done;
+ }
+
// non-bean fields needed during compilation
transient private RowSchema rowSchema;
@@ -219,9 +228,24 @@
protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
- if(childOperators == null) {
+ if((childOperators == null) || (getDone())) {
return;
}
+
+ // if all children are done, this operator is also done
+ boolean isDone = true;
+ for(Operator<? extends Serializable> o: childOperators) {
+ if (!o.getDone()) {
+ isDone = false;
+ break;
+ }
+ }
+
+ if (isDone) {
+ setDone(isDone);
+ return;
+ }
+
for(Operator<? extends Serializable> o: childOperators) {
o.process(row, rowInspector);
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Oct 21 11:11:05 2008
@@ -48,6 +48,7 @@
opvec.add(new opTuple<extractDesc> (extractDesc.class, ExtractOperator.class));
opvec.add(new opTuple<groupByDesc> (groupByDesc.class, GroupByOperator.class));
opvec.add(new opTuple<joinDesc> (joinDesc.class, JoinOperator.class));
+ opvec.add(new opTuple<limitDesc> (limitDesc.class, LimitOperator.class));
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Oct 21 11:11:05 2008
@@ -134,10 +134,10 @@
}
keyWritable.setHashCode(keyHashCode);
- ArrayList<String> values = new ArrayList<String>(valueEval.length);
+ ArrayList<Object> values = new ArrayList<Object>(valueEval.length);
for(ExprNodeEvaluator e: valueEval) {
e.evaluate(row, rowInspector, tempInspectableObject);
- values.add(tempInspectableObject.o == null ? null : tempInspectableObject.o.toString());
+ values.add(tempInspectableObject.o);
if (valueObjectInspector == null) {
valueFieldsObjectInspectors.add(tempInspectableObject.oi);
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Tue Oct 21 11:11:05 2008
@@ -86,6 +86,12 @@
}
public abstract int execute();
+
+ // dummy method - FetchTask overwrites this
+ public boolean fetch(Vector<String> res) {
+ assert false;
+ return false;
+ }
public void setChildTasks(List<Task<? extends Serializable>> childTasks) {
this.childTasks = childTasks;
@@ -158,4 +164,11 @@
return id;
}
+ public boolean isMapRedTask() {
+ return false;
+ }
+
+ public boolean hasReduce() {
+ return false;
+ }
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Tue Oct 21 11:11:05 2008
@@ -44,6 +44,7 @@
id = 0;
taskvec = new ArrayList<taskTuple<? extends Serializable>>();
taskvec.add(new taskTuple<moveWork>(moveWork.class, MoveTask.class));
+ taskvec.add(new taskTuple<fetchWork>(fetchWork.class, FetchTask.class));
taskvec.add(new taskTuple<copyWork>(copyWork.class, CopyTask.class));
taskvec.add(new taskTuple<DDLWork>(DDLWork.class, DDLTask.class));
taskvec.add(new taskTuple<FunctionWork>(FunctionWork.class, FunctionTask.class));
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java Tue Oct 21 11:11:05 2008
@@ -19,30 +19,20 @@
package org.apache.hadoop.hive.ql.exec;
/**
- * A dummy User-defined function (UDF) for the use with Hive.
+ * A User-defined function (UDF) for the use with Hive.
*
- * New UDF classes do NOT need to inherit from this UDF class.
+ * New UDF classes need to inherit from this UDF class.
*
* Required for all UDF classes:
- * 1. Implement a single method named "evaluate" which will be called by Hive.
+ * 1. Implement one or more methods named "evaluate" which will be called by Hive.
* The following are some examples:
+ * public int evaluate();
* public int evaluate(int a);
* public double evaluate(int a, double b);
* public String evaluate(String a, int b, String c);
*
- * "evaluate" should neither be a void method, nor should it returns "null" in any case.
- * In both cases, the Hive system will throw an HiveException saying the evaluation of UDF
- * is failed.
+ * "evaluate" should never be a void method. However it can return "null" if needed.
*/
-public class UDF {
+public interface UDF {
- public UDF() { }
-
- /** Evaluate the UDF.
- * @return plain old java object
- **/
- public int evaluate() {
- return 0;
- }
-
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 21 11:11:05 2008
@@ -382,7 +382,7 @@
}
public final static String NSTR = "";
- public static enum streamStatus {EOF, TERMINATED, NORMAL}
+ public static enum streamStatus {EOF, TERMINATED}
public static streamStatus readColumn(DataInput in, OutputStream out) throws IOException {
while (true) {
@@ -397,10 +397,6 @@
return streamStatus.TERMINATED;
}
- if (b == Utilities.ctrlaCode) {
- return streamStatus.NORMAL;
- }
-
out.write(b);
}
// Unreachable
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Oct 21 11:11:05 2008
@@ -179,8 +179,8 @@
}
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass);
-
- return inputFormat.getRecordReader(inputSplit, job, reporter);
+
+ return new HiveRecordReader(inputFormat.getRecordReader(inputSplit, job, reporter));
}
@@ -219,6 +219,7 @@
return result.toArray(new HiveInputSplit[result.size()]);
}
+
private tableDesc getTableDescFromPath(Path dir) throws IOException {
partitionDesc partDesc = pathToPartitionInfo.get(dir.toString());
Added: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java?rev=706704&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (added)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java Tue Oct 21 11:11:05 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.hive.ql.exec.ExecMapper;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import java.io.IOException;
+
+public class HiveRecordReader<K extends WritableComparable, V extends Writable>
+ implements RecordReader<K, V> {
+
+ private RecordReader recordReader;
+ public HiveRecordReader(RecordReader recordReader){
+ this.recordReader = recordReader;
+ }
+
+ public void close() throws IOException {
+ recordReader.close();
+ }
+
+ public K createKey() {
+ return (K)recordReader.createKey();
+ }
+
+ public V createValue() {
+ return (V)recordReader.createValue();
+ }
+
+ public long getPos() throws IOException {
+ return recordReader.getPos();
+ }
+
+ public float getProgress() throws IOException {
+ return recordReader.getProgress();
+ }
+
+ public boolean next(K key, V value) throws IOException {
+ if (ExecMapper.getDone())
+ return false;
+ return recordReader.next(key, value);
+ }
+}
+
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Oct 21 11:11:05 2008
@@ -105,7 +105,6 @@
this.conf = c;
try {
msc = this.createMetaStoreClient();
- //msc = new HiveMetaStoreClient(this.conf);
} catch (MetaException e) {
throw new HiveException("Unable to open connection to metastore", e);
}
@@ -169,7 +168,7 @@
tbl.getPartCols().add(part);
}
}
- tbl.setSerializationLib(org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe.shortName());
+ tbl.setSerializationLib(org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe.class.getName());
tbl.setNumBuckets(bucketCount);
createTable(tbl);
}
@@ -269,7 +268,7 @@
} catch (NoSuchObjectException e) {
if(throwException) {
LOG.error(StringUtils.stringifyException(e));
- throw new InvalidTableException("Table not found " + tableName);
+ throw new InvalidTableException("Table not found ", tableName);
}
return null;
} catch (Exception e) {
@@ -463,6 +462,17 @@
return new Partition(tbl, tpart);
}
+ public boolean dropPartition(String db_name, String tbl_name, List<String> part_vals,
+ boolean deleteData) throws HiveException {
+ try {
+ return msc.dropPartition(db_name, tbl_name, part_vals, deleteData);
+ } catch (NoSuchObjectException e) {
+ throw new HiveException("Partition or table doesn't exist.", e);
+ } catch (Exception e) {
+ throw new HiveException("Unknow error. Please check logs.", e);
+ }
+ }
+
public List<String> getPartitionNames(String dbName, String tblName, short max) throws HiveException {
List names = null;
try {
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Tue Oct 21 11:11:05 2008
@@ -58,6 +58,14 @@
private LinkedHashMap<String, String> spec;
+ /**
+ * @return
+ * @see org.apache.hadoop.hive.metastore.api.Partition#getValues()
+ */
+ public List<String> getValues() {
+ return tPartition.getValues();
+ }
+
private Path partPath;
private URI partURI;
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Tue Oct 21 11:11:05 2008
@@ -95,7 +95,7 @@
initEmpty();
this.schema = schema;
this.deserializer = deserializer; //TODO: convert to SerDeInfo format
- this.getTTable().getSd().getSerdeInfo().setSerializationLib(deserializer.getShortName());
+ this.getTTable().getSd().getSerdeInfo().setSerializationLib(deserializer.getClass().getName());
getTTable().setTableName(name);
getSerdeInfo().setSerializationLib(deserializer.getClass().getName());
setInputFormatClass(inputFormatClass);
@@ -108,7 +108,7 @@
initEmpty();
getTTable().setTableName(name);
getTTable().setDbName(MetaStoreUtils.DEFAULT_DATABASE_NAME);
- getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.shortName());
+ getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
getSerdeInfo().getParameters().put(Constants.SERIALIZATION_FORMAT, "1");
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=706704&r1=706703&r2=706704&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Tue Oct 21 11:11:05 2008
@@ -31,7 +31,6 @@
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -43,6 +42,8 @@
protected final Hive db;
protected final HiveConf conf;
protected List<Task<? extends Serializable>> rootTasks;
+ protected Task<? extends Serializable> fetchTask;
+ protected boolean fetchTaskInit;
protected final Log LOG;
protected final LogHelper console;
@@ -65,13 +66,40 @@
}
}
- public abstract void analyze(CommonTree ast, Context ctx) throws SemanticException;
+ public abstract void analyzeInternal(CommonTree ast, Context ctx) throws SemanticException;
+
+ public void analyze(CommonTree ast, Context ctx) throws SemanticException {
+ scratchDir = ctx.getScratchDir();
+ analyzeInternal(ast, ctx);
+ }
public List<Task<? extends Serializable>> getRootTasks() {
return rootTasks;
}
- protected void reset() {
+ /**
+ * @return the fetchTask
+ */
+ public Task<? extends Serializable> getFetchTask() {
+ return fetchTask;
+ }
+
+ /**
+ * @param fetchTask the fetchTask to set
+ */
+ public void setFetchTask(Task<? extends Serializable> fetchTask) {
+ this.fetchTask = fetchTask;
+ }
+
+ public boolean getFetchTaskInit() {
+ return fetchTaskInit;
+ }
+
+ public void setFetchTaskInit(boolean fetchTaskInit) {
+ this.fetchTaskInit = fetchTaskInit;
+ }
+
+ protected void reset() {
rootTasks = new ArrayList<Task<? extends Serializable>>();
}
@@ -118,9 +146,33 @@
public static String unescapeSQLString(String b) {
assert(b.charAt(0) == '\'');
assert(b.charAt(b.length()-1) == '\'');
+
+ // Some of the strings can be passed in as unicode. For example, the
+ // delimiter can be passed in as \002 - So, we first check if the
+ // string is a unicode number, else go back to the old behavior
StringBuilder sb = new StringBuilder(b.length());
- for(int i=1; i+1<b.length(); i++) {
- if (b.charAt(i) == '\\' && i+2<b.length()) {
+ int i = 1;
+ while (i < (b.length()-1)) {
+
+ if (b.charAt(i) == '\\' && (i+4 < b.length())) {
+ char i1 = b.charAt(i+1);
+ char i2 = b.charAt(i+2);
+ char i3 = b.charAt(i+3);
+ if ((i1 >= '0' && i1 <= '1') &&
+ (i2 >= '0' && i2 <= '7') &&
+ (i3 >= '0' && i3 <= '7'))
+ {
+ byte bVal = (byte)((i3 - '0') + ((i2 - '0') * 8 ) + ((i1 - '0') * 8 * 8));
+ byte[] bValArr = new byte[1];
+ bValArr[0] = bVal;
+ String tmp = new String(bValArr);
+ sb.append(tmp);
+ i += 4;
+ continue;
+ }
+ }
+
+ if (b.charAt(i) == '\\' && (i+2 < b.length())) {
char n=b.charAt(i+1);
switch(n) {
case '0': sb.append("\0"); break;
@@ -141,6 +193,7 @@
} else {
sb.append(b.charAt(i));
}
+ i++;
}
return sb.toString();
}
@@ -159,7 +212,7 @@
public HashMap<String, String> partSpec;
public Partition partHandle;
- public tableSpec(Hive db, CommonTree ast) throws SemanticException {
+ public tableSpec(Hive db, CommonTree ast, boolean forceCreatePartition) throws SemanticException {
assert(ast.getToken().getType() == HiveParser.TOK_TAB);
int childIndex = 0;
@@ -179,7 +232,10 @@
String val = stripQuotes(partspec_val.getChild(1).getText());
partSpec.put(partspec_val.getChild(0).getText(), val);
}
- partHandle = Hive.get().getPartition(tableHandle, partSpec, true);
+ partHandle = Hive.get().getPartition(tableHandle, partSpec, forceCreatePartition);
+ if(partHandle == null) {
+ throw new SemanticException(ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
+ }
}
} catch (InvalidTableException ite) {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(ast.getChild(0)), ite);