You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by zt...@apache.org on 2021/12/13 08:14:48 UTC

[hawq] branch ztao updated: HAWQ-1811. Sync with OushuDB - Phase II

This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git


The following commit(s) were added to refs/heads/ztao by this push:
     new 1a20da7  HAWQ-1811. Sync with OushuDB - Phase II
1a20da7 is described below

commit 1a20da7c5ca83ed7698f4917d13bd50e28fd740f
Author: ztao1987 <zh...@gmail.com>
AuthorDate: Mon Dec 13 16:05:15 2021 +0800

    HAWQ-1811. Sync with OushuDB - Phase II
---
 contrib/hornet/orc_debug_statistics.py       |  11 +-
 contrib/magma/magma.c                        | 153 ++++++++++++++++----------
 src/backend/access/common/reloptions.c       |   1 +
 src/backend/access/transam/xact.c            | 139 +++++++++++++++--------
 src/backend/catalog/aoseg.c                  | 158 ++++++++++++++++++++++++++-
 src/backend/catalog/heap.c                   | 121 ++++++++++----------
 src/backend/catalog/index.c                  |  17 +--
 src/backend/catalog/pg_compression.c         |   3 +-
 src/backend/cdb/cdbdatalocality.c            |  16 ++-
 src/backend/cdb/cdbquerycontextdispatching.c |   4 +-
 src/backend/cdb/dispatcher.c                 |   3 +
 src/backend/cdb/dispatcher_new.c             |   3 +
 src/backend/cdb/motion/ic_udp.c              |  39 ++++---
 src/backend/commands/analyze.c               |   6 +-
 src/backend/commands/copy.c                  |   7 +-
 src/backend/commands/dbcommands.c            |   5 +-
 src/backend/commands/indexcmds.c             |  36 +++---
 src/backend/commands/tablecmds.c             |   5 +-
 src/backend/executor/execDML.c               |   6 +-
 src/backend/executor/execMain.c              |   2 +-
 src/backend/executor/nodeExternalscan.c      |   2 +-
 src/backend/optimizer/path/allpaths.c        |   7 +-
 src/backend/optimizer/plan/newPlanner.c      | 121 +++++++++++++++++---
 src/backend/parser/analyze.c                 |   5 +
 src/backend/storage/buffer/bufmgr.c          |   1 +
 src/backend/tcop/utility.c                   | 143 ++++++++++++------------
 src/backend/utils/adt/dbsize.c               |   2 +-
 src/backend/utils/gp/segadmin.c              |   9 +-
 src/backend/utils/misc/guc.c                 |  10 +-
 src/include/access/orcsegfiles.h             |   5 +
 src/include/access/xact.h                    |  21 ++--
 src/include/catalog/aoseg.h                  |   1 +
 src/include/cdb/ml_ipc.h                     |   2 +
 src/include/optimizer/newPlanner.h           |   1 +
 src/include/tcop/utility.h                   |   2 +
 35 files changed, 725 insertions(+), 342 deletions(-)

diff --git a/contrib/hornet/orc_debug_statistics.py b/contrib/hornet/orc_debug_statistics.py
index 3ddfb0d..eb61af6 100755
--- a/contrib/hornet/orc_debug_statistics.py
+++ b/contrib/hornet/orc_debug_statistics.py
@@ -81,10 +81,15 @@ col_l = len(col_name)
 for i in range(0,col_l):
     str_ans = str_ans.replace(' Column ' + str(i+1) + ' ',col_name[i])
 
-str_ans = str_ans.replace(',}}}',"}}]}")
+if stripe_num == 0:
+    str_ans = str_ans.replace(',}}}',"}}]}")
+else:
+    str_ans = str_ans.replace(',}}}',"}}}]}")
 str_ans = str_ans.replace('" Stripe 0 ":','"Stripes":[')
-for i in range(1,stripe_num):
-    str_ans = str_ans.replace('" Stripe {} "'.format(i),'},{"Stripe {}"'.format(i))
+for i in range(1,2):
+    str_ans = str_ans.replace('" Stripe {} "'.format(i),'}},{{"Stripe {}"'.format(i))
+for i in range(2,stripe_num+1):
+    str_ans = str_ans.replace('" Stripe {} "'.format(i),'}}}},{{"Stripe {}"'.format(i))
 file_path = '"File_path":"' + sys.argv[2] + '",'
 pre_json = str_ans[0] + file_path + str_ans[1:len(str_ans)]
 print(sys.argv[1] + '|' + pre_json.replace(',}','}'))
diff --git a/contrib/magma/magma.c b/contrib/magma/magma.c
index 08bbf40..b7d6758 100644
--- a/contrib/magma/magma.c
+++ b/contrib/magma/magma.c
@@ -405,7 +405,8 @@ Datum magma_protocol_blocklocation(PG_FUNCTION_ARGS) {
          fmt_name);
   }
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaTablePtr table = MagmaClientC_FetchTable(client, snapshot, useClientCacheDirectly);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaTablePtr table = MagmaClientC_FetchTable(client, useClientCacheDirectly);
   magma_check_result(&client);
 
   elog(LOG, "magma_protocol_blocklocation pass fetch table");
@@ -527,10 +528,11 @@ Datum magma_protocol_tablesize(PG_FUNCTION_ARGS) {
          fmt_name);
   }
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
+  MagmaClientC_SetupSnapshot(client, snapshot);
 
   // set size of table in tp type to zero.
   if (tableType == MAGMACLIENTC_TABLETYPE_AP) {
-    tsdata->tablesize = MagmaClientC_GetTableSize(client, snapshot);
+    tsdata->tablesize = MagmaClientC_GetTableSize(client);
   } else {
     tsdata->tablesize = 0;
   }
@@ -583,7 +585,8 @@ Datum magma_protocol_databasesize(PG_FUNCTION_ARGS) {
   }
 
   MagmaClientC_SetupDatabaseInfo(client, dbname);
-  dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client, snapshot);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client);
   elog(LOG,"dbsize in magma.c is %llu", dbsdata->dbsize);
   magma_check_result(&client);
 
@@ -837,7 +840,8 @@ Datum magma_createindex(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_CreateIndex(client, snapshot, magmaidx);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_CreateIndex(client, magmaidx);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -861,7 +865,8 @@ Datum magma_dropindex(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_DropIndex(client, snapshot, indexname);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_DropIndex(client, indexname);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -885,7 +890,8 @@ Datum magma_reindex_index(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_Reindex(client, snapshot, indexname);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_Reindex(client, indexname);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -1029,7 +1035,8 @@ Datum magma_createtable(PG_FUNCTION_ARGS) {
   }
 
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_CreateTable(client, snapshot, ncols, cols);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_CreateTable(client, ncols, cols);
   magma_check_result(&client);
   pfree(cols);
   list_free(pk_names);
@@ -1061,7 +1068,8 @@ Datum magma_droptable(PG_FUNCTION_ARGS) {
   int16_t tableType = 0;
   // for drop table, tableType won't be used in the process, set it as default
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_DropTable(client, snapshot);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_DropTable(client);
   magma_check_result(&client);
 
   PG_RETURN_VOID();
@@ -3099,80 +3107,111 @@ Datum magma_transaction(PG_FUNCTION_ARGS) {
     elog(ERROR, "failed to connect to magma service");
   }
 
+  MagmaClientC_SetupSnapshot(client, pst->pst_transaction_snapshot);
+
   switch (txn_command) {
-    case PS_TXN_CMD_BEGIN: {
-      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
-      MagmaTableFullName *magmaTableFullNames = (MagmaTableFullName *) palloc0(magmaTableFullNamesSize * sizeof(MagmaTableFullName));
-      int i = 0;
-      ListCell *lc;
-      foreach (lc, ps->magma_talbe_full_names) {
-        MagmaTableFullName* mtfn = lfirst(lc);
-        magmaTableFullNames[i].databaseName = pstrdup(mtfn->databaseName);
-        magmaTableFullNames[i].schemaName = pstrdup(mtfn->schemaName);
-        magmaTableFullNames[i].tableName = pstrdup(mtfn->tableName);
-        ++i;
-      }
-      pst->pst_transaction_dist =
-          MagmaClientC_BeginTransaction(client, magmaTableFullNames, magmaTableFullNamesSize);
-      for (int i = 0; i < magmaTableFullNamesSize; ++i) {
-        pfree(magmaTableFullNames[i].databaseName);
-        pfree(magmaTableFullNames[i].schemaName);
-        pfree(magmaTableFullNames[i].tableName);
-      }
-      pfree(magmaTableFullNames);
-      if (pst->pst_transaction_dist == NULL) {
-        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
-        pst->pst_transaction_id = InvalidTransactionId;
-        pst->pst_transaction_dist = NULL;
-        elog(DEBUG1, "magma_transaction: begin snapshot: NULL");
-      } else {
-        elog(DEBUG1, "magma_transaction: begin snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
-      }
+    case PS_TXN_CMD_START_TRANSACTION: {
+      pst->pst_transaction_state = MagmaClientC_StartTransaction(client);
+      pst->pst_transaction_snapshot = NULL;
+      pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+      pst->pst_transaction_id = InvalidTransactionId;
+      pst->pst_transaction_snapshot = NULL;
+      elog(DEBUG1, "magma_transaction: start transaction");
       magma_check_result(&client);
       break;
     }
-    case PS_TXN_CMD_COMMIT:
-      if (pst->pst_transaction_dist == NULL) {
+    case PS_TXN_CMD_COMMIT_TRANSACTION:
+      if (pst->pst_transaction_snapshot == NULL) {
         elog(DEBUG1, "magma_transaction: commit snapshot: NULL");
       } else {
         elog(DEBUG1,
              "magma_transaction: commit snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
       }
 
-      MagmaClientC_CommitTransaction(client, pst->pst_transaction_dist);
+      MagmaClientC_CommitTransaction(client);
       magma_check_result(&client);
       break;
-    case PS_TXN_CMD_ABORT:
-      if (pst->pst_transaction_dist == NULL) {
+    case PS_TXN_CMD_ABORT_TRANSACTION:
+      if (pst->pst_transaction_snapshot == NULL) {
         elog(DEBUG1, "magma_transaction: abort snapshot: NULL");
       } else {
         elog(DEBUG1,
              "magma_transaction: abort snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
       }
 
       if (pst->pst_transaction_status != PS_TXN_STS_DEFAULT &&
           pst->pst_transaction_id != InvalidTransactionId &&
-          pst->pst_transaction_dist != NULL) {
-        MagmaClientC_AbortTransaction(client, pst->pst_transaction_dist,
-                                      PlugStorageGetIsCleanupAbort());
-        pst->pst_transaction_dist = NULL;
+          pst->pst_transaction_snapshot != NULL) {
+        MagmaClientC_AbortTransaction(client, PlugStorageGetIsCleanupAbort());
+        pst->pst_transaction_snapshot = NULL;
         pst->pst_transaction_id = InvalidTransactionId;
         pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
         magma_check_result(&client);
       }
       break;
+    case PS_TXN_CMD_GET_SNAPSHOT: {
+      MagmaClientC_CleanupTableInfo(client);
+      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+      int i = 0;
+      ListCell *lc;
+      foreach (lc, ps->magma_talbe_full_names) {
+        MagmaTableFullName* mtfn = lfirst(lc);
+        MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+                                  mtfn->tableName, 0);
+        ++i;
+      }
+      pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+      if (pst->pst_transaction_snapshot == NULL) {
+        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+        pst->pst_transaction_id = InvalidTransactionId;
+        pst->pst_transaction_snapshot = NULL;
+        elog(DEBUG1, "magma_transaction: get snapshot: NULL");
+      } else {
+        elog(DEBUG1, "magma_transaction: get snapshot: (%llu, %u, %llu, %u)",
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
+      }
+      magma_check_result(&client);
+      break;
+    }
+    case PS_TXN_CMD_GET_TRANSACTIONID: {
+      MagmaClientC_CleanupTableInfo(client);
+      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+      int i = 0;
+      ListCell *lc;
+      foreach (lc, ps->magma_talbe_full_names) {
+        MagmaTableFullName* mtfn = lfirst(lc);
+        MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+                                  mtfn->tableName, 0);
+        ++i;
+      }
+      pst->pst_transaction_state = MagmaClientC_GetTransctionId(client);
+      pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+      if (pst->pst_transaction_snapshot == NULL) {
+        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+        pst->pst_transaction_id = InvalidTransactionId;
+        pst->pst_transaction_snapshot = NULL;
+        elog(DEBUG1, "magma_transaction: get transaction state: NULL");
+      } else {
+        elog(DEBUG1, "magma_transaction: get transaction state: (%llu, %u, %llu, %u)",
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
+      }
+      magma_check_result(&client);
+      break;
+    }
     default:
       elog(ERROR, "Transaction command for magma is invalid %d", txn_command);
       break;
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 976fc71..c0f9236 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -545,6 +545,7 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 		        // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
 		        // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
 		        && (strcmp(compresstype, "zlib") != 0)
+		        && (strcmp(compresstype, "zstd") != 0)
 		        && (strcmp(compresstype, "none") != 0))
     {
       ereport(ERROR,
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 393852d..3762036 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -270,7 +270,7 @@ static PlugStorageTransactionData TopPlugStorageTransactionData = {
 	.pst_transaction_id      = InvalidTransactionId,/* transaction id */
 	.pst_transaction_status  = PS_TXN_STS_DEFAULT,  /* transaction status */
 	.pst_transaction_command = PS_TXN_CMD_INVALID,  /* transaction command */
-	.pst_transaction_dist    = NULL                 /* magma transaction info */
+	.pst_transaction_snapshot = NULL                 /* magma transaction info */
 };
 
 static PlugStorageTransaction TopPlugStorageTransaction = &TopPlugStorageTransactionData;
@@ -372,12 +372,6 @@ PlugStorageGetTransactionStatus(void)
 	return TopPlugStorageTransaction->pst_transaction_status;
 }
 
-MagmaSnapshot *
-PlugStorageGetTransactionSnapshot(void)
-{
-	return TopPlugStorageTransaction->pst_transaction_dist;
-}
-
 void PlugStorageSetIsCleanupAbort(bool isCleanup)
 {
   isCleanupAbortTransaction = isCleanup;
@@ -397,52 +391,54 @@ extern void PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot)
 	 */
 	if (Gp_role == GP_ROLE_DISPATCH)
 	{
-	    Insist(TopPlugStorageTransaction->pst_transaction_dist != NULL);
+	    Insist(TopPlugStorageTransaction->pst_transaction_snapshot != NULL);
 	}
 	else if (Gp_role == GP_ROLE_EXECUTE &&
-	         TopPlugStorageTransaction->pst_transaction_dist == NULL)
+	         TopPlugStorageTransaction->pst_transaction_snapshot == NULL)
 	{
-	    TopPlugStorageTransaction->pst_transaction_dist = malloc(sizeof(MagmaSnapshot));
-	    memset(TopPlugStorageTransaction->pst_transaction_dist, 0, sizeof(MagmaSnapshot));
-	    TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId = 0;
-	    TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus = 0;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset = 0;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions = NULL;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot = malloc(sizeof(MagmaSnapshot));
+	    memset(TopPlugStorageTransaction->pst_transaction_snapshot, 0, sizeof(MagmaSnapshot));
+	    TopPlugStorageTransaction->pst_transaction_snapshot
+                ->currentTransaction.txnId = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot
+                ->currentTransaction.txnStatus = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions = NULL;
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize = 0;
 	}
 
 	// set current transaction for current snapshot
-	TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId =
+	TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnId =
 	        snapshot->currentTransaction.txnId;
-	TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus =
+	TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnStatus =
 	        snapshot->currentTransaction.txnStatus;
 
 	// set command id
-	TopPlugStorageTransaction->pst_transaction_dist->cmdIdInTransaction =
+	TopPlugStorageTransaction->pst_transaction_snapshot
+            ->cmdIdInTransaction =
 	        snapshot->cmdIdInTransaction;
 
 	// reallocate memory for visible HLC map for current snapshot
-	free(TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions);
+	free(TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions);
 
-	TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset =
+	TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset =
 	    snapshot->txnActions.txnActionStartOffset;
-	TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions =
+	TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions =
 	    malloc(sizeof(MagmaTxnAction) *snapshot->txnActions.txnActionSize);
 
 	// set visible HLC map for current snapshot
-	TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize =
+	TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize =
 	    snapshot->txnActions.txnActionSize;
 	for (int i = 0; i < snapshot->txnActions.txnActionSize; ++i)
 	{
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnId =
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnId =
 	            snapshot->txnActions.txnActions[i].txnId;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnStatus =
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnStatus =
 	            snapshot->txnActions.txnActions[i].txnStatus;
     }
 }
 
-void
-PlugStorageBeginTransaction(List* magmaTableFullNames)
+void PlugStorageStartTransaction(List* magmaTableFullNames)
 {
 	if ((Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode())
 	{
@@ -460,13 +456,9 @@ PlugStorageBeginTransaction(List* magmaTableFullNames)
 			}
 
 			Assert(pst->pst_transaction_status == PS_TXN_STS_DEFAULT);
-			pst->pst_transaction_command = PS_TXN_CMD_BEGIN;
+			pst->pst_transaction_command =
+                            PS_TXN_CMD_START_TRANSACTION;
 			InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
-			elog(DEBUG1, "PS TXN: BEGIN (%llu, %u, %llu, %u)",
-			     pst->pst_transaction_dist->currentTransaction.txnId,
-			     pst->pst_transaction_dist->currentTransaction.txnStatus,
-			     pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-			     pst->pst_transaction_dist->txnActions.txnActionSize);
 			pst->pst_transaction_id = GetTopTransactionId();
 			pst->pst_transaction_status = PS_TXN_STS_STARTED;
 		}
@@ -492,14 +484,15 @@ PlugStorageCommitTransaction(void)
 
 			Assert(pst->pst_transaction_id == GetTopTransactionId());
 			Assert(pst->pst_transaction_status == PS_TXN_STS_STARTED);
-			pst->pst_transaction_command = PS_TXN_CMD_COMMIT;
+			pst->pst_transaction_command =
+                            PS_TXN_CMD_COMMIT_TRANSACTION;
 			elog(DEBUG1, "PS TXN: COMMIT (%llu, %u, %llu, %u)",
-			     pst->pst_transaction_dist->currentTransaction.txnId,
-			     pst->pst_transaction_dist->currentTransaction.txnStatus,
-			     pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-			     pst->pst_transaction_dist->txnActions.txnActionSize);
+			     pst->pst_transaction_snapshot->currentTransaction.txnId,
+			     pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+			     pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+			     pst->pst_transaction_snapshot->txnActions.txnActionSize);
 			InvokePlugStorageFormatTransaction(pst, NULL);
-			pst->pst_transaction_dist = NULL;
+			pst->pst_transaction_snapshot = NULL;
 			pst->pst_transaction_id = InvalidTransactionId;
 			pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
 		}
@@ -525,20 +518,74 @@ PlugStorageAbortTransaction(void)
 
 			Assert(pst->pst_transaction_id == GetTopTransactionId());
 			Assert(pst->pst_transaction_status == PS_TXN_STS_STARTED);
-			pst->pst_transaction_command = PS_TXN_CMD_ABORT;
+			pst->pst_transaction_command =
+                            PS_TXN_CMD_ABORT_TRANSACTION;
 			elog(DEBUG1, "PS TXN: ABORT (%llu, %u, llu, %u)",
-			     pst->pst_transaction_dist->currentTransaction.txnId,
-			     pst->pst_transaction_dist->currentTransaction.txnStatus,
-			     pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-			     pst->pst_transaction_dist->txnActions.txnActionSize);
+			     pst->pst_transaction_snapshot->currentTransaction.txnId,
+			     pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+			     pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+			     pst->pst_transaction_snapshot->txnActions.txnActionSize);
 			InvokePlugStorageFormatTransaction(pst, NULL);
-			pst->pst_transaction_dist = NULL;
+			pst->pst_transaction_snapshot = NULL;
 			pst->pst_transaction_id = InvalidTransactionId;
 			pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
 		}
 	}
 }
 
+
+MagmaSnapshot *
+PlugStorageGetTransactionSnapshot(List* magmaTableFullNames)
+{
+  PlugStorageTransaction pst = TopPlugStorageTransaction;
+  if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+      (pst->pst_transaction_snapshot == NULL) &&
+      (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+    if (!OidIsValid(pst->pst_proc_oid)) {
+      pst->pst_proc_oid =
+          LookupPlugStorageValidatorFunc("magma", "transaction");
+      Assert(OidIsValid(pst->pst_proc_oid));
+
+      fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+    }
+
+    pst->pst_transaction_command = PS_TXN_CMD_GET_SNAPSHOT;
+    InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+    elog(DEBUG1, "PS TXN: GET SNAPSHOT (%llu, %u, %llu, %u)",
+         pst->pst_transaction_snapshot->currentTransaction.txnId,
+         pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+         pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+         pst->pst_transaction_snapshot->txnActions.txnActionSize);
+    pst->pst_transaction_id = GetTopTransactionId();
+  }
+  return TopPlugStorageTransaction->pst_transaction_snapshot;
+}
+
+void PlugStorageGetTransactionId(List* magmaTableFullNames)
+{
+  PlugStorageTransaction pst = TopPlugStorageTransaction;
+  if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+      pst->pst_transaction_state->currentTransaction.txnId == 0 &&
+      (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+    if (!OidIsValid(pst->pst_proc_oid)) {
+      pst->pst_proc_oid =
+          LookupPlugStorageValidatorFunc("magma", "transaction");
+      Assert(OidIsValid(pst->pst_proc_oid));
+
+      fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+    }
+
+    pst->pst_transaction_command = PS_TXN_CMD_GET_TRANSACTIONID;
+    InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+    elog(DEBUG1, "PS TXN: GET TRANSACTION ID (%llu, %u, %llu, %u)",
+         pst->pst_transaction_snapshot->currentTransaction.txnId,
+         pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+         pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+         pst->pst_transaction_snapshot->txnActions.txnActionSize);
+    pst->pst_transaction_id = GetTopTransactionId();
+  }
+}
+
 /* ----------------------------------------------------------------
  *	transaction state accessors
  * ----------------------------------------------------------------
@@ -2456,7 +2503,7 @@ StartTransaction(void)
 	/*
 	 * begin transaction in magma service now
 	 */
-	/* PlugStorageBeginTransaction(); */
+	/* PlugStorageStartTransaction(); */
 }
 
 /*
diff --git a/src/backend/catalog/aoseg.c b/src/backend/catalog/aoseg.c
index b7b93fd..7e88b91 100644
--- a/src/backend/catalog/aoseg.c
+++ b/src/backend/catalog/aoseg.c
@@ -56,7 +56,9 @@
 #include "cdb/cdbvars.h"
 
 static bool create_aoseg_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid);
+static bool create_aoseg_index_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid);
 static bool needs_aoseg_table(Relation rel);
+static bool needs_aoseg_index_table(Relation rel);
 
 /*
  * AlterTableCreateAoSegTable
@@ -106,6 +108,19 @@ AlterTableCreateAoSegTableWithOid(Oid relOid, Oid newOid, Oid newIndexOid,
 	heap_close(rel, NoLock);
 }
 
+void
+AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool is_part_child)
+{
+	Relation	rel;
+	Assert(!is_part_child);
+	rel = heap_open(relOid, AccessShareLock);
+
+	/* create_aoseg_index_table does all the work */
+	(void) create_aoseg_index_table(rel, InvalidOid, InvalidOid, NULL);
+
+	heap_close(rel, AccessShareLock);
+}
+
 /*
  * create_aoseg_table --- internal workhorse
  *
@@ -310,6 +325,139 @@ create_aoseg_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptype
 	return true;
 }
 
+static bool
+create_aoseg_index_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid)
+{
+	Oid			relOid = RelationGetRelid(rel);
+	TupleDesc	tupdesc;
+	bool		shared_relation;
+	Oid		  blkdirrelid;
+	Oid		  blkdiridxid;
+	char		aoseg_relname[NAMEDATALEN];
+	char		aoseg_idxname[NAMEDATALEN];
+	IndexInfo  *indexInfo;
+	Oid			classObjectId[2];
+	ObjectAddress baseobject,
+				aosegobject;
+	Oid			tablespaceOid = ChooseTablespaceForLimitedObject(rel->rd_rel->reltablespace);
+
+	/*
+	 * Check to see whether the table actually needs an aoseg index table.
+	 */
+	if (!needs_aoseg_index_table(rel))
+		return false;
+
+	shared_relation = rel->rd_rel->relisshared;
+
+	/* can't have shared AO tables after initdb */
+	/* TODO: disallow it at CREATE TABLE time */
+	Assert(!(shared_relation && !IsBootstrapProcessingMode()) );
+
+	GetAppendOnlyEntryAuxOids(relOid, SnapshotNow, NULL, NULL, &blkdirrelid, &blkdiridxid);
+
+	/*
+	 * Was a aoseg index table already created?
+	 */
+	if (blkdirrelid != InvalidOid)
+	{
+		return false;
+	}
+
+	snprintf(aoseg_relname, sizeof(aoseg_relname), "pg_orcseg_idx_%u",
+					 relOid);
+	snprintf(aoseg_idxname, sizeof(aoseg_idxname), "pg_orcseg_idx_%u_index",
+					 relOid);
+
+	tupdesc = CreateTemplateTupleDesc(Natts_pg_orcseg_idx, false);
+
+	TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_idxoid, "idxoid",
+										 INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_segno, "segno",
+										 INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_eof, "eof",
+										 FLOAT8OID, -1, 0);
+
+	blkdirrelid = heap_create_with_catalog(aoseg_relname,
+										   PG_AOSEGMENT_NAMESPACE,
+										   tablespaceOid,
+										   aosegOid,
+										   rel->rd_rel->relowner,
+										   tupdesc,
+										   /* relam */ InvalidOid,
+										   RELKIND_AOSEGMENTS,
+										   RELSTORAGE_HEAP,
+										   shared_relation,
+										   true,
+										   /* bufferPoolBulkLoad */ false,
+										   0,
+										   ONCOMMIT_NOOP,
+										   NULL, /* CDB POLICY */
+										   (Datum) 0,
+										   true,
+										   comptypeOid,
+						 				   /* persistentTid */ NULL,
+						 				   /* persistentSerialNum */ NULL,
+						 				   /* formattername */ NULL);
+
+	/* make the toast relation visible, else index creation will fail */
+	CommandCounterIncrement();
+
+	/*
+	 * Create unique index on index oid.
+	 */
+	indexInfo = makeNode(IndexInfo);
+	indexInfo->ii_NumIndexAttrs = 1;
+	indexInfo->ii_NumIndexKeyAttrs = 1;
+	indexInfo->ii_KeyAttrNumbers[0] = 1;
+	indexInfo->ii_Expressions = NIL;
+	indexInfo->ii_ExpressionsState = NIL;
+	indexInfo->ii_Predicate = NIL;
+	indexInfo->ii_PredicateState = NIL;
+	indexInfo->ii_Unique = true;
+	indexInfo->ii_Concurrent = false;
+
+	classObjectId[0] = INT4_BTREE_OPS_OID;
+	classObjectId[1] = INT4_BTREE_OPS_OID;
+
+	blkdiridxid = index_create(blkdirrelid, aoseg_idxname, aosegIndexOid,
+							   indexInfo,
+							   BTREE_AM_OID,
+							   tablespaceOid,
+							   classObjectId, (Datum) 0,
+							   true, false, (Oid *) NULL, true, false, false, NULL);
+
+	/* Unlock target table -- no one can see it */
+	UnlockRelationOid(blkdirrelid, ShareLock);
+	/* Unlock the index -- no one can see it anyway */
+	UnlockRelationOid(blkdiridxid, AccessExclusiveLock);
+
+	/*
+	 * Store the aoseg table's OID in the parent relation's pg_appendonly row
+	 */
+	UpdateAppendOnlyEntryAuxOids(relOid, InvalidOid, InvalidOid, blkdirrelid, blkdiridxid);
+
+	/*
+	 * Register dependency from the aoseg table to the master, so that the
+	 * aoseg table will be deleted if the master is.
+	 */
+	baseobject.classId = RelationRelationId;
+	baseobject.objectId = relOid;
+	baseobject.objectSubId = 0;
+	aosegobject.classId = RelationRelationId;
+	aosegobject.objectId = blkdirrelid;
+	aosegobject.objectSubId = 0;
+
+	recordDependencyOn(&aosegobject, &baseobject, DEPENDENCY_INTERNAL);
+
+	/*
+	 * Make changes visible
+	 */
+	CommandCounterIncrement();
+
+	return true;
+}
+
+
 /*
  * Check to see whether the table needs an aoseg table.	It does only if it is
  * an append-only relation.
@@ -320,4 +468,12 @@ needs_aoseg_table(Relation rel)
 	return RelationIsAo(rel);
 }
 
-
+/*
+ * Check to see whether the table needs an aoseg index table.	It does only if it is
+ * an append-only orc relation.
+ */
+static bool
+needs_aoseg_index_table(Relation rel)
+{
+	return RelationIsOrc(rel);
+}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index e7e3887..e13b97b 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -129,7 +129,7 @@ extern void PQclear(PGresult *res);
 
 static void MetaTrackAddUpdInternal(cqContext  *pcqCtx,
 									Oid			classid,
-									Oid			objoid, 
+									Oid			objoid,
 									Oid			relowner,
 									char*		actionname,
 									char*		subtype,
@@ -546,14 +546,14 @@ heap_create(const char *relname,
 							rel->rd_segfile0_relationnodeinfo.persistentSerialNum);
 			heap_close(gp_relation_node, RowExclusiveLock);
 		}
-#endif	
+#endif
 	}
 
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(&rel->rd_relationnodeinfo.persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "setNewRelfilenodeCommon has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
 			 rel->rd_node.spcNode,
 			 rel->rd_node.dbNode,
@@ -563,7 +563,7 @@ heap_create(const char *relname,
 	}
 
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 		     "heap_create: '%s', Append-Only '%s', persistent TID %s and serial number " INT64_FORMAT " for CREATE",
 			 relpath(rel->rd_node),
 			 (isAppendOnly ? "true" : "false"),
@@ -917,8 +917,8 @@ static void MetaTrackAddUpdInternal(cqContext  *pcqCtx,
 } /* end MetaTrackAddUpdInternal */
 
 
-void MetaTrackAddObject(Oid		classid, 
-						Oid		objoid, 
+void MetaTrackAddObject(Oid		classid,
+						Oid		objoid,
 						Oid		relowner,
 						char*	actionname,
 						char*	subtype)
@@ -959,8 +959,8 @@ void MetaTrackAddObject(Oid		classid,
 
 } /* end MetaTrackAddObject */
 
-void MetaTrackUpdObject(Oid		classid, 
-						Oid		objoid, 
+void MetaTrackUpdObject(Oid		classid,
+						Oid		objoid,
 						Oid		relowner,
 						char*	actionname,
 						char*	subtype)
@@ -1015,7 +1015,7 @@ void MetaTrackUpdObject(Oid		classid,
 									classid, objoid, relowner,
 									actionname, subtype,
 									rel, tuple);
-			
+
 /*			CommandCounterIncrement(); */
 
 			ii++;
@@ -1026,14 +1026,14 @@ void MetaTrackUpdObject(Oid		classid,
 
 	/* add it if it didn't already exist */
 	if (!ii)
-		MetaTrackAddObject(classid, 
-						   objoid, 
+		MetaTrackAddObject(classid,
+						   objoid,
 						   relowner,
 						   actionname,
 						   subtype);
 
 } /* end MetaTrackUpdObject */
-void MetaTrackDropObject(Oid		classid, 
+void MetaTrackDropObject(Oid		classid,
 						 Oid		objoid)
 {
 	int ii = 0;
@@ -1307,7 +1307,7 @@ AddNewRelationTuple(Relation pg_class_desc,
 			/* NOTE: look at cdb_estimate_rel_size() if changing these values */
 			if(relstorage_is_external(relstorage))
 			{
-				new_rel_reltup->relpages = gp_external_table_default_number_of_pages; 
+				new_rel_reltup->relpages = gp_external_table_default_number_of_pages;
 				new_rel_reltup->reltuples = gp_external_table_default_number_of_tuples;
 			}
 			break;
@@ -1430,8 +1430,8 @@ InsertGpRelfileNodeTuple(
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "Inserting with invalid TID (0,0) into relation id %u '%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT,
 			 relationId,
 			 relname,
@@ -1444,7 +1444,7 @@ InsertGpRelfileNodeTuple(
 	memset(nulls, false, sizeof(nulls));
 
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "InsertGpRelationNodeTuple: Inserting into relation id %u '%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT ", TID %s",
 			 relationId,
 			 relname,
@@ -1499,8 +1499,8 @@ UpdateGpRelfileNodeTuple(
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "Updating with invalid TID (0,0) in relfilenode %u, segment file #%d, serial number " INT64_FORMAT,
 			 relfilenode,
 			 segmentFileNum,
@@ -1508,7 +1508,7 @@ UpdateGpRelfileNodeTuple(
 	}
 
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "UpdateGpRelationNodeTuple: Updating relfilenode %u, segment file #%d, serial number " INT64_FORMAT " at TID %s",
 			 relfilenode,
 			 segmentFileNum,
@@ -1521,7 +1521,7 @@ UpdateGpRelfileNodeTuple(
 
 	repl_repl[Anum_gp_relfile_node_relfilenode_oid - 1] = true;
 	repl_val[Anum_gp_relfile_node_relfilenode_oid - 1] = ObjectIdGetDatum(relfilenode);
-	
+
 	repl_repl[Anum_gp_relfile_node_segment_file_num - 1] = true;
 	repl_val[Anum_gp_relfile_node_segment_file_num - 1] = Int32GetDatum(segmentFileNum);
 
@@ -1529,12 +1529,12 @@ UpdateGpRelfileNodeTuple(
 
 	repl_repl[Anum_gp_relfile_node_persistent_tid- 1] = true;
 	repl_val[Anum_gp_relfile_node_persistent_tid- 1] = PointerGetDatum(persistentTid);
-	
+
 	repl_repl[Anum_gp_relfile_node_persistent_serial_num - 1] = true;
 	repl_val[Anum_gp_relfile_node_persistent_serial_num - 1] = Int64GetDatum(persistentSerialNum);
 
 	newtuple = heap_modify_tuple(tuple, RelationGetDescr(gp_relfile_node), repl_val, repl_null, repl_repl);
-	
+
 	simple_heap_update(gp_relfile_node, &newtuple->t_self, newtuple);
 
 	CatalogUpdateIndexes(gp_relfile_node, newtuple);
@@ -1559,7 +1559,7 @@ AddNewRelfileNodeTuple(
 							/* updateIndex */ true,
 							&new_rel->rd_relationnodeinfo.persistentTid,
 							new_rel->rd_relationnodeinfo.persistentSerialNum);
-							
+
 	}
 }
 
@@ -1603,7 +1603,7 @@ heap_create_with_catalog(const char *relname,
 
 	pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
 
-    // When creating gp_persistent_relfile_node, we can't directly insert meta info into gp_relfile_node 
+    // When creating gp_persistent_relfile_node, we can't directly insert meta info into gp_relfile_node
     // for this table is renamed from gp_relation_node, also it's schema changed.
 	if (IsBootstrapProcessingMode()|| (gp_upgrade_mode && GpPersistent_IsPersistentRelation(relid)))
 		gp_relfile_node_desc = NULL;
@@ -1630,7 +1630,7 @@ heap_create_with_catalog(const char *relname,
 		}
 		relstorage = stdRdOptions->columnstore;
 	}
-	
+
 	if (IsBuiltinTablespace(reltablespace) && appendOnlyRel) {
 		ereport(ERROR,
 			(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1669,7 +1669,7 @@ heap_create_with_catalog(const char *relname,
 								 stdRdOptions->columnstore);
 
 	/* MPP-8058: disallow OIDS on column-oriented tables */
-	if (tupdesc->tdhasoid && 
+	if (tupdesc->tdhasoid &&
 		IsNormalProcessingMode() &&
         (Gp_role == GP_ROLE_DISPATCH))
 	{
@@ -1982,11 +1982,11 @@ heap_create_with_catalog(const char *relname,
 		}
 
 		/* MPP-7576: don't track internal namespace tables */
-		switch (relnamespace) 
+		switch (relnamespace)
 		{
 			case PG_CATALOG_NAMESPACE:
 				/* MPP-7773: don't track objects in system namespace
-				 * if modifying system tables (eg during upgrade)  
+				 * if modifying system tables (eg during upgrade)
 				 */
 				if (allowSystemTableModsDDL)
 					doIt = false;
@@ -2330,7 +2330,7 @@ RemoveAttrDefault(Oid relid, AttrNumber attnum,
 		object.objectSubId = 0;
 
 		performDeletion(&object, behavior);
-		
+
 		found = true;
 	}
 
@@ -2429,9 +2429,9 @@ remove_gp_relation_node_and_schedule_drop(
 	Relation	rel)
 {
 	PersistentFileSysRelStorageMgr relStorageMgr;
-	
+
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "remove_gp_relation_node_and_schedule_drop: dropping relation '%s', relation id %u '%s', relfilenode %u",
 			 rel->rd_rel->relname.data,
 			 rel->rd_id,
@@ -2449,9 +2449,9 @@ remove_gp_relation_node_and_schedule_drop(
 		DeleteGpRelfileNodeTuple(
 								rel,
 								/* segmentFileNum */ 0);
-		
+
 		if (Debug_persistent_print)
-			elog(Persistent_DebugPrintLevel(), 
+			elog(Persistent_DebugPrintLevel(),
 				 "remove_gp_relation_node_and_schedule_drop: For Buffer Pool managed relation '%s' persistent TID %s and serial number " INT64_FORMAT " for DROP",
 				 relpath(rel->rd_node),
 				 ItemPointerToString(&rel->rd_relationnodeinfo.persistentTid),
@@ -2465,7 +2465,7 @@ remove_gp_relation_node_and_schedule_drop(
 		int32 segmentFileNum;
 		ItemPointerData persistentTid;
 		int64 persistentSerialNum;
-		
+
 		relNodeRelation = heap_open(GpRelfileNodeRelationId, RowExclusiveLock);
 
 		GpRelfileNodeBeginScan(
@@ -2473,7 +2473,7 @@ remove_gp_relation_node_and_schedule_drop(
 						rel->rd_id,
 						rel->rd_rel->relfilenode,
 						&gpRelfileNodeScan);
-		
+
 		while ((tuple = GpRelfileNodeGetNext(
 								&gpRelfileNodeScan,
 								&segmentFileNum,
@@ -2481,16 +2481,16 @@ remove_gp_relation_node_and_schedule_drop(
 								&persistentSerialNum)))
 		{
 			if (Debug_persistent_print)
-				elog(Persistent_DebugPrintLevel(), 
+				elog(Persistent_DebugPrintLevel(),
 					 "remove_gp_relation_node_and_schedule_drop: For Append-Only relation %u relfilenode %u scanned segment file #%d, serial number " INT64_FORMAT " at TID %s for DROP",
 					 rel->rd_id,
 					 rel->rd_rel->relfilenode,
 					 segmentFileNum,
 					 persistentSerialNum,
 					 ItemPointerToString(&persistentTid));
-			
+
 			simple_heap_delete(relNodeRelation, &tuple->t_self);
-			
+
 			MirroredFileSysObj_ScheduleDropAppendOnlyFile(
 											&rel->rd_node,
 											segmentFileNum,
@@ -2498,9 +2498,9 @@ remove_gp_relation_node_and_schedule_drop(
 											&persistentTid,
 											persistentSerialNum);
 		}
-		
+
 		GpRelfileNodeEndScan(&gpRelfileNodeScan);
-		
+
 		heap_close(relNodeRelation, RowExclusiveLock);
 
 		/*
@@ -2682,8 +2682,9 @@ heap_drop_with_catalog(Oid relid)
 				// start transaction in magma for DROP TABLE
 				if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 				{
-				    PlugStorageBeginTransaction(NULL);
+                                  PlugStorageStartTransaction();
 				}
+                                PlugStorageGetTransactionId(NULL);
 				Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 				// drop table in magma now
@@ -2693,7 +2694,7 @@ heap_drop_with_catalog(Oid relid)
 				                     database_name,
 				                     schema_name,
 				                     table_name,
-				                     PlugStorageGetTransactionSnapshot());
+				                     PlugStorageGetTransactionSnapshot(NULL));
 				ReadCacheHashEntryReviseOnCommit(RelationGetRelid(rel), true);
 
 			}
@@ -2718,7 +2719,7 @@ heap_drop_with_catalog(Oid relid)
 
 	if (is_foreign_rel)
 		RemoveForeignTableEntry(relid);
-	
+
 	/*
  	 * delete distribution policy if present
  	 */
@@ -2815,12 +2816,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
 
 	adrcqCtx = caql_beginscan(
 			caql_addrel(cqclr(&cqc), adrel),
-			cql("INSERT INTO pg_attrdef ", 
+			cql("INSERT INTO pg_attrdef ",
 				NULL));
 
 	if (Debug_check_for_invalid_persistent_tid)
-	{	
-		elog(LOG, 
+	{
+		elog(LOG,
 			 "StoreAttrDefault[1] relation %u/%u/%u '%s', isPresent %s, serial number " INT64_FORMAT ", TID %s",
 			 adrel->rd_node.spcNode,
 			 adrel->rd_node.dbNode,
@@ -2835,8 +2836,8 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
 	RelationFetchGpRelationNodeForXLog(adrel);
 
 	if (Debug_check_for_invalid_persistent_tid)
-	{	
-		elog(LOG, 
+	{
+		elog(LOG,
 			 "StoreAttrDefault[2] relation %u/%u/%u '%s', isPresent %s, serial number " INT64_FORMAT ", TID %s",
 			 adrel->rd_node.spcNode,
 			 adrel->rd_node.dbNode,
@@ -3518,7 +3519,7 @@ RemoveStatistics(Oid relid, AttrNumber attnum)
 					" WHERE starelid = :1 "
 					" AND staattnum = :2 ",
 					ObjectIdGetDatum(relid),
-					Int16GetDatum(attnum)));		 
+					Int16GetDatum(attnum)));
 	}
 }
 
@@ -3550,7 +3551,7 @@ RelationTruncateIndexes(Relation heapRelation)
 
 		/* Now truncate the actual file (and discard buffers) */
 		RelationTruncate(
-					currentIndex, 
+					currentIndex,
 					0,
 					/* markPersistentAsPhysicallyTruncated */ true);
 
@@ -3636,7 +3637,7 @@ heap_truncate(List *relids)
 
 		/* Truncate the actual file (and discard buffers) */
 		RelationTruncate(
-					rel, 
+					rel,
 					0,
 					/* markPersistentAsPhysicallyTruncated */ false);
 
@@ -3888,7 +3889,7 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 	newrnode.relNode = newrelfilenode;
 
 	isAppendOnly = RelationIsAo(relation);
-	
+
 	relname = RelationGetRelationName(relation);
 
 	if (!isAppendOnly)
@@ -3897,7 +3898,7 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 
 		PersistentFileSysRelStorageMgr localRelStorageMgr;
 		PersistentFileSysRelBufpoolKind relBufpoolKind;
-		
+
 		GpPersistentRelfileNode_GetRelfileInfo(
 											relation->rd_rel->relkind,
 											relation->rd_rel->relstorage,
@@ -3905,9 +3906,9 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 											&localRelStorageMgr,
 											&relBufpoolKind);
 		Assert(localRelStorageMgr == PersistentFileSysRelStorageMgr_BufferPool);
-		
+
 		srel = smgropen(newrnode);
-	
+
 		MirroredFileSysObj_TransactionCreateBufferPoolFile(
 											srel,
 											relBufpoolKind,
@@ -3931,8 +3932,8 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(&relation->rd_relationnodeinfo.persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "setNewRelfilenodeCommon has invalid TID (0,0) for relation %u/%u/%u '%s', serial number " INT64_FORMAT,
 			 newrnode.spcNode,
 			 newrnode.dbNode,
@@ -3942,9 +3943,9 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 	}
 
 	relation->rd_relationnodeinfo.isPresent = true;
-	
+
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "setNewRelfilenodeCommon: NEW '%s', Append-Only '%s', persistent TID %s and serial number " INT64_FORMAT,
 			 relpath(newrnode),
 			 (isAppendOnly ? "true" : "false"),
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9b4762e..7747e5f 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -977,8 +977,9 @@ index_create(Oid heapRelationId,
 		// 2. start transaction in magma for CREATE INDEX
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-			PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 		// 3. call InvokeMagmaCreateIndex
@@ -1006,7 +1007,7 @@ index_create(Oid heapRelationId,
 		InvokeMagmaCreateIndex(
 				&procInfo, database_name, schemaname,
 				tablename, &idxinfo,
-				PlugStorageGetTransactionSnapshot());
+				PlugStorageGetTransactionSnapshot(NULL));
 		// free memory
 		pfree(idxinfo.indkey);
 	}
@@ -1281,8 +1282,9 @@ index_drop(Oid indexId)
 		// 2. start transaction in magma for DROP INDEX
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-			PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 		// 3. call InvokeMagmaDropIndex
@@ -1302,7 +1304,7 @@ index_drop(Oid indexId)
 		InvokeMagmaDropIndex(
 				&procInfo, database_name, schemaname,
 				tablename, indexName,
-				PlugStorageGetTransactionSnapshot());
+				PlugStorageGetTransactionSnapshot(NULL));
 	}
 
 	/*
@@ -1475,7 +1477,7 @@ index_update_stats(Relation rel, bool hasindex, bool isprimary,
 	 * this relpages are only needed by QE,
 	 * when this is a magma table, just ignore this info.
 	 */
-	if (!((RelationIsExternal(rel) && RelationIsMagmaTable(rel->rd_id))))
+	if (!RelationIsMagmaTable2(rel->rd_id))
 		relpages = RelationGetNumberOfBlocks(rel);
 	Oid			relid = RelationGetRelid(rel);
 	Relation	pg_class;
@@ -2846,8 +2848,9 @@ reindex_index(Oid indexId, Oid newrelfilenode, List **extra_oids)
 		// 2. start transaction in magma for REINDEX INDEX
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-			PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 		// 3. call InvokeMagmaReindexIndex
@@ -2867,7 +2870,7 @@ reindex_index(Oid indexId, Oid newrelfilenode, List **extra_oids)
 		InvokeMagmaReindexIndex(
 				&procInfo, database_name, schemaname,
 				tablename, indexName,
-				PlugStorageGetTransactionSnapshot());
+				PlugStorageGetTransactionSnapshot(NULL));
 	}
 
 	/* Close rels, but keep locks */
diff --git a/src/backend/catalog/pg_compression.c b/src/backend/catalog/pg_compression.c
index 1a10d09..4df1427 100644
--- a/src/backend/catalog/pg_compression.c
+++ b/src/backend/catalog/pg_compression.c
@@ -582,7 +582,8 @@ compresstype_is_valid(char *comptype)
 	{
 		if(strcmp(comptype, "snappy") == 0 ||
 		    strcmp(comptype, "gzip") == 0 ||
-		    strcmp(comptype, "lz4") == 0)
+		    strcmp(comptype, "lz4") == 0 ||
+		    strcmp(comptype, "zstd") == 0)
 			found = true;
 	}
 
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index 278103b..0e908fc 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -1073,9 +1073,16 @@ int64 get_block_locations_and_calculate_table_size(split_to_segment_mapping_cont
           // start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
           if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
           {
-                  PlugStorageBeginTransaction(magmaTableFullNames);
+            PlugStorageStartTransaction();
                   useClientCacheDirectly = true;
           }
+          if (((PlannedStmt *)context->srtc_context.base.node)->commandType ==
+                  CMD_SELECT ||
+              context->isTargetNoMagma) {
+            PlugStorageGetTransactionSnapshot(magmaTableFullNames);
+          } else {
+            PlugStorageGetTransactionId(magmaTableFullNames);
+          }
           Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
   }
 
@@ -2886,7 +2893,7 @@ static void ExternalGetMagmaRangeDataLocation(
 			// get range location from magma now
 			Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 			InvokeMagmaProtocolBlockLocation(ext_entry, procOid, dbname, schemaname,
-			                                 tablename, PlugStorageGetTransactionSnapshot(),
+			                                 tablename, PlugStorageGetTransactionSnapshot(NULL),
 			                                 useClientCacheDirectly,
 			                                 &bldata);
 		}
@@ -6698,14 +6705,15 @@ void build_magma_scansplits_for_result_relations(List **alloc_result, List *relO
     // start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
     if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
     {
-      PlugStorageBeginTransaction(NULL);
+      PlugStorageStartTransaction();
     }
+    PlugStorageGetTransactionId(NULL);
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
     ExtProtocolBlockLocationData *bldata = NULL;
     InvokeMagmaProtocolBlockLocation(
         ext_entry, procOid, dbname, schemaname, tablename,
-        PlugStorageGetTransactionSnapshot(), false, &bldata);
+        PlugStorageGetTransactionSnapshot(NULL), false, &bldata);
 
     pfree(dbname);
     pfree(schemaname);
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c b/src/backend/cdb/cdbquerycontextdispatching.c
index 9ffb5e5..fcf11d8 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -868,7 +868,7 @@ RebuildPlugStorageSnapshot(QueryContextInfo *cxt)
 
 	pfree(snapshot.txnActions.txnActions);
 
-	MagmaSnapshot *s = PlugStorageGetTransactionSnapshot();
+	MagmaSnapshot *s = PlugStorageGetTransactionSnapshot(NULL);
 
 	elog(LOG, "SNAPSHOT DEBUG: GET TOP (%llu, %u, %llu, %u, %d)",
 	     s->currentTransaction.txnId,
@@ -1703,7 +1703,7 @@ prepareDispatchedCatalogSingleRelation(QueryContextInfo *cxt, Oid relid,
 			pfree(formatterName);
 	}
 	/* The pluggable storage snapshot must be dispatched */
-	prepareDispatchedPlugStorageSnapshot(cxt, PlugStorageGetTransactionSnapshot());
+	prepareDispatchedPlugStorageSnapshot(cxt, PlugStorageGetTransactionSnapshot(NULL));
 
 	/* The distribution policy for table */
 	prepareDispatchedCatalogDistributionPolicy(cxt, relid);
diff --git a/src/backend/cdb/dispatcher.c b/src/backend/cdb/dispatcher.c
index 226ffae..f29514a 100644
--- a/src/backend/cdb/dispatcher.c
+++ b/src/backend/cdb/dispatcher.c
@@ -1180,6 +1180,9 @@ static void dispatcher_serialize_common_plan(DispatchData *data, CommonPlanConte
             new_executor_partitioned_hash_recursive_depth_limit);
     univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
                    numberStrBuf);
+    sprintf(numberStrBuf, "%d", new_executor_external_sort_memory_limit_size_mb);
+    univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+                   numberStrBuf);
 
     univPlanAddGuc(ctx->univplan, "new_interconnect_type",
                    show_new_interconnect_type());
diff --git a/src/backend/cdb/dispatcher_new.c b/src/backend/cdb/dispatcher_new.c
index f00afce..ee6cc69 100644
--- a/src/backend/cdb/dispatcher_new.c
+++ b/src/backend/cdb/dispatcher_new.c
@@ -709,6 +709,9 @@ static void dispatcher_serialize_common_plan(MainDispatchData *data,
             new_executor_partitioned_hash_recursive_depth_limit);
     univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
                    numberStrBuf);
+    sprintf(numberStrBuf, "%d", new_executor_external_sort_memory_limit_size_mb);
+    univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+                   numberStrBuf);
 
     univPlanAddGuc(ctx->univplan, "new_interconnect_type",
                    show_new_interconnect_type());
diff --git a/src/backend/cdb/motion/ic_udp.c b/src/backend/cdb/motion/ic_udp.c
index 0c7030c..2f754c3 100644
--- a/src/backend/cdb/motion/ic_udp.c
+++ b/src/backend/cdb/motion/ic_udp.c
@@ -778,8 +778,6 @@ static bool SendChunkUDP(MotionLayerState *mlStates, ChunkTransportState *transp
 
 static void doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID);
 static bool dispatcherAYT(void);
-static void checkQDConnectionAlive(void);
-
 
 static void *rxThreadFunc(void *arg);
 
@@ -5923,24 +5921,6 @@ formatSockAddr(struct sockaddr *sa, char* buf, int bufsize)
 }								/* formatSockAddr */
 
 /*
- * checkQDConnectionAlive
- * 		Check whether QD connection is still alive. If not, report error.
- */
-static void
-checkQDConnectionAlive(void)
-{
-	if (!dispatch_validate_conn(MyProcPort->sock))
-	{
-		if (Gp_role == GP_ROLE_EXECUTE)
-			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-							errmsg("Interconnect error segment lost contact with master (recv)")));
-		else
-			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-							errmsg("Interconnect error master lost contact with client (recv)")));
-	}
-}
-
-/*
  * getCurrentTime
  * 		get current time
  *
@@ -6967,3 +6947,22 @@ WaitInterconnectQuitUDP(void)
 	}
 	ic_control_info.threadCreated = false;
 }
+
+
+/*
+ * checkQDConnectionAlive
+ *    Check whether QD connection is still alive. If not, report error.
+ */
+void
+checkQDConnectionAlive(void)
+{
+  if (!dispatch_validate_conn(MyProcPort->sock))
+  {
+    if (Gp_role == GP_ROLE_EXECUTE)
+      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+              errmsg("Interconnect error segment lost contact with master (recv)")));
+    else
+      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+              errmsg("Interconnect error master lost contact with client (recv)")));
+  }
+}
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index b3fcf4a..6b61415 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -3872,7 +3872,7 @@ int64 GetExternalTotalBytesMAGMA(Relation relation){
 
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
     InvokeMagmaProtocolTableSize(ext_entry, procOid, dbname, schemaname, tablename,
-                                 PlugStorageGetTransactionSnapshot(), &ts);
+                                 PlugStorageGetTransactionSnapshot(NULL), &ts);
 
     pfree(dbname);
     pfree(schemaname);
@@ -3899,7 +3899,7 @@ int64 GetDatabaseTotalBytesMAGMA(Oid dbOid){
 
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 		InvokeMagmaProtocolDatabaseSize(procOid, dbname,
-		                                PlugStorageGetTransactionSnapshot(),
+		                                PlugStorageGetTransactionSnapshot(NULL),
 		                                &dbs);
 
 		pfree(dbname);
@@ -4099,7 +4099,7 @@ uint64 GetExternalTotalBytes(Relation rel)
 			/* start transaction for magma table */
       if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
       {
-        PlugStorageBeginTransaction(NULL);
+        PlugStorageStartTransaction();
       }
 			Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 92f7098..ed98b00 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1290,8 +1290,9 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 		{
 			if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 			{
-				PlugStorageBeginTransaction(NULL);
+                          PlugStorageStartTransaction();
 			}
+                        PlugStorageGetTransactionId(NULL);
 			Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 		}
 
@@ -2574,7 +2575,7 @@ CopyTo(CopyState cstate)
 					currentScanDesc = InvokePlugStorageFormatBeginScan(
 							&beginScanFunc, cstate->planstmt, node, &(externalstate.ss),
 							serializeSchema, serializeSchemaLen, rel,
-							formatterType, formatterName, PlugStorageGetTransactionSnapshot());
+							formatterType, formatterName, PlugStorageGetTransactionSnapshot(NULL));
 				}
 				else
 				{
@@ -4562,7 +4563,7 @@ CopyFrom(CopyState cstate)
 							                                          formatterName,
 							                                          plannedstmt,
 							                                          segfileinfo->segno,
-							                                          PlugStorageGetTransactionSnapshot());
+							                                          PlugStorageGetTransactionSnapshot(NULL));
 
 							pfree(insertInitFunc);
 							pfree(plannedstmt);
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 072197a..126861c 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1530,8 +1530,9 @@ dropdb(const char *dbname, bool missing_ok)
 						// start transaction in magma for DROP TABLE
 						if (PlugStorageGetTransactionStatus()
 								== PS_TXN_STS_DEFAULT) {
-							PlugStorageBeginTransaction(NULL);
+                                                  PlugStorageStartTransaction();
 						}
+                                                PlugStorageGetTransactionId(NULL);
 						Assert(
 								PlugStorageGetTransactionStatus()
 										== PS_TXN_STS_STARTED);
@@ -1539,7 +1540,7 @@ dropdb(const char *dbname, bool missing_ok)
 						// drop table in magma now
 						InvokeMagmaDropTable(&procInfo, dbInfoRel->exttable, database_name,
 								schema_name, table_name,
-								PlugStorageGetTransactionSnapshot());
+								PlugStorageGetTransactionSnapshot(NULL));
 						ReadCacheHashEntryReviseOnCommit(dbInfoRel->relationOid, true);
 					}
 				}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 7d43c8f..345de0b 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -34,6 +34,7 @@
 
 #include "postgres.h"
 
+#include "access/aosegfiles.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/fileam.h"
@@ -423,10 +424,11 @@ DefineIndex(Oid relationId,
 				 errmsg("access method \"%s\" does not support multicolumn indexes",
 						accessMethodName)));
 
-    if  (unique && RelationIsAo(rel))
-        ereport(ERROR,
-                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                 errmsg("append-only tables do not support unique indexes")));
+
+	/* native orc can't support unique/primary index */
+	if (unique && RelationIsOrc(rel))
+		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("native orc do not support unique indexes")));
 
 	amoptions = accessMethodForm->amoptions;
 
@@ -752,24 +754,14 @@ DefineIndex(Oid relationId,
 							   errOmitLocation(true)));
         }
         else
-		    {
-          char *formatOpt = caql_getcstring(
-                  NULL,
-                  cql("SELECT fmtopts FROM pg_exttable WHERE reloid = :1",
-                  ObjectIdGetDatum(relationId)));
-          if (!formatOpt) {
-            ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                    errmsg("Cannot support DefineIndex")));
-          }
-          else {
-            char *formatName = getExtTblFormatterTypeInFmtOptsStr(formatOpt);
-            if (!formatName ||
-                (!(pg_strncasecmp(formatName, "magma", strlen("magma")) == 0))) {
-              ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                      errmsg("Cannot support DefineIndex")));
-            }
-          }
-		    }
+        {
+        	/* magma and native orc support index */
+        	if (!(RelationIsOrc(rel) || RelationIsMagmaTable2(relationId)))
+        	{
+        		ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support DefineIndex")));
+        	}
+        	// dispatch_statement_node((Node *)stmt, NULL, NULL, NULL);
+        }
 	}
 
 	/* save lockrelid for below, then close rel */
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3460af4..a92eaa6 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -1833,14 +1833,15 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt)
 		// start transaction in magma for CREATE TABLE
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-		    PlugStorageBeginTransaction(NULL);
+		    PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 		InvokeMagmaCreateTable(&procInfo,
 		                       database_name,
 		                       schema_name,
 		                       table_name,
-		                       PlugStorageGetTransactionSnapshot(),
+		                       PlugStorageGetTransactionSnapshot(NULL),
 		                       createExtStmt->base.tableElts,
 		                       createExtStmt->pkey,
 		                       createExtStmt->base.distributedBy,
diff --git a/src/backend/executor/execDML.c b/src/backend/executor/execDML.c
index c6a0384..bd06077 100644
--- a/src/backend/executor/execDML.c
+++ b/src/backend/executor/execDML.c
@@ -470,7 +470,7 @@ ExecInsert(TupleTableSlot *slot,
 															 formatterName,
 															 estate->es_plannedstmt,
 															 segfileinfo->segno,
-															 PlugStorageGetTransactionSnapshot());
+															 PlugStorageGetTransactionSnapshot(NULL));
 				}
 				else
 				{
@@ -723,7 +723,7 @@ ldelete:;
 	                    InvokeMagmaBeginDelete(&procInfo,
 	                                           resultRelationDesc,
 	                                           estate->es_plannedstmt,
-	                                           PlugStorageGetTransactionSnapshot());
+	                                           PlugStorageGetTransactionSnapshot(NULL));
 		    }
 	        else
 	        {
@@ -1043,7 +1043,7 @@ lreplace:;
 		                    InvokeMagmaBeginUpdate(&procInfo,
 		                                           resultRelationDesc,
 		                                           estate->es_plannedstmt,
-		                                           PlugStorageGetTransactionSnapshot());
+		                                           PlugStorageGetTransactionSnapshot(NULL));
 		            elog(LOG, "exec update begin update: %d", extUpdDescEntry->ext_ins_oid);
 		        }
 		        else
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 71f49ef..5de2981 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -5084,7 +5084,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 	                               formatterName,
 	                               estate->es_plannedstmt,
 	                               segfileinfo->segno,
-	                               PlugStorageGetTransactionSnapshot());
+	                               PlugStorageGetTransactionSnapshot(NULL));
 	        }
 	        else
 	        {
diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c
index 68d7add..2d203a7 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -355,7 +355,7 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
                                        currentRelation,
                                        formatterType,
                                        formatterName,
-                                       PlugStorageGetTransactionSnapshot());
+                                       PlugStorageGetTransactionSnapshot(NULL));
 		}
 		else
 		{
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c635d8f..df5775c 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -386,8 +386,11 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
         pathlist = lappend(pathlist, seqpath);
 
 	/* Consider index and bitmap scans */
-	create_index_paths(root, rel, relstorage, 
-					   &indexpathlist, &bitmappathlist);
+  if (!relstorage_is_ao(relstorage))
+  {
+  	/* Temporarily disable index for ao table */
+  	create_index_paths(root, rel, relstorage, &indexpathlist, &bitmappathlist);
+  }
 
 	/* deal with magma index scan */
 	if (relstorage == RELSTORAGE_EXTERNAL)
diff --git a/src/backend/optimizer/plan/newPlanner.c b/src/backend/optimizer/plan/newPlanner.c
index 4840f47..dc5546e 100644
--- a/src/backend/optimizer/plan/newPlanner.c
+++ b/src/backend/optimizer/plan/newPlanner.c
@@ -17,6 +17,7 @@
  */
 
 #include "optimizer/newPlanner.h"
+#include "catalog/catalog.h"
 
 #include "access/aomd.h"
 #include "access/fileam.h"
@@ -54,6 +55,7 @@ char *new_executor_enable_partitioned_hashjoin_mode;
 char *new_executor_enable_external_sort_mode;
 int new_executor_partitioned_hash_recursive_depth_limit;
 int new_executor_ic_tcp_client_limit_per_query_per_segment;
+int new_executor_external_sort_memory_limit_size_mb;
 
 const char *new_executor_runtime_filter_mode;
 const char *new_executor_runtime_filter_mode_local = "local";
@@ -99,6 +101,8 @@ static void
 do_convert_magma_rangevseg_map_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_rangetbl_to_common_plan(List *rtable,
                                                CommonPlanContext *ctx);
+static void do_convert_result_partitions_to_common_plan(
+    PartitionNode *partitionNode, CommonPlanContext *ctx);
 static void do_convert_token_map_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_splits_list_to_common_plan(List *splits, Oid relOid,
@@ -133,6 +137,19 @@ static bool checkSupportedSubLinkType(SubLinkType sublinkType);
 static bool checkInsertSupportTable(PlannedStmt *stmt);
 static bool checkIsPrepareQuery(QueryDesc *queryDesc);
 
+// @return format string whose life time goes along with current MemoryContext
+static const char *buildInternalTableFormatOptionStringInJson(Relation rel) {
+  AppendOnlyEntry *aoentry =
+      GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
+  StringInfoData option;
+  initStringInfo(&option);
+  appendStringInfoChar(&option, '{');
+  if (aoentry->compresstype)
+    appendStringInfo(&option, "%s", aoentry->compresstype);
+  appendStringInfoChar(&option, '}');
+  return option.data;
+}
+
 #define DIRECT_LEFT_CHILD_VAR 0
 #define INT64_MAX_LENGTH 20
 
@@ -313,6 +330,27 @@ void convert_to_common_plan(PlannedStmt *stmt, CommonPlanContext *ctx) {
         pfree(rgId);
         pfree(rgUrl);
       }
+      // For append-only internal table
+      if (get_relation_storage_type(oid) == RELSTORAGE_ORC) {
+        ListCell *lc;
+        foreach (lc, stmt->result_segfileinfos) {
+          ResultRelSegFileInfoMapNode *pRelSegFileInfoMapNode =
+              (ResultRelSegFileInfoMapNode *)lfirst(lc);
+          ListCell *lc;
+          foreach (lc, pRelSegFileInfoMapNode->segfileinfos) {
+            ResultRelSegFileInfo *pSegFileInfo = lfirst(lc);
+            if (pSegFileInfo->numfiles == 0) {
+              // detect mixed-up partition of external table
+              ctx->convertible = false;
+              return;
+            }
+            univPlanAddResultRelSegFileInfo(
+                ctx->univplan, pRelSegFileInfoMapNode->relid,
+                pSegFileInfo->segno, pSegFileInfo->eof[0],
+                pSegFileInfo->uncompressed_eof[0]);
+          }
+        }
+      }
       univPlanAddToPlanNode(ctx->univplan, true);
     }
     do_convert_plantree_to_common_plan(stmt->planTree, pid, true, false, NIL,
@@ -327,8 +365,9 @@ void convert_to_common_plan(PlannedStmt *stmt, CommonPlanContext *ctx) {
       do_convert_plantree_to_common_plan(subplan, -1, true, true, NIL, NULL,
                                          true, ctx);
   }
-  if (ctx->convertible)
-    do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+  if (ctx->convertible) do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+  if (ctx->convertible && stmt->result_partitions)
+    do_convert_result_partitions_to_common_plan(stmt->result_partitions, ctx);
   if (ctx->convertible && enable_secure_filesystem)
     do_convert_token_map_to_common_plan(ctx);
   if (ctx->convertible && ctx->isMagma)
@@ -1294,9 +1333,11 @@ void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx) {
       columnDataTypeMod[i] = att->atttypmod;
     }
     FormatType fmttype = UnivPlanOrcFormat;
-    univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, "dummy", "{}",
-                                  attNum, (const char **)columnName,
-                                  columnDataType, columnDataTypeMod, NULL);
+    univPlanRangeTblEntryAddTable(
+        ctx->univplan, relid, fmttype, relpath(rel->rd_node),
+        buildInternalTableFormatOptionStringInJson(rel), attNum,
+        (const char **)columnName, columnDataType, columnDataTypeMod, NULL,
+        rel->rd_rel->relname.data);
   } else if (RelationIsExternal(rel)) {
     TupleDesc tableAttrs = rel->rd_att;
     attNum = tableAttrs->natts;
@@ -1394,7 +1435,7 @@ void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx) {
     univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, location,
                                   fmtOptsJson, attNum,
                                   (const char **)columnName, columnDataType,
-                                  columnDataTypeMod, targetName);
+                                  columnDataTypeMod, targetName, NULL);
 
     if (fmtOptsJson != NULL)
       pfree(fmtOptsJson);
@@ -1419,6 +1460,53 @@ end:
   pfree(columnDataTypeMod);
 }
 
+static void do_convert_result_partition_rule_to_common_plan(
+    CommonPlanContext *ctx, PartitionRule *partitionRule,
+    bool isDefaultPartition) {
+  if (partitionRule->children) {
+    // TODO(chiyang): sub-partition
+    ctx->convertible = false;
+    return;
+  }
+  univPlanResultPartitionsAddPartitionRule(
+      ctx->univplan, partitionRule->parchildrelid, partitionRule->parname,
+      isDefaultPartition);
+
+  ListCell *lc;
+  foreach (lc, partitionRule->parlistvalues) {
+    univPlanPartitionRuleAddPartitionValue(ctx->univplan, isDefaultPartition);
+    List *partitionListValues = (List *)lfirst(lc);
+    ListCell *lc;
+    foreach (lc, partitionListValues) {
+      Const *val = (List *)lfirst(lc);
+      do_convert_expr_to_common_plan(-1, val, ctx);
+      univPlanPartitionValueAddConst(ctx->univplan, isDefaultPartition);
+    }
+  }
+}
+
+static void do_convert_result_partitions_to_common_plan(
+    PartitionNode *partitionNode, CommonPlanContext *ctx) {
+  if (partitionNode->part->parkind != 'l') {
+    // TODO(chiyang): range partition
+    ctx->convertible = false;
+    return;
+  }
+  univPlanAddResultPartitions(ctx->univplan, partitionNode->part->parrelid,
+                              partitionNode->part->parkind,
+                              partitionNode->part->paratts,
+                              partitionNode->part->parnatts);
+  ListCell *lc;
+  foreach (lc, partitionNode->rules) {
+    PartitionRule *partitionRule = (PartitionRule *)lfirst(lc);
+    do_convert_result_partition_rule_to_common_plan(ctx, partitionRule, false);
+  }
+  if (partitionNode->default_part) {
+    do_convert_result_partition_rule_to_common_plan(
+        ctx, partitionNode->default_part, true);
+  }
+}
+
 void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
   HASH_SEQ_STATUS status;
   struct FileSystemCredential *entry;
@@ -1453,14 +1541,14 @@ void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
 // it's convertible and it's a magma scan
 void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx) {
   // start transaction in magma for SELECT in new executor
-  if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
-    PlugStorageBeginTransaction(NULL);
-  }
+  // if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
+  //   PlugStorageStartTransaction(NULL);
+  // }
   Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
   int32_t size = 0;
   char *snapshot = NULL;
-  MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(), &snapshot,
-                                 &size);
+  MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(NULL),
+                                 &snapshot, &size);
   if (snapshot && size != 0) {
     univPlanAddSnapshot(ctx->univplan, snapshot, size);
   }
@@ -1959,15 +2047,14 @@ end:
 }
 
 bool checkInsertSupportTable(PlannedStmt *stmt) {
-  // disable partitioned result target
-  if (stmt->result_partitions)
-    return false;
-  if (list_length(stmt->resultRelations) > 1)
-    return false;
+  if (list_length(stmt->resultRelations) > 1) return false;
   int32_t index = list_nth_int(stmt->resultRelations, 0);
   RangeTblEntry *rte = (RangeTblEntry *)list_nth(stmt->rtable, index - 1);
 
-  // if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+  if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+
+  // disable partition table insert for external table
+  if (stmt->result_partitions) return false;
 
   Relation pgExtTableRel = heap_open(ExtTableRelationId, RowExclusiveLock);
   cqContext cqc;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index be53f20..ce75706 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -8112,6 +8112,11 @@ static Query *transformIndexStmt(ParseState *pstate, IndexStmt *stmt,
 
     if (RelationBuildPartitionDesc(rel, false)) stmt->do_part = true;
 
+    /* native orc can't create index in parent relation */
+    if (RelationIsOrc(rel) && stmt->do_part)
+  		ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+  				errmsg("Cannot support create index statement in native orc parent relation yet")));
+
     if (stmt->do_part && Gp_role != GP_ROLE_EXECUTE) {
       List *children;
       struct HTAB *nameCache;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ed4b5dc..ee8958d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -67,6 +67,7 @@
 #include "cdb/cdbpersistentrelfile.h"
 
 #include "access/aosegfiles.h"
+#include "access/orcsegfiles.h"
 #include "access/parquetsegfiles.h"
 #include "cdb/cdbappendonlyam.h"
 #include "cdb/cdbvars.h"
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 27e7894..89d20e0 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -17,6 +17,7 @@
 #include "postgres.h"
 #include "port.h"
 
+#include "access/aosegfiles.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/fileam.h"
@@ -415,6 +416,72 @@ QueryIsReadOnly(Query *parsetree)
 }
 
 /*
+ * CanCreateIndex: can support create index
+ * So far, magma table and native orc could support index
+ */
+void CanSupportIndex(IndexStmt *stmt, Oid relid)
+{
+	/* 1. upgrade mode should support index operation */
+	if (gp_upgrade_mode) return;
+
+	bool supportIndex = false;
+	Relation rel = heap_open(relid, AccessShareLock);
+	bool nativeOrc = RelationIsOrc(rel);
+	heap_close(rel, AccessShareLock);
+
+	/*
+	 * 2. deal magma table and native orc
+	 * for "stmt->magma", deal with special partition situation, oushu issue #1049
+	 * its ugly, but there is no elegant way now
+	 */
+	if (RelationIsMagmaTable2(relid) || stmt->magma || nativeOrc)
+	{
+			supportIndex = true;
+			if (nativeOrc)
+			{
+				/* add pg_aoseg.pg_orcseg_idx_xxx and its index pg_aoseg.pg_orcseg_idx_xxx_index */
+				AlterTableCreateAoSegIndexTableWithOid(relid, stmt->is_part_child);
+			}
+	}
+	if (supportIndex)
+	{
+		/*
+		 * 3. magma/native orc index cant support the accessory conditions
+		 */
+		if (stmt->options) {
+			ereport(ERROR,
+							(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+									errmsg("magma/native orc Index cannot support create index with clause")));
+		}
+		if (stmt->whereClause) {
+			ereport(ERROR,
+							(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+									errmsg("magma/native orc Index cannot support create index where predicate")));
+		}
+		if (stmt->tableSpace) {
+			ereport(ERROR,
+							(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+									errmsg("magma/native orc Index cannot support create index with tableSpace")));
+		}
+		ListCell   *cell;
+		foreach(cell, stmt->indexParams)
+		{
+			IndexElem *elem = (IndexElem *) lfirst(cell);
+			if (elem->expr || elem->opclass)
+			{
+				ereport(ERROR,
+								(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+										errmsg("magma/native orc Index cannot support create index with expr or opclass")));
+			}
+		}
+	}
+	else
+	{
+		ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet")));
+	}
+}
+
+/*
  * CommandIsReadOnly: is an executable query read-only?
  *
  * This is a much stricter test than we apply for XactReadOnly mode;
@@ -1349,83 +1416,13 @@ ProcessUtility(Node *parsetree,
 				lockmode = stmt->concurrent ? ShareUpdateExclusiveLock
 						: ShareLock;
 				relid = RangeVarGetRelid(stmt->relation, false, false/*allowHcatalog*/);
-
-				/* Only create index for external table with magma */
 				Assert(OidIsValid(relid));
-				char *formatOpt = caql_getcstring(
-				        NULL,
-				        cql("SELECT fmtopts FROM pg_exttable WHERE reloid = :1",
-				        ObjectIdGetDatum(relid)));
-
-				if (!formatOpt)
-				{
-				  if (stmt->magma) {}
-				  else if (!gp_upgrade_mode)
-					{
-						ereport(ERROR,
-						(errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
-					}
-				}
-				else
-				{
-					char *formatName = getExtTblFormatterTypeInFmtOptsStr(formatOpt);
-					if (!formatName)
-					{
-						if (!gp_upgrade_mode)
-						{
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
-						}
-					}
-					/* in order to support magmatp/magmaap */
-					else if ((pg_strncasecmp(formatName, FORMAT_MAGMA_TP_STR,
-																	 sizeof(FORMAT_MAGMA_TP_STR)-1) == 0) ||
-							(pg_strncasecmp(formatName, FORMAT_MAGMA_AP_STR,
-															sizeof(FORMAT_MAGMA_AP_STR)-1) == 0))
-					{
-						if (stmt->options) {
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-													errmsg("magma Index cannot support create index with clause")));
-						}
-						if (stmt->whereClause) {
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-													errmsg("magma Index cannot support create index where predicate")));
-						}
-						if (stmt->tableSpace) {
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-													errmsg("magma Index cannot support create index with tableSpace")));
-						}
-						ListCell   *cell;
-						foreach(cell, stmt->indexParams)
-						{
-							IndexElem *elem = (IndexElem *) lfirst(cell);
-							if (elem->expr || elem->opclass)
-							{
-								ereport(ERROR,
-												(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-														errmsg("magma Index cannot support create index with expr or opclass")));
-							}
-						}
-						pfree(formatName);
-						// break;
-					}
-					else
-					{
-						pfree(formatName);
-						if (!gp_upgrade_mode)
-						{
-							ereport(ERROR,
-							        (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
-						}
-					}
-				}
-
 				LockRelationOid(relid, lockmode);
 				CheckRelationOwnership(relid, true);
 
+				// check whether can support index
+				CanSupportIndex(stmt, relid);
+
 				DefineIndex(relid,		/* relation */
 							stmt->idxname,		/* index name */
 							InvalidOid, /* no predefined OID */
diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c
index de91837..a5e327e 100644
--- a/src/backend/utils/adt/dbsize.c
+++ b/src/backend/utils/adt/dbsize.c
@@ -222,7 +222,7 @@ calculate_database_size(Oid dbOid)
     /* start transaction for magma table */
     if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
     {
-      PlugStorageBeginTransaction(NULL);
+      PlugStorageStartTransaction();
     }
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
diff --git a/src/backend/utils/gp/segadmin.c b/src/backend/utils/gp/segadmin.c
index ab4bff0..a615242 100644
--- a/src/backend/utils/gp/segadmin.c
+++ b/src/backend/utils/gp/segadmin.c
@@ -309,13 +309,16 @@ Datum hawq_magma_status(PG_FUNCTION_ARGS)
 	  }
 	  tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
 	  // free memory
-	  free(data->magmaNodes[funcctx->call_cntr].node);
-	  free(data->magmaNodes[funcctx->call_cntr].dirs);
+	  if (data->magmaNodes[funcctx->call_cntr].node)
+	    free(data->magmaNodes[funcctx->call_cntr].node);
+	  if (data->magmaNodes[funcctx->call_cntr].dirs)
+	    free(data->magmaNodes[funcctx->call_cntr].dirs);
 	  result = HeapTupleGetDatum(tuple);
 	  SRF_RETURN_NEXT(funcctx, result);
 	} else {
 	  // free memory
-	  free(data->magmaNodes);
+	  if (data->magmaNodes)
+	    free(data->magmaNodes);
 	  SRF_RETURN_DONE(funcctx);
 	}
 }
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4bb606b..20f8f1c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4678,6 +4678,15 @@ static struct config_int ConfigureNamesInt[] =
 		10000, 0, 65535, NULL, NULL
 	},
 
+  {
+    {"new_executor_external_sort_memory_limit", PGC_USERSET, QUERY_TUNING_OTHER,
+      gettext_noop("Sets the memory usage (in MB) limit of external sort for new executor."),
+      NULL
+    },
+    &new_executor_external_sort_memory_limit_size_mb,
+    256, 0, 1024, NULL, NULL
+  },
+
 	{
 		{"default_magma_hash_table_nvseg_per_seg", PGC_USERSET, QUERY_TUNING_OTHER,
 			gettext_noop("Sets default vseg number per node for Magma hash table"),
@@ -7490,7 +7499,6 @@ static struct config_string ConfigureNamesString[] =
 		"AUTO", assign_new_executor_mode, NULL
 	},
 
-
 	{
 		{"new_scheduler", PGC_USERSET, EXTERNAL_TABLES,
 			gettext_noop("Enable the new scheduler."),
diff --git a/src/include/access/orcsegfiles.h b/src/include/access/orcsegfiles.h
index 4a87981..f8b4582 100644
--- a/src/include/access/orcsegfiles.h
+++ b/src/include/access/orcsegfiles.h
@@ -30,6 +30,11 @@
 #define Anum_pg_orcseg_tupcount 3
 #define Anum_pg_orcseg_eofuncompressed 4
 
+#define Natts_pg_orcseg_idx 3
+#define Anum_pg_orcseg_idx_idxoid 1
+#define Anum_pg_orcseg_idx_segno 2
+#define Anum_pg_orcseg_idx_eof 3
+
 extern void insertInitialOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo);
 extern void insertOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo,
                                 float8 tupleCount, float8 eof,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 4b825ac..93e922d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -188,10 +188,15 @@ typedef enum PlugStorageTransactionStatus
 
 typedef enum PlugStorageTransactionCommand
 {
-	PS_TXN_CMD_BEGIN   = 0,
-	PS_TXN_CMD_COMMIT  = 1,
-	PS_TXN_CMD_ABORT   = 2,
-	PS_TXN_CMD_INVALID = 3
+    PS_TXN_CMD_START_TRANSACTION        = 0,
+    PS_TXN_CMD_COMMIT_TRANSACTION       = 1,
+    PS_TXN_CMD_ABORT_TRANSACTION        = 2,
+    PS_TXN_CMD_START_SUB_TRANSACTION    = 3,
+    PS_TXN_CMD_COMMIT_SUB_TRANSACTION   = 4,
+    PS_TXN_CMD_ABORT_SUB_TRANSACTION    = 5,
+    PS_TXN_CMD_GET_SNAPSHOT             = 6,
+    PS_TXN_CMD_GET_TRANSACTIONID        = 7,
+    PS_TXN_CMD_INVALID                  = 8
 } PlugStorageTransactionCommand;
 
 typedef struct PlugStorageTransactionData
@@ -202,7 +207,8 @@ typedef struct PlugStorageTransactionData
 	TransactionId                  pst_transaction_id;
 	PlugStorageTransactionStatus   pst_transaction_status;
 	PlugStorageTransactionCommand  pst_transaction_command;
-	MagmaSnapshot                 *pst_transaction_dist;     /* magma format */
+	MagmaSnapshot                 *pst_transaction_snapshot;     /* magma format */
+	MagmaTransactionState         *pst_transaction_state;        /* magma format */
 } PlugStorageTransactionData;
 
 typedef PlugStorageTransactionData *PlugStorageTransaction;
@@ -217,9 +223,10 @@ extern bool isCleanupAbortTransaction;
  */
 extern PlugStorageTransaction PlugStorageGetTransaction(void);
 extern PlugStorageTransactionStatus PlugStorageGetTransactionStatus(void);
-extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(void);
 extern void PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot);
-extern void PlugStorageBeginTransaction(List* magmaTableFullNames);
+extern void PlugStorageStartTransaction();
+extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(List* magmaTableFullNames);
+extern void PlugStorageGetTransactionId(List* magmaTableFullNames);
 extern void PlugStorageCommitTransaction(void);
 extern void PlugStorageAbortTransaction(void);
 extern void PlugStorageSetIsCleanupAbort(bool isCleanup);
diff --git a/src/include/catalog/aoseg.h b/src/include/catalog/aoseg.h
index 0e3d5ff..e653fb7 100644
--- a/src/include/catalog/aoseg.h
+++ b/src/include/catalog/aoseg.h
@@ -34,6 +34,7 @@ extern void AlterTableCreateAoSegTableWithOid(Oid relOid, Oid newOid,
 											  Oid newIndexOid,
 											  Oid *comptypeOid,
 											  bool is_part_child);
+extern void AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool is_part_child);
 
 extern void gpsql_appendonly_segfile_create(PG_FUNCTION_ARGS);
 
diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h
index 50b6c0d..8284945 100644
--- a/src/include/cdb/ml_ipc.h
+++ b/src/include/cdb/ml_ipc.h
@@ -354,4 +354,6 @@ extern void CleanUpNewInterconnect();
 
 extern void ResetRpcClientInstance();
 
+extern void checkQDConnectionAlive(void);
+
 #endif   /* ML_IPC_H */
diff --git a/src/include/optimizer/newPlanner.h b/src/include/optimizer/newPlanner.h
index 97bcdf6..6067169 100644
--- a/src/include/optimizer/newPlanner.h
+++ b/src/include/optimizer/newPlanner.h
@@ -41,6 +41,7 @@ extern char *new_executor_enable_partitioned_hashjoin_mode;
 extern char *new_executor_enable_external_sort_mode;
 extern int new_executor_partitioned_hash_recursive_depth_limit;
 extern int new_executor_ic_tcp_client_limit_per_query_per_segment;
+extern int new_executor_external_sort_memory_limit_size_mb;
 
 extern const char *new_executor_runtime_filter_mode;
 extern const char *new_executor_runtime_filter_mode_local;
diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h
index 99eec7c..776cf35 100644
--- a/src/include/tcop/utility.h
+++ b/src/include/tcop/utility.h
@@ -43,4 +43,6 @@ extern void CheckRelationOwnership(Oid relOid, bool noCatalogs);
 
 extern void DropErrorMsgNonExistent(const RangeVar *rel, char rightkind, bool missing_ok);
 
+extern void CanSupportIndex(IndexStmt *stmt, Oid relid);
+
 #endif   /* UTILITY_H */