You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by zt...@apache.org on 2021/12/13 14:23:16 UTC

[hawq] branch ztao updated (3bc605d -> 7947f7e)

This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a change to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git.


 discard 3bc605d  HAWQ-1811. Sync with OushuDB - Phase II
     new 7947f7e  HAWQ-1811. Sync with OushuDB - Phase II

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3bc605d)
            \
             N -- N -- N   refs/heads/ztao (7947f7e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/include/cwrapper/magma/cwrapper/magma-client-c.h | 3 +++
 src/include/cwrapper/univplan/cwrapper/univplan-c.h  | 3 ++-
 2 files changed, 5 insertions(+), 1 deletion(-)

[hawq] 01/01: HAWQ-1811. Sync with OushuDB - Phase II

Posted by zt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit 7947f7efbf483d03871bff2fd0edabdc9e7de602
Author: ztao1987 <zh...@gmail.com>
AuthorDate: Mon Dec 13 22:18:51 2021 +0800

    HAWQ-1811. Sync with OushuDB - Phase II
---
 contrib/hornet/orc_debug_statistics.py             |  11 +-
 contrib/magma/magma.c                              | 153 ++++++++++++--------
 src/backend/access/common/reloptions.c             |   1 +
 src/backend/access/transam/xact.c                  | 139 ++++++++++++------
 src/backend/catalog/aoseg.c                        | 158 ++++++++++++++++++++-
 src/backend/catalog/heap.c                         | 121 ++++++++--------
 src/backend/catalog/index.c                        |  17 ++-
 src/backend/catalog/pg_compression.c               |   3 +-
 src/backend/cdb/cdbdatalocality.c                  |  16 ++-
 src/backend/cdb/cdbquerycontextdispatching.c       |   4 +-
 src/backend/cdb/dispatcher.c                       |   3 +
 src/backend/cdb/dispatcher_new.c                   |   3 +
 src/backend/cdb/motion/ic_udp.c                    |  39 +++--
 src/backend/commands/analyze.c                     |   6 +-
 src/backend/commands/copy.c                        |   7 +-
 src/backend/commands/dbcommands.c                  |   5 +-
 src/backend/commands/indexcmds.c                   |  36 ++---
 src/backend/commands/tablecmds.c                   |   5 +-
 src/backend/executor/execDML.c                     |   6 +-
 src/backend/executor/execMain.c                    |   2 +-
 src/backend/executor/nodeExternalscan.c            |   2 +-
 src/backend/optimizer/path/allpaths.c              |   7 +-
 src/backend/optimizer/plan/newPlanner.c            | 121 +++++++++++++---
 src/backend/parser/analyze.c                       |   5 +
 src/backend/storage/buffer/bufmgr.c                |   1 +
 src/backend/tcop/utility.c                         | 143 +++++++++----------
 src/backend/utils/adt/dbsize.c                     |   2 +-
 src/backend/utils/gp/segadmin.c                    |   9 +-
 src/backend/utils/misc/guc.c                       |  10 +-
 src/include/access/orcsegfiles.h                   |   5 +
 src/include/access/xact.h                          |  21 ++-
 src/include/catalog/aoseg.h                        |   1 +
 src/include/cdb/ml_ipc.h                           |   2 +
 .../cwrapper/magma/cwrapper/magma-client-c.h       |  30 +++-
 .../cwrapper/univplan/cwrapper/univplan-c.h        |   3 +-
 src/include/optimizer/newPlanner.h                 |   1 +
 src/include/tcop/utility.h                         |   2 +
 37 files changed, 755 insertions(+), 345 deletions(-)

diff --git a/contrib/hornet/orc_debug_statistics.py b/contrib/hornet/orc_debug_statistics.py
index 3ddfb0d..eb61af6 100755
--- a/contrib/hornet/orc_debug_statistics.py
+++ b/contrib/hornet/orc_debug_statistics.py
@@ -81,10 +81,15 @@ col_l = len(col_name)
 for i in range(0,col_l):
     str_ans = str_ans.replace(' Column ' + str(i+1) + ' ',col_name[i])
 
-str_ans = str_ans.replace(',}}}',"}}]}")
+if stripe_num == 0:
+    str_ans = str_ans.replace(',}}}',"}}]}")
+else:
+    str_ans = str_ans.replace(',}}}',"}}}]}")
 str_ans = str_ans.replace('" Stripe 0 ":','"Stripes":[')
-for i in range(1,stripe_num):
-    str_ans = str_ans.replace('" Stripe {} "'.format(i),'},{"Stripe {}"'.format(i))
+for i in range(1,2):
+    str_ans = str_ans.replace('" Stripe {} "'.format(i),'}},{{"Stripe {}"'.format(i))
+for i in range(2,stripe_num+1):
+    str_ans = str_ans.replace('" Stripe {} "'.format(i),'}}}},{{"Stripe {}"'.format(i))
 file_path = '"File_path":"' + sys.argv[2] + '",'
 pre_json = str_ans[0] + file_path + str_ans[1:len(str_ans)]
 print(sys.argv[1] + '|' + pre_json.replace(',}','}'))
diff --git a/contrib/magma/magma.c b/contrib/magma/magma.c
index 08bbf40..b7d6758 100644
--- a/contrib/magma/magma.c
+++ b/contrib/magma/magma.c
@@ -405,7 +405,8 @@ Datum magma_protocol_blocklocation(PG_FUNCTION_ARGS) {
          fmt_name);
   }
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaTablePtr table = MagmaClientC_FetchTable(client, snapshot, useClientCacheDirectly);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaTablePtr table = MagmaClientC_FetchTable(client, useClientCacheDirectly);
   magma_check_result(&client);
 
   elog(LOG, "magma_protocol_blocklocation pass fetch table");
@@ -527,10 +528,11 @@ Datum magma_protocol_tablesize(PG_FUNCTION_ARGS) {
          fmt_name);
   }
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
+  MagmaClientC_SetupSnapshot(client, snapshot);
 
   // set size of table in tp type to zero.
   if (tableType == MAGMACLIENTC_TABLETYPE_AP) {
-    tsdata->tablesize = MagmaClientC_GetTableSize(client, snapshot);
+    tsdata->tablesize = MagmaClientC_GetTableSize(client);
   } else {
     tsdata->tablesize = 0;
   }
@@ -583,7 +585,8 @@ Datum magma_protocol_databasesize(PG_FUNCTION_ARGS) {
   }
 
   MagmaClientC_SetupDatabaseInfo(client, dbname);
-  dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client, snapshot);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client);
   elog(LOG,"dbsize in magma.c is %llu", dbsdata->dbsize);
   magma_check_result(&client);
 
@@ -837,7 +840,8 @@ Datum magma_createindex(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_CreateIndex(client, snapshot, magmaidx);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_CreateIndex(client, magmaidx);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -861,7 +865,8 @@ Datum magma_dropindex(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_DropIndex(client, snapshot, indexname);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_DropIndex(client, indexname);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -885,7 +890,8 @@ Datum magma_reindex_index(PG_FUNCTION_ARGS) {
 
   int16_t tableType = 0;
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_Reindex(client, snapshot, indexname);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_Reindex(client, indexname);
   magma_check_result(&client);
   PG_RETURN_VOID();
 }
@@ -1029,7 +1035,8 @@ Datum magma_createtable(PG_FUNCTION_ARGS) {
   }
 
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_CreateTable(client, snapshot, ncols, cols);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_CreateTable(client, ncols, cols);
   magma_check_result(&client);
   pfree(cols);
   list_free(pk_names);
@@ -1061,7 +1068,8 @@ Datum magma_droptable(PG_FUNCTION_ARGS) {
   int16_t tableType = 0;
   // for drop table, tableType won't be used in the process, set it as default
   MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
-  MagmaClientC_DropTable(client, snapshot);
+  MagmaClientC_SetupSnapshot(client, snapshot);
+  MagmaClientC_DropTable(client);
   magma_check_result(&client);
 
   PG_RETURN_VOID();
@@ -3099,80 +3107,111 @@ Datum magma_transaction(PG_FUNCTION_ARGS) {
     elog(ERROR, "failed to connect to magma service");
   }
 
+  MagmaClientC_SetupSnapshot(client, pst->pst_transaction_snapshot);
+
   switch (txn_command) {
-    case PS_TXN_CMD_BEGIN: {
-      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
-      MagmaTableFullName *magmaTableFullNames = (MagmaTableFullName *) palloc0(magmaTableFullNamesSize * sizeof(MagmaTableFullName));
-      int i = 0;
-      ListCell *lc;
-      foreach (lc, ps->magma_talbe_full_names) {
-        MagmaTableFullName* mtfn = lfirst(lc);
-        magmaTableFullNames[i].databaseName = pstrdup(mtfn->databaseName);
-        magmaTableFullNames[i].schemaName = pstrdup(mtfn->schemaName);
-        magmaTableFullNames[i].tableName = pstrdup(mtfn->tableName);
-        ++i;
-      }
-      pst->pst_transaction_dist =
-          MagmaClientC_BeginTransaction(client, magmaTableFullNames, magmaTableFullNamesSize);
-      for (int i = 0; i < magmaTableFullNamesSize; ++i) {
-        pfree(magmaTableFullNames[i].databaseName);
-        pfree(magmaTableFullNames[i].schemaName);
-        pfree(magmaTableFullNames[i].tableName);
-      }
-      pfree(magmaTableFullNames);
-      if (pst->pst_transaction_dist == NULL) {
-        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
-        pst->pst_transaction_id = InvalidTransactionId;
-        pst->pst_transaction_dist = NULL;
-        elog(DEBUG1, "magma_transaction: begin snapshot: NULL");
-      } else {
-        elog(DEBUG1, "magma_transaction: begin snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
-      }
+    case PS_TXN_CMD_START_TRANSACTION: {
+      pst->pst_transaction_state = MagmaClientC_StartTransaction(client);
+      pst->pst_transaction_snapshot = NULL;
+      pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+      pst->pst_transaction_id = InvalidTransactionId;
+      pst->pst_transaction_snapshot = NULL;
+      elog(DEBUG1, "magma_transaction: start transaction");
       magma_check_result(&client);
       break;
     }
-    case PS_TXN_CMD_COMMIT:
-      if (pst->pst_transaction_dist == NULL) {
+    case PS_TXN_CMD_COMMIT_TRANSACTION:
+      if (pst->pst_transaction_snapshot == NULL) {
         elog(DEBUG1, "magma_transaction: commit snapshot: NULL");
       } else {
         elog(DEBUG1,
              "magma_transaction: commit snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
       }
 
-      MagmaClientC_CommitTransaction(client, pst->pst_transaction_dist);
+      MagmaClientC_CommitTransaction(client);
       magma_check_result(&client);
       break;
-    case PS_TXN_CMD_ABORT:
-      if (pst->pst_transaction_dist == NULL) {
+    case PS_TXN_CMD_ABORT_TRANSACTION:
+      if (pst->pst_transaction_snapshot == NULL) {
         elog(DEBUG1, "magma_transaction: abort snapshot: NULL");
       } else {
         elog(DEBUG1,
              "magma_transaction: abort snapshot: (%llu, %u, %llu, %u)",
-             pst->pst_transaction_dist->currentTransaction.txnId,
-             pst->pst_transaction_dist->currentTransaction.txnStatus,
-             pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-             pst->pst_transaction_dist->txnActions.txnActionSize);
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
       }
 
       if (pst->pst_transaction_status != PS_TXN_STS_DEFAULT &&
           pst->pst_transaction_id != InvalidTransactionId &&
-          pst->pst_transaction_dist != NULL) {
-        MagmaClientC_AbortTransaction(client, pst->pst_transaction_dist,
-                                      PlugStorageGetIsCleanupAbort());
-        pst->pst_transaction_dist = NULL;
+          pst->pst_transaction_snapshot != NULL) {
+        MagmaClientC_AbortTransaction(client, PlugStorageGetIsCleanupAbort());
+        pst->pst_transaction_snapshot = NULL;
         pst->pst_transaction_id = InvalidTransactionId;
         pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
         magma_check_result(&client);
       }
       break;
+    case PS_TXN_CMD_GET_SNAPSHOT: {
+      MagmaClientC_CleanupTableInfo(client);
+      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+      int i = 0;
+      ListCell *lc;
+      foreach (lc, ps->magma_talbe_full_names) {
+        MagmaTableFullName* mtfn = lfirst(lc);
+        MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+                                  mtfn->tableName, 0);
+        ++i;
+      }
+      pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+      if (pst->pst_transaction_snapshot == NULL) {
+        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+        pst->pst_transaction_id = InvalidTransactionId;
+        pst->pst_transaction_snapshot = NULL;
+        elog(DEBUG1, "magma_transaction: get snapshot: NULL");
+      } else {
+        elog(DEBUG1, "magma_transaction: get snapshot: (%llu, %u, %llu, %u)",
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
+      }
+      magma_check_result(&client);
+      break;
+    }
+    case PS_TXN_CMD_GET_TRANSACTIONID: {
+      MagmaClientC_CleanupTableInfo(client);
+      int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+      int i = 0;
+      ListCell *lc;
+      foreach (lc, ps->magma_talbe_full_names) {
+        MagmaTableFullName* mtfn = lfirst(lc);
+        MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+                                  mtfn->tableName, 0);
+        ++i;
+      }
+      pst->pst_transaction_state = MagmaClientC_GetTransctionId(client);
+      pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+      if (pst->pst_transaction_snapshot == NULL) {
+        pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+        pst->pst_transaction_id = InvalidTransactionId;
+        pst->pst_transaction_snapshot = NULL;
+        elog(DEBUG1, "magma_transaction: get transaction state: NULL");
+      } else {
+        elog(DEBUG1, "magma_transaction: get transaction state: (%llu, %u, %llu, %u)",
+             pst->pst_transaction_snapshot->currentTransaction.txnId,
+             pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+             pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+             pst->pst_transaction_snapshot->txnActions.txnActionSize);
+      }
+      magma_check_result(&client);
+      break;
+    }
     default:
       elog(ERROR, "Transaction command for magma is invalid %d", txn_command);
       break;
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 976fc71..c0f9236 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -545,6 +545,7 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 		        // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
 		        // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
 		        && (strcmp(compresstype, "zlib") != 0)
+		        && (strcmp(compresstype, "zstd") != 0)
 		        && (strcmp(compresstype, "none") != 0))
     {
       ereport(ERROR,
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 393852d..3762036 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -270,7 +270,7 @@ static PlugStorageTransactionData TopPlugStorageTransactionData = {
 	.pst_transaction_id      = InvalidTransactionId,/* transaction id */
 	.pst_transaction_status  = PS_TXN_STS_DEFAULT,  /* transaction status */
 	.pst_transaction_command = PS_TXN_CMD_INVALID,  /* transaction command */
-	.pst_transaction_dist    = NULL                 /* magma transaction info */
+	.pst_transaction_snapshot = NULL                 /* magma transaction info */
 };
 
 static PlugStorageTransaction TopPlugStorageTransaction = &TopPlugStorageTransactionData;
@@ -372,12 +372,6 @@ PlugStorageGetTransactionStatus(void)
 	return TopPlugStorageTransaction->pst_transaction_status;
 }
 
-MagmaSnapshot *
-PlugStorageGetTransactionSnapshot(void)
-{
-	return TopPlugStorageTransaction->pst_transaction_dist;
-}
-
 void PlugStorageSetIsCleanupAbort(bool isCleanup)
 {
   isCleanupAbortTransaction = isCleanup;
@@ -397,52 +391,54 @@ extern void PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot)
 	 */
 	if (Gp_role == GP_ROLE_DISPATCH)
 	{
-	    Insist(TopPlugStorageTransaction->pst_transaction_dist != NULL);
+	    Insist(TopPlugStorageTransaction->pst_transaction_snapshot != NULL);
 	}
 	else if (Gp_role == GP_ROLE_EXECUTE &&
-	         TopPlugStorageTransaction->pst_transaction_dist == NULL)
+	         TopPlugStorageTransaction->pst_transaction_snapshot == NULL)
 	{
-	    TopPlugStorageTransaction->pst_transaction_dist = malloc(sizeof(MagmaSnapshot));
-	    memset(TopPlugStorageTransaction->pst_transaction_dist, 0, sizeof(MagmaSnapshot));
-	    TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId = 0;
-	    TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus = 0;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset = 0;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions = NULL;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot = malloc(sizeof(MagmaSnapshot));
+	    memset(TopPlugStorageTransaction->pst_transaction_snapshot, 0, sizeof(MagmaSnapshot));
+	    TopPlugStorageTransaction->pst_transaction_snapshot
+                ->currentTransaction.txnId = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot
+                ->currentTransaction.txnStatus = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset = 0;
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions = NULL;
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize = 0;
 	}
 
 	// set current transaction for current snapshot
-	TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId =
+	TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnId =
 	        snapshot->currentTransaction.txnId;
-	TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus =
+	TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnStatus =
 	        snapshot->currentTransaction.txnStatus;
 
 	// set command id
-	TopPlugStorageTransaction->pst_transaction_dist->cmdIdInTransaction =
+	TopPlugStorageTransaction->pst_transaction_snapshot
+            ->cmdIdInTransaction =
 	        snapshot->cmdIdInTransaction;
 
 	// reallocate memory for visible HLC map for current snapshot
-	free(TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions);
+	free(TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions);
 
-	TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset =
+	TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset =
 	    snapshot->txnActions.txnActionStartOffset;
-	TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions =
+	TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions =
 	    malloc(sizeof(MagmaTxnAction) *snapshot->txnActions.txnActionSize);
 
 	// set visible HLC map for current snapshot
-	TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize =
+	TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize =
 	    snapshot->txnActions.txnActionSize;
 	for (int i = 0; i < snapshot->txnActions.txnActionSize; ++i)
 	{
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnId =
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnId =
 	            snapshot->txnActions.txnActions[i].txnId;
-	    TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnStatus =
+	    TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnStatus =
 	            snapshot->txnActions.txnActions[i].txnStatus;
     }
 }
 
-void
-PlugStorageBeginTransaction(List* magmaTableFullNames)
+void PlugStorageStartTransaction(List* magmaTableFullNames)
 {
 	if ((Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode())
 	{
@@ -460,13 +456,9 @@ PlugStorageBeginTransaction(List* magmaTableFullNames)
 			}
 
 			Assert(pst->pst_transaction_status == PS_TXN_STS_DEFAULT);
-			pst->pst_transaction_command = PS_TXN_CMD_BEGIN;
+			pst->pst_transaction_command =
+                            PS_TXN_CMD_START_TRANSACTION;
 			InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
-			elog(DEBUG1, "PS TXN: BEGIN (%llu, %u, %llu, %u)",
-			     pst->pst_transaction_dist->currentTransaction.txnId,
-			     pst->pst_transaction_dist->currentTransaction.txnStatus,
-			     pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-			     pst->pst_transaction_dist->txnActions.txnActionSize);
 			pst->pst_transaction_id = GetTopTransactionId();
 			pst->pst_transaction_status = PS_TXN_STS_STARTED;
 		}
@@ -492,14 +484,15 @@ PlugStorageCommitTransaction(void)
 
 			Assert(pst->pst_transaction_id == GetTopTransactionId());
 			Assert(pst->pst_transaction_status == PS_TXN_STS_STARTED);
-			pst->pst_transaction_command = PS_TXN_CMD_COMMIT;
+			pst->pst_transaction_command =
+                            PS_TXN_CMD_COMMIT_TRANSACTION;
 			elog(DEBUG1, "PS TXN: COMMIT (%llu, %u, %llu, %u)",
-			     pst->pst_transaction_dist->currentTransaction.txnId,
-			     pst->pst_transaction_dist->currentTransaction.txnStatus,
-			     pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-			     pst->pst_transaction_dist->txnActions.txnActionSize);
+			     pst->pst_transaction_snapshot->currentTransaction.txnId,
+			     pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+			     pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+			     pst->pst_transaction_snapshot->txnActions.txnActionSize);
 			InvokePlugStorageFormatTransaction(pst, NULL);
-			pst->pst_transaction_dist = NULL;
+			pst->pst_transaction_snapshot = NULL;
 			pst->pst_transaction_id = InvalidTransactionId;
 			pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
 		}
@@ -525,20 +518,74 @@ PlugStorageAbortTransaction(void)
 
 			Assert(pst->pst_transaction_id == GetTopTransactionId());
 			Assert(pst->pst_transaction_status == PS_TXN_STS_STARTED);
-			pst->pst_transaction_command = PS_TXN_CMD_ABORT;
+			pst->pst_transaction_command =
+                            PS_TXN_CMD_ABORT_TRANSACTION;
 			elog(DEBUG1, "PS TXN: ABORT (%llu, %u, llu, %u)",
-			     pst->pst_transaction_dist->currentTransaction.txnId,
-			     pst->pst_transaction_dist->currentTransaction.txnStatus,
-			     pst->pst_transaction_dist->txnActions.txnActionStartOffset,
-			     pst->pst_transaction_dist->txnActions.txnActionSize);
+			     pst->pst_transaction_snapshot->currentTransaction.txnId,
+			     pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+			     pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+			     pst->pst_transaction_snapshot->txnActions.txnActionSize);
 			InvokePlugStorageFormatTransaction(pst, NULL);
-			pst->pst_transaction_dist = NULL;
+			pst->pst_transaction_snapshot = NULL;
 			pst->pst_transaction_id = InvalidTransactionId;
 			pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
 		}
 	}
 }
 
+
+MagmaSnapshot *
+PlugStorageGetTransactionSnapshot(List* magmaTableFullNames)
+{
+  PlugStorageTransaction pst = TopPlugStorageTransaction;
+  if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+      (pst->pst_transaction_snapshot == NULL) &&
+      (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+    if (!OidIsValid(pst->pst_proc_oid)) {
+      pst->pst_proc_oid =
+          LookupPlugStorageValidatorFunc("magma", "transaction");
+      Assert(OidIsValid(pst->pst_proc_oid));
+
+      fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+    }
+
+    pst->pst_transaction_command = PS_TXN_CMD_GET_SNAPSHOT;
+    InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+    elog(DEBUG1, "PS TXN: GET SNAPSHOT (%llu, %u, %llu, %u)",
+         pst->pst_transaction_snapshot->currentTransaction.txnId,
+         pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+         pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+         pst->pst_transaction_snapshot->txnActions.txnActionSize);
+    pst->pst_transaction_id = GetTopTransactionId();
+  }
+  return TopPlugStorageTransaction->pst_transaction_snapshot;
+}
+
+void PlugStorageGetTransactionId(List* magmaTableFullNames)
+{
+  PlugStorageTransaction pst = TopPlugStorageTransaction;
+  if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+      pst->pst_transaction_state->currentTransaction.txnId == 0 &&
+      (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+    if (!OidIsValid(pst->pst_proc_oid)) {
+      pst->pst_proc_oid =
+          LookupPlugStorageValidatorFunc("magma", "transaction");
+      Assert(OidIsValid(pst->pst_proc_oid));
+
+      fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+    }
+
+    pst->pst_transaction_command = PS_TXN_CMD_GET_TRANSACTIONID;
+    InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+    elog(DEBUG1, "PS TXN: GET TRANSACTION ID (%llu, %u, %llu, %u)",
+         pst->pst_transaction_snapshot->currentTransaction.txnId,
+         pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+         pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+         pst->pst_transaction_snapshot->txnActions.txnActionSize);
+    pst->pst_transaction_id = GetTopTransactionId();
+  }
+}
+
 /* ----------------------------------------------------------------
  *	transaction state accessors
  * ----------------------------------------------------------------
@@ -2456,7 +2503,7 @@ StartTransaction(void)
 	/*
 	 * begin transaction in magma service now
 	 */
-	/* PlugStorageBeginTransaction(); */
+	/* PlugStorageStartTransaction(); */
 }
 
 /*
diff --git a/src/backend/catalog/aoseg.c b/src/backend/catalog/aoseg.c
index b7b93fd..7e88b91 100644
--- a/src/backend/catalog/aoseg.c
+++ b/src/backend/catalog/aoseg.c
@@ -56,7 +56,9 @@
 #include "cdb/cdbvars.h"
 
 static bool create_aoseg_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid);
+static bool create_aoseg_index_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid);
 static bool needs_aoseg_table(Relation rel);
+static bool needs_aoseg_index_table(Relation rel);
 
 /*
  * AlterTableCreateAoSegTable
@@ -106,6 +108,19 @@ AlterTableCreateAoSegTableWithOid(Oid relOid, Oid newOid, Oid newIndexOid,
 	heap_close(rel, NoLock);
 }
 
+void
+AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool is_part_child)
+{
+	Relation	rel;
+	Assert(!is_part_child);
+	rel = heap_open(relOid, AccessShareLock);
+
+	/* create_aoseg_index_table does all the work */
+	(void) create_aoseg_index_table(rel, InvalidOid, InvalidOid, NULL);
+
+	heap_close(rel, AccessShareLock);
+}
+
 /*
  * create_aoseg_table --- internal workhorse
  *
@@ -310,6 +325,139 @@ create_aoseg_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptype
 	return true;
 }
 
+static bool
+create_aoseg_index_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid)
+{
+	Oid			relOid = RelationGetRelid(rel);
+	TupleDesc	tupdesc;
+	bool		shared_relation;
+	Oid		  blkdirrelid;
+	Oid		  blkdiridxid;
+	char		aoseg_relname[NAMEDATALEN];
+	char		aoseg_idxname[NAMEDATALEN];
+	IndexInfo  *indexInfo;
+	Oid			classObjectId[2];
+	ObjectAddress baseobject,
+				aosegobject;
+	Oid			tablespaceOid = ChooseTablespaceForLimitedObject(rel->rd_rel->reltablespace);
+
+	/*
+	 * Check to see whether the table actually needs an aoseg index table.
+	 */
+	if (!needs_aoseg_index_table(rel))
+		return false;
+
+	shared_relation = rel->rd_rel->relisshared;
+
+	/* can't have shared AO tables after initdb */
+	/* TODO: disallow it at CREATE TABLE time */
+	Assert(!(shared_relation && !IsBootstrapProcessingMode()) );
+
+	GetAppendOnlyEntryAuxOids(relOid, SnapshotNow, NULL, NULL, &blkdirrelid, &blkdiridxid);
+
+	/*
+	 * Was a aoseg index table already created?
+	 */
+	if (blkdirrelid != InvalidOid)
+	{
+		return false;
+	}
+
+	snprintf(aoseg_relname, sizeof(aoseg_relname), "pg_orcseg_idx_%u",
+					 relOid);
+	snprintf(aoseg_idxname, sizeof(aoseg_idxname), "pg_orcseg_idx_%u_index",
+					 relOid);
+
+	tupdesc = CreateTemplateTupleDesc(Natts_pg_orcseg_idx, false);
+
+	TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_idxoid, "idxoid",
+										 INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_segno, "segno",
+										 INT4OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_eof, "eof",
+										 FLOAT8OID, -1, 0);
+
+	blkdirrelid = heap_create_with_catalog(aoseg_relname,
+										   PG_AOSEGMENT_NAMESPACE,
+										   tablespaceOid,
+										   aosegOid,
+										   rel->rd_rel->relowner,
+										   tupdesc,
+										   /* relam */ InvalidOid,
+										   RELKIND_AOSEGMENTS,
+										   RELSTORAGE_HEAP,
+										   shared_relation,
+										   true,
+										   /* bufferPoolBulkLoad */ false,
+										   0,
+										   ONCOMMIT_NOOP,
+										   NULL, /* CDB POLICY */
+										   (Datum) 0,
+										   true,
+										   comptypeOid,
+						 				   /* persistentTid */ NULL,
+						 				   /* persistentSerialNum */ NULL,
+						 				   /* formattername */ NULL);
+
+	/* make the toast relation visible, else index creation will fail */
+	CommandCounterIncrement();
+
+	/*
+	 * Create unique index on index oid.
+	 */
+	indexInfo = makeNode(IndexInfo);
+	indexInfo->ii_NumIndexAttrs = 1;
+	indexInfo->ii_NumIndexKeyAttrs = 1;
+	indexInfo->ii_KeyAttrNumbers[0] = 1;
+	indexInfo->ii_Expressions = NIL;
+	indexInfo->ii_ExpressionsState = NIL;
+	indexInfo->ii_Predicate = NIL;
+	indexInfo->ii_PredicateState = NIL;
+	indexInfo->ii_Unique = true;
+	indexInfo->ii_Concurrent = false;
+
+	classObjectId[0] = INT4_BTREE_OPS_OID;
+	classObjectId[1] = INT4_BTREE_OPS_OID;
+
+	blkdiridxid = index_create(blkdirrelid, aoseg_idxname, aosegIndexOid,
+							   indexInfo,
+							   BTREE_AM_OID,
+							   tablespaceOid,
+							   classObjectId, (Datum) 0,
+							   true, false, (Oid *) NULL, true, false, false, NULL);
+
+	/* Unlock target table -- no one can see it */
+	UnlockRelationOid(blkdirrelid, ShareLock);
+	/* Unlock the index -- no one can see it anyway */
+	UnlockRelationOid(blkdiridxid, AccessExclusiveLock);
+
+	/*
+	 * Store the aoseg table's OID in the parent relation's pg_appendonly row
+	 */
+	UpdateAppendOnlyEntryAuxOids(relOid, InvalidOid, InvalidOid, blkdirrelid, blkdiridxid);
+
+	/*
+	 * Register dependency from the aoseg table to the master, so that the
+	 * aoseg table will be deleted if the master is.
+	 */
+	baseobject.classId = RelationRelationId;
+	baseobject.objectId = relOid;
+	baseobject.objectSubId = 0;
+	aosegobject.classId = RelationRelationId;
+	aosegobject.objectId = blkdirrelid;
+	aosegobject.objectSubId = 0;
+
+	recordDependencyOn(&aosegobject, &baseobject, DEPENDENCY_INTERNAL);
+
+	/*
+	 * Make changes visible
+	 */
+	CommandCounterIncrement();
+
+	return true;
+}
+
+
 /*
  * Check to see whether the table needs an aoseg table.	It does only if it is
  * an append-only relation.
@@ -320,4 +468,12 @@ needs_aoseg_table(Relation rel)
 	return RelationIsAo(rel);
 }
 
-
+/*
+ * Check to see whether the table needs an aoseg index table.	It does only if it is
+ * an append-only orc relation.
+ */
+static bool
+needs_aoseg_index_table(Relation rel)
+{
+	return RelationIsOrc(rel);
+}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index e7e3887..e13b97b 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -129,7 +129,7 @@ extern void PQclear(PGresult *res);
 
 static void MetaTrackAddUpdInternal(cqContext  *pcqCtx,
 									Oid			classid,
-									Oid			objoid, 
+									Oid			objoid,
 									Oid			relowner,
 									char*		actionname,
 									char*		subtype,
@@ -546,14 +546,14 @@ heap_create(const char *relname,
 							rel->rd_segfile0_relationnodeinfo.persistentSerialNum);
 			heap_close(gp_relation_node, RowExclusiveLock);
 		}
-#endif	
+#endif
 	}
 
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(&rel->rd_relationnodeinfo.persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "setNewRelfilenodeCommon has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
 			 rel->rd_node.spcNode,
 			 rel->rd_node.dbNode,
@@ -563,7 +563,7 @@ heap_create(const char *relname,
 	}
 
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 		     "heap_create: '%s', Append-Only '%s', persistent TID %s and serial number " INT64_FORMAT " for CREATE",
 			 relpath(rel->rd_node),
 			 (isAppendOnly ? "true" : "false"),
@@ -917,8 +917,8 @@ static void MetaTrackAddUpdInternal(cqContext  *pcqCtx,
 } /* end MetaTrackAddUpdInternal */
 
 
-void MetaTrackAddObject(Oid		classid, 
-						Oid		objoid, 
+void MetaTrackAddObject(Oid		classid,
+						Oid		objoid,
 						Oid		relowner,
 						char*	actionname,
 						char*	subtype)
@@ -959,8 +959,8 @@ void MetaTrackAddObject(Oid		classid,
 
 } /* end MetaTrackAddObject */
 
-void MetaTrackUpdObject(Oid		classid, 
-						Oid		objoid, 
+void MetaTrackUpdObject(Oid		classid,
+						Oid		objoid,
 						Oid		relowner,
 						char*	actionname,
 						char*	subtype)
@@ -1015,7 +1015,7 @@ void MetaTrackUpdObject(Oid		classid,
 									classid, objoid, relowner,
 									actionname, subtype,
 									rel, tuple);
-			
+
 /*			CommandCounterIncrement(); */
 
 			ii++;
@@ -1026,14 +1026,14 @@ void MetaTrackUpdObject(Oid		classid,
 
 	/* add it if it didn't already exist */
 	if (!ii)
-		MetaTrackAddObject(classid, 
-						   objoid, 
+		MetaTrackAddObject(classid,
+						   objoid,
 						   relowner,
 						   actionname,
 						   subtype);
 
 } /* end MetaTrackUpdObject */
-void MetaTrackDropObject(Oid		classid, 
+void MetaTrackDropObject(Oid		classid,
 						 Oid		objoid)
 {
 	int ii = 0;
@@ -1307,7 +1307,7 @@ AddNewRelationTuple(Relation pg_class_desc,
 			/* NOTE: look at cdb_estimate_rel_size() if changing these values */
 			if(relstorage_is_external(relstorage))
 			{
-				new_rel_reltup->relpages = gp_external_table_default_number_of_pages; 
+				new_rel_reltup->relpages = gp_external_table_default_number_of_pages;
 				new_rel_reltup->reltuples = gp_external_table_default_number_of_tuples;
 			}
 			break;
@@ -1430,8 +1430,8 @@ InsertGpRelfileNodeTuple(
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "Inserting with invalid TID (0,0) into relation id %u '%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT,
 			 relationId,
 			 relname,
@@ -1444,7 +1444,7 @@ InsertGpRelfileNodeTuple(
 	memset(nulls, false, sizeof(nulls));
 
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "InsertGpRelationNodeTuple: Inserting into relation id %u '%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT ", TID %s",
 			 relationId,
 			 relname,
@@ -1499,8 +1499,8 @@ UpdateGpRelfileNodeTuple(
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "Updating with invalid TID (0,0) in relfilenode %u, segment file #%d, serial number " INT64_FORMAT,
 			 relfilenode,
 			 segmentFileNum,
@@ -1508,7 +1508,7 @@ UpdateGpRelfileNodeTuple(
 	}
 
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "UpdateGpRelationNodeTuple: Updating relfilenode %u, segment file #%d, serial number " INT64_FORMAT " at TID %s",
 			 relfilenode,
 			 segmentFileNum,
@@ -1521,7 +1521,7 @@ UpdateGpRelfileNodeTuple(
 
 	repl_repl[Anum_gp_relfile_node_relfilenode_oid - 1] = true;
 	repl_val[Anum_gp_relfile_node_relfilenode_oid - 1] = ObjectIdGetDatum(relfilenode);
-	
+
 	repl_repl[Anum_gp_relfile_node_segment_file_num - 1] = true;
 	repl_val[Anum_gp_relfile_node_segment_file_num - 1] = Int32GetDatum(segmentFileNum);
 
@@ -1529,12 +1529,12 @@ UpdateGpRelfileNodeTuple(
 
 	repl_repl[Anum_gp_relfile_node_persistent_tid- 1] = true;
 	repl_val[Anum_gp_relfile_node_persistent_tid- 1] = PointerGetDatum(persistentTid);
-	
+
 	repl_repl[Anum_gp_relfile_node_persistent_serial_num - 1] = true;
 	repl_val[Anum_gp_relfile_node_persistent_serial_num - 1] = Int64GetDatum(persistentSerialNum);
 
 	newtuple = heap_modify_tuple(tuple, RelationGetDescr(gp_relfile_node), repl_val, repl_null, repl_repl);
-	
+
 	simple_heap_update(gp_relfile_node, &newtuple->t_self, newtuple);
 
 	CatalogUpdateIndexes(gp_relfile_node, newtuple);
@@ -1559,7 +1559,7 @@ AddNewRelfileNodeTuple(
 							/* updateIndex */ true,
 							&new_rel->rd_relationnodeinfo.persistentTid,
 							new_rel->rd_relationnodeinfo.persistentSerialNum);
-							
+
 	}
 }
 
@@ -1603,7 +1603,7 @@ heap_create_with_catalog(const char *relname,
 
 	pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
 
-    // When creating gp_persistent_relfile_node, we can't directly insert meta info into gp_relfile_node 
+    // When creating gp_persistent_relfile_node, we can't directly insert meta info into gp_relfile_node
     // for this table is renamed from gp_relation_node, also it's schema changed.
 	if (IsBootstrapProcessingMode()|| (gp_upgrade_mode && GpPersistent_IsPersistentRelation(relid)))
 		gp_relfile_node_desc = NULL;
@@ -1630,7 +1630,7 @@ heap_create_with_catalog(const char *relname,
 		}
 		relstorage = stdRdOptions->columnstore;
 	}
-	
+
 	if (IsBuiltinTablespace(reltablespace) && appendOnlyRel) {
 		ereport(ERROR,
 			(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1669,7 +1669,7 @@ heap_create_with_catalog(const char *relname,
 								 stdRdOptions->columnstore);
 
 	/* MPP-8058: disallow OIDS on column-oriented tables */
-	if (tupdesc->tdhasoid && 
+	if (tupdesc->tdhasoid &&
 		IsNormalProcessingMode() &&
         (Gp_role == GP_ROLE_DISPATCH))
 	{
@@ -1982,11 +1982,11 @@ heap_create_with_catalog(const char *relname,
 		}
 
 		/* MPP-7576: don't track internal namespace tables */
-		switch (relnamespace) 
+		switch (relnamespace)
 		{
 			case PG_CATALOG_NAMESPACE:
 				/* MPP-7773: don't track objects in system namespace
-				 * if modifying system tables (eg during upgrade)  
+				 * if modifying system tables (eg during upgrade)
 				 */
 				if (allowSystemTableModsDDL)
 					doIt = false;
@@ -2330,7 +2330,7 @@ RemoveAttrDefault(Oid relid, AttrNumber attnum,
 		object.objectSubId = 0;
 
 		performDeletion(&object, behavior);
-		
+
 		found = true;
 	}
 
@@ -2429,9 +2429,9 @@ remove_gp_relation_node_and_schedule_drop(
 	Relation	rel)
 {
 	PersistentFileSysRelStorageMgr relStorageMgr;
-	
+
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "remove_gp_relation_node_and_schedule_drop: dropping relation '%s', relation id %u '%s', relfilenode %u",
 			 rel->rd_rel->relname.data,
 			 rel->rd_id,
@@ -2449,9 +2449,9 @@ remove_gp_relation_node_and_schedule_drop(
 		DeleteGpRelfileNodeTuple(
 								rel,
 								/* segmentFileNum */ 0);
-		
+
 		if (Debug_persistent_print)
-			elog(Persistent_DebugPrintLevel(), 
+			elog(Persistent_DebugPrintLevel(),
 				 "remove_gp_relation_node_and_schedule_drop: For Buffer Pool managed relation '%s' persistent TID %s and serial number " INT64_FORMAT " for DROP",
 				 relpath(rel->rd_node),
 				 ItemPointerToString(&rel->rd_relationnodeinfo.persistentTid),
@@ -2465,7 +2465,7 @@ remove_gp_relation_node_and_schedule_drop(
 		int32 segmentFileNum;
 		ItemPointerData persistentTid;
 		int64 persistentSerialNum;
-		
+
 		relNodeRelation = heap_open(GpRelfileNodeRelationId, RowExclusiveLock);
 
 		GpRelfileNodeBeginScan(
@@ -2473,7 +2473,7 @@ remove_gp_relation_node_and_schedule_drop(
 						rel->rd_id,
 						rel->rd_rel->relfilenode,
 						&gpRelfileNodeScan);
-		
+
 		while ((tuple = GpRelfileNodeGetNext(
 								&gpRelfileNodeScan,
 								&segmentFileNum,
@@ -2481,16 +2481,16 @@ remove_gp_relation_node_and_schedule_drop(
 								&persistentSerialNum)))
 		{
 			if (Debug_persistent_print)
-				elog(Persistent_DebugPrintLevel(), 
+				elog(Persistent_DebugPrintLevel(),
 					 "remove_gp_relation_node_and_schedule_drop: For Append-Only relation %u relfilenode %u scanned segment file #%d, serial number " INT64_FORMAT " at TID %s for DROP",
 					 rel->rd_id,
 					 rel->rd_rel->relfilenode,
 					 segmentFileNum,
 					 persistentSerialNum,
 					 ItemPointerToString(&persistentTid));
-			
+
 			simple_heap_delete(relNodeRelation, &tuple->t_self);
-			
+
 			MirroredFileSysObj_ScheduleDropAppendOnlyFile(
 											&rel->rd_node,
 											segmentFileNum,
@@ -2498,9 +2498,9 @@ remove_gp_relation_node_and_schedule_drop(
 											&persistentTid,
 											persistentSerialNum);
 		}
-		
+
 		GpRelfileNodeEndScan(&gpRelfileNodeScan);
-		
+
 		heap_close(relNodeRelation, RowExclusiveLock);
 
 		/*
@@ -2682,8 +2682,9 @@ heap_drop_with_catalog(Oid relid)
 				// start transaction in magma for DROP TABLE
 				if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 				{
-				    PlugStorageBeginTransaction(NULL);
+                                  PlugStorageStartTransaction();
 				}
+                                PlugStorageGetTransactionId(NULL);
 				Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 				// drop table in magma now
@@ -2693,7 +2694,7 @@ heap_drop_with_catalog(Oid relid)
 				                     database_name,
 				                     schema_name,
 				                     table_name,
-				                     PlugStorageGetTransactionSnapshot());
+				                     PlugStorageGetTransactionSnapshot(NULL));
 				ReadCacheHashEntryReviseOnCommit(RelationGetRelid(rel), true);
 
 			}
@@ -2718,7 +2719,7 @@ heap_drop_with_catalog(Oid relid)
 
 	if (is_foreign_rel)
 		RemoveForeignTableEntry(relid);
-	
+
 	/*
  	 * delete distribution policy if present
  	 */
@@ -2815,12 +2816,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
 
 	adrcqCtx = caql_beginscan(
 			caql_addrel(cqclr(&cqc), adrel),
-			cql("INSERT INTO pg_attrdef ", 
+			cql("INSERT INTO pg_attrdef ",
 				NULL));
 
 	if (Debug_check_for_invalid_persistent_tid)
-	{	
-		elog(LOG, 
+	{
+		elog(LOG,
 			 "StoreAttrDefault[1] relation %u/%u/%u '%s', isPresent %s, serial number " INT64_FORMAT ", TID %s",
 			 adrel->rd_node.spcNode,
 			 adrel->rd_node.dbNode,
@@ -2835,8 +2836,8 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
 	RelationFetchGpRelationNodeForXLog(adrel);
 
 	if (Debug_check_for_invalid_persistent_tid)
-	{	
-		elog(LOG, 
+	{
+		elog(LOG,
 			 "StoreAttrDefault[2] relation %u/%u/%u '%s', isPresent %s, serial number " INT64_FORMAT ", TID %s",
 			 adrel->rd_node.spcNode,
 			 adrel->rd_node.dbNode,
@@ -3518,7 +3519,7 @@ RemoveStatistics(Oid relid, AttrNumber attnum)
 					" WHERE starelid = :1 "
 					" AND staattnum = :2 ",
 					ObjectIdGetDatum(relid),
-					Int16GetDatum(attnum)));		 
+					Int16GetDatum(attnum)));
 	}
 }
 
@@ -3550,7 +3551,7 @@ RelationTruncateIndexes(Relation heapRelation)
 
 		/* Now truncate the actual file (and discard buffers) */
 		RelationTruncate(
-					currentIndex, 
+					currentIndex,
 					0,
 					/* markPersistentAsPhysicallyTruncated */ true);
 
@@ -3636,7 +3637,7 @@ heap_truncate(List *relids)
 
 		/* Truncate the actual file (and discard buffers) */
 		RelationTruncate(
-					rel, 
+					rel,
 					0,
 					/* markPersistentAsPhysicallyTruncated */ false);
 
@@ -3888,7 +3889,7 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 	newrnode.relNode = newrelfilenode;
 
 	isAppendOnly = RelationIsAo(relation);
-	
+
 	relname = RelationGetRelationName(relation);
 
 	if (!isAppendOnly)
@@ -3897,7 +3898,7 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 
 		PersistentFileSysRelStorageMgr localRelStorageMgr;
 		PersistentFileSysRelBufpoolKind relBufpoolKind;
-		
+
 		GpPersistentRelfileNode_GetRelfileInfo(
 											relation->rd_rel->relkind,
 											relation->rd_rel->relstorage,
@@ -3905,9 +3906,9 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 											&localRelStorageMgr,
 											&relBufpoolKind);
 		Assert(localRelStorageMgr == PersistentFileSysRelStorageMgr_BufferPool);
-		
+
 		srel = smgropen(newrnode);
-	
+
 		MirroredFileSysObj_TransactionCreateBufferPoolFile(
 											srel,
 											relBufpoolKind,
@@ -3931,8 +3932,8 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 	if (Debug_check_for_invalid_persistent_tid &&
 		!Persistent_BeforePersistenceWork() &&
 		PersistentStore_IsZeroTid(&relation->rd_relationnodeinfo.persistentTid))
-	{	
-		elog(ERROR, 
+	{
+		elog(ERROR,
 			 "setNewRelfilenodeCommon has invalid TID (0,0) for relation %u/%u/%u '%s', serial number " INT64_FORMAT,
 			 newrnode.spcNode,
 			 newrnode.dbNode,
@@ -3942,9 +3943,9 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
 	}
 
 	relation->rd_relationnodeinfo.isPresent = true;
-	
+
 	if (Debug_persistent_print)
-		elog(Persistent_DebugPrintLevel(), 
+		elog(Persistent_DebugPrintLevel(),
 			 "setNewRelfilenodeCommon: NEW '%s', Append-Only '%s', persistent TID %s and serial number " INT64_FORMAT,
 			 relpath(newrnode),
 			 (isAppendOnly ? "true" : "false"),
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9b4762e..7747e5f 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -977,8 +977,9 @@ index_create(Oid heapRelationId,
 		// 2. start transaction in magma for CREATE INDEX
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-			PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 		// 3. call InvokeMagmaCreateIndex
@@ -1006,7 +1007,7 @@ index_create(Oid heapRelationId,
 		InvokeMagmaCreateIndex(
 				&procInfo, database_name, schemaname,
 				tablename, &idxinfo,
-				PlugStorageGetTransactionSnapshot());
+				PlugStorageGetTransactionSnapshot(NULL));
 		// free memory
 		pfree(idxinfo.indkey);
 	}
@@ -1281,8 +1282,9 @@ index_drop(Oid indexId)
 		// 2. start transaction in magma for DROP INDEX
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-			PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 		// 3. call InvokeMagmaDropIndex
@@ -1302,7 +1304,7 @@ index_drop(Oid indexId)
 		InvokeMagmaDropIndex(
 				&procInfo, database_name, schemaname,
 				tablename, indexName,
-				PlugStorageGetTransactionSnapshot());
+				PlugStorageGetTransactionSnapshot(NULL));
 	}
 
 	/*
@@ -1475,7 +1477,7 @@ index_update_stats(Relation rel, bool hasindex, bool isprimary,
 	 * this relpages are only needed by QE,
 	 * when this is a magma table, just ignore this info.
 	 */
-	if (!((RelationIsExternal(rel) && RelationIsMagmaTable(rel->rd_id))))
+	if (!RelationIsMagmaTable2(rel->rd_id))
 		relpages = RelationGetNumberOfBlocks(rel);
 	Oid			relid = RelationGetRelid(rel);
 	Relation	pg_class;
@@ -2846,8 +2848,9 @@ reindex_index(Oid indexId, Oid newrelfilenode, List **extra_oids)
 		// 2. start transaction in magma for REINDEX INDEX
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-			PlugStorageBeginTransaction(NULL);
+                  PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
 		// 3. call InvokeMagmaReindexIndex
@@ -2867,7 +2870,7 @@ reindex_index(Oid indexId, Oid newrelfilenode, List **extra_oids)
 		InvokeMagmaReindexIndex(
 				&procInfo, database_name, schemaname,
 				tablename, indexName,
-				PlugStorageGetTransactionSnapshot());
+				PlugStorageGetTransactionSnapshot(NULL));
 	}
 
 	/* Close rels, but keep locks */
diff --git a/src/backend/catalog/pg_compression.c b/src/backend/catalog/pg_compression.c
index 1a10d09..4df1427 100644
--- a/src/backend/catalog/pg_compression.c
+++ b/src/backend/catalog/pg_compression.c
@@ -582,7 +582,8 @@ compresstype_is_valid(char *comptype)
 	{
 		if(strcmp(comptype, "snappy") == 0 ||
 		    strcmp(comptype, "gzip") == 0 ||
-		    strcmp(comptype, "lz4") == 0)
+		    strcmp(comptype, "lz4") == 0 ||
+		    strcmp(comptype, "zstd") == 0)
 			found = true;
 	}
 
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index 278103b..0e908fc 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -1073,9 +1073,16 @@ int64 get_block_locations_and_calculate_table_size(split_to_segment_mapping_cont
           // start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
           if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
           {
-                  PlugStorageBeginTransaction(magmaTableFullNames);
+            PlugStorageStartTransaction();
                   useClientCacheDirectly = true;
           }
+          if (((PlannedStmt *)context->srtc_context.base.node)->commandType ==
+                  CMD_SELECT ||
+              context->isTargetNoMagma) {
+            PlugStorageGetTransactionSnapshot(magmaTableFullNames);
+          } else {
+            PlugStorageGetTransactionId(magmaTableFullNames);
+          }
           Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
   }
 
@@ -2886,7 +2893,7 @@ static void ExternalGetMagmaRangeDataLocation(
 			// get range location from magma now
 			Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 			InvokeMagmaProtocolBlockLocation(ext_entry, procOid, dbname, schemaname,
-			                                 tablename, PlugStorageGetTransactionSnapshot(),
+			                                 tablename, PlugStorageGetTransactionSnapshot(NULL),
 			                                 useClientCacheDirectly,
 			                                 &bldata);
 		}
@@ -6698,14 +6705,15 @@ void build_magma_scansplits_for_result_relations(List **alloc_result, List *relO
     // start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
     if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
     {
-      PlugStorageBeginTransaction(NULL);
+      PlugStorageStartTransaction();
     }
+    PlugStorageGetTransactionId(NULL);
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
     ExtProtocolBlockLocationData *bldata = NULL;
     InvokeMagmaProtocolBlockLocation(
         ext_entry, procOid, dbname, schemaname, tablename,
-        PlugStorageGetTransactionSnapshot(), false, &bldata);
+        PlugStorageGetTransactionSnapshot(NULL), false, &bldata);
 
     pfree(dbname);
     pfree(schemaname);
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c b/src/backend/cdb/cdbquerycontextdispatching.c
index 9ffb5e5..fcf11d8 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -868,7 +868,7 @@ RebuildPlugStorageSnapshot(QueryContextInfo *cxt)
 
 	pfree(snapshot.txnActions.txnActions);
 
-	MagmaSnapshot *s = PlugStorageGetTransactionSnapshot();
+	MagmaSnapshot *s = PlugStorageGetTransactionSnapshot(NULL);
 
 	elog(LOG, "SNAPSHOT DEBUG: GET TOP (%llu, %u, %llu, %u, %d)",
 	     s->currentTransaction.txnId,
@@ -1703,7 +1703,7 @@ prepareDispatchedCatalogSingleRelation(QueryContextInfo *cxt, Oid relid,
 			pfree(formatterName);
 	}
 	/* The pluggable storage snapshot must be dispatched */
-	prepareDispatchedPlugStorageSnapshot(cxt, PlugStorageGetTransactionSnapshot());
+	prepareDispatchedPlugStorageSnapshot(cxt, PlugStorageGetTransactionSnapshot(NULL));
 
 	/* The distribution policy for table */
 	prepareDispatchedCatalogDistributionPolicy(cxt, relid);
diff --git a/src/backend/cdb/dispatcher.c b/src/backend/cdb/dispatcher.c
index 226ffae..f29514a 100644
--- a/src/backend/cdb/dispatcher.c
+++ b/src/backend/cdb/dispatcher.c
@@ -1180,6 +1180,9 @@ static void dispatcher_serialize_common_plan(DispatchData *data, CommonPlanConte
             new_executor_partitioned_hash_recursive_depth_limit);
     univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
                    numberStrBuf);
+    sprintf(numberStrBuf, "%d", new_executor_external_sort_memory_limit_size_mb);
+    univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+                   numberStrBuf);
 
     univPlanAddGuc(ctx->univplan, "new_interconnect_type",
                    show_new_interconnect_type());
diff --git a/src/backend/cdb/dispatcher_new.c b/src/backend/cdb/dispatcher_new.c
index f00afce..ee6cc69 100644
--- a/src/backend/cdb/dispatcher_new.c
+++ b/src/backend/cdb/dispatcher_new.c
@@ -709,6 +709,9 @@ static void dispatcher_serialize_common_plan(MainDispatchData *data,
             new_executor_partitioned_hash_recursive_depth_limit);
     univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
                    numberStrBuf);
+    sprintf(numberStrBuf, "%d", new_executor_external_sort_memory_limit_size_mb);
+    univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+                   numberStrBuf);
 
     univPlanAddGuc(ctx->univplan, "new_interconnect_type",
                    show_new_interconnect_type());
diff --git a/src/backend/cdb/motion/ic_udp.c b/src/backend/cdb/motion/ic_udp.c
index 0c7030c..2f754c3 100644
--- a/src/backend/cdb/motion/ic_udp.c
+++ b/src/backend/cdb/motion/ic_udp.c
@@ -778,8 +778,6 @@ static bool SendChunkUDP(MotionLayerState *mlStates, ChunkTransportState *transp
 
 static void doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID);
 static bool dispatcherAYT(void);
-static void checkQDConnectionAlive(void);
-
 
 static void *rxThreadFunc(void *arg);
 
@@ -5923,24 +5921,6 @@ formatSockAddr(struct sockaddr *sa, char* buf, int bufsize)
 }								/* formatSockAddr */
 
 /*
- * checkQDConnectionAlive
- * 		Check whether QD connection is still alive. If not, report error.
- */
-static void
-checkQDConnectionAlive(void)
-{
-	if (!dispatch_validate_conn(MyProcPort->sock))
-	{
-		if (Gp_role == GP_ROLE_EXECUTE)
-			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-							errmsg("Interconnect error segment lost contact with master (recv)")));
-		else
-			ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-							errmsg("Interconnect error master lost contact with client (recv)")));
-	}
-}
-
-/*
  * getCurrentTime
  * 		get current time
  *
@@ -6967,3 +6947,22 @@ WaitInterconnectQuitUDP(void)
 	}
 	ic_control_info.threadCreated = false;
 }
+
+
+/*
+ * checkQDConnectionAlive
+ *    Check whether QD connection is still alive. If not, report error.
+ */
+void
+checkQDConnectionAlive(void)
+{
+  if (!dispatch_validate_conn(MyProcPort->sock))
+  {
+    if (Gp_role == GP_ROLE_EXECUTE)
+      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+              errmsg("Interconnect error segment lost contact with master (recv)")));
+    else
+      ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+              errmsg("Interconnect error master lost contact with client (recv)")));
+  }
+}
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index b3fcf4a..6b61415 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -3872,7 +3872,7 @@ int64 GetExternalTotalBytesMAGMA(Relation relation){
 
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
     InvokeMagmaProtocolTableSize(ext_entry, procOid, dbname, schemaname, tablename,
-                                 PlugStorageGetTransactionSnapshot(), &ts);
+                                 PlugStorageGetTransactionSnapshot(NULL), &ts);
 
     pfree(dbname);
     pfree(schemaname);
@@ -3899,7 +3899,7 @@ int64 GetDatabaseTotalBytesMAGMA(Oid dbOid){
 
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 		InvokeMagmaProtocolDatabaseSize(procOid, dbname,
-		                                PlugStorageGetTransactionSnapshot(),
+		                                PlugStorageGetTransactionSnapshot(NULL),
 		                                &dbs);
 
 		pfree(dbname);
@@ -4099,7 +4099,7 @@ uint64 GetExternalTotalBytes(Relation rel)
 			/* start transaction for magma table */
       if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
       {
-        PlugStorageBeginTransaction(NULL);
+        PlugStorageStartTransaction();
       }
 			Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 92f7098..ed98b00 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1290,8 +1290,9 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 		{
 			if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 			{
-				PlugStorageBeginTransaction(NULL);
+                          PlugStorageStartTransaction();
 			}
+                        PlugStorageGetTransactionId(NULL);
 			Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 		}
 
@@ -2574,7 +2575,7 @@ CopyTo(CopyState cstate)
 					currentScanDesc = InvokePlugStorageFormatBeginScan(
 							&beginScanFunc, cstate->planstmt, node, &(externalstate.ss),
 							serializeSchema, serializeSchemaLen, rel,
-							formatterType, formatterName, PlugStorageGetTransactionSnapshot());
+							formatterType, formatterName, PlugStorageGetTransactionSnapshot(NULL));
 				}
 				else
 				{
@@ -4562,7 +4563,7 @@ CopyFrom(CopyState cstate)
 							                                          formatterName,
 							                                          plannedstmt,
 							                                          segfileinfo->segno,
-							                                          PlugStorageGetTransactionSnapshot());
+							                                          PlugStorageGetTransactionSnapshot(NULL));
 
 							pfree(insertInitFunc);
 							pfree(plannedstmt);
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 072197a..126861c 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1530,8 +1530,9 @@ dropdb(const char *dbname, bool missing_ok)
 						// start transaction in magma for DROP TABLE
 						if (PlugStorageGetTransactionStatus()
 								== PS_TXN_STS_DEFAULT) {
-							PlugStorageBeginTransaction(NULL);
+                                                  PlugStorageStartTransaction();
 						}
+                                                PlugStorageGetTransactionId(NULL);
 						Assert(
 								PlugStorageGetTransactionStatus()
 										== PS_TXN_STS_STARTED);
@@ -1539,7 +1540,7 @@ dropdb(const char *dbname, bool missing_ok)
 						// drop table in magma now
 						InvokeMagmaDropTable(&procInfo, dbInfoRel->exttable, database_name,
 								schema_name, table_name,
-								PlugStorageGetTransactionSnapshot());
+								PlugStorageGetTransactionSnapshot(NULL));
 						ReadCacheHashEntryReviseOnCommit(dbInfoRel->relationOid, true);
 					}
 				}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 7d43c8f..345de0b 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -34,6 +34,7 @@
 
 #include "postgres.h"
 
+#include "access/aosegfiles.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/fileam.h"
@@ -423,10 +424,11 @@ DefineIndex(Oid relationId,
 				 errmsg("access method \"%s\" does not support multicolumn indexes",
 						accessMethodName)));
 
-    if  (unique && RelationIsAo(rel))
-        ereport(ERROR,
-                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                 errmsg("append-only tables do not support unique indexes")));
+
+	/* native orc can't support unique/primary index */
+	if (unique && RelationIsOrc(rel))
+		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("native orc do not support unique indexes")));
 
 	amoptions = accessMethodForm->amoptions;
 
@@ -752,24 +754,14 @@ DefineIndex(Oid relationId,
 							   errOmitLocation(true)));
         }
         else
-		    {
-          char *formatOpt = caql_getcstring(
-                  NULL,
-                  cql("SELECT fmtopts FROM pg_exttable WHERE reloid = :1",
-                  ObjectIdGetDatum(relationId)));
-          if (!formatOpt) {
-            ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                    errmsg("Cannot support DefineIndex")));
-          }
-          else {
-            char *formatName = getExtTblFormatterTypeInFmtOptsStr(formatOpt);
-            if (!formatName ||
-                (!(pg_strncasecmp(formatName, "magma", strlen("magma")) == 0))) {
-              ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-                      errmsg("Cannot support DefineIndex")));
-            }
-          }
-		    }
+        {
+        	/* magma and native orc support index */
+        	if (!(RelationIsOrc(rel) || RelationIsMagmaTable2(relationId)))
+        	{
+        		ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support DefineIndex")));
+        	}
+        	// dispatch_statement_node((Node *)stmt, NULL, NULL, NULL);
+        }
 	}
 
 	/* save lockrelid for below, then close rel */
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3460af4..a92eaa6 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -1833,14 +1833,15 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt)
 		// start transaction in magma for CREATE TABLE
 		if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
 		{
-		    PlugStorageBeginTransaction(NULL);
+		    PlugStorageStartTransaction();
 		}
+                PlugStorageGetTransactionId(NULL);
 		Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 		InvokeMagmaCreateTable(&procInfo,
 		                       database_name,
 		                       schema_name,
 		                       table_name,
-		                       PlugStorageGetTransactionSnapshot(),
+		                       PlugStorageGetTransactionSnapshot(NULL),
 		                       createExtStmt->base.tableElts,
 		                       createExtStmt->pkey,
 		                       createExtStmt->base.distributedBy,
diff --git a/src/backend/executor/execDML.c b/src/backend/executor/execDML.c
index c6a0384..bd06077 100644
--- a/src/backend/executor/execDML.c
+++ b/src/backend/executor/execDML.c
@@ -470,7 +470,7 @@ ExecInsert(TupleTableSlot *slot,
 															 formatterName,
 															 estate->es_plannedstmt,
 															 segfileinfo->segno,
-															 PlugStorageGetTransactionSnapshot());
+															 PlugStorageGetTransactionSnapshot(NULL));
 				}
 				else
 				{
@@ -723,7 +723,7 @@ ldelete:;
 	                    InvokeMagmaBeginDelete(&procInfo,
 	                                           resultRelationDesc,
 	                                           estate->es_plannedstmt,
-	                                           PlugStorageGetTransactionSnapshot());
+	                                           PlugStorageGetTransactionSnapshot(NULL));
 		    }
 	        else
 	        {
@@ -1043,7 +1043,7 @@ lreplace:;
 		                    InvokeMagmaBeginUpdate(&procInfo,
 		                                           resultRelationDesc,
 		                                           estate->es_plannedstmt,
-		                                           PlugStorageGetTransactionSnapshot());
+		                                           PlugStorageGetTransactionSnapshot(NULL));
 		            elog(LOG, "exec update begin update: %d", extUpdDescEntry->ext_ins_oid);
 		        }
 		        else
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 71f49ef..5de2981 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -5084,7 +5084,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 	                               formatterName,
 	                               estate->es_plannedstmt,
 	                               segfileinfo->segno,
-	                               PlugStorageGetTransactionSnapshot());
+	                               PlugStorageGetTransactionSnapshot(NULL));
 	        }
 	        else
 	        {
diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c
index 68d7add..2d203a7 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -355,7 +355,7 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
                                        currentRelation,
                                        formatterType,
                                        formatterName,
-                                       PlugStorageGetTransactionSnapshot());
+                                       PlugStorageGetTransactionSnapshot(NULL));
 		}
 		else
 		{
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c635d8f..df5775c 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -386,8 +386,11 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
         pathlist = lappend(pathlist, seqpath);
 
 	/* Consider index and bitmap scans */
-	create_index_paths(root, rel, relstorage, 
-					   &indexpathlist, &bitmappathlist);
+  if (!relstorage_is_ao(relstorage))
+  {
+  	/* Temporarily disable index for ao table */
+  	create_index_paths(root, rel, relstorage, &indexpathlist, &bitmappathlist);
+  }
 
 	/* deal with magma index scan */
 	if (relstorage == RELSTORAGE_EXTERNAL)
diff --git a/src/backend/optimizer/plan/newPlanner.c b/src/backend/optimizer/plan/newPlanner.c
index 4840f47..dc5546e 100644
--- a/src/backend/optimizer/plan/newPlanner.c
+++ b/src/backend/optimizer/plan/newPlanner.c
@@ -17,6 +17,7 @@
  */
 
 #include "optimizer/newPlanner.h"
+#include "catalog/catalog.h"
 
 #include "access/aomd.h"
 #include "access/fileam.h"
@@ -54,6 +55,7 @@ char *new_executor_enable_partitioned_hashjoin_mode;
 char *new_executor_enable_external_sort_mode;
 int new_executor_partitioned_hash_recursive_depth_limit;
 int new_executor_ic_tcp_client_limit_per_query_per_segment;
+int new_executor_external_sort_memory_limit_size_mb;
 
 const char *new_executor_runtime_filter_mode;
 const char *new_executor_runtime_filter_mode_local = "local";
@@ -99,6 +101,8 @@ static void
 do_convert_magma_rangevseg_map_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_rangetbl_to_common_plan(List *rtable,
                                                CommonPlanContext *ctx);
+static void do_convert_result_partitions_to_common_plan(
+    PartitionNode *partitionNode, CommonPlanContext *ctx);
 static void do_convert_token_map_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx);
 static void do_convert_splits_list_to_common_plan(List *splits, Oid relOid,
@@ -133,6 +137,19 @@ static bool checkSupportedSubLinkType(SubLinkType sublinkType);
 static bool checkInsertSupportTable(PlannedStmt *stmt);
 static bool checkIsPrepareQuery(QueryDesc *queryDesc);
 
+// @return format string whose life time goes along with current MemoryContext
+static const char *buildInternalTableFormatOptionStringInJson(Relation rel) {
+  AppendOnlyEntry *aoentry =
+      GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
+  StringInfoData option;
+  initStringInfo(&option);
+  appendStringInfoChar(&option, '{');
+  if (aoentry->compresstype)
+    appendStringInfo(&option, "%s", aoentry->compresstype);
+  appendStringInfoChar(&option, '}');
+  return option.data;
+}
+
 #define DIRECT_LEFT_CHILD_VAR 0
 #define INT64_MAX_LENGTH 20
 
@@ -313,6 +330,27 @@ void convert_to_common_plan(PlannedStmt *stmt, CommonPlanContext *ctx) {
         pfree(rgId);
         pfree(rgUrl);
       }
+      // For append-only internal table
+      if (get_relation_storage_type(oid) == RELSTORAGE_ORC) {
+        ListCell *lc;
+        foreach (lc, stmt->result_segfileinfos) {
+          ResultRelSegFileInfoMapNode *pRelSegFileInfoMapNode =
+              (ResultRelSegFileInfoMapNode *)lfirst(lc);
+          ListCell *lc;
+          foreach (lc, pRelSegFileInfoMapNode->segfileinfos) {
+            ResultRelSegFileInfo *pSegFileInfo = lfirst(lc);
+            if (pSegFileInfo->numfiles == 0) {
+              // detect mixed-up partition of external table
+              ctx->convertible = false;
+              return;
+            }
+            univPlanAddResultRelSegFileInfo(
+                ctx->univplan, pRelSegFileInfoMapNode->relid,
+                pSegFileInfo->segno, pSegFileInfo->eof[0],
+                pSegFileInfo->uncompressed_eof[0]);
+          }
+        }
+      }
       univPlanAddToPlanNode(ctx->univplan, true);
     }
     do_convert_plantree_to_common_plan(stmt->planTree, pid, true, false, NIL,
@@ -327,8 +365,9 @@ void convert_to_common_plan(PlannedStmt *stmt, CommonPlanContext *ctx) {
       do_convert_plantree_to_common_plan(subplan, -1, true, true, NIL, NULL,
                                          true, ctx);
   }
-  if (ctx->convertible)
-    do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+  if (ctx->convertible) do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+  if (ctx->convertible && stmt->result_partitions)
+    do_convert_result_partitions_to_common_plan(stmt->result_partitions, ctx);
   if (ctx->convertible && enable_secure_filesystem)
     do_convert_token_map_to_common_plan(ctx);
   if (ctx->convertible && ctx->isMagma)
@@ -1294,9 +1333,11 @@ void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx) {
       columnDataTypeMod[i] = att->atttypmod;
     }
     FormatType fmttype = UnivPlanOrcFormat;
-    univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, "dummy", "{}",
-                                  attNum, (const char **)columnName,
-                                  columnDataType, columnDataTypeMod, NULL);
+    univPlanRangeTblEntryAddTable(
+        ctx->univplan, relid, fmttype, relpath(rel->rd_node),
+        buildInternalTableFormatOptionStringInJson(rel), attNum,
+        (const char **)columnName, columnDataType, columnDataTypeMod, NULL,
+        rel->rd_rel->relname.data);
   } else if (RelationIsExternal(rel)) {
     TupleDesc tableAttrs = rel->rd_att;
     attNum = tableAttrs->natts;
@@ -1394,7 +1435,7 @@ void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx) {
     univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, location,
                                   fmtOptsJson, attNum,
                                   (const char **)columnName, columnDataType,
-                                  columnDataTypeMod, targetName);
+                                  columnDataTypeMod, targetName, NULL);
 
     if (fmtOptsJson != NULL)
       pfree(fmtOptsJson);
@@ -1419,6 +1460,53 @@ end:
   pfree(columnDataTypeMod);
 }
 
+static void do_convert_result_partition_rule_to_common_plan(
+    CommonPlanContext *ctx, PartitionRule *partitionRule,
+    bool isDefaultPartition) {
+  if (partitionRule->children) {
+    // TODO(chiyang): sub-partition
+    ctx->convertible = false;
+    return;
+  }
+  univPlanResultPartitionsAddPartitionRule(
+      ctx->univplan, partitionRule->parchildrelid, partitionRule->parname,
+      isDefaultPartition);
+
+  ListCell *lc;
+  foreach (lc, partitionRule->parlistvalues) {
+    univPlanPartitionRuleAddPartitionValue(ctx->univplan, isDefaultPartition);
+    List *partitionListValues = (List *)lfirst(lc);
+    ListCell *lc;
+    foreach (lc, partitionListValues) {
+      Const *val = (List *)lfirst(lc);
+      do_convert_expr_to_common_plan(-1, val, ctx);
+      univPlanPartitionValueAddConst(ctx->univplan, isDefaultPartition);
+    }
+  }
+}
+
+static void do_convert_result_partitions_to_common_plan(
+    PartitionNode *partitionNode, CommonPlanContext *ctx) {
+  if (partitionNode->part->parkind != 'l') {
+    // TODO(chiyang): range partition
+    ctx->convertible = false;
+    return;
+  }
+  univPlanAddResultPartitions(ctx->univplan, partitionNode->part->parrelid,
+                              partitionNode->part->parkind,
+                              partitionNode->part->paratts,
+                              partitionNode->part->parnatts);
+  ListCell *lc;
+  foreach (lc, partitionNode->rules) {
+    PartitionRule *partitionRule = (PartitionRule *)lfirst(lc);
+    do_convert_result_partition_rule_to_common_plan(ctx, partitionRule, false);
+  }
+  if (partitionNode->default_part) {
+    do_convert_result_partition_rule_to_common_plan(
+        ctx, partitionNode->default_part, true);
+  }
+}
+
 void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
   HASH_SEQ_STATUS status;
   struct FileSystemCredential *entry;
@@ -1453,14 +1541,14 @@ void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
 // it's convertible and it's a magma scan
 void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx) {
   // start transaction in magma for SELECT in new executor
-  if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
-    PlugStorageBeginTransaction(NULL);
-  }
+  // if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
+  //   PlugStorageStartTransaction(NULL);
+  // }
   Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
   int32_t size = 0;
   char *snapshot = NULL;
-  MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(), &snapshot,
-                                 &size);
+  MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(NULL),
+                                 &snapshot, &size);
   if (snapshot && size != 0) {
     univPlanAddSnapshot(ctx->univplan, snapshot, size);
   }
@@ -1959,15 +2047,14 @@ end:
 }
 
 bool checkInsertSupportTable(PlannedStmt *stmt) {
-  // disable partitioned result target
-  if (stmt->result_partitions)
-    return false;
-  if (list_length(stmt->resultRelations) > 1)
-    return false;
+  if (list_length(stmt->resultRelations) > 1) return false;
   int32_t index = list_nth_int(stmt->resultRelations, 0);
   RangeTblEntry *rte = (RangeTblEntry *)list_nth(stmt->rtable, index - 1);
 
-  // if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+  if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+
+  // disable partition table insert for external table
+  if (stmt->result_partitions) return false;
 
   Relation pgExtTableRel = heap_open(ExtTableRelationId, RowExclusiveLock);
   cqContext cqc;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index be53f20..ce75706 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -8112,6 +8112,11 @@ static Query *transformIndexStmt(ParseState *pstate, IndexStmt *stmt,
 
     if (RelationBuildPartitionDesc(rel, false)) stmt->do_part = true;
 
+    /* native orc can't create index in parent relation */
+    if (RelationIsOrc(rel) && stmt->do_part)
+  		ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+  				errmsg("Cannot support create index statement in native orc parent relation yet")));
+
     if (stmt->do_part && Gp_role != GP_ROLE_EXECUTE) {
       List *children;
       struct HTAB *nameCache;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ed4b5dc..ee8958d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -67,6 +67,7 @@
 #include "cdb/cdbpersistentrelfile.h"
 
 #include "access/aosegfiles.h"
+#include "access/orcsegfiles.h"
 #include "access/parquetsegfiles.h"
 #include "cdb/cdbappendonlyam.h"
 #include "cdb/cdbvars.h"
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 27e7894..89d20e0 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -17,6 +17,7 @@
 #include "postgres.h"
 #include "port.h"
 
+#include "access/aosegfiles.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/fileam.h"
@@ -415,6 +416,72 @@ QueryIsReadOnly(Query *parsetree)
 }
 
 /*
+ * CanCreateIndex: can support create index
+ * So far, magma table and native orc could support index
+ */
+void CanSupportIndex(IndexStmt *stmt, Oid relid)
+{
+	/* 1. upgrade mode should support index operation */
+	if (gp_upgrade_mode) return;
+
+	bool supportIndex = false;
+	Relation rel = heap_open(relid, AccessShareLock);
+	bool nativeOrc = RelationIsOrc(rel);
+	heap_close(rel, AccessShareLock);
+
+	/*
+	 * 2. deal magma table and native orc
+	 * for "stmt->magma", deal with special partition situation, oushu issue #1049
+	 * its ugly, but there is no elegant way now
+	 */
+	if (RelationIsMagmaTable2(relid) || stmt->magma || nativeOrc)
+	{
+			supportIndex = true;
+			if (nativeOrc)
+			{
+				/* add pg_aoseg.pg_orcseg_idx_xxx and its index pg_aoseg.pg_orcseg_idx_xxx_index */
+				AlterTableCreateAoSegIndexTableWithOid(relid, stmt->is_part_child);
+			}
+	}
+	if (supportIndex)
+	{
+		/*
+		 * 3. magma/native orc index cant support the accessory conditions
+		 */
+		if (stmt->options) {
+			ereport(ERROR,
+							(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+									errmsg("magma/native orc Index cannot support create index with clause")));
+		}
+		if (stmt->whereClause) {
+			ereport(ERROR,
+							(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+									errmsg("magma/native orc Index cannot support create index where predicate")));
+		}
+		if (stmt->tableSpace) {
+			ereport(ERROR,
+							(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+									errmsg("magma/native orc Index cannot support create index with tableSpace")));
+		}
+		ListCell   *cell;
+		foreach(cell, stmt->indexParams)
+		{
+			IndexElem *elem = (IndexElem *) lfirst(cell);
+			if (elem->expr || elem->opclass)
+			{
+				ereport(ERROR,
+								(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+										errmsg("magma/native orc Index cannot support create index with expr or opclass")));
+			}
+		}
+	}
+	else
+	{
+		ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet")));
+	}
+}
+
+/*
  * CommandIsReadOnly: is an executable query read-only?
  *
  * This is a much stricter test than we apply for XactReadOnly mode;
@@ -1349,83 +1416,13 @@ ProcessUtility(Node *parsetree,
 				lockmode = stmt->concurrent ? ShareUpdateExclusiveLock
 						: ShareLock;
 				relid = RangeVarGetRelid(stmt->relation, false, false/*allowHcatalog*/);
-
-				/* Only create index for external table with magma */
 				Assert(OidIsValid(relid));
-				char *formatOpt = caql_getcstring(
-				        NULL,
-				        cql("SELECT fmtopts FROM pg_exttable WHERE reloid = :1",
-				        ObjectIdGetDatum(relid)));
-
-				if (!formatOpt)
-				{
-				  if (stmt->magma) {}
-				  else if (!gp_upgrade_mode)
-					{
-						ereport(ERROR,
-						(errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
-					}
-				}
-				else
-				{
-					char *formatName = getExtTblFormatterTypeInFmtOptsStr(formatOpt);
-					if (!formatName)
-					{
-						if (!gp_upgrade_mode)
-						{
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
-						}
-					}
-					/* in order to support magmatp/magmaap */
-					else if ((pg_strncasecmp(formatName, FORMAT_MAGMA_TP_STR,
-																	 sizeof(FORMAT_MAGMA_TP_STR)-1) == 0) ||
-							(pg_strncasecmp(formatName, FORMAT_MAGMA_AP_STR,
-															sizeof(FORMAT_MAGMA_AP_STR)-1) == 0))
-					{
-						if (stmt->options) {
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-													errmsg("magma Index cannot support create index with clause")));
-						}
-						if (stmt->whereClause) {
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-													errmsg("magma Index cannot support create index where predicate")));
-						}
-						if (stmt->tableSpace) {
-							ereport(ERROR,
-											(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-													errmsg("magma Index cannot support create index with tableSpace")));
-						}
-						ListCell   *cell;
-						foreach(cell, stmt->indexParams)
-						{
-							IndexElem *elem = (IndexElem *) lfirst(cell);
-							if (elem->expr || elem->opclass)
-							{
-								ereport(ERROR,
-												(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
-														errmsg("magma Index cannot support create index with expr or opclass")));
-							}
-						}
-						pfree(formatName);
-						// break;
-					}
-					else
-					{
-						pfree(formatName);
-						if (!gp_upgrade_mode)
-						{
-							ereport(ERROR,
-							        (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
-						}
-					}
-				}
-
 				LockRelationOid(relid, lockmode);
 				CheckRelationOwnership(relid, true);
 
+				// check whether can support index
+				CanSupportIndex(stmt, relid);
+
 				DefineIndex(relid,		/* relation */
 							stmt->idxname,		/* index name */
 							InvalidOid, /* no predefined OID */
diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c
index de91837..a5e327e 100644
--- a/src/backend/utils/adt/dbsize.c
+++ b/src/backend/utils/adt/dbsize.c
@@ -222,7 +222,7 @@ calculate_database_size(Oid dbOid)
     /* start transaction for magma table */
     if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
     {
-      PlugStorageBeginTransaction(NULL);
+      PlugStorageStartTransaction();
     }
     Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
 
diff --git a/src/backend/utils/gp/segadmin.c b/src/backend/utils/gp/segadmin.c
index ab4bff0..a615242 100644
--- a/src/backend/utils/gp/segadmin.c
+++ b/src/backend/utils/gp/segadmin.c
@@ -309,13 +309,16 @@ Datum hawq_magma_status(PG_FUNCTION_ARGS)
 	  }
 	  tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
 	  // free memory
-	  free(data->magmaNodes[funcctx->call_cntr].node);
-	  free(data->magmaNodes[funcctx->call_cntr].dirs);
+	  if (data->magmaNodes[funcctx->call_cntr].node)
+	    free(data->magmaNodes[funcctx->call_cntr].node);
+	  if (data->magmaNodes[funcctx->call_cntr].dirs)
+	    free(data->magmaNodes[funcctx->call_cntr].dirs);
 	  result = HeapTupleGetDatum(tuple);
 	  SRF_RETURN_NEXT(funcctx, result);
 	} else {
 	  // free memory
-	  free(data->magmaNodes);
+	  if (data->magmaNodes)
+	    free(data->magmaNodes);
 	  SRF_RETURN_DONE(funcctx);
 	}
 }
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4bb606b..20f8f1c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4678,6 +4678,15 @@ static struct config_int ConfigureNamesInt[] =
 		10000, 0, 65535, NULL, NULL
 	},
 
+  {
+    {"new_executor_external_sort_memory_limit", PGC_USERSET, QUERY_TUNING_OTHER,
+      gettext_noop("Sets the memory usage (in MB) limit of external sort for new executor."),
+      NULL
+    },
+    &new_executor_external_sort_memory_limit_size_mb,
+    256, 0, 1024, NULL, NULL
+  },
+
 	{
 		{"default_magma_hash_table_nvseg_per_seg", PGC_USERSET, QUERY_TUNING_OTHER,
 			gettext_noop("Sets default vseg number per node for Magma hash table"),
@@ -7490,7 +7499,6 @@ static struct config_string ConfigureNamesString[] =
 		"AUTO", assign_new_executor_mode, NULL
 	},
 
-
 	{
 		{"new_scheduler", PGC_USERSET, EXTERNAL_TABLES,
 			gettext_noop("Enable the new scheduler."),
diff --git a/src/include/access/orcsegfiles.h b/src/include/access/orcsegfiles.h
index 4a87981..f8b4582 100644
--- a/src/include/access/orcsegfiles.h
+++ b/src/include/access/orcsegfiles.h
@@ -30,6 +30,11 @@
 #define Anum_pg_orcseg_tupcount 3
 #define Anum_pg_orcseg_eofuncompressed 4
 
+#define Natts_pg_orcseg_idx 3
+#define Anum_pg_orcseg_idx_idxoid 1
+#define Anum_pg_orcseg_idx_segno 2
+#define Anum_pg_orcseg_idx_eof 3
+
 extern void insertInitialOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo);
 extern void insertOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo,
                                 float8 tupleCount, float8 eof,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 4b825ac..93e922d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -188,10 +188,15 @@ typedef enum PlugStorageTransactionStatus
 
 typedef enum PlugStorageTransactionCommand
 {
-	PS_TXN_CMD_BEGIN   = 0,
-	PS_TXN_CMD_COMMIT  = 1,
-	PS_TXN_CMD_ABORT   = 2,
-	PS_TXN_CMD_INVALID = 3
+    PS_TXN_CMD_START_TRANSACTION        = 0,
+    PS_TXN_CMD_COMMIT_TRANSACTION       = 1,
+    PS_TXN_CMD_ABORT_TRANSACTION        = 2,
+    PS_TXN_CMD_START_SUB_TRANSACTION    = 3,
+    PS_TXN_CMD_COMMIT_SUB_TRANSACTION   = 4,
+    PS_TXN_CMD_ABORT_SUB_TRANSACTION    = 5,
+    PS_TXN_CMD_GET_SNAPSHOT             = 6,
+    PS_TXN_CMD_GET_TRANSACTIONID        = 7,
+    PS_TXN_CMD_INVALID                  = 8
 } PlugStorageTransactionCommand;
 
 typedef struct PlugStorageTransactionData
@@ -202,7 +207,8 @@ typedef struct PlugStorageTransactionData
 	TransactionId                  pst_transaction_id;
 	PlugStorageTransactionStatus   pst_transaction_status;
 	PlugStorageTransactionCommand  pst_transaction_command;
-	MagmaSnapshot                 *pst_transaction_dist;     /* magma format */
+	MagmaSnapshot                 *pst_transaction_snapshot;     /* magma format */
+	MagmaTransactionState         *pst_transaction_state;        /* magma format */
 } PlugStorageTransactionData;
 
 typedef PlugStorageTransactionData *PlugStorageTransaction;
@@ -217,9 +223,10 @@ extern bool isCleanupAbortTransaction;
  */
 extern PlugStorageTransaction PlugStorageGetTransaction(void);
 extern PlugStorageTransactionStatus PlugStorageGetTransactionStatus(void);
-extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(void);
 extern void PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot);
-extern void PlugStorageBeginTransaction(List* magmaTableFullNames);
+extern void PlugStorageStartTransaction();
+extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(List* magmaTableFullNames);
+extern void PlugStorageGetTransactionId(List* magmaTableFullNames);
 extern void PlugStorageCommitTransaction(void);
 extern void PlugStorageAbortTransaction(void);
 extern void PlugStorageSetIsCleanupAbort(bool isCleanup);
diff --git a/src/include/catalog/aoseg.h b/src/include/catalog/aoseg.h
index 0e3d5ff..e653fb7 100644
--- a/src/include/catalog/aoseg.h
+++ b/src/include/catalog/aoseg.h
@@ -34,6 +34,7 @@ extern void AlterTableCreateAoSegTableWithOid(Oid relOid, Oid newOid,
 											  Oid newIndexOid,
 											  Oid *comptypeOid,
 											  bool is_part_child);
+extern void AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool is_part_child);
 
 extern void gpsql_appendonly_segfile_create(PG_FUNCTION_ARGS);
 
diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h
index 50b6c0d..8284945 100644
--- a/src/include/cdb/ml_ipc.h
+++ b/src/include/cdb/ml_ipc.h
@@ -354,4 +354,6 @@ extern void CleanUpNewInterconnect();
 
 extern void ResetRpcClientInstance();
 
+extern void checkQDConnectionAlive(void);
+
 #endif   /* ML_IPC_H */
diff --git a/src/include/cwrapper/magma/cwrapper/magma-client-c.h b/src/include/cwrapper/magma/cwrapper/magma-client-c.h
index bef1a13..9eed545 100644
--- a/src/include/cwrapper/magma/cwrapper/magma-client-c.h
+++ b/src/include/cwrapper/magma/cwrapper/magma-client-c.h
@@ -127,6 +127,34 @@ __attribute__((weak)) typedef struct MagmaColumn {
   int32_t id;
 } MagmaColumn;
 
+__attribute__((weak)) typedef int BackendId;
+
+__attribute__((weak)) typedef uint32_t LocalTransactionId;
+
+__attribute__((weak)) typedef struct VirtualTransactionId {
+  BackendId backendId;
+  LocalTransactionId localTransactionId;
+} VirtualTransactionId;
+
+__attribute__((weak)) typedef uint64_t MagmaTransactionId;
+
+__attribute__((weak)) typedef uint8_t TransactionStatus;
+
+__attribute__((weak)) struct MagmaRgIds;
+
+__attribute__((weak)) typedef struct MagmaRgIds MagmaRgIds;
+
+__attribute__((weak)) typedef struct MagmaTransactionState {
+  VirtualTransactionId
+      virtualTransactionId;  // used for 'magma lock', generated on magma
+  // client startTransaction
+  MagmaTransactionId transactionId;  // useless for read only transaction
+  uint32_t commandId;                // useless for read only transaction
+  TransactionStatus state;
+  MagmaRgIds *relatedRgIds;
+  MagmaTxnAction currentTransaction;
+} MagmaTransactionState;
+
 __attribute__((weak)) typedef struct MagmaReplicaGroup {
   uint32_t id;
   uint16_t port;
@@ -144,8 +172,6 @@ __attribute__((weak)) typedef void *MagmaTablePtr;
 __attribute__((weak)) typedef void *MagmaRangeDistPtr;
 __attribute__((weak)) typedef void *MagmaRangePtr;
 
-__attribute__((weak)) typedef struct MagmaTransactionState MagmaTransactionState;
-
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/include/cwrapper/univplan/cwrapper/univplan-c.h b/src/include/cwrapper/univplan/cwrapper/univplan-c.h
index 34f4c88..a52c829 100644
--- a/src/include/cwrapper/univplan/cwrapper/univplan-c.h
+++ b/src/include/cwrapper/univplan/cwrapper/univplan-c.h
@@ -72,7 +72,8 @@ __attribute__((weak)) void univPlanRangeTblEntryAddTable(UnivPlanC *up, uint64_t
                                    const char **columnName,
                                    int32_t *columnDataType,
                                    int64_t *columnDataTypeMod,
-                                   const char *targetName) {}
+                                   const char *targetName,
+                                   const char *tableName) {}
 __attribute__((weak)) void univPlanRangeTblEntryAddDummy(UnivPlanC *up) {}
 
 // construct interconnect info
diff --git a/src/include/optimizer/newPlanner.h b/src/include/optimizer/newPlanner.h
index 97bcdf6..6067169 100644
--- a/src/include/optimizer/newPlanner.h
+++ b/src/include/optimizer/newPlanner.h
@@ -41,6 +41,7 @@ extern char *new_executor_enable_partitioned_hashjoin_mode;
 extern char *new_executor_enable_external_sort_mode;
 extern int new_executor_partitioned_hash_recursive_depth_limit;
 extern int new_executor_ic_tcp_client_limit_per_query_per_segment;
+extern int new_executor_external_sort_memory_limit_size_mb;
 
 extern const char *new_executor_runtime_filter_mode;
 extern const char *new_executor_runtime_filter_mode_local;
diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h
index 99eec7c..776cf35 100644
--- a/src/include/tcop/utility.h
+++ b/src/include/tcop/utility.h
@@ -43,4 +43,6 @@ extern void CheckRelationOwnership(Oid relOid, bool noCatalogs);
 
 extern void DropErrorMsgNonExistent(const RangeVar *rel, char rightkind, bool missing_ok);
 
+extern void CanSupportIndex(IndexStmt *stmt, Oid relid);
+
 #endif   /* UTILITY_H */