You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2008/12/08 07:01:20 UTC

svn commit: r724266 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java

Author: zshao
Date: Sun Dec  7 22:01:20 2008
New Revision: 724266

URL: http://svn.apache.org/viewvc?rev=724266&view=rev
Log:
HIVE-102. Refactor DDLTask. (Johan Oskarsson through zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=724266&r1=724265&r2=724266&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sun Dec  7 22:01:20 2008
@@ -17,6 +17,8 @@
 
   IMPROVEMENTS
 
+    HIVE-102. Refactor DDLTask. (Johan Oskarsson through zshao)
+
     HIVE-85. New compression options for Hive. (Joydeep Sarma through zshao)
 
     HIVE-69. genMapRedTasks uses tree walker. (Namit through zshao)

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=724266&r1=724265&r2=724266&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sun Dec  7 22:01:20 2008
@@ -93,444 +93,511 @@
 
       createTableDesc crtTbl = work.getCreateTblDesc();
       if (crtTbl != null) {
+        return createTable(db, crtTbl);
+      }
 
-        // create the table
-        Table tbl = new Table(crtTbl.getTableName());
-        StorageDescriptor tblStorDesc = tbl.getTTable().getSd();
-        if (crtTbl.getBucketCols() != null)
-          tblStorDesc.setBucketCols(crtTbl.getBucketCols());
-        if (crtTbl.getSortCols() != null)
-          tbl.setSortCols(crtTbl.getSortCols());
-        if (crtTbl.getPartCols() != null)
-          tbl.setPartCols(crtTbl.getPartCols());
-        if (crtTbl.getNumBuckets() != -1)
-          tblStorDesc.setNumBuckets(crtTbl.getNumBuckets());
-
-        if (crtTbl.getSerName() != null) {
-        	tbl.setSerializationLib(crtTbl.getSerName());
-          if (crtTbl.getMapProp() != null) {
-            Iterator<Map.Entry<String, String>> iter = crtTbl.getMapProp().entrySet().iterator();
-            while (iter.hasNext()) {
-              Map.Entry<String, String> m = (Map.Entry)iter.next();
-              tbl.setSerdeParam(m.getKey(), m.getValue());
-            }
-          }
-        } 
-        else
-        {
-          if (crtTbl.getFieldDelim() != null)
-          {
-            tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
-            tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim());
-          }
-        
-          if (crtTbl.getCollItemDelim() != null)
-            tbl.setSerdeParam(Constants.COLLECTION_DELIM, crtTbl.getCollItemDelim());
-          if (crtTbl.getMapKeyDelim() != null)
-            tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
-          if (crtTbl.getLineDelim() != null)
-            tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
-        }
-        
-        /**
-         * For now, if the user specifies either the map or the collections delimiter, we infer the 
-         * table to DynamicSerDe/TCTLSeparatedProtocol.
-         * In the future, we should infer this for any delimiters specified, but this will break older
-         * hive tables, so not for now. 
-         */
-        if (crtTbl.getCollItemDelim() != null || crtTbl.getMapKeyDelim() != null) {
-          tbl.setSerializationLib(org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe.class.getName());
-          tbl.setSerdeParam(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol.class.getName());
-        }
+      dropTableDesc dropTbl = work.getDropTblDesc();
+      if (dropTbl != null) {
+        return dropTable(db, dropTbl);
+      }
 
+      alterTableDesc alterTbl = work.getAlterTblDesc();
+      if (alterTbl != null) {
+        return alterTable(db, alterTbl);
+      }
 
-        if (crtTbl.getComment() != null)
-          tbl.setProperty("comment", crtTbl.getComment());
-        if (crtTbl.getLocation() != null)
-          tblStorDesc.setLocation(crtTbl.getLocation());
-
-        if (crtTbl.isSequenceFile()) {
-          tbl.setInputFormatClass(SequenceFileInputFormat.class);
-          tbl.setOutputFormatClass(SequenceFileOutputFormat.class);
-        }
-        else {
-          tbl.setOutputFormatClass(IgnoreKeyTextOutputFormat.class);
-          tbl.setInputFormatClass(TextInputFormat.class);
-        } 
-
-        if (crtTbl.isExternal())
-          tbl.setProperty("EXTERNAL", "TRUE");
-
-        // If the sorted columns is a superset of bucketed columns, store this fact. It can be later used to
-        // optimize some group-by queries. Note that, the order does not matter as long as it in the first
-        // 'n' columns where 'n' is the length of the bucketed columns.
-        if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null))
-        {
-          List<String> bucketCols = tbl.getBucketCols();
-          List<Order> sortCols = tbl.getSortCols();
-
-          if ( (sortCols.size() > 0) && (sortCols.size() >= bucketCols.size()))
-          {
-            boolean found = true;
-
-            Iterator<String> iterBucketCols = bucketCols.iterator();
-            while (iterBucketCols.hasNext())
-            {
-              String bucketCol = iterBucketCols.next();
-              boolean colFound = false;
-              for (int i = 0; i < bucketCols.size(); i++)
-              {
-                if (bucketCol.equals(sortCols.get(i).getCol())) {
-                  colFound = true;
-                  break;
-                }
-              }
-              if (colFound == false)
-              {
-                found = false;
-                break;
-              }
-            }
-            if (found)
-              tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE");
-          }
-        }
-        
-        // set owner, create_time etc
-        tbl.setOwner(System.getProperty("user.name"));
-        // set create time
-        tbl.getTTable().setCreateTime((int) (System.currentTimeMillis()/1000));
+      descTableDesc descTbl = work.getDescTblDesc();
+      if (descTbl != null) {
+        return describeTable(db, fs, descTbl);
+      }
 
-        if(crtTbl.getCols() != null) {
-          tbl.setFields(crtTbl.getCols());
-        }
+      showTablesDesc showTbls = work.getShowTblsDesc();
+      if (showTbls != null) {
+        return showTables(db, fs, showTbls);
+      }
 
-        // create the table
-        db.createTable(tbl);
-        return 0;
+      showPartitionsDesc showParts = work.getShowPartsDesc();
+      if (showParts != null) {
+        return showPartitions(db, fs, showParts);
       }
 
-      dropTableDesc dropTbl = work.getDropTblDesc();
-      if (dropTbl != null) {
-        if(dropTbl.getPartSpecs() == null) {
-          // drop the table
-          db.dropTable(dropTbl.getTableName());
-        } else {
-          // drop partitions in the list
-          Table tbl  = db.getTable(dropTbl.getTableName());
-          List<Partition> parts = new ArrayList<Partition>();
-          for(HashMap<String, String> partSpec : dropTbl.getPartSpecs()) {
-            Partition part = db.getPartition(tbl, partSpec, false);
-            if(part == null) {
-              console.printInfo("Partition " + partSpec + " does not exist.");
-            } else {
-              parts.add(part);
-            }
-          }
-          // drop all existing partitions from the list
-          for (Partition partition : parts) {
-            console.printInfo("Dropping the partition " + partition.getName());
-            db.dropPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, 
-                dropTbl.getTableName(), 
-                partition.getValues(), 
-                true); //drop data for the partition
-          }
-        }
-        return 0;
+    } catch (InvalidTableException e) {
+      console.printError("Table " + e.getTableName() + " does not exist");
+      LOG.debug(StringUtils.stringifyException(e));
+      return 1;
+    } catch (HiveException e) {
+      console.printError("FAILED: Error in metadata: " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
+      LOG.debug(StringUtils.stringifyException(e));
+      return 1;
+    } catch (Exception e) {
+      console.printError("Failed with exception " +   e.getMessage(), "\n" + StringUtils.stringifyException(e));
+      return (1);
+    }
+    assert false;
+    return 0;
+  }
+
+  /**
+   * Write a list of partitions to a file.
+   * 
+   * @param db The database in question.
+   * @param fs FileSystem that will contain the file written.
+   * @param showParts These are the partitions we're interested in.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   * @throws HiveException Throws this exception if an unexpected error occurs.
+   */
+  private int showPartitions(Hive db, FileSystem fs,
+      showPartitionsDesc showParts) throws HiveException {
+    // get the partitions for the table and populate the output
+    String tabName = showParts.getTabName();
+    Table tbl = null;
+    List<String> parts = null;
+
+    tbl = db.getTable(tabName);
+
+    if (!tbl.isPartitioned()) {
+      console.printError("Table " + tabName + " is not a partitioned table");
+      return 1;
+    }
+
+    parts = db.getPartitionNames(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl
+        .getName(), Short.MAX_VALUE);
+
+    // write the results in the file
+    try {
+      DataOutput outStream = (DataOutput) fs.create(showParts.getResFile());
+      Iterator<String> iterParts = parts.iterator();
+      boolean firstCol = true;
+      while (iterParts.hasNext()) {
+        if (!firstCol)
+          outStream.write(terminator);
+        outStream.write(iterParts.next().getBytes("UTF-8"));
+        firstCol = false;
       }
+      ((FSDataOutputStream) outStream).close();
+    } catch (FileNotFoundException e) {
+      LOG.info("show partitions: " + StringUtils.stringifyException(e));
+      return 1;
+    } catch (IOException e) {
+      LOG.info("show partitions: " + StringUtils.stringifyException(e));
+      return 1;
+    }
+    return 0;
+  }
 
-      alterTableDesc alterTbl = work.getAlterTblDesc();
-      if (alterTbl != null) {
-        // alter the table
-        Table tbl = db.getTable(alterTbl.getOldName());
-        if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME)
-          tbl.getTTable().setTableName(alterTbl.getNewName());
-        else if(alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDCOLS) {
-          List<FieldSchema> newCols = alterTbl.getNewCols();
-          List<FieldSchema> oldCols = tbl.getCols();
-          if(tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
-            console.printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
-            tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
-            tbl.getTTable().getSd().setCols(newCols);
-          } 
-          else { 
-            // make sure the columns does not already exist
-            Iterator<FieldSchema> iterNewCols = newCols.iterator();
-            while (iterNewCols.hasNext()) {
-              FieldSchema newCol = iterNewCols.next();
-              String newColName  = newCol.getName();
-              Iterator<FieldSchema> iterOldCols = oldCols.iterator();
-              while (iterOldCols.hasNext()) {
-                String oldColName = iterOldCols.next().getName();
-                if (oldColName.equalsIgnoreCase(newColName)) { 
-                  console.printError("Column '" + newColName + "' exists");
-                  return 1;
-                }
-              }
-              oldCols.add(newCol);
-            }
-            tbl.getTTable().getSd().setCols(oldCols);
-          }
-        } 
-        else if(alterTbl.getOp() == alterTableDesc.alterTableTypes.REPLACECOLS) {
-          // change SerDe to MetadataTypedColumnsetSerDe if it is columnsetSerDe
-          if(tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
-            console.printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
-            tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
-          }
-          else if(!tbl.getSerializationLib().equals(MetadataTypedColumnsetSerDe.class.getName())) {
-            console.printError("Replace columns is not supported for this table. SerDe may be incompatible.");
-            return 1;
-          }
-          tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
-        }
-        else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDPROPS) {
-          tbl.getTTable().getParameters().putAll(alterTbl.getProps());
-        }
-        else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDEPROPS) {
-          tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(alterTbl.getProps());
-        }
-        else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDE) {
-          tbl.setSerializationLib(alterTbl.getSerdeName());
-          if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0))
-            tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(alterTbl.getProps());
-          // since serde is modified then do the appropriate things to reset columns etc
-          tbl.reinitSerDe();
-          tbl.setFields(Hive.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer()));
-        }
-        else {
-          console.printError("Unsupported Alter commnad");
-          return 1;
-        }
+  /**
+   * Write a list of the tables in the database to a file.
+   * 
+   * @param db The database in question.
+   * @param fs FileSystem that will contain the file written.
+   * @param showTbls These are the tables we're interested in.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   * @throws HiveException Throws this exception if an unexpected error occurs.
+   */
+  private int showTables(Hive db, FileSystem fs, showTablesDesc showTbls)
+      throws HiveException {
+    // get the tables for the desired pattenn - populate the output stream
+    List<String> tbls = null;
+    if (showTbls.getPattern() != null) {
+      LOG.info("pattern: " + showTbls.getPattern());
+      tbls = db.getTablesByPattern(showTbls.getPattern());
+      LOG.info("results : " + tbls.size());
+    } else
+      tbls = db.getAllTables();
+
+    // write the results in the file
+    try {
+      DataOutput outStream = (DataOutput) fs.create(showTbls.getResFile());
+      SortedSet<String> sortedTbls = new TreeSet<String>(tbls);
+      Iterator<String> iterTbls = sortedTbls.iterator();
+      boolean firstCol = true;
+      while (iterTbls.hasNext()) {
+        if (!firstCol)
+          outStream.write(separator);
+        outStream.write(iterTbls.next().getBytes("UTF-8"));
+        firstCol = false;
+      }
+      ((FSDataOutputStream) outStream).close();
+    } catch (FileNotFoundException e) {
+      LOG.info("show table: " + StringUtils.stringifyException(e));
+      return 1;
+    } catch (IOException e) {
+      LOG.info("show table: " + StringUtils.stringifyException(e));
+      return 1;
+    }
+    return 0;
+  }
 
-        // set last modified by properties
-        tbl.setProperty("last_modified_by", System.getProperty("user.name"));
-        tbl.setProperty("last_modified_time", Long.toString(System.currentTimeMillis()/1000));
-
-        try {
-          db.alterTable(alterTbl.getOldName(), tbl);
-        } catch (InvalidOperationException e) {
-          LOG.info("alter table: " + StringUtils.stringifyException(e));
-          return 1;
-        } catch (MetaException e) {
-          return 1;
-        } catch (TException e) {
-          return 1;
-        }
+  /**
+   * Write the description of a table to a file.
+   * 
+   * @param db The database in question.
+   * @param fs FileSystem that will contain the file written.
+   * @param descTbl This is the table we're interested in.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   * @throws HiveException Throws this exception if an unexpected error occurs.
+   */
+  private int describeTable(Hive db, FileSystem fs, descTableDesc descTbl)
+      throws HiveException {
+    String colPath = descTbl.getTableName();
+    String tableName = colPath.substring(0,
+        colPath.indexOf('.') == -1 ? colPath.length() : colPath.indexOf('.'));
+
+    // describe the table - populate the output stream
+    Table tbl = db.getTable(tableName, false);
+    Partition part = null;
+    try {
+      if (tbl == null) {
+        DataOutput outStream = (DataOutput) fs.open(descTbl.getResFile());
+        String errMsg = "Table " + tableName + " does not exist";
+        outStream.write(errMsg.getBytes("UTF-8"));
+        ((FSDataOutputStream) outStream).close();
         return 0;
       }
+      if (descTbl.getPartSpec() != null) {
+        part = db.getPartition(tbl, descTbl.getPartSpec(), false);
+        if (part == null) {
+          DataOutput outStream = (DataOutput) fs.open(descTbl.getResFile());
+          String errMsg = "Partition " + descTbl.getPartSpec() + " for table "
+              + tableName + " does not exist";
+          outStream.write(errMsg.getBytes("UTF-8"));
+          ((FSDataOutputStream) outStream).close();
+          return 0;
+        }
+        tbl = part.getTable();
+      }
+    } catch (FileNotFoundException e) {
+      LOG.info("describe table: " + StringUtils.stringifyException(e));
+      return 1;
+    } catch (IOException e) {
+      LOG.info("describe table: " + StringUtils.stringifyException(e));
+      return 1;
+    }
 
-      descTableDesc descTbl = work.getDescTblDesc();
-
-      if (descTbl != null) {
+    try {
 
-        String colPath = descTbl.getTableName();
-        String tableName = colPath.substring(0, colPath.indexOf('.') == -1 ? colPath.length() : colPath.indexOf('.'));
+      LOG.info("DDLTask: got data for " + tbl.getName());
 
-        // describe the table - populate the output stream
-        Table tbl = db.getTable(tableName, false);
-        Partition part = null;
-        try {
-          if(tbl == null) {
-            DataOutput outStream = (DataOutput)fs.open(descTbl.getResFile());
-            String errMsg = "Table " + tableName + " does not exist";
-            outStream.write(errMsg.getBytes("UTF-8"));
-            ((FSDataOutputStream)outStream).close();
-            return 0;
-          }
-          if(descTbl.getPartSpec() != null) {
-            part = db.getPartition(tbl, descTbl.getPartSpec(), false);
-            if(part == null) {
-              DataOutput outStream = (DataOutput)fs.open(descTbl.getResFile());
-              String errMsg = "Partition " + descTbl.getPartSpec() + " for table " + tableName + " does not exist";
-              outStream.write(errMsg.getBytes("UTF-8"));
-              ((FSDataOutputStream)outStream).close();
-              return 0;
-            }
-            tbl = part.getTable();
-          }
-        } catch (FileNotFoundException e) {
-          LOG.info("describe table: " + StringUtils.stringifyException(e));
-          return 1;
+      // write the results in the file
+      DataOutput os = (DataOutput) fs.create(descTbl.getResFile());
+      List<FieldSchema> cols = null;
+      if (colPath.equals(tableName)) {
+        cols = tbl.getCols();
+        if (part != null) {
+          cols = part.getTPartition().getSd().getCols();
         }
-        catch (IOException e) {
-          LOG.info("describe table: " + StringUtils.stringifyException(e));
-          return 1;
+      } else {
+        cols = db.getFieldsFromDeserializer(colPath, tbl.getDeserializer());
+      }
+
+      Iterator<FieldSchema> iterCols = cols.iterator();
+      boolean firstCol = true;
+      while (iterCols.hasNext()) {
+        if (!firstCol)
+          os.write(terminator);
+        FieldSchema col = iterCols.next();
+        os.write(col.getName().getBytes("UTF-8"));
+        os.write(separator);
+        os.write(col.getType().getBytes("UTF-8"));
+        if (col.getComment() != null) {
+          os.write(separator);
+          os.write(singleQuote);
+          os.write(col.getComment().getBytes("UTF-8"));
+          os.write(singleQuote);
         }
-        
-        try {
+        firstCol = false;
+      }
 
-          LOG.info("DDLTask: got data for " +  tbl.getName());
-          
-          // write the results in the file
-          DataOutput os = (DataOutput)fs.create(descTbl.getResFile());
-          List<FieldSchema> cols = null;
-          if (colPath.equals(tableName)) {
-            cols = tbl.getCols();
-            if (part != null) {
-              cols = part.getTPartition().getSd().getCols();
-            }
-          }
-          else {
-            cols = db.getFieldsFromDeserializer(colPath, tbl.getDeserializer());
+      if (tableName.equals(colPath)) {
+        // also return the partitioning columns
+        List<FieldSchema> partCols = tbl.getPartCols();
+        Iterator<FieldSchema> iterPartCols = partCols.iterator();
+        while (iterPartCols.hasNext()) {
+          os.write(terminator);
+          FieldSchema col = iterPartCols.next();
+          os.write(col.getName().getBytes("UTF-8"));
+          os.write(separator);
+          os.write(col.getType().getBytes("UTF-8"));
+          if (col.getComment() != null) {
+            os.write(separator);
+            os.write(col.getComment().getBytes("UTF-8"));
           }
+        }
 
-          Iterator<FieldSchema> iterCols = cols.iterator();
-          boolean firstCol = true;
-          while (iterCols.hasNext())
-          {
-            if (!firstCol)
-              os.write(terminator);
-            FieldSchema col = iterCols.next();
-            os.write(col.getName().getBytes("UTF-8"));
-            os.write(separator);
-            os.write(col.getType().getBytes("UTF-8"));
-            if (col.getComment() != null)
-            {
-              os.write(separator);
-              os.write(singleQuote);
-              os.write(col.getComment().getBytes("UTF-8"));
-              os.write(singleQuote);
-            }
-            firstCol = false;
+        // if extended desc table then show the complete details of the table
+        if (descTbl.isExt()) {
+          if (part != null) {
+            // show partition informatio
+            os.write("\n\nDetailed Partition Information:\n".getBytes("UTF-8"));
+            os.write(part.getTPartition().toString().getBytes("UTF-8"));
+          } else {
+            os.write("\nDetailed Table Information:\n".getBytes("UTF-8"));
+            os.write(tbl.getTTable().toString().getBytes("UTF-8"));
           }
+        }
+      }
 
-          if (tableName.equals(colPath)) {
-            // also return the partitioning columns
-            List<FieldSchema> partCols = tbl.getPartCols();
-            Iterator<FieldSchema> iterPartCols = partCols.iterator();
-            while (iterPartCols.hasNext())
-            {
-              os.write(terminator);
-              FieldSchema col = iterPartCols.next();
-              os.write(col.getName().getBytes("UTF-8"));
-              os.write(separator);
-              os.write(col.getType().getBytes("UTF-8"));
-              if (col.getComment() != null)
-              {
-                os.write(separator);
-                os.write(col.getComment().getBytes("UTF-8"));
-              }
-            }
-          
-            // if extended desc table then show the complete details of the table
-            if(descTbl.isExt()) {
-              if(part != null) {
-                // show partition informatio
-                os.write("\n\nDetailed Partition Information:\n".getBytes("UTF-8"));
-                os.write(part.getTPartition().toString().getBytes("UTF-8"));
-              } else {
-                os.write("\nDetailed Table Information:\n".getBytes("UTF-8"));
-                os.write(tbl.getTTable().toString().getBytes("UTF-8"));
-              }
+      LOG.info("DDLTask: written data for " + tbl.getName());
+      ((FSDataOutputStream) os).close();
+
+    } catch (FileNotFoundException e) {
+      LOG.info("describe table: " + StringUtils.stringifyException(e));
+      return 1;
+    } catch (IOException e) {
+      LOG.info("describe table: " + StringUtils.stringifyException(e));
+      return 1;
+    }
+    return 0;
+  }
+
+  /**
+   * Alter a given table.
+   * 
+   * @param db The database in question.
+   * @param alterTbl This is the table we're altering.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   * @throws HiveException Throws this exception if an unexpected error occurs.
+   */
+  private int alterTable(Hive db, alterTableDesc alterTbl) throws HiveException {
+    // alter the table
+    Table tbl = db.getTable(alterTbl.getOldName());
+    if (alterTbl.getOp() == alterTableDesc.alterTableTypes.RENAME)
+      tbl.getTTable().setTableName(alterTbl.getNewName());
+    else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDCOLS) {
+      List<FieldSchema> newCols = alterTbl.getNewCols();
+      List<FieldSchema> oldCols = tbl.getCols();
+      if (tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
+        console
+            .printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
+        tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+        tbl.getTTable().getSd().setCols(newCols);
+      } else {
+        // make sure the columns does not already exist
+        Iterator<FieldSchema> iterNewCols = newCols.iterator();
+        while (iterNewCols.hasNext()) {
+          FieldSchema newCol = iterNewCols.next();
+          String newColName = newCol.getName();
+          Iterator<FieldSchema> iterOldCols = oldCols.iterator();
+          while (iterOldCols.hasNext()) {
+            String oldColName = iterOldCols.next().getName();
+            if (oldColName.equalsIgnoreCase(newColName)) {
+              console.printError("Column '" + newColName + "' exists");
+              return 1;
             }
           }
-
-          LOG.info("DDLTask: written data for " +  tbl.getName());
-          ((FSDataOutputStream)os).close();
-          
-        } catch (FileNotFoundException e) {
-          LOG.info("describe table: " + StringUtils.stringifyException(e));
-          return 1;
-        }
-        catch (IOException e) {
-          LOG.info("describe table: " + StringUtils.stringifyException(e));
-          return 1;
+          oldCols.add(newCol);
         }
-        return 0;
+        tbl.getTTable().getSd().setCols(oldCols);
+      }
+    } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.REPLACECOLS) {
+      // change SerDe to MetadataTypedColumnsetSerDe if it is columnsetSerDe
+      if (tbl.getSerializationLib().equals(columnsetSerDe.class.getName())) {
+        console
+            .printInfo("Replacing columns for columnsetSerDe and changing to typed SerDe");
+        tbl.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+      } else if (!tbl.getSerializationLib().equals(
+          MetadataTypedColumnsetSerDe.class.getName())) {
+        console
+            .printError("Replace columns is not supported for this table. SerDe may be incompatible.");
+        return 1;
       }
+      tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
+    } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDPROPS) {
+      tbl.getTTable().getParameters().putAll(alterTbl.getProps());
+    } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDEPROPS) {
+      tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(
+          alterTbl.getProps());
+    } else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDE) {
+      tbl.setSerializationLib(alterTbl.getSerdeName());
+      if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0))
+        tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(
+            alterTbl.getProps());
+      // since serde is modified then do the appropriate things to reset columns
+      // etc
+      tbl.reinitSerDe();
+      tbl.setFields(Hive.getFieldsFromDeserializer(tbl.getName(), tbl
+          .getDeserializer()));
+    } else {
+      console.printError("Unsupported Alter commnad");
+      return 1;
+    }
 
-      showTablesDesc showTbls = work.getShowTblsDesc();
-      if (showTbls != null) {
-        // get the tables for the desired pattenn - populate the output stream
-        List<String> tbls = null;
-        if (showTbls.getPattern() != null)
-        {
-          LOG.info("pattern: " + showTbls.getPattern());
-          tbls = db.getTablesByPattern(showTbls.getPattern());
-          LOG.info("results : " + tbls.size());
-        }
-        else
-          tbls = db.getAllTables();
-        
-        // write the results in the file
-        try {
-          DataOutput outStream = (DataOutput)fs.create(showTbls.getResFile());
-          SortedSet<String> sortedTbls = new TreeSet<String>(tbls);
-          Iterator<String> iterTbls = sortedTbls.iterator();
-          boolean firstCol = true;
-          while (iterTbls.hasNext())
-          {
-            if (!firstCol)
-              outStream.write(separator);
-            outStream.write(iterTbls.next().getBytes("UTF-8"));
-            firstCol = false;
-          }
-          ((FSDataOutputStream)outStream).close();
-        } catch (FileNotFoundException e) {
-          LOG.info("show table: " + StringUtils.stringifyException(e));
-          return 1;
-        } catch (IOException e) {
-          LOG.info("show table: " + StringUtils.stringifyException(e));
-          return 1;
+    // set last modified by properties
+    tbl.setProperty("last_modified_by", System.getProperty("user.name"));
+    tbl.setProperty("last_modified_time", Long.toString(System
+        .currentTimeMillis() / 1000));
+
+    try {
+      db.alterTable(alterTbl.getOldName(), tbl);
+    } catch (InvalidOperationException e) {
+      LOG.info("alter table: " + StringUtils.stringifyException(e));
+      return 1;
+    } catch (MetaException e) {
+      return 1;
+    } catch (TException e) {
+      return 1;
+    }
+    return 0;
+  }
+
+  /**
+   * Drop a given table.
+   * 
+   * @param db The database in question.
+   * @param dropTbl This is the table we're dropping.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   * @throws HiveException Throws this exception if an unexpected error occurs.
+   */
+  private int dropTable(Hive db, dropTableDesc dropTbl) throws HiveException {
+    if (dropTbl.getPartSpecs() == null) {
+      // drop the table
+      db.dropTable(dropTbl.getTableName());
+    } else {
+      // drop partitions in the list
+      Table tbl = db.getTable(dropTbl.getTableName());
+      List<Partition> parts = new ArrayList<Partition>();
+      for (HashMap<String, String> partSpec : dropTbl.getPartSpecs()) {
+        Partition part = db.getPartition(tbl, partSpec, false);
+        if (part == null) {
+          console.printInfo("Partition " + partSpec + " does not exist.");
+        } else {
+          parts.add(part);
         }
-        return 0;
       }
+      // drop all existing partitions from the list
+      for (Partition partition : parts) {
+        console.printInfo("Dropping the partition " + partition.getName());
+        db.dropPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, dropTbl
+            .getTableName(), partition.getValues(), true); // drop data for the
+                                                           // partition
+      }
+    }
+    return 0;
+  }
 
-      showPartitionsDesc showParts = work.getShowPartsDesc();
-      if (showParts != null) {
-        // get the partitions for the table and populate the output
-        String tabName = showParts.getTabName();
-        Table tbl = null;
-        List<String> parts = null;
-
-        tbl = db.getTable(tabName);
-
-        if (!tbl.isPartitioned()) {
-          console.printError("Table " + tabName + " is not a partitioned table");
-          return 1;
+  /**
+   * Create a new table.
+   * 
+   * @param db The database in question.
+   * @param crtTbl This is the table we're creating.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   * @throws HiveException Throws this exception if an unexpected error occurs.
+   */
+  private int createTable(Hive db, createTableDesc crtTbl) throws HiveException {
+    // create the table
+    Table tbl = new Table(crtTbl.getTableName());
+    StorageDescriptor tblStorDesc = tbl.getTTable().getSd();
+    if (crtTbl.getBucketCols() != null)
+      tblStorDesc.setBucketCols(crtTbl.getBucketCols());
+    if (crtTbl.getSortCols() != null)
+      tbl.setSortCols(crtTbl.getSortCols());
+    if (crtTbl.getPartCols() != null)
+      tbl.setPartCols(crtTbl.getPartCols());
+    if (crtTbl.getNumBuckets() != -1)
+      tblStorDesc.setNumBuckets(crtTbl.getNumBuckets());
+
+    if (crtTbl.getSerName() != null) {
+      tbl.setSerializationLib(crtTbl.getSerName());
+      if (crtTbl.getMapProp() != null) {
+        Iterator<Map.Entry<String, String>> iter = crtTbl.getMapProp()
+            .entrySet().iterator();
+        while (iter.hasNext()) {
+          Map.Entry<String, String> m = (Map.Entry) iter.next();
+          tbl.setSerdeParam(m.getKey(), m.getValue());
         }
+      }
+    } else {
+      if (crtTbl.getFieldDelim() != null) {
+        tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim());
+        tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl
+            .getFieldDelim());
+      }
 
-        parts = db.getPartitionNames(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbl.getName(), Short.MAX_VALUE);
+      if (crtTbl.getCollItemDelim() != null)
+        tbl
+            .setSerdeParam(Constants.COLLECTION_DELIM, crtTbl
+                .getCollItemDelim());
+      if (crtTbl.getMapKeyDelim() != null)
+        tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim());
+      if (crtTbl.getLineDelim() != null)
+        tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim());
+    }
+
+    /**
+     * For now, if the user specifies either the map or the collections
+     * delimiter, we infer the table to DynamicSerDe/TCTLSeparatedProtocol. In
+     * the future, we should infer this for any delimiters specified, but this
+     * will break older hive tables, so not for now.
+     */
+    if (crtTbl.getCollItemDelim() != null || crtTbl.getMapKeyDelim() != null) {
+      tbl
+          .setSerializationLib(org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe.class
+              .getName());
+      tbl.setSerdeParam(
+          org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT,
+          org.apache.hadoop.hive.serde2.thrift.TCTLSeparatedProtocol.class
+              .getName());
+    }
+
+    if (crtTbl.getComment() != null)
+      tbl.setProperty("comment", crtTbl.getComment());
+    if (crtTbl.getLocation() != null)
+      tblStorDesc.setLocation(crtTbl.getLocation());
+
+    if (crtTbl.isSequenceFile()) {
+      tbl.setInputFormatClass(SequenceFileInputFormat.class);
+      tbl.setOutputFormatClass(SequenceFileOutputFormat.class);
+    } else {
+      tbl.setOutputFormatClass(IgnoreKeyTextOutputFormat.class);
+      tbl.setInputFormatClass(TextInputFormat.class);
+    }
 
-        // write the results in the file
-        try {
-          DataOutput outStream = (DataOutput)fs.create(showParts.getResFile());
-          Iterator<String> iterParts = parts.iterator();
-          boolean firstCol = true;
-          while (iterParts.hasNext())
-          {
-            if (!firstCol)
-              outStream.write(terminator);
-            outStream.write(iterParts.next().getBytes("UTF-8"));
-            firstCol = false;
+    if (crtTbl.isExternal())
+      tbl.setProperty("EXTERNAL", "TRUE");
+
+    // If the sorted columns is a superset of bucketed columns, store this fact.
+    // It can be later used to
+    // optimize some group-by queries. Note that, the order does not matter as
+    // long as it in the first
+    // 'n' columns where 'n' is the length of the bucketed columns.
+    if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null)) {
+      List<String> bucketCols = tbl.getBucketCols();
+      List<Order> sortCols = tbl.getSortCols();
+
+      if ((sortCols.size() > 0) && (sortCols.size() >= bucketCols.size())) {
+        boolean found = true;
+
+        Iterator<String> iterBucketCols = bucketCols.iterator();
+        while (iterBucketCols.hasNext()) {
+          String bucketCol = iterBucketCols.next();
+          boolean colFound = false;
+          for (int i = 0; i < bucketCols.size(); i++) {
+            if (bucketCol.equals(sortCols.get(i).getCol())) {
+              colFound = true;
+              break;
+            }
+          }
+          if (colFound == false) {
+            found = false;
+            break;
           }
-          ((FSDataOutputStream)outStream).close();
-        } catch (FileNotFoundException e) {
-          LOG.info("show partitions: " + StringUtils.stringifyException(e));
-          return 1;
-        } catch (IOException e) {
-          LOG.info("show partitions: " + StringUtils.stringifyException(e));
-          return 1;
         }
-        return 0;
+        if (found)
+          tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE");
       }
-
     }
-    catch (InvalidTableException e) {
-      console.printError("Table " + e.getTableName() + " does not exist");
-      LOG.debug(StringUtils.stringifyException(e));
-      return 1;
-    }
-    catch (HiveException e) {
-      console.printError("FAILED: Error in metadata: " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
-      LOG.debug(StringUtils.stringifyException(e));
-      return 1;
-    } catch (Exception e) {
-      console.printError("Failed with exception " +   e.getMessage(), "\n" + StringUtils.stringifyException(e));
-      return (1);
+
+    // set owner, create_time etc
+    tbl.setOwner(System.getProperty("user.name"));
+    // set create time
+    tbl.getTTable().setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+    if (crtTbl.getCols() != null) {
+      tbl.setFields(crtTbl.getCols());
     }
-    assert false;
+
+    // create the table
+    db.createTable(tbl);
     return 0;
   }
 }