You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/07/31 15:56:17 UTC

[2/2] hive git commit: HIVE-20241: Support partitioning spec in CTAS statements (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

HIVE-20241: Support partitioning spec in CTAS statements (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3ed7d6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3ed7d6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3ed7d6f

Branch: refs/heads/master
Commit: a3ed7d6fb2ac131da360fdd99d8361dde5ee5b48
Parents: 48ce7ce
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed Jul 25 17:28:45 2018 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue Jul 31 08:55:33 2018 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   5 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   1 +
 .../hive/ql/optimizer/GenMapRedUtils.java       |   9 +-
 .../apache/hadoop/hive/ql/parse/HiveParser.g    |  26 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 485 ++++++----
 .../hadoop/hive/ql/parse/TaskCompiler.java      |  40 +-
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |  48 +-
 .../hive/ql/plan/DynamicPartitionCtx.java       |  34 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |  10 +
 .../hive/ql/exec/TestFileSinkOperator.java      |   2 +-
 .../queries/clientpositive/partition_ctas.q     |  51 +
 .../clientpositive/llap/partition_ctas.q.out    | 946 +++++++++++++++++++
 13 files changed, 1446 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 125ad19..4d2eacb 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -599,6 +599,7 @@ minillaplocal.query.files=\
   orc_ppd_decimal.q,\
   orc_ppd_timestamp.q,\
   order_null.q,\
+  partition_ctas.q,\
   partition_multilevels.q,\
   partition_shared_scan.q,\
   partition_pruning.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 397cee2..939ef36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -48,16 +48,15 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.concurrent.ExecutionException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -289,8 +288,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.stringtemplate.v4.ST;
 
-import static org.apache.commons.lang.StringUtils.join;
-
 /**
  * DDLTask implementation.
  *

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index a1f5133..d2c04e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -322,6 +322,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           }
         }
       }
+
       // Multi-file load is for dynamic partitions when some partitions do not
       // need to merge and they can simply be moved to the target directory.
       // This is also used for MM table conversion.

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index fa92385..d887124 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1489,10 +1489,15 @@ public final class GenMapRedUtils {
     boolean truncate = false;
     if (mvWork.getLoadTableWork() != null) {
       statsWork = new BasicStatsWork(mvWork.getLoadTableWork());
-      String tableName = mvWork.getLoadTableWork().getTable().getTableName();
       truncate = mvWork.getLoadTableWork().getReplace();
+      String tableName = mvWork.getLoadTableWork().getTable().getTableName();
       try {
-        table = Hive.get().getTable(SessionState.get().getCurrentDatabase(), tableName);
+        // For partitioned CTAS, the table has not been created, but we can retrieve it
+        // from the loadTableWork. For rest of query types, we just retrieve it from
+        // metastore.
+        table = mvWork.getLoadTableWork().getMdTable() != null ?
+            mvWork.getLoadTableWork().getMdTable() :
+            Hive.get().getTable(SessionState.get().getCurrentDatabase(), tableName);
       } catch (HiveException e) {
         throw new RuntimeException("unexpected; table should be present already..: " + tableName, e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 6be48ca..49f5487 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -215,6 +215,7 @@ TOK_TABCOLLIST;
 TOK_TABCOL;
 TOK_TABLECOMMENT;
 TOK_TABLEPARTCOLS;
+TOK_TABLEPARTCOLNAMES;
 TOK_TABLEROWFORMAT;
 TOK_TABLEROWFORMATFIELD;
 TOK_TABLEROWFORMATCOLLITEMS;
@@ -1088,7 +1089,7 @@ createTableStatement
          tablePropertiesPrefixed?
        | (LPAREN columnNameTypeOrConstraintList RPAREN)?
          tableComment?
-         tablePartition?
+         createTablePartitionSpec?
          tableBuckets?
          tableSkewed?
          tableRowFormat?
@@ -1101,7 +1102,7 @@ createTableStatement
          ^(TOK_LIKETABLE $likeName?)
          columnNameTypeOrConstraintList?
          tableComment?
-         tablePartition?
+         createTablePartitionSpec?
          tableBuckets?
          tableSkewed?
          tableRowFormat?
@@ -1987,13 +1988,28 @@ tableComment
       KW_COMMENT comment=StringLiteral  -> ^(TOK_TABLECOMMENT $comment)
     ;
 
-tablePartition
-@init { pushMsg("table partition specification", state); }
+createTablePartitionSpec
+@init { pushMsg("create table partition specification", state); }
 @after { popMsg(state); }
-    : KW_PARTITIONED KW_BY LPAREN columnNameTypeConstraint (COMMA columnNameTypeConstraint)* RPAREN
+    : KW_PARTITIONED KW_BY LPAREN (opt1 = createTablePartitionColumnTypeSpec | opt2 = createTablePartitionColumnSpec) RPAREN
+    -> {$opt1.tree != null}? $opt1
+    -> $opt2
+    ;
+
+createTablePartitionColumnTypeSpec
+@init { pushMsg("create table partition specification", state); }
+@after { popMsg(state); }
+    : columnNameTypeConstraint (COMMA columnNameTypeConstraint)*
     -> ^(TOK_TABLEPARTCOLS columnNameTypeConstraint+)
     ;
 
+createTablePartitionColumnSpec
+@init { pushMsg("create table partition specification", state); }
+@after { popMsg(state); }
+    : columnName (COMMA columnName)*
+    -> ^(TOK_TABLEPARTCOLNAMES columnName+)
+    ;
+
 tableBuckets
 @init { pushMsg("table buckets specification", state); }
 @after { popMsg(state); }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index b5adf1b..b28cf98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -38,6 +38,8 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -55,8 +57,9 @@ import org.antlr.runtime.tree.TreeWizard;
 import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -6953,7 +6956,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private void genPartnCols(String dest, Operator input, QB qb,
-                            TableDesc table_desc, Table dest_tab, SortBucketRSCtx ctx) throws SemanticException {
+      TableDesc table_desc, Table dest_tab, SortBucketRSCtx ctx) throws SemanticException {
     boolean enforceBucketing = false;
     ArrayList<ExprNodeDesc> partnColsNoConvert = new ArrayList<ExprNodeDesc>();
 
@@ -6986,13 +6989,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   @SuppressWarnings("unchecked")
-  private void setStatsForNonNativeTable(Table tab) throws SemanticException {
-    String tableName = DDLSemanticAnalyzer.getDotName(new String[] { tab.getDbName(),
-        tab.getTableName() });
+  private void setStatsForNonNativeTable(String dbName, String tableName) throws SemanticException {
+    String qTableName = DDLSemanticAnalyzer.getDotName(new String[] { dbName,
+        tableName });
     AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableTypes.DROPPROPS, null, false);
     HashMap<String, String> mapProp = new HashMap<>();
     mapProp.put(StatsSetupConst.COLUMN_STATS_ACCURATE, null);
-    alterTblDesc.setOldName(tableName);
+    alterTblDesc.setOldName(qTableName);
     alterTblDesc.setProps(mapProp);
     alterTblDesc.setDropIfExists(true);
     this.rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc)));
@@ -7227,21 +7230,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBMetaData qbm = qb.getMetaData();
-    Integer dest_type = qbm.getDestTypeForAlias(dest);
+    Integer destType = qbm.getDestTypeForAlias(dest);
 
-    Table dest_tab = null; // destination table if any
+    Table destinationTable = null; // destination table if any
     boolean destTableIsTransactional;     // true for full ACID table and MM table
     boolean destTableIsFullAcid; // should the destination table be written to using ACID
     boolean destTableIsTemporary = false;
     boolean destTableIsMaterialization = false;
-    Partition dest_part = null;// destination partition if any
+    Partition destinationPartition = null;// destination partition if any
     Path queryTmpdir = null; // the intermediate destination directory
-    Path dest_path = null; // the final destination directory
-    TableDesc table_desc = null;
+    Path destinationPath = null; // the final destination directory
+    TableDesc tableDescriptor = null;
     int currentTableId = 0;
     boolean isLocal = false;
     SortBucketRSCtx rsCtx = new SortBucketRSCtx();
     DynamicPartitionCtx dpCtx = null;
+    Table partitionedCTASOrMVTable = null; // destination partitioned CTAS or MV table if any
     LoadTableDesc ltd = null;
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
@@ -7249,24 +7253,24 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     Long writeId = null;
     HiveTxnManager txnMgr = getTxnMgr();
 
-    switch (dest_type.intValue()) {
+    switch (destType.intValue()) {
     case QBMetaData.DEST_TABLE: {
 
-      dest_tab = qbm.getDestTableForAlias(dest);
-      destTableIsTransactional = AcidUtils.isTransactionalTable(dest_tab);
-      destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab);
-      destTableIsTemporary = dest_tab.isTemporary();
+      destinationTable = qbm.getDestTableForAlias(dest);
+      destTableIsTransactional = AcidUtils.isTransactionalTable(destinationTable);
+      destTableIsFullAcid = AcidUtils.isFullAcidTable(destinationTable);
+      destTableIsTemporary = destinationTable.isTemporary();
 
       // Is the user trying to insert into a external tables
-      checkExternalTable(dest_tab);
+      checkExternalTable(destinationTable);
 
       partSpec = qbm.getPartSpecForAlias(dest);
-      dest_path = dest_tab.getPath();
+      destinationPath = destinationTable.getPath();
 
-      checkImmutableTable(qb, dest_tab, dest_path, false);
+      checkImmutableTable(qb, destinationTable, destinationPath, false);
 
       // check for partition
-      List<FieldSchema> parts = dest_tab.getPartitionKeys();
+      List<FieldSchema> parts = destinationTable.getPartitionKeys();
       if (parts != null && parts.size() > 0) { // table is partitioned
         if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
           throw new SemanticException(generateErrorMessage(
@@ -7275,8 +7279,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         dpCtx = qbm.getDPCtx(dest);
         if (dpCtx == null) {
-          dest_tab.validatePartColumnNames(partSpec, false);
-          dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
+          destinationTable.validatePartColumnNames(partSpec, false);
+          dpCtx = new DynamicPartitionCtx(partSpec,
               conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
               conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
           qbm.setDPCtx(dest, dpCtx);
@@ -7284,76 +7288,76 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
 
       // Check for dynamic partitions.
-      dpCtx = checkDynPart(qb, qbm, dest_tab, partSpec, dest);
+      dpCtx = checkDynPart(qb, qbm, destinationTable, partSpec, dest);
       if (dpCtx != null && dpCtx.getSPPath() != null) {
-        dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
+        destinationPath = new Path(destinationTable.getPath(), dpCtx.getSPPath());
       }
 
-      boolean isNonNativeTable = dest_tab.isNonNative();
-      isMmTable = AcidUtils.isInsertOnlyTable(dest_tab.getParameters());
+      boolean isNonNativeTable = destinationTable.isNonNative();
+      isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
       if (isNonNativeTable || isMmTable) {
-        queryTmpdir = dest_path;
+        queryTmpdir = destinationPath;
       } else {
-        queryTmpdir = ctx.getTempDirForFinalJobPath(dest_path);
+        queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath);
       }
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE specifying " + queryTmpdir
-            + " from " + dest_path);
+            + " from " + destinationPath);
       }
       if (dpCtx != null) {
         // set the root of the temporary path where dynamic partition columns will populate
         dpCtx.setRootPath(queryTmpdir);
       }
       // this table_desc does not contain the partitioning columns
-      table_desc = Utilities.getTableDesc(dest_tab);
+      tableDescriptor = Utilities.getTableDesc(destinationTable);
 
       // Add NOT NULL constraint check
       input = genConstraintsPlan(dest, qb, input);
 
       // Add sorting/bucketing if needed
-      input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);
+      input = genBucketingSortingDest(dest, input, qb, tableDescriptor, destinationTable, rsCtx);
 
-      idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
+      idToTableNameMap.put(String.valueOf(destTableId), destinationTable.getTableName());
       currentTableId = destTableId;
       destTableId++;
 
-      lbCtx = constructListBucketingCtx(dest_tab.getSkewedColNames(),
-          dest_tab.getSkewedColValues(), dest_tab.getSkewedColValueLocationMaps(),
-          dest_tab.isStoredAsSubDirectories(), conf);
+      lbCtx = constructListBucketingCtx(destinationTable.getSkewedColNames(),
+          destinationTable.getSkewedColValues(), destinationTable.getSkewedColValueLocationMaps(),
+          destinationTable.isStoredAsSubDirectories(), conf);
 
       // Create the work for moving the table
       // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
       if (!isNonNativeTable) {
         AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
         if (destTableIsFullAcid) {
-          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
+          acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest);
           //todo: should this be done for MM?  is it ok to use CombineHiveInputFormat with MM
-          checkAcidConstraints(qb, table_desc, dest_tab);
+          checkAcidConstraints(qb, tableDescriptor, destinationTable);
         }
         try {
           if (ctx.getExplainConfig() != null) {
             writeId = null; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
           } else {
             if (isMmTable) {
-              writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName());
+              writeId = txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName());
             } else {
               writeId = acidOp == Operation.NOT_ACID ? null :
-                      txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName());
+                      txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName());
             }
           }
         } catch (LockException ex) {
           throw new SemanticException("Failed to allocate write Id", ex);
         }
         boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
-            dest_tab.getDbName(), dest_tab.getTableName());
-        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, writeId);
+            destinationTable.getDbName(), destinationTable.getTableName());
+        ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx, acidOp, isReplace, writeId);
         if (writeId != null) {
           ltd.setStmtId(txnMgr.getCurrentStmtId());
         }
         // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
         // deltas and base and leave them up to the cleaner to clean up
         boolean isInsertInto = qb.getParseInfo().isInsertIntoTable(
-            dest_tab.getDbName(), dest_tab.getTableName());
+            destinationTable.getDbName(), destinationTable.getTableName());
         LoadFileType loadType = (!isInsertInto && !destTableIsTransactional)
             ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
         ltd.setLoadFileType(loadType);
@@ -7363,88 +7367,88 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       } else {
         // This is a non-native table.
         // We need to set stats as inaccurate.
-        setStatsForNonNativeTable(dest_tab);
+        setStatsForNonNativeTable(destinationTable.getDbName(), destinationTable.getTableName());
         // true if it is insert overwrite.
         boolean overwrite = !qb.getParseInfo().isInsertIntoTable(
-            String.format("%s.%s", dest_tab.getDbName(), dest_tab.getTableName()));
-        createPreInsertDesc(dest_tab, overwrite);
+            String.format("%s.%s", destinationTable.getDbName(), destinationTable.getTableName()));
+        createPreInsertDesc(destinationTable, overwrite);
 
-        ltd = new LoadTableDesc(queryTmpdir, table_desc, partSpec == null ? ImmutableMap.of() : partSpec);
+        ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, partSpec == null ? ImmutableMap.of() : partSpec);
         ltd.setInsertOverwrite(overwrite);
         ltd.setLoadFileType(overwrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING);
       }
 
-      if (dest_tab.isMaterializedView()) {
+      if (destinationTable.isMaterializedView()) {
         materializedViewUpdateDesc = new MaterializedViewDesc(
-            dest_tab.getFullyQualifiedName(), false, false, true);
+            destinationTable.getFullyQualifiedName(), false, false, true);
       }
 
       WriteEntity output = generateTableWriteEntity(
-          dest, dest_tab, partSpec, ltd, dpCtx, isNonNativeTable);
+          dest, destinationTable, partSpec, ltd, dpCtx, isNonNativeTable);
       ctx.getLoadTableOutputMap().put(ltd, output);
       break;
     }
     case QBMetaData.DEST_PARTITION: {
 
-      dest_part = qbm.getDestPartitionForAlias(dest);
-      dest_tab = dest_part.getTable();
-      destTableIsTransactional = AcidUtils.isTransactionalTable(dest_tab);
-      destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab);
+      destinationPartition = qbm.getDestPartitionForAlias(dest);
+      destinationTable = destinationPartition.getTable();
+      destTableIsTransactional = AcidUtils.isTransactionalTable(destinationTable);
+      destTableIsFullAcid = AcidUtils.isFullAcidTable(destinationTable);
 
-      checkExternalTable(dest_tab);
+      checkExternalTable(destinationTable);
 
-      Path tabPath = dest_tab.getPath();
-      Path partPath = dest_part.getDataLocation();
+      Path tabPath = destinationTable.getPath();
+      Path partPath = destinationPartition.getDataLocation();
 
-      checkImmutableTable(qb, dest_tab, partPath, true);
+      checkImmutableTable(qb, destinationTable, partPath, true);
 
       // if the table is in a different dfs than the partition,
       // replace the partition's dfs with the table's dfs.
-      dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
+      destinationPath = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
           .getAuthority(), partPath.toUri().getPath());
 
-      isMmTable = AcidUtils.isInsertOnlyTable(dest_tab.getParameters());
-      queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForFinalJobPath(dest_path);
+      isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters());
+      queryTmpdir = isMmTable ? destinationPath : ctx.getTempDirForFinalJobPath(destinationPath);
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying "
-            + queryTmpdir + " from " + dest_path);
+            + queryTmpdir + " from " + destinationPath);
       }
-      table_desc = Utilities.getTableDesc(dest_tab);
+      tableDescriptor = Utilities.getTableDesc(destinationTable);
 
       // Add NOT NULL constraint check
       input = genConstraintsPlan(dest, qb, input);
 
       // Add sorting/bucketing if needed
-      input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx);
+      input = genBucketingSortingDest(dest, input, qb, tableDescriptor, destinationTable, rsCtx);
 
-      idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName());
+      idToTableNameMap.put(String.valueOf(destTableId), destinationTable.getTableName());
       currentTableId = destTableId;
       destTableId++;
 
-      lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(),
-          dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(),
-          dest_part.isStoredAsSubDirectories(), conf);
+      lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(),
+          destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(),
+          destinationPartition.isStoredAsSubDirectories(), conf);
       AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
       if (destTableIsFullAcid) {
-        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
+        acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest);
         //todo: should this be done for MM?  is it ok to use CombineHiveInputFormat with MM?
-        checkAcidConstraints(qb, table_desc, dest_tab);
+        checkAcidConstraints(qb, tableDescriptor, destinationTable);
       }
       try {
         if (ctx.getExplainConfig() != null) {
           writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
         } else {
           if (isMmTable) {
-            writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName());
+            writeId = txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName());
           } else {
             writeId = (acidOp == Operation.NOT_ACID) ? null :
-                    txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName());
+                    txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName());
           }
         }
       } catch (LockException ex) {
         throw new SemanticException("Failed to allocate write Id", ex);
       }
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, writeId);
+      ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, destinationPartition.getSpec(), acidOp, writeId);
       if (writeId != null) {
         ltd.setStmtId(txnMgr.getCurrentStmtId());
       }
@@ -7460,11 +7464,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       ltd.setLbCtx(lbCtx);
 
       loadTableWork.add(ltd);
-      if (!outputs.add(new WriteEntity(dest_part,
-          determineWriteType(ltd, dest_tab.isNonNative(), dest)))) {
+      if (!outputs.add(new WriteEntity(destinationPartition,
+          determineWriteType(ltd, destinationTable.isNonNative(), dest)))) {
 
         throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
-            .getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
+            .getMsg(destinationTable.getTableName() + "@" + destinationPartition.getName()));
       }
      break;
     }
@@ -7472,18 +7476,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       isLocal = true;
       // fall through
     case QBMetaData.DEST_DFS_FILE: {
-      dest_path = new Path(qbm.getDestFileForAlias(dest));
-
-      ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();
+      destinationPath = new Path(qbm.getDestFileForAlias(dest));
 
       // CTAS case: the file output format and serde are defined by the create
       // table command rather than taking the default value
-      List<FieldSchema> field_schemas = null;
+      List<FieldSchema> fieldSchemas = null;
+      List<FieldSchema> partitionColumns = null;
+      List<String> partitionColumnNames = null;
+      List<ColumnInfo> fileSinkColInfos = null;
       CreateTableDesc tblDesc = qb.getTableDesc();
       CreateViewDesc viewDesc = qb.getViewDesc();
-      boolean isCtas = false;
       if (tblDesc != null) {
-        field_schemas = new ArrayList<FieldSchema>();
+        fieldSchemas = new ArrayList<>();
+        partitionColumns = new ArrayList<>();
+        partitionColumnNames = tblDesc.getPartColNames();
+        fileSinkColInfos = new ArrayList<>();
         destTableIsTemporary = tblDesc.isTemporary();
         destTableIsMaterialization = tblDesc.isMaterialization();
         if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) {
@@ -7500,7 +7507,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           tblDesc.setInitialMmWriteId(writeId);
         }
       } else if (viewDesc != null) {
-        field_schemas = new ArrayList<FieldSchema>();
+        fieldSchemas = new ArrayList<>();
+        partitionColumns = new ArrayList<>();
+        partitionColumnNames = viewDesc.getPartColNames();
+        fileSinkColInfos = new ArrayList<>();
         destTableIsTemporary = false;
       }
 
@@ -7513,55 +7523,73 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         // otherwise write to the file system implied by the directory
         // no copy is required. we may want to revisit this policy in future
         try {
-          Path qPath = FileUtils.makeQualified(dest_path, conf);
+          Path qPath = FileUtils.makeQualified(destinationPath, conf);
           queryTmpdir = isMmTable ? qPath : ctx.getTempDirForFinalJobPath(qPath);
           if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
             Utilities.FILE_OP_LOGGER.trace("Setting query directory " + queryTmpdir
-                + " from " + dest_path + " (" + isMmTable + ")");
+                + " from " + destinationPath + " (" + isMmTable + ")");
           }
         } catch (Exception e) {
           throw new SemanticException("Error creating temporary folder on: "
-              + dest_path, e);
+              + destinationPath, e);
         }
       }
 
-      ColsAndTypes ct = deriveFileSinkColTypes(inputRR, field_schemas);
-      String cols = ct.cols, colTypes = ct.colTypes;
+      // Check for dynamic partitions.
+      final String cols, colTypes;
+      final boolean isPartitioned;
+      if (dpCtx != null) {
+        throw new SemanticException("Dynamic partition context has already been created, this should not happen");
+      }
+      if (!CollectionUtils.isEmpty(partitionColumnNames)) {
+        ColsAndTypes ct = deriveFileSinkColTypes(
+            inputRR, partitionColumnNames, fieldSchemas, partitionColumns,
+            fileSinkColInfos);
+        cols = ct.cols;
+        colTypes = ct.colTypes;
+        dpCtx = new DynamicPartitionCtx(partitionColumnNames,
+            conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+            conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+        qbm.setDPCtx(dest, dpCtx);
+        // set the root of the temporary path where dynamic partition columns will populate
+        dpCtx.setRootPath(queryTmpdir);
+        isPartitioned = true;
+      } else {
+        ColsAndTypes ct = deriveFileSinkColTypes(inputRR, fieldSchemas);
+        cols = ct.cols;
+        colTypes = ct.colTypes;
+        isPartitioned = false;
+      }
 
       // update the create table descriptor with the resulting schema.
       if (tblDesc != null) {
-        tblDesc.setCols(new ArrayList<FieldSchema>(field_schemas));
+        tblDesc.setCols(new ArrayList<>(fieldSchemas));
+        tblDesc.setPartCols(new ArrayList<>(partitionColumns));
       } else if (viewDesc != null) {
-        viewDesc.setSchema(new ArrayList<FieldSchema>(field_schemas));
+        viewDesc.setSchema(new ArrayList<>(fieldSchemas));
+        viewDesc.setPartCols(new ArrayList<>(partitionColumns));
       }
 
       destTableIsTransactional = tblDesc != null && AcidUtils.isTransactionalTable(tblDesc);
       destTableIsFullAcid = tblDesc != null && AcidUtils.isFullAcidTable(tblDesc);
 
       boolean isDestTempFile = true;
-      if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) {
-        idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString());
+      if (!ctx.isMRTmpFileURI(destinationPath.toUri().toString())) {
+        idToTableNameMap.put(String.valueOf(destTableId), destinationPath.toUri().toString());
         currentTableId = destTableId;
         destTableId++;
         isDestTempFile = false;
       }
 
-      boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
-      // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be used for stats.
-      loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols,
-          colTypes,
-          destTableIsFullAcid ?//there is a change here - prev version had 'transadtional', one beofre' acid'
-              Operation.INSERT : Operation.NOT_ACID,
-          isMmCtas));
       if (tblDesc == null) {
         if (viewDesc != null) {
-          table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes);
+          tableDescriptor = PlanUtils.getTableDesc(viewDesc, cols, colTypes);
         } else if (qb.getIsQuery()) {
           String fileFormat;
           if (SessionState.get().getIsUsingThriftJDBCBinarySerDe()) {
             fileFormat = "SequenceFile";
             HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat);
-            table_desc=
+            tableDescriptor =
                 PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
                     ThriftJDBCBinarySerDe.class);
             // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll
@@ -7578,29 +7606,97 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                 serdeClass = LazyBinarySerDe2.class;
               }
             }
-            table_desc =
+            tableDescriptor =
                 PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
                     serdeClass);
           }
         } else {
-          table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
+          tableDescriptor = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);
         }
       } else {
-        table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
+        tableDescriptor = PlanUtils.getTableDesc(tblDesc, cols, colTypes);
       }
 
-      if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) {
-        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
-            .getMsg(dest_path.toUri().toString()));
+      boolean isDfsDir = (destType.intValue() == QBMetaData.DEST_DFS_FILE);
+
+      if (isPartitioned) {
+        // Create a SELECT that may reorder the columns if needed
+        RowResolver rowResolver = new RowResolver();
+        List<ExprNodeDesc> columnExprs = new ArrayList<>();
+        List<String> colNames = new ArrayList<>();
+        Map<String, ExprNodeDesc> colExprMap = new HashMap<>();
+        for (int i = 0; i < fileSinkColInfos.size(); i++) {
+          ColumnInfo ci = fileSinkColInfos.get(i);
+          ExprNodeDesc columnExpr = new ExprNodeColumnDesc(ci);
+          String name = getColumnInternalName(i);
+          rowResolver.put("", name, new ColumnInfo(name, columnExpr.getTypeInfo(), "", false));
+          columnExprs.add(columnExpr);
+          colNames.add(name);
+          colExprMap.put(name, columnExpr);
+        }
+        input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+            new SelectDesc(columnExprs, colNames), new RowSchema(rowResolver
+                .getColumnInfos()), input), rowResolver);
+        input.setColumnExprMap(colExprMap);
+        // If this is a partitioned CTAS or MV statement, we are going to create a LoadTableDesc
+        // object. Although the table does not exist in metastore, we will swamp the CreateTableTask
+        // and MoveTask resulting from this LoadTable so in this specific case, first we create
+        // the metastore table, then we move and commit the partitions. At least for the time being,
+        // this order needs to be enforced because metastore expects a table to exist before we can
+        // add any partitions to it.
+        boolean isNonNativeTable = tableDescriptor.isNonNative();
+        if (!isNonNativeTable) {
+          AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
+          if (destTableIsFullAcid) {
+            acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest);
+            //todo: should this be done for MM?  is it ok to use CombineHiveInputFormat with MM
+            checkAcidConstraints(qb, tableDescriptor, null);
+          }
+          // isReplace = false in case concurrent operation is executed
+          ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx, acidOp, false, writeId);
+          if (writeId != null) {
+            ltd.setStmtId(txnMgr.getCurrentStmtId());
+          }
+          ltd.setLoadFileType(LoadFileType.KEEP_EXISTING);
+          ltd.setInsertOverwrite(false);
+          loadTableWork.add(ltd);
+        } else {
+          // This is a non-native table.
+          // We need to set stats as inaccurate.
+          setStatsForNonNativeTable(tableDescriptor.getDbName(), tableDescriptor.getTableName());
+          ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx.getPartSpec());
+          ltd.setInsertOverwrite(false);
+          ltd.setLoadFileType(LoadFileType.KEEP_EXISTING);
+        }
+        try {
+          partitionedCTASOrMVTable = tblDesc != null ? tblDesc.toTable(conf) : viewDesc.toTable(conf);
+          ltd.setMdTable(partitionedCTASOrMVTable);
+          WriteEntity output = generateTableWriteEntity(
+              dest, partitionedCTASOrMVTable, dpCtx.getPartSpec(), ltd, dpCtx, isNonNativeTable);
+          ctx.getLoadTableOutputMap().put(ltd, output);
+        } catch (HiveException e) {
+          throw new SemanticException(e);
+        }
+      } else {
+        // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be used for stats.
+        loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, destinationPath, isDfsDir, cols,
+            colTypes,
+            destTableIsFullAcid ?//there is a change here - prev version had 'transactional', one before 'acid'
+                Operation.INSERT : Operation.NOT_ACID,
+            isMmCtas));
+        if (!outputs.add(new WriteEntity(destinationPath, !isDfsDir, isDestTempFile))) {
+          throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
+              .getMsg(destinationPath.toUri().toString()));
+        }
       }
       break;
     }
     default:
-      throw new SemanticException("Unknown destination type: " + dest_type);
+      throw new SemanticException("Unknown destination type: " + destType);
     }
 
-    if (!(dest_type.intValue() == QBMetaData.DEST_DFS_FILE && qb.getIsQuery())) {
-      input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
+    if (!(destType.intValue() == QBMetaData.DEST_DFS_FILE && qb.getIsQuery())) {
+      input = genConversionSelectOperator(dest, qb, input, tableDescriptor, dpCtx);
     }
 
     inputRR = opParseCtx.get(input).getRowResolver();
@@ -7612,7 +7708,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           "", true));
     } else {
       try {
-        StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
+        StructObjectInspector rowObjectInspector = (StructObjectInspector) tableDescriptor
             .getDeserializer(conf).getObjectInspector();
         List<? extends StructField> fields = rowObjectInspector
             .getAllStructFieldRefs();
@@ -7631,22 +7727,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     // The output files of a FileSink can be merged if they are either not being written to a table
     // or are being written to a table which is not bucketed
     // and table the table is not sorted
-    boolean canBeMerged = (dest_tab == null || !((dest_tab.getNumBuckets() > 0) ||
-        (dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0)));
+    boolean canBeMerged = (destinationTable == null || !((destinationTable.getNumBuckets() > 0) ||
+        (destinationTable.getSortCols() != null && destinationTable.getSortCols().size() > 0)));
 
     // If this table is working with ACID semantics, turn off merging
     canBeMerged &= !destTableIsFullAcid;
 
     // Generate the partition columns from the parent input
-    if (dest_type.intValue() == QBMetaData.DEST_TABLE
-        || dest_type.intValue() == QBMetaData.DEST_PARTITION) {
-      genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
+    if (destType.intValue() == QBMetaData.DEST_TABLE
+        || destType.intValue() == QBMetaData.DEST_PARTITION) {
+      genPartnCols(dest, input, qb, tableDescriptor, destinationTable, rsCtx);
     }
 
-    FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part,
-        dest_path, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid
+    FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition,
+        destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, dest_tab, writeId, isMmCtas, dest_type, qb);
+        canBeMerged, destinationTable, writeId, isMmCtas, destType, qb);
     if (isMmCtas) {
       // Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
       tableDesc.setWriter(fileSinkDesc);
@@ -7657,7 +7753,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         ltd.setInsertOverwrite(true);
       }
     }
-    if (null != table_desc && useBatchingSerializer(table_desc.getSerdeClassName())) {
+    if (null != tableDescriptor && useBatchingSerializer(tableDescriptor.getSerdeClassName())) {
       fileSinkDesc.setIsUsingBatchingSerDe(true);
     } else {
       fileSinkDesc.setIsUsingBatchingSerDe(false);
@@ -7670,26 +7766,24 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
-          + dest_path + " row schema: " + inputRR.toString());
+          + destinationPath + " row schema: " + inputRR.toString());
     }
 
     FileSinkOperator fso = (FileSinkOperator) output;
-    fso.getConf().setTable(dest_tab);
+    fso.getConf().setTable(destinationTable);
     // the following code is used to collect column stats when
     // hive.stats.autogather=true
     // and it is an insert overwrite or insert into table
-    if (dest_tab != null
-        && !dest_tab.isNonNative()
-        && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
+    if (conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
         && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
         && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
-      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
-        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
-            .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
-      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
-        genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
-            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
-
+      // TODO: Column stats autogather does not work for CTAS statements
+      if (destType.intValue() == QBMetaData.DEST_TABLE && !destinationTable.isNonNative()) {
+        genAutoColumnStatsGatheringPipeline(qb, destinationTable, partSpec, input, qb.getParseInfo()
+            .isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName()));
+      } else if (destType.intValue() == QBMetaData.DEST_PARTITION && !destinationTable.isNonNative()) {
+        genAutoColumnStatsGatheringPipeline(qb, destinationTable, destinationPartition.getSpec(), input, qb
+            .getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName()));
       }
     }
     return output;
@@ -7706,61 +7800,101 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     serdeClassName.equalsIgnoreCase(ArrowColumnarBatchSerDe.class.getName());
   }
 
+  private ColsAndTypes deriveFileSinkColTypes(RowResolver inputRR, List<FieldSchema> field_schemas)
+      throws SemanticException {
+    return deriveFileSinkColTypes(inputRR, new ArrayList<>(), field_schemas, new ArrayList<>(), new ArrayList<>());
+  }
+
   private ColsAndTypes deriveFileSinkColTypes(
-      RowResolver inputRR, List<FieldSchema> field_schemas) throws SemanticException {
+      RowResolver inputRR, List<String> partitionColumnNames,
+      List<FieldSchema> columns, List<FieldSchema> partitionColumns,
+      List<ColumnInfo> fileSinkColInfos) throws SemanticException {
     ColsAndTypes result = new ColsAndTypes("", "");
-    ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos();
+    List<String> allColumns = new ArrayList<>();
+    List<ColumnInfo> colInfos = inputRR.getColumnInfos();
+    List<ColumnInfo> nonPartColInfos = new ArrayList<>();
+    SortedMap<Integer, Pair<FieldSchema, ColumnInfo>> partColInfos = new TreeMap<>();
     boolean first = true;
-    for (ColumnInfo colInfo : colInfos) {
+    int numNonPartitionedCols = colInfos.size() - partitionColumnNames.size();
+    if (numNonPartitionedCols <= 0) {
+      throw new SemanticException("Too many partition columns declared");
+    }
+    for (int i = 0; i < colInfos.size(); i++) {
+      ColumnInfo colInfo = colInfos.get(i);
       String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
 
       if (nm[1] != null) { // non-null column alias
         colInfo.setAlias(nm[1]);
       }
 
+      boolean isPartitionCol = false;
       String colName = colInfo.getInternalName();  //default column name
-      if (field_schemas != null) {
+      if (columns != null) {
         FieldSchema col = new FieldSchema();
         if (!("".equals(nm[0])) && nm[1] != null) {
           colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove ``
         }
         colName = fixCtasColumnName(colName);
         col.setName(colName);
+        allColumns.add(colName);
         String typeName = colInfo.getType().getTypeName();
         // CTAS should NOT create a VOID type
         if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) {
           throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE.getMsg(colName));
         }
         col.setType(typeName);
-        field_schemas.add(col);
-      }
-
-      if (!first) {
-        result.cols = result.cols.concat(",");
-        result.colTypes = result.colTypes.concat(":");
-      }
-
-      first = false;
-      result.cols = result.cols.concat(colName);
-
-      // Replace VOID type with string when the output is a temp table or
-      // local files.
-      // A VOID type can be generated under the query:
-      //
-      // select NULL from tt;
-      // or
-      // insert overwrite local directory "abc" select NULL from tt;
-      //
-      // where there is no column type to which the NULL value should be
-      // converted.
-      //
-      String tName = colInfo.getType().getTypeName();
-      if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
-        result.colTypes = result.colTypes.concat(serdeConstants.STRING_TYPE_NAME);
-      } else {
-        result.colTypes = result.colTypes.concat(tName);
+        int idx = partitionColumnNames.indexOf(colName);
+        if (idx >= 0) {
+          partColInfos.put(idx, Pair.of(col, colInfo));
+          isPartitionCol = true;
+        } else {
+          columns.add(col);
+          nonPartColInfos.add(colInfo);
+        }
+      }
+
+      if (!isPartitionCol) {
+        if (!first) {
+          result.cols = result.cols.concat(",");
+          result.colTypes = result.colTypes.concat(":");
+        }
+
+        first = false;
+        result.cols = result.cols.concat(colName);
+
+        // Replace VOID type with string when the output is a temp table or
+        // local files.
+        // A VOID type can be generated under the query:
+        //
+        // select NULL from tt;
+        // or
+        // insert overwrite local directory "abc" select NULL from tt;
+        //
+        // where there is no column type to which the NULL value should be
+        // converted.
+        //
+        String tName = colInfo.getType().getTypeName();
+        if (tName.equals(serdeConstants.VOID_TYPE_NAME)) {
+          result.colTypes = result.colTypes.concat(serdeConstants.STRING_TYPE_NAME);
+        } else {
+          result.colTypes = result.colTypes.concat(tName);
+        }
       }
+
+    }
+
+    if (partColInfos.size() != partitionColumnNames.size()) {
+      throw new SemanticException("Table declaration contains partition columns that are not present " +
+        "in query result schema. " +
+        "Query columns: " + allColumns + ". " +
+        "Partition columns: " + partitionColumnNames);
     }
+
+    // FileSinkColInfos comprise nonPartCols followed by partCols
+    fileSinkColInfos.addAll(nonPartColInfos);
+    partitionColumns.addAll(partColInfos.values().stream().map(Pair::getLeft).collect(Collectors.toList()));
+    fileSinkColInfos.addAll(partColInfos.values().stream().map(Pair::getRight).collect(Collectors.toList()));
+
     return result;
   }
 
@@ -7964,7 +8098,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest);
     if (dpCtx == null) {
       dest_tab.validatePartColumnNames(partSpec, false);
-      dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
+      dpCtx = new DynamicPartitionCtx(partSpec,
           conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
           conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
       qbm.setDPCtx(dest, dpCtx);
@@ -7988,16 +8122,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
 
-  private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc,
+  private void genAutoColumnStatsGatheringPipeline(QB qb, Table table,
                                                    Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException {
-    String tableName = table_desc.getTableName();
-    Table table = null;
-    try {
-      table = db.getTable(tableName);
-    } catch (HiveException e) {
-      throw new SemanticException(e.getMessage());
-    }
-    LOG.info("Generate an operator pipeline to autogather column stats for table " + tableName
+    LOG.info("Generate an operator pipeline to autogather column stats for table " + table.getTableName()
         + " in query " + ctx.getCmd());
     ColumnStatsAutoGatherContext columnStatsAutoGatherContext = null;
     columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, conf, curr, table, partSpec, isInsertInto, ctx);
@@ -13015,6 +13142,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     String likeTableName = null;
     List<FieldSchema> cols = new ArrayList<FieldSchema>();
     List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+    List<String> partColNames = new ArrayList<>();
     List<String> bucketCols = new ArrayList<String>();
     List<SQLPrimaryKey> primaryKeys = new ArrayList<SQLPrimaryKey>();
     List<SQLForeignKey> foreignKeys = new ArrayList<SQLForeignKey>();
@@ -13130,6 +13258,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                                                       "partition columns. "));
         }
         break;
+      case HiveParser.TOK_TABLEPARTCOLNAMES:
+        partColNames = getColumnNames(child);
+        break;
       case HiveParser.TOK_ALTERTABLE_BUCKETS:
         bucketCols = getColumnNames((ASTNode) child.getChild(0));
         if (child.getChildCount() == 2) {
@@ -13235,6 +13366,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     switch (command_type) {
 
     case CREATE_TABLE: // REGULAR CREATE TABLE DDL
+      if (!CollectionUtils.isEmpty(partColNames)) {
+        throw new SemanticException(
+            "Partition columns can only declared using their name and types in regular CREATE TABLE statements");
+      }
       tblProps = addDefaultProperties(
           tblProps, isExt, storageFormat, dbDotTab, sortCols, isMaterialization, isTemporary);
       addDbAndTabToOutputs(qualifiedTabName, TableType.MANAGED_TABLE, isTemporary, tblProps);
@@ -13338,12 +13473,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
         }
       }
+      if (!CollectionUtils.isEmpty(partCols)) {
+        throw new SemanticException(
+            "Partition columns can only declared using their names in CTAS statements");
+      }
 
       tblProps = addDefaultProperties(
           tblProps, isExt, storageFormat, dbDotTab, sortCols, isMaterialization, isTemporary);
       addDbAndTabToOutputs(qualifiedTabName, TableType.MANAGED_TABLE, isTemporary, tblProps);
       tableDesc = new CreateTableDesc(qualifiedTabName[0], dbDotTab, isExt, isTemporary, cols,
-          partCols, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim,
+          partColNames, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim,
           rowFormatParams.fieldEscape, rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim,
           rowFormatParams.lineDelim, comment, storageFormat.getInputFormat(),
           storageFormat.getOutputFormat(), location, storageFormat.getSerde(),

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 7a2a2c7..005e7b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.parse;
 import com.google.common.collect.Interner;
 import com.google.common.collect.Interners;
 
+import org.apache.commons.collections.*;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.MaterializedViewDesc;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
 import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -327,13 +329,13 @@ public abstract class TaskCompiler {
       crtTblDesc.validate(conf);
       Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork(
           inputs, outputs, crtTblDesc));
-      patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtTblTask);
+      patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtTblTask, CollectionUtils.isEmpty(crtTblDesc.getPartColNames()));
     } else if (pCtx.getQueryProperties().isMaterializedView()) {
       // generate a DDL task and make it a dependent task of the leaf
       CreateViewDesc viewDesc = pCtx.getCreateViewDesc();
       Task<? extends Serializable> crtViewTask = TaskFactory.get(new DDLWork(
           inputs, outputs, viewDesc));
-      patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtViewTask);
+      patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtViewTask, CollectionUtils.isEmpty(viewDesc.getPartColNames()));
     } else if (pCtx.getMaterializedViewUpdateDesc() != null) {
       // If there is a materialized view update desc, we create introduce it at the end
       // of the tree.
@@ -458,9 +460,10 @@ public abstract class TaskCompiler {
     }
   }
 
-  private void patchUpAfterCTASorMaterializedView(final List<Task<? extends Serializable>>  rootTasks,
+  private void patchUpAfterCTASorMaterializedView(final List<Task<? extends Serializable>> rootTasks,
                                                   final HashSet<WriteEntity> outputs,
-                                                  Task<? extends Serializable> createTask) {
+                                                  Task<? extends Serializable> createTask,
+                                                  boolean createTaskAfterMoveTask) {
     // clear the mapredWork output file from outputs for CTAS
     // DDLWork at the tail of the chain will have the output
     Iterator<WriteEntity> outIter = outputs.iterator();
@@ -479,18 +482,32 @@ public abstract class TaskCompiler {
     HashSet<Task<? extends Serializable>> leaves = new LinkedHashSet<>();
     getLeafTasks(rootTasks, leaves);
     assert (leaves.size() > 0);
+    // Target task is supposed to be the last task
     Task<? extends Serializable> targetTask = createTask;
     for (Task<? extends Serializable> task : leaves) {
       if (task instanceof StatsTask) {
         // StatsTask require table to already exist
         for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) {
-          parentOfStatsTask.addDependentTask(createTask);
+          if (parentOfStatsTask instanceof MoveTask && !createTaskAfterMoveTask) {
+            // For partitioned CTAS, we need to create the table before the move task
+            // as we need to create the partitions in metastore and for that we should
+            // have already registered the table
+            interleaveTask(parentOfStatsTask, createTask);
+          } else {
+            parentOfStatsTask.addDependentTask(createTask);
+          }
         }
         for (Task<? extends Serializable> parentOfCrtTblTask : createTask.getParentTasks()) {
           parentOfCrtTblTask.removeDependentTask(task);
         }
         createTask.addDependentTask(task);
         targetTask = task;
+      } else if (task instanceof MoveTask && !createTaskAfterMoveTask) {
+        // For partitioned CTAS, we need to create the table before the move task
+        // as we need to create the partitions in metastore and for that we should
+        // have already registered the table
+        interleaveTask(task, createTask);
+        targetTask = task;
       } else {
         task.addDependentTask(createTask);
       }
@@ -523,6 +540,19 @@ public abstract class TaskCompiler {
   }
 
   /**
+   * Makes dependentTask dependent of task.
+   */
+  private void interleaveTask(Task<? extends Serializable> dependentTask, Task<? extends Serializable> task) {
+    for (Task<? extends Serializable> parentOfStatsTask : dependentTask.getParentTasks()) {
+      parentOfStatsTask.addDependentTask(task);
+    }
+    for (Task<? extends Serializable> parentOfCrtTblTask : task.getParentTasks()) {
+      parentOfCrtTblTask.removeDependentTask(dependentTask);
+    }
+    task.addDependentTask(dependentTask);
+  }
+
+  /**
    * A helper function to generate a column stats task on top of map-red task. The column stats
    * task fetches from the output of the map-red task, constructs the column stats object and
    * persists it to the metastore.

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index 871844b..0fadf1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -72,6 +72,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
   boolean isExternal;
   List<FieldSchema> cols;
   List<FieldSchema> partCols;
+  List<String> partColNames;
   List<String> bucketCols;
   List<Order> sortCols;
   int numBuckets;
@@ -137,29 +138,28 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
   }
 
   public CreateTableDesc(String databaseName, String tableName, boolean isExternal, boolean isTemporary,
-                         List<FieldSchema> cols, List<FieldSchema> partCols,
-                         List<String> bucketCols, List<Order> sortCols, int numBuckets,
-                         String fieldDelim, String fieldEscape, String collItemDelim,
-                         String mapKeyDelim, String lineDelim, String comment, String inputFormat,
-                         String outputFormat, String location, String serName,
-                         String storageHandler,
-                         Map<String, String> serdeProps,
-                         Map<String, String> tblProps,
-                         boolean ifNotExists, List<String> skewedColNames, List<List<String>> skewedColValues,
-                         boolean isCTAS, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
-                         List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
-                         List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints) {
-    this(databaseName, tableName, isExternal, isTemporary, cols, partCols,
-            bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
-            collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
-            outputFormat, location, serName, storageHandler, serdeProps,
-            tblProps, ifNotExists, skewedColNames, skewedColValues,
-            primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+      List<FieldSchema> cols, List<String> partColNames,
+      List<String> bucketCols, List<Order> sortCols, int numBuckets,
+      String fieldDelim, String fieldEscape, String collItemDelim,
+      String mapKeyDelim, String lineDelim, String comment, String inputFormat,
+      String outputFormat, String location, String serName,
+      String storageHandler,
+      Map<String, String> serdeProps,
+      Map<String, String> tblProps,
+      boolean ifNotExists, List<String> skewedColNames, List<List<String>> skewedColValues,
+      boolean isCTAS, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
+      List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
+      List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints) {
+    this(databaseName, tableName, isExternal, isTemporary, cols, new ArrayList<>(),
+        bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
+        collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat,
+        outputFormat, location, serName, storageHandler, serdeProps,
+        tblProps, ifNotExists, skewedColNames, skewedColValues,
+        primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+    this.partColNames = partColNames;
     this.isCTAS = isCTAS;
-
   }
 
-
   public CreateTableDesc(String tableName, boolean isExternal, boolean isTemporary,
       List<FieldSchema> cols, List<FieldSchema> partCols,
       List<String> bucketCols, List<Order> sortCols, int numBuckets,
@@ -257,6 +257,14 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
     this.partCols = partCols;
   }
 
+  public List<String> getPartColNames() {
+    return partColNames;
+  }
+
+  public void setPartColNames(ArrayList<String> partColNames) {
+    this.partColNames = partColNames;
+  }
+
   public List<SQLPrimaryKey> getPrimaryKeys() {
     return primaryKeys;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 6af7833..c1aeb8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 public class DynamicPartitionCtx implements Serializable {
@@ -54,7 +54,37 @@ public class DynamicPartitionCtx implements Serializable {
   public DynamicPartitionCtx() {
   }
 
-  public DynamicPartitionCtx(Table tbl, Map<String, String> partSpec, String defaultPartName,
+  /**
+   * This constructor is used for partitioned CTAS. Basically we pass the name of
+   * partitioned columns, which will all be dynamic partitions since the binding
+   * is done after executing the query in the CTAS.
+   */
+  public DynamicPartitionCtx(List<String> partColNames, String defaultPartName,
+      int maxParts) throws SemanticException {
+    this.partSpec = new LinkedHashMap<>();
+    this.spNames = new ArrayList<>();
+    this.dpNames = new ArrayList<>();
+    for (String colName : partColNames) {
+      this.partSpec.put(colName, null);
+      this.dpNames.add(colName);
+    }
+    this.numBuckets = 0;
+    this.maxPartsPerNode = maxParts;
+    this.defaultPartName = defaultPartName;
+
+    this.numDPCols = dpNames.size();
+    this.numSPCols = spNames.size();
+    this.spPath = null;
+    String confVal;
+    try {
+      confVal = Hive.get().getMetaConf(ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname);
+    } catch (HiveException e) {
+      throw new SemanticException(e);
+    }
+    this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
+  }
+
+  public DynamicPartitionCtx(Map<String, String> partSpec, String defaultPartName,
       int maxParts) throws SemanticException {
     this.partSpec = partSpec;
     this.spNames = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index af2ece4..f320167 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
 import java.io.Serializable;
@@ -44,6 +45,7 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
   private boolean isInsertOverwrite;
 
   // TODO: the below seem like they should just be combined into partitionDesc
+  private Table mdTable;
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
   private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
 
@@ -252,4 +254,12 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
   public void setStmtId(int stmtId) {
     this.stmtId = stmtId;
   }
+
+  public Table getMdTable() {
+    return mdTable;
+  }
+
+  public void setMdTable(Table mdTable) {
+    this.mdTable = mdTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 71127c2..b369c96 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -283,7 +283,7 @@ public class TestFileSinkOperator {
       partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true));
       Map<String, String> partColMap= new LinkedHashMap<String, String>(1);
       partColMap.put(PARTCOL_NAME, null);
-      DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
+      DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100);
       //todo: does this need the finalDestination?
       desc = new FileSinkDesc(basePath, tableDesc, false, 1, false,
           false, 1, 1, partCols, dpCtx, null, null, false, false);

http://git-wip-us.apache.org/repos/asf/hive/blob/a3ed7d6f/ql/src/test/queries/clientpositive/partition_ctas.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/partition_ctas.q b/ql/src/test/queries/clientpositive/partition_ctas.q
new file mode 100644
index 0000000..470b86e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/partition_ctas.q
@@ -0,0 +1,51 @@
+--! qt:dataset:src
+
+EXPLAIN
+CREATE TABLE partition_ctas_1 PARTITIONED BY (key) AS
+SELECT value, key FROM src where key > 200 and key < 300;
+
+CREATE TABLE partition_ctas_1 PARTITIONED BY (key) AS
+SELECT value, key FROM src where key > 200 and key < 300;
+
+DESCRIBE FORMATTED partition_ctas_1;
+
+EXPLAIN
+SELECT * FROM partition_ctas_1 where key = 238;
+
+SELECT * FROM partition_ctas_1 where key = 238;
+
+CREATE TABLE partition_ctas_2 PARTITIONED BY (value) AS
+SELECT key, value FROM src where key > 200 and key < 300;
+
+EXPLAIN
+SELECT * FROM partition_ctas_2 where value = 'val_238';
+
+SELECT * FROM partition_ctas_2 where value = 'val_238';
+
+EXPLAIN
+SELECT value FROM partition_ctas_2 where key = 238;
+
+SELECT value FROM partition_ctas_2 where key = 238;
+
+CREATE TABLE partition_ctas_diff_order PARTITIONED BY (value) AS
+SELECT value, key FROM src where key > 200 and key < 300;
+
+EXPLAIN
+SELECT * FROM partition_ctas_diff_order where value = 'val_238';
+
+SELECT * FROM partition_ctas_diff_order where value = 'val_238';
+
+CREATE TABLE partition_ctas_complex_order PARTITIONED BY (c0, c4, c1) AS
+SELECT concat(value, '_0') as c0,
+       concat(value, '_1') as c1,
+       concat(value, '_2') as c2,
+       concat(value, '_3') as c3,
+       concat(value, '_5') as c5,
+       concat(value, '_4') as c4
+FROM src where key > 200 and key < 240;
+
+-- c2, c3, c5, c0, c4, c1
+EXPLAIN
+SELECT * FROM partition_ctas_complex_order where c0 = 'val_238_0';
+
+SELECT * FROM partition_ctas_complex_order where c0 = 'val_238_0';