You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2008/12/08 07:01:20 UTC
svn commit: r724266 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
Author: zshao
Date: Sun Dec 7 22:01:20 2008
New Revision: 724266
URL: http://svn.apache.org/viewvc?rev=724266&view=rev
Log:
HIVE-102. Refactor DDLTask. (Johan Oskarsson through zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=724266&r1=724265&r2=724266&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sun Dec 7 22:01:20 2008
@@ -17,6 +17,8 @@
IMPROVEMENTS
+ HIVE-102. Refactor DDLTask. (Johan Oskarsson through zshao)
+
HIVE-85. New compression options for Hive. (Joydeep Sarma through zshao)
HIVE-69. genMapRedTasks uses tree walker. (Namit through zshao)
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=724266&r1=724265&r2=724266&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sun Dec 7 22:01:20 2008
@@ -93,444 +93,511 @@
createTableDesc crtTbl = work.getCreateTblDesc();
if (crtTbl != null) {
+ return createTable(db, crtTbl);
+ }
- // create the table
- Table tbl = new Table(crtTbl.getTableName());
- StorageDescriptor tblStorDesc = tbl.getTTable().getSd();
- if (crtTbl.getBucketCols() != null)
- tblStorDesc.setBucketCols(crtTbl.getBucketCols());
- if (crtTbl.getSortCols() != null)
- tbl.setSortCols(crtTbl.getSortCols());
- if (crtTbl.getPartCols() != null)
- tbl.setPartCols(crtTbl.getPartCols());
- if (crtTbl.getNumBuckets() != -1)
- tblStorDesc.setNumBuckets(crtTbl.getNumBuckets());
-
- if (crtTbl.getSerName() != null) {
- tbl.setSerializationLib(crtTbl.getSerName());
- if (crtTbl.getMapProp() != null) {
- Iterator<Map.Entry<String, String>> iter = crtTbl.getMapProp().entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, String> m = (Map.Entry)iter.next();
- tbl.setSerdeParam(m.getKey(), m.getValue());
- }
- }
- }
- else
- {
- if (crtTbl.getFieldDelim() != null)
- {
- tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
- tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim());
- }
-
- if (crtTbl.getCollItemDelim() != null)
- tbl.setSerdeParam(Constants.COLLECTION_DELIM, crtTbl.getCollItemDelim());
- if (crtTbl.getMapKeyDelim() != null)
- tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
- if (crtTbl.getLineDelim() != null)
- tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
- }
-
- /**
- * For now, if the user specifies either the map or the collections delimiter, we infer the
- * table to DynamicSerDe/TCTLSeparatedProtocol.
- * In the future, we should infer this for any delimiters specified, but this will break older
- * hive tables, so not for now.
- */
- if (crtTbl.getCollItemDelim() != null || crtTbl.getMapKeyDelim() != null) {
- tbl.setSerializationLib(org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe.class.getName());
- tbl.setSerdeParam(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol.class.getName());
- }
+ dropTableDesc dropTbl = work.getDropTblDesc();
+ if (dropTbl != null) {
+ return dropTable(db, dropTbl);
+ }
+ alterTableDesc alterTbl = work.getAlterTblDesc();
+ if (alterTbl != null) {
+ return alterTable(db, alterTbl);
+ }
- if (crtTbl.getComment() != null)
- tbl.setProperty("comment", crtTbl.getComment());
- if (crtTbl.getLocation() != null)
- tblStorDesc.setLocation(crtTbl.getLocation());
-
- if (crtTbl.isSequenceFile()) {
- tbl.setInputFormatClass(SequenceFileInputFormat.class);
- tbl.setOutputFormatClass(SequenceFileOutputFormat.class);
- }
- else {
- tbl.setOutputFormatClass(IgnoreKeyTextOutputFormat.class);
- tbl.setInputFormatClass(TextInputFormat.class);
- }
-
- if (crtTbl.isExternal())
- tbl.setProperty("EXTERNAL", "TRUE");
-
- // If the sorted columns is a superset of bucketed columns, store this fact. It can be later used to
- // optimize some group-by queries. Note that, the order does not matter as long as it in the first
- // 'n' columns where 'n' is the length of the bucketed columns.
- if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null))
- {
- List<String> bucketCols = tbl.getBucketCols();
- List<Order> sortCols = tbl.getSortCols();
-
- if ( (sortCols.size() > 0) && (sortCols.size() >= bucketCols.size()))
- {
- boolean found = true;
-
- Iterator<String> iterBucketCols = bucketCols.iterator();
- while (iterBucketCols.hasNext())
- {
- String bucketCol = iterBucketCols.next();
- boolean colFound = false;
- for (int i = 0; i < bucketCols.size(); i++)
- {
- if (bucketCol.equals(sortCols.get(i).getCol())) {
- colFound = true;
- break;
- }
- }
- if (colFound == false)
- {
- found = false;
- break;
- }
- }
- if (found)
- tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE");
- }
- }
-
- // set owner, create_time etc
- tbl.setOwner(System.getProperty("user.name"));
- // set create time
- tbl.getTTable().setCreateTime((int) (System.currentTimeMillis()/1000));
+ descTableDesc descTbl = work.getDescTblDesc();
+ if (descTbl != null) {
+ return describeTable(db, fs, descTbl);
+ }
- if(crtTbl.getCols() != null) {
- tbl.setFields(crtTbl.getCols());
- }
+ showTablesDesc showTbls = work.getShowTblsDesc();
+ if (showTbls != null) {
+ return showTables(db, fs, showTbls);
+ }
- // create the table
- db.createTable(tbl);
- return 0;
+ showPartitionsDesc showParts = work.getShowPartsDesc();
+ if (showParts != null) {
+ return showPartitions(db, fs, showParts);
}
- dropTableDesc dropTbl = work.getDropTblDesc();
- if (dropTbl != null) {
- if(dropTbl.getPartSpecs() == null) {
- // drop the table
- db.dropTable(dropTbl.getTableName());
- } else {
- // drop partitions in the list
- Table tbl = db.getTable(dropTbl.getTableName());
- List<Partition> parts = new ArrayList<Partition>();
- for(HashMap<String, String> partSpec : dropTbl.getPartSpecs()) {
- Partition part = db.getPartition(tbl, partSpec, false);
- if(part == null) {
- console.printInfo("Partition " + partSpec + " does not exist.");
- } else {
- parts.add(part);
- }
- }
- // drop all existing partitions from the list
- for (Partition partition : parts) {
- console.printInfo("Dropping the partition " + partition.getName());
- db.dropPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME,
- dropTbl.getTableName(),
- partition.getValues(),
- true); //drop data for the partition
- }
- }
- return 0;
+ } catch (InvalidTableException e) {
+ console.printError("Table " + e.getTableName() + " does not exist");
+ LOG.debug(StringUtils.stringifyException(e));
+ return 1;
+ } catch (HiveException e) {
+ console.printError("FAILED: Error in metadata: " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
+ LOG.debug(StringUtils.stringifyException(e));
+ return 1;
+ } catch (Exception e) {
+ console.printError("Failed with exception " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
+ return (1);
+ }
+ assert false;
+ return 0;
+ }
+
+ /**
+ * Write a list of partitions to a file.
+ *
+ * @param db The database in question.
+ * @param fs FileSystem that will contain the file written.
+ * @param showParts These are the partitions we're interested in.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException Throws this exception if an unexpected error occurs.
+ */
+ private int showPartitions(Hive db, FileSystem fs,
+ showPartitionsDesc showParts) throws HiveException {
+ // get the partitions for the table and populate the output
+ String tabName = showParts.getTabName();
+ Table tbl = null;
+ List<String> parts = null;
+
+ tbl = db.getTable(tabName);
+
+ if (!tbl.isPartitioned()) {
+ console.printError("Table " + tabName + " is not a partitioned table");
+ return 1;
+ }
+
+ parts = db.getPartitionNames(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl
+ .getName(), Short.MAX_VALUE);
+
+ // write the results in the file
+ try {
+ DataOutput outStream = (DataOutput) fs.create(showParts.getResFile());
+ Iterator<String> iterParts = parts.iterator();
+ boolean firstCol = true;
+ while (iterParts.hasNext()) {
+ if (!firstCol)
+ outStream.write(terminator);
+ outStream.write(iterParts.next().getBytes("UTF-8"));
+ firstCol = false;
}
+ ((FSDataOutputStream) outStream).close();
+ } catch (FileNotFoundException e) {
+ LOG.info("show partitions: " + StringUtils.stringifyException(e));
+ return 1;
+ } catch (IOException e) {
+ LOG.info("show partitions: " + StringUtils.stringifyException(e));
+ return 1;
+ }
+ return 0;
+ }
- alterTableDesc alterTbl = work.getAlterTblDesc();
- if (alterTbl != null) {
- // alter the table
- Table tbl = db.getTable(alterTbl.getOldName());
- if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME)
- tbl.getTTable().setTableName(alterTbl.getNewName());
- else if(alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDCOLS) {
- List<FieldSchema> newCols = alterTbl.getNewCols();
- List<FieldSchema> oldCols = tbl.getCols();
- if(tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
- console.printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
- tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
- tbl.getTTable().getSd().setCols(newCols);
- }
- else {
- // make sure the columns does not already exist
- Iterator<FieldSchema> iterNewCols = newCols.iterator();
- while (iterNewCols.hasNext()) {
- FieldSchema newCol = iterNewCols.next();
- String newColName = newCol.getName();
- Iterator<FieldSchema> iterOldCols = oldCols.iterator();
- while (iterOldCols.hasNext()) {
- String oldColName = iterOldCols.next().getName();
- if (oldColName.equalsIgnoreCase(newColName)) {
- console.printError("Column '" + newColName + "' exists");
- return 1;
- }
- }
- oldCols.add(newCol);
- }
- tbl.getTTable().getSd().setCols(oldCols);
- }
- }
- else if(alterTbl.getOp() == alterTableDesc.alterTableTypes.REPLACECOLS) {
- // change SerDe to MetadataTypedColumnsetSerDe if it is columnsetSerDe
- if(tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
- console.printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
- tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
- }
- else if(!tbl.getSerializationLib().equals(MetadataTypedColumnsetSerDe.class.getName())) {
- console.printError("Replace columns is not supported for this table. SerDe may be incompatible.");
- return 1;
- }
- tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
- }
- else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDPROPS) {
- tbl.getTTable().getParameters().putAll(alterTbl.getProps());
- }
- else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDEPROPS) {
- tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(alterTbl.getProps());
- }
- else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDE) {
- tbl.setSerializationLib(alterTbl.getSerdeName());
- if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0))
- tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(alterTbl.getProps());
- // since serde is modified then do the appropriate things to reset columns etc
- tbl.reinitSerDe();
- tbl.setFields(Hive.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer()));
- }
- else {
- console.printError("Unsupported Alter commnad");
- return 1;
- }
+ /**
+ * Write a list of the tables in the database to a file.
+ *
+ * @param db The database in question.
+ * @param fs FileSystem that will contain the file written.
+ * @param showTbls These are the tables we're interested in.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException Throws this exception if an unexpected error occurs.
+ */
+ private int showTables(Hive db, FileSystem fs, showTablesDesc showTbls)
+ throws HiveException {
+ // get the tables for the desired pattenn - populate the output stream
+ List<String> tbls = null;
+ if (showTbls.getPattern() != null) {
+ LOG.info("pattern: " + showTbls.getPattern());
+ tbls = db.getTablesByPattern(showTbls.getPattern());
+ LOG.info("results : " + tbls.size());
+ } else
+ tbls = db.getAllTables();
+
+ // write the results in the file
+ try {
+ DataOutput outStream = (DataOutput) fs.create(showTbls.getResFile());
+ SortedSet<String> sortedTbls = new TreeSet<String>(tbls);
+ Iterator<String> iterTbls = sortedTbls.iterator();
+ boolean firstCol = true;
+ while (iterTbls.hasNext()) {
+ if (!firstCol)
+ outStream.write(separator);
+ outStream.write(iterTbls.next().getBytes("UTF-8"));
+ firstCol = false;
+ }
+ ((FSDataOutputStream) outStream).close();
+ } catch (FileNotFoundException e) {
+ LOG.info("show table: " + StringUtils.stringifyException(e));
+ return 1;
+ } catch (IOException e) {
+ LOG.info("show table: " + StringUtils.stringifyException(e));
+ return 1;
+ }
+ return 0;
+ }
- // set last modified by properties
- tbl.setProperty("last_modified_by", System.getProperty("user.name"));
- tbl.setProperty("last_modified_time", Long.toString(System.currentTimeMillis()/1000));
-
- try {
- db.alterTable(alterTbl.getOldName(), tbl);
- } catch (InvalidOperationException e) {
- LOG.info("alter table: " + StringUtils.stringifyException(e));
- return 1;
- } catch (MetaException e) {
- return 1;
- } catch (TException e) {
- return 1;
- }
+ /**
+ * Write the description of a table to a file.
+ *
+ * @param db The database in question.
+ * @param fs FileSystem that will contain the file written.
+ * @param descTbl This is the table we're interested in.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException Throws this exception if an unexpected error occurs.
+ */
+ private int describeTable(Hive db, FileSystem fs, descTableDesc descTbl)
+ throws HiveException {
+ String colPath = descTbl.getTableName();
+ String tableName = colPath.substring(0,
+ colPath.indexOf('.') == -1 ? colPath.length() : colPath.indexOf('.'));
+
+ // describe the table - populate the output stream
+ Table tbl = db.getTable(tableName, false);
+ Partition part = null;
+ try {
+ if (tbl == null) {
+ DataOutput outStream = (DataOutput) fs.open(descTbl.getResFile());
+ String errMsg = "Table " + tableName + " does not exist";
+ outStream.write(errMsg.getBytes("UTF-8"));
+ ((FSDataOutputStream) outStream).close();
return 0;
}
+ if (descTbl.getPartSpec() != null) {
+ part = db.getPartition(tbl, descTbl.getPartSpec(), false);
+ if (part == null) {
+ DataOutput outStream = (DataOutput) fs.open(descTbl.getResFile());
+ String errMsg = "Partition " + descTbl.getPartSpec() + " for table "
+ + tableName + " does not exist";
+ outStream.write(errMsg.getBytes("UTF-8"));
+ ((FSDataOutputStream) outStream).close();
+ return 0;
+ }
+ tbl = part.getTable();
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("describe table: " + StringUtils.stringifyException(e));
+ return 1;
+ } catch (IOException e) {
+ LOG.info("describe table: " + StringUtils.stringifyException(e));
+ return 1;
+ }
- descTableDesc descTbl = work.getDescTblDesc();
-
- if (descTbl != null) {
+ try {
- String colPath = descTbl.getTableName();
- String tableName = colPath.substring(0, colPath.indexOf('.') == -1 ? colPath.length() : colPath.indexOf('.'));
+ LOG.info("DDLTask: got data for " + tbl.getName());
- // describe the table - populate the output stream
- Table tbl = db.getTable(tableName, false);
- Partition part = null;
- try {
- if(tbl == null) {
- DataOutput outStream = (DataOutput)fs.open(descTbl.getResFile());
- String errMsg = "Table " + tableName + " does not exist";
- outStream.write(errMsg.getBytes("UTF-8"));
- ((FSDataOutputStream)outStream).close();
- return 0;
- }
- if(descTbl.getPartSpec() != null) {
- part = db.getPartition(tbl, descTbl.getPartSpec(), false);
- if(part == null) {
- DataOutput outStream = (DataOutput)fs.open(descTbl.getResFile());
- String errMsg = "Partition " + descTbl.getPartSpec() + " for table " + tableName + " does not exist";
- outStream.write(errMsg.getBytes("UTF-8"));
- ((FSDataOutputStream)outStream).close();
- return 0;
- }
- tbl = part.getTable();
- }
- } catch (FileNotFoundException e) {
- LOG.info("describe table: " + StringUtils.stringifyException(e));
- return 1;
+ // write the results in the file
+ DataOutput os = (DataOutput) fs.create(descTbl.getResFile());
+ List<FieldSchema> cols = null;
+ if (colPath.equals(tableName)) {
+ cols = tbl.getCols();
+ if (part != null) {
+ cols = part.getTPartition().getSd().getCols();
}
- catch (IOException e) {
- LOG.info("describe table: " + StringUtils.stringifyException(e));
- return 1;
+ } else {
+ cols = db.getFieldsFromDeserializer(colPath, tbl.getDeserializer());
+ }
+
+ Iterator<FieldSchema> iterCols = cols.iterator();
+ boolean firstCol = true;
+ while (iterCols.hasNext()) {
+ if (!firstCol)
+ os.write(terminator);
+ FieldSchema col = iterCols.next();
+ os.write(col.getName().getBytes("UTF-8"));
+ os.write(separator);
+ os.write(col.getType().getBytes("UTF-8"));
+ if (col.getComment() != null) {
+ os.write(separator);
+ os.write(singleQuote);
+ os.write(col.getComment().getBytes("UTF-8"));
+ os.write(singleQuote);
}
-
- try {
+ firstCol = false;
+ }
- LOG.info("DDLTask: got data for " + tbl.getName());
-
- // write the results in the file
- DataOutput os = (DataOutput)fs.create(descTbl.getResFile());
- List<FieldSchema> cols = null;
- if (colPath.equals(tableName)) {
- cols = tbl.getCols();
- if (part != null) {
- cols = part.getTPartition().getSd().getCols();
- }
- }
- else {
- cols = db.getFieldsFromDeserializer(colPath, tbl.getDeserializer());
+ if (tableName.equals(colPath)) {
+ // also return the partitioning columns
+ List<FieldSchema> partCols = tbl.getPartCols();
+ Iterator<FieldSchema> iterPartCols = partCols.iterator();
+ while (iterPartCols.hasNext()) {
+ os.write(terminator);
+ FieldSchema col = iterPartCols.next();
+ os.write(col.getName().getBytes("UTF-8"));
+ os.write(separator);
+ os.write(col.getType().getBytes("UTF-8"));
+ if (col.getComment() != null) {
+ os.write(separator);
+ os.write(col.getComment().getBytes("UTF-8"));
}
+ }
- Iterator<FieldSchema> iterCols = cols.iterator();
- boolean firstCol = true;
- while (iterCols.hasNext())
- {
- if (!firstCol)
- os.write(terminator);
- FieldSchema col = iterCols.next();
- os.write(col.getName().getBytes("UTF-8"));
- os.write(separator);
- os.write(col.getType().getBytes("UTF-8"));
- if (col.getComment() != null)
- {
- os.write(separator);
- os.write(singleQuote);
- os.write(col.getComment().getBytes("UTF-8"));
- os.write(singleQuote);
- }
- firstCol = false;
+ // if extended desc table then show the complete details of the table
+ if (descTbl.isExt()) {
+ if (part != null) {
+ // show partition informatio
+ os.write("\n\nDetailed Partition Information:\n".getBytes("UTF-8"));
+ os.write(part.getTPartition().toString().getBytes("UTF-8"));
+ } else {
+ os.write("\nDetailed Table Information:\n".getBytes("UTF-8"));
+ os.write(tbl.getTTable().toString().getBytes("UTF-8"));
}
+ }
+ }
- if (tableName.equals(colPath)) {
- // also return the partitioning columns
- List<FieldSchema> partCols = tbl.getPartCols();
- Iterator<FieldSchema> iterPartCols = partCols.iterator();
- while (iterPartCols.hasNext())
- {
- os.write(terminator);
- FieldSchema col = iterPartCols.next();
- os.write(col.getName().getBytes("UTF-8"));
- os.write(separator);
- os.write(col.getType().getBytes("UTF-8"));
- if (col.getComment() != null)
- {
- os.write(separator);
- os.write(col.getComment().getBytes("UTF-8"));
- }
- }
-
- // if extended desc table then show the complete details of the table
- if(descTbl.isExt()) {
- if(part != null) {
- // show partition informatio
- os.write("\n\nDetailed Partition Information:\n".getBytes("UTF-8"));
- os.write(part.getTPartition().toString().getBytes("UTF-8"));
- } else {
- os.write("\nDetailed Table Information:\n".getBytes("UTF-8"));
- os.write(tbl.getTTable().toString().getBytes("UTF-8"));
- }
+ LOG.info("DDLTask: written data for " + tbl.getName());
+ ((FSDataOutputStream) os).close();
+
+ } catch (FileNotFoundException e) {
+ LOG.info("describe table: " + StringUtils.stringifyException(e));
+ return 1;
+ } catch (IOException e) {
+ LOG.info("describe table: " + StringUtils.stringifyException(e));
+ return 1;
+ }
+ return 0;
+ }
+
+ /**
+ * Alter a given table.
+ *
+ * @param db The database in question.
+ * @param alterTbl This is the table we're altering.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException Throws this exception if an unexpected error occurs.
+ */
+ private int alterTable(Hive db, alterTableDesc alterTbl) throws HiveException {
+ // alter the table
+ Table tbl = db.getTable(alterTbl.getOldName());
+ if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME)
+ tbl.getTTable().setTableName(alterTbl.getNewName());
+ else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDCOLS) {
+ List<FieldSchema> newCols = alterTbl.getNewCols();
+ List<FieldSchema> oldCols = tbl.getCols();
+ if (tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
+ console
+ .printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
+ tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+ tbl.getTTable().getSd().setCols(newCols);
+ } else {
+ // make sure the columns does not already exist
+ Iterator<FieldSchema> iterNewCols = newCols.iterator();
+ while (iterNewCols.hasNext()) {
+ FieldSchema newCol = iterNewCols.next();
+ String newColName = newCol.getName();
+ Iterator<FieldSchema> iterOldCols = oldCols.iterator();
+ while (iterOldCols.hasNext()) {
+ String oldColName = iterOldCols.next().getName();
+ if (oldColName.equalsIgnoreCase(newColName)) {
+ console.printError("Column '" + newColName + "' exists");
+ return 1;
}
}
-
- LOG.info("DDLTask: written data for " + tbl.getName());
- ((FSDataOutputStream)os).close();
-
- } catch (FileNotFoundException e) {
- LOG.info("describe table: " + StringUtils.stringifyException(e));
- return 1;
- }
- catch (IOException e) {
- LOG.info("describe table: " + StringUtils.stringifyException(e));
- return 1;
+ oldCols.add(newCol);
}
- return 0;
+ tbl.getTTable().getSd().setCols(oldCols);
+ }
+ } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.REPLACECOLS) {
+ // change SerDe to MetadataTypedColumnsetSerDe if it is columnsetSerDe
+ if (tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
+ console
+ .printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
+ tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+ } else if (!tbl.getSerializationLib().equals(
+ MetadataTypedColumnsetSerDe.class.getName())) {
+ console
+ .printError("Replace columns is not supported for this table. SerDe may be incompatible.");
+ return 1;
}
+ tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
+ } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDPROPS) {
+ tbl.getTTable().getParameters().putAll(alterTbl.getProps());
+ } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDEPROPS) {
+ tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(
+ alterTbl.getProps());
+ } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDE) {
+ tbl.setSerializationLib(alterTbl.getSerdeName());
+ if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0))
+ tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(
+ alterTbl.getProps());
+ // since serde is modified then do the appropriate things to reset columns
+ // etc
+ tbl.reinitSerDe();
+ tbl.setFields(Hive.getFieldsFromDeserializer(tbl.getName(), tbl
+ .getDeserializer()));
+ } else {
+ console.printError("Unsupported Alter commnad");
+ return 1;
+ }
- showTablesDesc showTbls = work.getShowTblsDesc();
- if (showTbls != null) {
- // get the tables for the desired pattenn - populate the output stream
- List<String> tbls = null;
- if (showTbls.getPattern() != null)
- {
- LOG.info("pattern: " + showTbls.getPattern());
- tbls = db.getTablesByPattern(showTbls.getPattern());
- LOG.info("results : " + tbls.size());
- }
- else
- tbls = db.getAllTables();
-
- // write the results in the file
- try {
- DataOutput outStream = (DataOutput)fs.create(showTbls.getResFile());
- SortedSet<String> sortedTbls = new TreeSet<String>(tbls);
- Iterator<String> iterTbls = sortedTbls.iterator();
- boolean firstCol = true;
- while (iterTbls.hasNext())
- {
- if (!firstCol)
- outStream.write(separator);
- outStream.write(iterTbls.next().getBytes("UTF-8"));
- firstCol = false;
- }
- ((FSDataOutputStream)outStream).close();
- } catch (FileNotFoundException e) {
- LOG.info("show table: " + StringUtils.stringifyException(e));
- return 1;
- } catch (IOException e) {
- LOG.info("show table: " + StringUtils.stringifyException(e));
- return 1;
+ // set last modified by properties
+ tbl.setProperty("last_modified_by", System.getProperty("user.name"));
+ tbl.setProperty("last_modified_time", Long.toString(System
+ .currentTimeMillis() / 1000));
+
+ try {
+ db.alterTable(alterTbl.getOldName(), tbl);
+ } catch (InvalidOperationException e) {
+ LOG.info("alter table: " + StringUtils.stringifyException(e));
+ return 1;
+ } catch (MetaException e) {
+ return 1;
+ } catch (TException e) {
+ return 1;
+ }
+ return 0;
+ }
+
+ /**
+ * Drop a given table.
+ *
+ * @param db The database in question.
+ * @param dropTbl This is the table we're dropping.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException Throws this exception if an unexpected error occurs.
+ */
+ private int dropTable(Hive db, dropTableDesc dropTbl) throws HiveException {
+ if (dropTbl.getPartSpecs() == null) {
+ // drop the table
+ db.dropTable(dropTbl.getTableName());
+ } else {
+ // drop partitions in the list
+ Table tbl = db.getTable(dropTbl.getTableName());
+ List<Partition> parts = new ArrayList<Partition>();
+ for (HashMap<String, String> partSpec : dropTbl.getPartSpecs()) {
+ Partition part = db.getPartition(tbl, partSpec, false);
+ if (part == null) {
+ console.printInfo("Partition " + partSpec + " does not exist.");
+ } else {
+ parts.add(part);
}
- return 0;
}
+ // drop all existing partitions from the list
+ for (Partition partition : parts) {
+ console.printInfo("Dropping the partition " + partition.getName());
+ db.dropPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, dropTbl
+ .getTableName(), partition.getValues(), true); // drop data for the
+ // partition
+ }
+ }
+ return 0;
+ }
- showPartitionsDesc showParts = work.getShowPartsDesc();
- if (showParts != null) {
- // get the partitions for the table and populate the output
- String tabName = showParts.getTabName();
- Table tbl = null;
- List<String> parts = null;
-
- tbl = db.getTable(tabName);
-
- if (!tbl.isPartitioned()) {
- console.printError("Table " + tabName + " is not a partitioned table");
- return 1;
+ /**
+ * Create a new table.
+ *
+ * @param db The database in question.
+ * @param crtTbl This is the table we're creating.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException Throws this exception if an unexpected error occurs.
+ */
+ private int createTable(Hive db, createTableDesc crtTbl) throws HiveException {
+ // create the table
+ Table tbl = new Table(crtTbl.getTableName());
+ StorageDescriptor tblStorDesc = tbl.getTTable().getSd();
+ if (crtTbl.getBucketCols() != null)
+ tblStorDesc.setBucketCols(crtTbl.getBucketCols());
+ if (crtTbl.getSortCols() != null)
+ tbl.setSortCols(crtTbl.getSortCols());
+ if (crtTbl.getPartCols() != null)
+ tbl.setPartCols(crtTbl.getPartCols());
+ if (crtTbl.getNumBuckets() != -1)
+ tblStorDesc.setNumBuckets(crtTbl.getNumBuckets());
+
+ if (crtTbl.getSerName() != null) {
+ tbl.setSerializationLib(crtTbl.getSerName());
+ if (crtTbl.getMapProp() != null) {
+ Iterator<Map.Entry<String, String>> iter = crtTbl.getMapProp()
+ .entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> m = (Map.Entry) iter.next();
+ tbl.setSerdeParam(m.getKey(), m.getValue());
}
+ }
+ } else {
+ if (crtTbl.getFieldDelim() != null) {
+ tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
+ tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl
+ .getFieldDelim());
+ }
- parts = db.getPartitionNames(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), Short.MAX_VALUE);
+ if (crtTbl.getCollItemDelim() != null)
+ tbl
+ .setSerdeParam(Constants.COLLECTION_DELIM, crtTbl
+ .getCollItemDelim());
+ if (crtTbl.getMapKeyDelim() != null)
+ tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
+ if (crtTbl.getLineDelim() != null)
+ tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
+ }
+
+ /**
+ * For now, if the user specifies either the map or the collections
+ * delimiter, we infer the table to DynamicSerDe/TCTLSeparatedProtocol. In
+ * the future, we should infer this for any delimiters specified, but this
+ * will break older hive tables, so not for now.
+ */
+ if (crtTbl.getCollItemDelim() != null || crtTbl.getMapKeyDelim() != null) {
+ tbl
+ .setSerializationLib(org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe.class
+ .getName());
+ tbl.setSerdeParam(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT,
+ org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol.class
+ .getName());
+ }
+
+ if (crtTbl.getComment() != null)
+ tbl.setProperty("comment", crtTbl.getComment());
+ if (crtTbl.getLocation() != null)
+ tblStorDesc.setLocation(crtTbl.getLocation());
+
+ if (crtTbl.isSequenceFile()) {
+ tbl.setInputFormatClass(SequenceFileInputFormat.class);
+ tbl.setOutputFormatClass(SequenceFileOutputFormat.class);
+ } else {
+ tbl.setOutputFormatClass(IgnoreKeyTextOutputFormat.class);
+ tbl.setInputFormatClass(TextInputFormat.class);
+ }
- // write the results in the file
- try {
- DataOutput outStream = (DataOutput)fs.create(showParts.getResFile());
- Iterator<String> iterParts = parts.iterator();
- boolean firstCol = true;
- while (iterParts.hasNext())
- {
- if (!firstCol)
- outStream.write(terminator);
- outStream.write(iterParts.next().getBytes("UTF-8"));
- firstCol = false;
+ if (crtTbl.isExternal())
+ tbl.setProperty("EXTERNAL", "TRUE");
+
+ // If the sorted columns is a superset of bucketed columns, store this fact.
+ // It can be later used to
+ // optimize some group-by queries. Note that, the order does not matter as
+ // long as it in the first
+ // 'n' columns where 'n' is the length of the bucketed columns.
+ if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null)) {
+ List<String> bucketCols = tbl.getBucketCols();
+ List<Order> sortCols = tbl.getSortCols();
+
+ if ((sortCols.size() > 0) && (sortCols.size() >= bucketCols.size())) {
+ boolean found = true;
+
+ Iterator<String> iterBucketCols = bucketCols.iterator();
+ while (iterBucketCols.hasNext()) {
+ String bucketCol = iterBucketCols.next();
+ boolean colFound = false;
+ for (int i = 0; i < bucketCols.size(); i++) {
+ if (bucketCol.equals(sortCols.get(i).getCol())) {
+ colFound = true;
+ break;
+ }
+ }
+ if (colFound == false) {
+ found = false;
+ break;
}
- ((FSDataOutputStream)outStream).close();
- } catch (FileNotFoundException e) {
- LOG.info("show partitions: " + StringUtils.stringifyException(e));
- return 1;
- } catch (IOException e) {
- LOG.info("show partitions: " + StringUtils.stringifyException(e));
- return 1;
}
- return 0;
+ if (found)
+ tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE");
}
-
}
- catch (InvalidTableException e) {
- console.printError("Table " + e.getTableName() + " does not exist");
- LOG.debug(StringUtils.stringifyException(e));
- return 1;
- }
- catch (HiveException e) {
- console.printError("FAILED: Error in metadata: " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
- LOG.debug(StringUtils.stringifyException(e));
- return 1;
- } catch (Exception e) {
- console.printError("Failed with exception " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
- return (1);
+
+ // set owner, create_time etc
+ tbl.setOwner(System.getProperty("user.name"));
+ // set create time
+ tbl.getTTable().setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+ if (crtTbl.getCols() != null) {
+ tbl.setFields(crtTbl.getCols());
}
- assert false;
+
+ // create the table
+ db.createTable(tbl);
return 0;
}
}