You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by zt...@apache.org on 2021/12/13 08:42:32 UTC
[hawq] 01/01: HAWQ-1811. Sync with OushuDB - Phase II
This is an automated email from the ASF dual-hosted git repository.
ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git
commit 3bc605d2de98935eadde78ec46e5a5dba168bdcb
Author: ztao1987 <zh...@gmail.com>
AuthorDate: Mon Dec 13 16:41:57 2021 +0800
HAWQ-1811. Sync with OushuDB - Phase II
---
contrib/hornet/orc_debug_statistics.py | 11 +-
contrib/magma/magma.c | 153 ++++++++++++--------
src/backend/access/common/reloptions.c | 1 +
src/backend/access/transam/xact.c | 139 ++++++++++++------
src/backend/catalog/aoseg.c | 158 ++++++++++++++++++++-
src/backend/catalog/heap.c | 121 ++++++++--------
src/backend/catalog/index.c | 17 ++-
src/backend/catalog/pg_compression.c | 3 +-
src/backend/cdb/cdbdatalocality.c | 16 ++-
src/backend/cdb/cdbquerycontextdispatching.c | 4 +-
src/backend/cdb/dispatcher.c | 3 +
src/backend/cdb/dispatcher_new.c | 3 +
src/backend/cdb/motion/ic_udp.c | 39 +++--
src/backend/commands/analyze.c | 6 +-
src/backend/commands/copy.c | 7 +-
src/backend/commands/dbcommands.c | 5 +-
src/backend/commands/indexcmds.c | 36 ++---
src/backend/commands/tablecmds.c | 5 +-
src/backend/executor/execDML.c | 6 +-
src/backend/executor/execMain.c | 2 +-
src/backend/executor/nodeExternalscan.c | 2 +-
src/backend/optimizer/path/allpaths.c | 7 +-
src/backend/optimizer/plan/newPlanner.c | 121 +++++++++++++---
src/backend/parser/analyze.c | 5 +
src/backend/storage/buffer/bufmgr.c | 1 +
src/backend/tcop/utility.c | 143 +++++++++----------
src/backend/utils/adt/dbsize.c | 2 +-
src/backend/utils/gp/segadmin.c | 9 +-
src/backend/utils/misc/guc.c | 10 +-
src/include/access/orcsegfiles.h | 5 +
src/include/access/xact.h | 21 ++-
src/include/catalog/aoseg.h | 1 +
src/include/cdb/ml_ipc.h | 2 +
.../cwrapper/magma/cwrapper/magma-client-c.h | 27 +++-
src/include/optimizer/newPlanner.h | 1 +
src/include/tcop/utility.h | 2 +
36 files changed, 750 insertions(+), 344 deletions(-)
diff --git a/contrib/hornet/orc_debug_statistics.py b/contrib/hornet/orc_debug_statistics.py
index 3ddfb0d..eb61af6 100755
--- a/contrib/hornet/orc_debug_statistics.py
+++ b/contrib/hornet/orc_debug_statistics.py
@@ -81,10 +81,15 @@ col_l = len(col_name)
for i in range(0,col_l):
str_ans = str_ans.replace(' Column ' + str(i+1) + ' ',col_name[i])
-str_ans = str_ans.replace(',}}}',"}}]}")
+if stripe_num == 0:
+ str_ans = str_ans.replace(',}}}',"}}]}")
+else:
+ str_ans = str_ans.replace(',}}}',"}}}]}")
str_ans = str_ans.replace('" Stripe 0 ":','"Stripes":[')
-for i in range(1,stripe_num):
- str_ans = str_ans.replace('" Stripe {} "'.format(i),'},{"Stripe {}"'.format(i))
+for i in range(1,2):
+ str_ans = str_ans.replace('" Stripe {} "'.format(i),'}},{{"Stripe {}"'.format(i))
+for i in range(2,stripe_num+1):
+ str_ans = str_ans.replace('" Stripe {} "'.format(i),'}}}},{{"Stripe {}"'.format(i))
file_path = '"File_path":"' + sys.argv[2] + '",'
pre_json = str_ans[0] + file_path + str_ans[1:len(str_ans)]
print(sys.argv[1] + '|' + pre_json.replace(',}','}'))
diff --git a/contrib/magma/magma.c b/contrib/magma/magma.c
index 08bbf40..b7d6758 100644
--- a/contrib/magma/magma.c
+++ b/contrib/magma/magma.c
@@ -405,7 +405,8 @@ Datum magma_protocol_blocklocation(PG_FUNCTION_ARGS) {
fmt_name);
}
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
- MagmaTablePtr table = MagmaClientC_FetchTable(client, snapshot, useClientCacheDirectly);
+ MagmaClientC_SetupSnapshot(client, snapshot);
+ MagmaTablePtr table = MagmaClientC_FetchTable(client, useClientCacheDirectly);
magma_check_result(&client);
elog(LOG, "magma_protocol_blocklocation pass fetch table");
@@ -527,10 +528,11 @@ Datum magma_protocol_tablesize(PG_FUNCTION_ARGS) {
fmt_name);
}
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
+ MagmaClientC_SetupSnapshot(client, snapshot);
// set size of table in tp type to zero.
if (tableType == MAGMACLIENTC_TABLETYPE_AP) {
- tsdata->tablesize = MagmaClientC_GetTableSize(client, snapshot);
+ tsdata->tablesize = MagmaClientC_GetTableSize(client);
} else {
tsdata->tablesize = 0;
}
@@ -583,7 +585,8 @@ Datum magma_protocol_databasesize(PG_FUNCTION_ARGS) {
}
MagmaClientC_SetupDatabaseInfo(client, dbname);
- dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client, snapshot);
+ MagmaClientC_SetupSnapshot(client, snapshot);
+ dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client);
elog(LOG,"dbsize in magma.c is %llu", dbsdata->dbsize);
magma_check_result(&client);
@@ -837,7 +840,8 @@ Datum magma_createindex(PG_FUNCTION_ARGS) {
int16_t tableType = 0;
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
- MagmaClientC_CreateIndex(client, snapshot, magmaidx);
+ MagmaClientC_SetupSnapshot(client, snapshot);
+ MagmaClientC_CreateIndex(client, magmaidx);
magma_check_result(&client);
PG_RETURN_VOID();
}
@@ -861,7 +865,8 @@ Datum magma_dropindex(PG_FUNCTION_ARGS) {
int16_t tableType = 0;
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
- MagmaClientC_DropIndex(client, snapshot, indexname);
+ MagmaClientC_SetupSnapshot(client, snapshot);
+ MagmaClientC_DropIndex(client, indexname);
magma_check_result(&client);
PG_RETURN_VOID();
}
@@ -885,7 +890,8 @@ Datum magma_reindex_index(PG_FUNCTION_ARGS) {
int16_t tableType = 0;
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
- MagmaClientC_Reindex(client, snapshot, indexname);
+ MagmaClientC_SetupSnapshot(client, snapshot);
+ MagmaClientC_Reindex(client, indexname);
magma_check_result(&client);
PG_RETURN_VOID();
}
@@ -1029,7 +1035,8 @@ Datum magma_createtable(PG_FUNCTION_ARGS) {
}
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
- MagmaClientC_CreateTable(client, snapshot, ncols, cols);
+ MagmaClientC_SetupSnapshot(client, snapshot);
+ MagmaClientC_CreateTable(client, ncols, cols);
magma_check_result(&client);
pfree(cols);
list_free(pk_names);
@@ -1061,7 +1068,8 @@ Datum magma_droptable(PG_FUNCTION_ARGS) {
int16_t tableType = 0;
// for drop table, tableType won't be used in the process, set it as default
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
- MagmaClientC_DropTable(client, snapshot);
+ MagmaClientC_SetupSnapshot(client, snapshot);
+ MagmaClientC_DropTable(client);
magma_check_result(&client);
PG_RETURN_VOID();
@@ -3099,80 +3107,111 @@ Datum magma_transaction(PG_FUNCTION_ARGS) {
elog(ERROR, "failed to connect to magma service");
}
+ MagmaClientC_SetupSnapshot(client, pst->pst_transaction_snapshot);
+
switch (txn_command) {
- case PS_TXN_CMD_BEGIN: {
- int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
- MagmaTableFullName *magmaTableFullNames = (MagmaTableFullName *) palloc0(magmaTableFullNamesSize * sizeof(MagmaTableFullName));
- int i = 0;
- ListCell *lc;
- foreach (lc, ps->magma_talbe_full_names) {
- MagmaTableFullName* mtfn = lfirst(lc);
- magmaTableFullNames[i].databaseName = pstrdup(mtfn->databaseName);
- magmaTableFullNames[i].schemaName = pstrdup(mtfn->schemaName);
- magmaTableFullNames[i].tableName = pstrdup(mtfn->tableName);
- ++i;
- }
- pst->pst_transaction_dist =
- MagmaClientC_BeginTransaction(client, magmaTableFullNames, magmaTableFullNamesSize);
- for (int i = 0; i < magmaTableFullNamesSize; ++i) {
- pfree(magmaTableFullNames[i].databaseName);
- pfree(magmaTableFullNames[i].schemaName);
- pfree(magmaTableFullNames[i].tableName);
- }
- pfree(magmaTableFullNames);
- if (pst->pst_transaction_dist == NULL) {
- pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
- pst->pst_transaction_id = InvalidTransactionId;
- pst->pst_transaction_dist = NULL;
- elog(DEBUG1, "magma_transaction: begin snapshot: NULL");
- } else {
- elog(DEBUG1, "magma_transaction: begin snapshot: (%llu, %u, %llu, %u)",
- pst->pst_transaction_dist->currentTransaction.txnId,
- pst->pst_transaction_dist->currentTransaction.txnStatus,
- pst->pst_transaction_dist->txnActions.txnActionStartOffset,
- pst->pst_transaction_dist->txnActions.txnActionSize);
- }
+ case PS_TXN_CMD_START_TRANSACTION: {
+ pst->pst_transaction_state = MagmaClientC_StartTransaction(client);
+ pst->pst_transaction_snapshot = NULL;
+ pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+ pst->pst_transaction_id = InvalidTransactionId;
+ pst->pst_transaction_snapshot = NULL;
+ elog(DEBUG1, "magma_transaction: start transaction");
magma_check_result(&client);
break;
}
- case PS_TXN_CMD_COMMIT:
- if (pst->pst_transaction_dist == NULL) {
+ case PS_TXN_CMD_COMMIT_TRANSACTION:
+ if (pst->pst_transaction_snapshot == NULL) {
elog(DEBUG1, "magma_transaction: commit snapshot: NULL");
} else {
elog(DEBUG1,
"magma_transaction: commit snapshot: (%llu, %u, %llu, %u)",
- pst->pst_transaction_dist->currentTransaction.txnId,
- pst->pst_transaction_dist->currentTransaction.txnStatus,
- pst->pst_transaction_dist->txnActions.txnActionStartOffset,
- pst->pst_transaction_dist->txnActions.txnActionSize);
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
}
- MagmaClientC_CommitTransaction(client, pst->pst_transaction_dist);
+ MagmaClientC_CommitTransaction(client);
magma_check_result(&client);
break;
- case PS_TXN_CMD_ABORT:
- if (pst->pst_transaction_dist == NULL) {
+ case PS_TXN_CMD_ABORT_TRANSACTION:
+ if (pst->pst_transaction_snapshot == NULL) {
elog(DEBUG1, "magma_transaction: abort snapshot: NULL");
} else {
elog(DEBUG1,
"magma_transaction: abort snapshot: (%llu, %u, %llu, %u)",
- pst->pst_transaction_dist->currentTransaction.txnId,
- pst->pst_transaction_dist->currentTransaction.txnStatus,
- pst->pst_transaction_dist->txnActions.txnActionStartOffset,
- pst->pst_transaction_dist->txnActions.txnActionSize);
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
}
if (pst->pst_transaction_status != PS_TXN_STS_DEFAULT &&
pst->pst_transaction_id != InvalidTransactionId &&
- pst->pst_transaction_dist != NULL) {
- MagmaClientC_AbortTransaction(client, pst->pst_transaction_dist,
- PlugStorageGetIsCleanupAbort());
- pst->pst_transaction_dist = NULL;
+ pst->pst_transaction_snapshot != NULL) {
+ MagmaClientC_AbortTransaction(client, PlugStorageGetIsCleanupAbort());
+ pst->pst_transaction_snapshot = NULL;
pst->pst_transaction_id = InvalidTransactionId;
pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
magma_check_result(&client);
}
break;
+ case PS_TXN_CMD_GET_SNAPSHOT: {
+ MagmaClientC_CleanupTableInfo(client);
+ int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+ int i = 0;
+ ListCell *lc;
+ foreach (lc, ps->magma_talbe_full_names) {
+ MagmaTableFullName* mtfn = lfirst(lc);
+ MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+ mtfn->tableName, 0);
+ ++i;
+ }
+ pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+ if (pst->pst_transaction_snapshot == NULL) {
+ pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+ pst->pst_transaction_id = InvalidTransactionId;
+ pst->pst_transaction_snapshot = NULL;
+ elog(DEBUG1, "magma_transaction: get snapshot: NULL");
+ } else {
+ elog(DEBUG1, "magma_transaction: get snapshot: (%llu, %u, %llu, %u)",
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
+ }
+ magma_check_result(&client);
+ break;
+ }
+ case PS_TXN_CMD_GET_TRANSACTIONID: {
+ MagmaClientC_CleanupTableInfo(client);
+ int magmaTableFullNamesSize = list_length(ps->magma_talbe_full_names);
+ int i = 0;
+ ListCell *lc;
+ foreach (lc, ps->magma_talbe_full_names) {
+ MagmaTableFullName* mtfn = lfirst(lc);
+ MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
+ mtfn->tableName, 0);
+ ++i;
+ }
+ pst->pst_transaction_state = MagmaClientC_GetTransctionId(client);
+ pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
+ if (pst->pst_transaction_snapshot == NULL) {
+ pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
+ pst->pst_transaction_id = InvalidTransactionId;
+ pst->pst_transaction_snapshot = NULL;
+ elog(DEBUG1, "magma_transaction: get transaction state: NULL");
+ } else {
+ elog(DEBUG1, "magma_transaction: get transaction state: (%llu, %u, %llu, %u)",
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
+ }
+ magma_check_result(&client);
+ break;
+ }
default:
elog(ERROR, "Transaction command for magma is invalid %d", txn_command);
break;
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 976fc71..c0f9236 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -545,6 +545,7 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
// XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
// and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
&& (strcmp(compresstype, "zlib") != 0)
+ && (strcmp(compresstype, "zstd") != 0)
&& (strcmp(compresstype, "none") != 0))
{
ereport(ERROR,
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 393852d..3762036 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -270,7 +270,7 @@ static PlugStorageTransactionData TopPlugStorageTransactionData = {
.pst_transaction_id = InvalidTransactionId,/* transaction id */
.pst_transaction_status = PS_TXN_STS_DEFAULT, /* transaction status */
.pst_transaction_command = PS_TXN_CMD_INVALID, /* transaction command */
- .pst_transaction_dist = NULL /* magma transaction info */
+ .pst_transaction_snapshot = NULL /* magma transaction info */
};
static PlugStorageTransaction TopPlugStorageTransaction = &TopPlugStorageTransactionData;
@@ -372,12 +372,6 @@ PlugStorageGetTransactionStatus(void)
return TopPlugStorageTransaction->pst_transaction_status;
}
-MagmaSnapshot *
-PlugStorageGetTransactionSnapshot(void)
-{
- return TopPlugStorageTransaction->pst_transaction_dist;
-}
-
void PlugStorageSetIsCleanupAbort(bool isCleanup)
{
isCleanupAbortTransaction = isCleanup;
@@ -397,52 +391,54 @@ extern void PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot)
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
- Insist(TopPlugStorageTransaction->pst_transaction_dist != NULL);
+ Insist(TopPlugStorageTransaction->pst_transaction_snapshot != NULL);
}
else if (Gp_role == GP_ROLE_EXECUTE &&
- TopPlugStorageTransaction->pst_transaction_dist == NULL)
+ TopPlugStorageTransaction->pst_transaction_snapshot == NULL)
{
- TopPlugStorageTransaction->pst_transaction_dist = malloc(sizeof(MagmaSnapshot));
- memset(TopPlugStorageTransaction->pst_transaction_dist, 0, sizeof(MagmaSnapshot));
- TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId = 0;
- TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus = 0;
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset = 0;
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions = NULL;
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize = 0;
+ TopPlugStorageTransaction->pst_transaction_snapshot = malloc(sizeof(MagmaSnapshot));
+ memset(TopPlugStorageTransaction->pst_transaction_snapshot, 0, sizeof(MagmaSnapshot));
+ TopPlugStorageTransaction->pst_transaction_snapshot
+ ->currentTransaction.txnId = 0;
+ TopPlugStorageTransaction->pst_transaction_snapshot
+ ->currentTransaction.txnStatus = 0;
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset = 0;
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions = NULL;
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize = 0;
}
// set current transaction for current snapshot
- TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnId =
+ TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnId =
snapshot->currentTransaction.txnId;
- TopPlugStorageTransaction->pst_transaction_dist->currentTransaction.txnStatus =
+ TopPlugStorageTransaction->pst_transaction_snapshot->currentTransaction.txnStatus =
snapshot->currentTransaction.txnStatus;
// set command id
- TopPlugStorageTransaction->pst_transaction_dist->cmdIdInTransaction =
+ TopPlugStorageTransaction->pst_transaction_snapshot
+ ->cmdIdInTransaction =
snapshot->cmdIdInTransaction;
// reallocate memory for visible HLC map for current snapshot
- free(TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions);
+ free(TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions);
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionStartOffset =
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionStartOffset =
snapshot->txnActions.txnActionStartOffset;
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions =
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions =
malloc(sizeof(MagmaTxnAction) *snapshot->txnActions.txnActionSize);
// set visible HLC map for current snapshot
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActionSize =
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActionSize =
snapshot->txnActions.txnActionSize;
for (int i = 0; i < snapshot->txnActions.txnActionSize; ++i)
{
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnId =
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnId =
snapshot->txnActions.txnActions[i].txnId;
- TopPlugStorageTransaction->pst_transaction_dist->txnActions.txnActions[i].txnStatus =
+ TopPlugStorageTransaction->pst_transaction_snapshot->txnActions.txnActions[i].txnStatus =
snapshot->txnActions.txnActions[i].txnStatus;
}
}
-void
-PlugStorageBeginTransaction(List* magmaTableFullNames)
+void PlugStorageStartTransaction(List* magmaTableFullNames)
{
if ((Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode())
{
@@ -460,13 +456,9 @@ PlugStorageBeginTransaction(List* magmaTableFullNames)
}
Assert(pst->pst_transaction_status == PS_TXN_STS_DEFAULT);
- pst->pst_transaction_command = PS_TXN_CMD_BEGIN;
+ pst->pst_transaction_command =
+ PS_TXN_CMD_START_TRANSACTION;
InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
- elog(DEBUG1, "PS TXN: BEGIN (%llu, %u, %llu, %u)",
- pst->pst_transaction_dist->currentTransaction.txnId,
- pst->pst_transaction_dist->currentTransaction.txnStatus,
- pst->pst_transaction_dist->txnActions.txnActionStartOffset,
- pst->pst_transaction_dist->txnActions.txnActionSize);
pst->pst_transaction_id = GetTopTransactionId();
pst->pst_transaction_status = PS_TXN_STS_STARTED;
}
@@ -492,14 +484,15 @@ PlugStorageCommitTransaction(void)
Assert(pst->pst_transaction_id == GetTopTransactionId());
Assert(pst->pst_transaction_status == PS_TXN_STS_STARTED);
- pst->pst_transaction_command = PS_TXN_CMD_COMMIT;
+ pst->pst_transaction_command =
+ PS_TXN_CMD_COMMIT_TRANSACTION;
elog(DEBUG1, "PS TXN: COMMIT (%llu, %u, %llu, %u)",
- pst->pst_transaction_dist->currentTransaction.txnId,
- pst->pst_transaction_dist->currentTransaction.txnStatus,
- pst->pst_transaction_dist->txnActions.txnActionStartOffset,
- pst->pst_transaction_dist->txnActions.txnActionSize);
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
InvokePlugStorageFormatTransaction(pst, NULL);
- pst->pst_transaction_dist = NULL;
+ pst->pst_transaction_snapshot = NULL;
pst->pst_transaction_id = InvalidTransactionId;
pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
}
@@ -525,20 +518,74 @@ PlugStorageAbortTransaction(void)
Assert(pst->pst_transaction_id == GetTopTransactionId());
Assert(pst->pst_transaction_status == PS_TXN_STS_STARTED);
- pst->pst_transaction_command = PS_TXN_CMD_ABORT;
+ pst->pst_transaction_command =
+ PS_TXN_CMD_ABORT_TRANSACTION;
elog(DEBUG1, "PS TXN: ABORT (%llu, %u, llu, %u)",
- pst->pst_transaction_dist->currentTransaction.txnId,
- pst->pst_transaction_dist->currentTransaction.txnStatus,
- pst->pst_transaction_dist->txnActions.txnActionStartOffset,
- pst->pst_transaction_dist->txnActions.txnActionSize);
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
InvokePlugStorageFormatTransaction(pst, NULL);
- pst->pst_transaction_dist = NULL;
+ pst->pst_transaction_snapshot = NULL;
pst->pst_transaction_id = InvalidTransactionId;
pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
}
}
}
+
+MagmaSnapshot *
+PlugStorageGetTransactionSnapshot(List* magmaTableFullNames)
+{
+ PlugStorageTransaction pst = TopPlugStorageTransaction;
+ if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+ (pst->pst_transaction_snapshot == NULL) &&
+ (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+ if (!OidIsValid(pst->pst_proc_oid)) {
+ pst->pst_proc_oid =
+ LookupPlugStorageValidatorFunc("magma", "transaction");
+ Assert(OidIsValid(pst->pst_proc_oid));
+
+ fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+ }
+
+ pst->pst_transaction_command = PS_TXN_CMD_GET_SNAPSHOT;
+ InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+ elog(DEBUG1, "PS TXN: GET SNAPSHOT (%llu, %u, %llu, %u)",
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
+ pst->pst_transaction_id = GetTopTransactionId();
+ }
+ return TopPlugStorageTransaction->pst_transaction_snapshot;
+}
+
+void PlugStorageGetTransactionId(List* magmaTableFullNames)
+{
+ PlugStorageTransaction pst = TopPlugStorageTransaction;
+ if (pst->pst_transaction_status == PS_TXN_STS_STARTED &&
+ pst->pst_transaction_state->currentTransaction.txnId == 0 &&
+ (Gp_role == GP_ROLE_DISPATCH) && IsNormalProcessingMode()) {
+ if (!OidIsValid(pst->pst_proc_oid)) {
+ pst->pst_proc_oid =
+ LookupPlugStorageValidatorFunc("magma", "transaction");
+ Assert(OidIsValid(pst->pst_proc_oid));
+
+ fmgr_info(pst->pst_proc_oid, &(pst->pst_transaction_fmgr_info));
+ }
+
+ pst->pst_transaction_command = PS_TXN_CMD_GET_TRANSACTIONID;
+ InvokePlugStorageFormatTransaction(pst, magmaTableFullNames);
+ elog(DEBUG1, "PS TXN: GET TRANSACTION ID (%llu, %u, %llu, %u)",
+ pst->pst_transaction_snapshot->currentTransaction.txnId,
+ pst->pst_transaction_snapshot->currentTransaction.txnStatus,
+ pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
+ pst->pst_transaction_snapshot->txnActions.txnActionSize);
+ pst->pst_transaction_id = GetTopTransactionId();
+ }
+}
+
/* ----------------------------------------------------------------
* transaction state accessors
* ----------------------------------------------------------------
@@ -2456,7 +2503,7 @@ StartTransaction(void)
/*
* begin transaction in magma service now
*/
- /* PlugStorageBeginTransaction(); */
+ /* PlugStorageStartTransaction(); */
}
/*
diff --git a/src/backend/catalog/aoseg.c b/src/backend/catalog/aoseg.c
index b7b93fd..7e88b91 100644
--- a/src/backend/catalog/aoseg.c
+++ b/src/backend/catalog/aoseg.c
@@ -56,7 +56,9 @@
#include "cdb/cdbvars.h"
static bool create_aoseg_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid);
+static bool create_aoseg_index_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid);
static bool needs_aoseg_table(Relation rel);
+static bool needs_aoseg_index_table(Relation rel);
/*
* AlterTableCreateAoSegTable
@@ -106,6 +108,19 @@ AlterTableCreateAoSegTableWithOid(Oid relOid, Oid newOid, Oid newIndexOid,
heap_close(rel, NoLock);
}
+void
+AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool is_part_child)
+{
+ Relation rel;
+ Assert(!is_part_child);
+ rel = heap_open(relOid, AccessShareLock);
+
+ /* create_aoseg_index_table does all the work */
+ (void) create_aoseg_index_table(rel, InvalidOid, InvalidOid, NULL);
+
+ heap_close(rel, AccessShareLock);
+}
+
/*
* create_aoseg_table --- internal workhorse
*
@@ -310,6 +325,139 @@ create_aoseg_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptype
return true;
}
+static bool
+create_aoseg_index_table(Relation rel, Oid aosegOid, Oid aosegIndexOid, Oid * comptypeOid)
+{
+ Oid relOid = RelationGetRelid(rel);
+ TupleDesc tupdesc;
+ bool shared_relation;
+ Oid blkdirrelid;
+ Oid blkdiridxid;
+ char aoseg_relname[NAMEDATALEN];
+ char aoseg_idxname[NAMEDATALEN];
+ IndexInfo *indexInfo;
+ Oid classObjectId[2];
+ ObjectAddress baseobject,
+ aosegobject;
+ Oid tablespaceOid = ChooseTablespaceForLimitedObject(rel->rd_rel->reltablespace);
+
+ /*
+ * Check to see whether the table actually needs an aoseg index table.
+ */
+ if (!needs_aoseg_index_table(rel))
+ return false;
+
+ shared_relation = rel->rd_rel->relisshared;
+
+ /* can't have shared AO tables after initdb */
+ /* TODO: disallow it at CREATE TABLE time */
+ Assert(!(shared_relation && !IsBootstrapProcessingMode()) );
+
+ GetAppendOnlyEntryAuxOids(relOid, SnapshotNow, NULL, NULL, &blkdirrelid, &blkdiridxid);
+
+ /*
+ * Was a aoseg index table already created?
+ */
+ if (blkdirrelid != InvalidOid)
+ {
+ return false;
+ }
+
+ snprintf(aoseg_relname, sizeof(aoseg_relname), "pg_orcseg_idx_%u",
+ relOid);
+ snprintf(aoseg_idxname, sizeof(aoseg_idxname), "pg_orcseg_idx_%u_index",
+ relOid);
+
+ tupdesc = CreateTemplateTupleDesc(Natts_pg_orcseg_idx, false);
+
+ TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_idxoid, "idxoid",
+ INT4OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_segno, "segno",
+ INT4OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber)Anum_pg_orcseg_idx_eof, "eof",
+ FLOAT8OID, -1, 0);
+
+ blkdirrelid = heap_create_with_catalog(aoseg_relname,
+ PG_AOSEGMENT_NAMESPACE,
+ tablespaceOid,
+ aosegOid,
+ rel->rd_rel->relowner,
+ tupdesc,
+ /* relam */ InvalidOid,
+ RELKIND_AOSEGMENTS,
+ RELSTORAGE_HEAP,
+ shared_relation,
+ true,
+ /* bufferPoolBulkLoad */ false,
+ 0,
+ ONCOMMIT_NOOP,
+ NULL, /* CDB POLICY */
+ (Datum) 0,
+ true,
+ comptypeOid,
+ /* persistentTid */ NULL,
+ /* persistentSerialNum */ NULL,
+ /* formattername */ NULL);
+
+ /* make the toast relation visible, else index creation will fail */
+ CommandCounterIncrement();
+
+ /*
+ * Create unique index on index oid.
+ */
+ indexInfo = makeNode(IndexInfo);
+ indexInfo->ii_NumIndexAttrs = 1;
+ indexInfo->ii_NumIndexKeyAttrs = 1;
+ indexInfo->ii_KeyAttrNumbers[0] = 1;
+ indexInfo->ii_Expressions = NIL;
+ indexInfo->ii_ExpressionsState = NIL;
+ indexInfo->ii_Predicate = NIL;
+ indexInfo->ii_PredicateState = NIL;
+ indexInfo->ii_Unique = true;
+ indexInfo->ii_Concurrent = false;
+
+ classObjectId[0] = INT4_BTREE_OPS_OID;
+ classObjectId[1] = INT4_BTREE_OPS_OID;
+
+ blkdiridxid = index_create(blkdirrelid, aoseg_idxname, aosegIndexOid,
+ indexInfo,
+ BTREE_AM_OID,
+ tablespaceOid,
+ classObjectId, (Datum) 0,
+ true, false, (Oid *) NULL, true, false, false, NULL);
+
+ /* Unlock target table -- no one can see it */
+ UnlockRelationOid(blkdirrelid, ShareLock);
+ /* Unlock the index -- no one can see it anyway */
+ UnlockRelationOid(blkdiridxid, AccessExclusiveLock);
+
+ /*
+ * Store the aoseg table's OID in the parent relation's pg_appendonly row
+ */
+ UpdateAppendOnlyEntryAuxOids(relOid, InvalidOid, InvalidOid, blkdirrelid, blkdiridxid);
+
+ /*
+ * Register dependency from the aoseg table to the master, so that the
+ * aoseg table will be deleted if the master is.
+ */
+ baseobject.classId = RelationRelationId;
+ baseobject.objectId = relOid;
+ baseobject.objectSubId = 0;
+ aosegobject.classId = RelationRelationId;
+ aosegobject.objectId = blkdirrelid;
+ aosegobject.objectSubId = 0;
+
+ recordDependencyOn(&aosegobject, &baseobject, DEPENDENCY_INTERNAL);
+
+ /*
+ * Make changes visible
+ */
+ CommandCounterIncrement();
+
+ return true;
+}
+
+
/*
* Check to see whether the table needs an aoseg table. It does only if it is
* an append-only relation.
@@ -320,4 +468,12 @@ needs_aoseg_table(Relation rel)
return RelationIsAo(rel);
}
-
+/*
+ * Check to see whether the table needs an aoseg index table. It does only if it is
+ * an append-only orc relation.
+ */
+static bool
+needs_aoseg_index_table(Relation rel)
+{
+ return RelationIsOrc(rel);
+}
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index e7e3887..e13b97b 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -129,7 +129,7 @@ extern void PQclear(PGresult *res);
static void MetaTrackAddUpdInternal(cqContext *pcqCtx,
Oid classid,
- Oid objoid,
+ Oid objoid,
Oid relowner,
char* actionname,
char* subtype,
@@ -546,14 +546,14 @@ heap_create(const char *relname,
rel->rd_segfile0_relationnodeinfo.persistentSerialNum);
heap_close(gp_relation_node, RowExclusiveLock);
}
-#endif
+#endif
}
if (Debug_check_for_invalid_persistent_tid &&
!Persistent_BeforePersistenceWork() &&
PersistentStore_IsZeroTid(&rel->rd_relationnodeinfo.persistentTid))
- {
- elog(ERROR,
+ {
+ elog(ERROR,
"setNewRelfilenodeCommon has invalid TID (0,0) into relation %u/%u/%u '%s', serial number " INT64_FORMAT,
rel->rd_node.spcNode,
rel->rd_node.dbNode,
@@ -563,7 +563,7 @@ heap_create(const char *relname,
}
if (Debug_persistent_print)
- elog(Persistent_DebugPrintLevel(),
+ elog(Persistent_DebugPrintLevel(),
"heap_create: '%s', Append-Only '%s', persistent TID %s and serial number " INT64_FORMAT " for CREATE",
relpath(rel->rd_node),
(isAppendOnly ? "true" : "false"),
@@ -917,8 +917,8 @@ static void MetaTrackAddUpdInternal(cqContext *pcqCtx,
} /* end MetaTrackAddUpdInternal */
-void MetaTrackAddObject(Oid classid,
- Oid objoid,
+void MetaTrackAddObject(Oid classid,
+ Oid objoid,
Oid relowner,
char* actionname,
char* subtype)
@@ -959,8 +959,8 @@ void MetaTrackAddObject(Oid classid,
} /* end MetaTrackAddObject */
-void MetaTrackUpdObject(Oid classid,
- Oid objoid,
+void MetaTrackUpdObject(Oid classid,
+ Oid objoid,
Oid relowner,
char* actionname,
char* subtype)
@@ -1015,7 +1015,7 @@ void MetaTrackUpdObject(Oid classid,
classid, objoid, relowner,
actionname, subtype,
rel, tuple);
-
+
/* CommandCounterIncrement(); */
ii++;
@@ -1026,14 +1026,14 @@ void MetaTrackUpdObject(Oid classid,
/* add it if it didn't already exist */
if (!ii)
- MetaTrackAddObject(classid,
- objoid,
+ MetaTrackAddObject(classid,
+ objoid,
relowner,
actionname,
subtype);
} /* end MetaTrackUpdObject */
-void MetaTrackDropObject(Oid classid,
+void MetaTrackDropObject(Oid classid,
Oid objoid)
{
int ii = 0;
@@ -1307,7 +1307,7 @@ AddNewRelationTuple(Relation pg_class_desc,
/* NOTE: look at cdb_estimate_rel_size() if changing these values */
if(relstorage_is_external(relstorage))
{
- new_rel_reltup->relpages = gp_external_table_default_number_of_pages;
+ new_rel_reltup->relpages = gp_external_table_default_number_of_pages;
new_rel_reltup->reltuples = gp_external_table_default_number_of_tuples;
}
break;
@@ -1430,8 +1430,8 @@ InsertGpRelfileNodeTuple(
if (Debug_check_for_invalid_persistent_tid &&
!Persistent_BeforePersistenceWork() &&
PersistentStore_IsZeroTid(persistentTid))
- {
- elog(ERROR,
+ {
+ elog(ERROR,
"Inserting with invalid TID (0,0) into relation id %u '%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT,
relationId,
relname,
@@ -1444,7 +1444,7 @@ InsertGpRelfileNodeTuple(
memset(nulls, false, sizeof(nulls));
if (Debug_persistent_print)
- elog(Persistent_DebugPrintLevel(),
+ elog(Persistent_DebugPrintLevel(),
"InsertGpRelationNodeTuple: Inserting into relation id %u '%s', relfilenode %u, segment file #%d, serial number " INT64_FORMAT ", TID %s",
relationId,
relname,
@@ -1499,8 +1499,8 @@ UpdateGpRelfileNodeTuple(
if (Debug_check_for_invalid_persistent_tid &&
!Persistent_BeforePersistenceWork() &&
PersistentStore_IsZeroTid(persistentTid))
- {
- elog(ERROR,
+ {
+ elog(ERROR,
"Updating with invalid TID (0,0) in relfilenode %u, segment file #%d, serial number " INT64_FORMAT,
relfilenode,
segmentFileNum,
@@ -1508,7 +1508,7 @@ UpdateGpRelfileNodeTuple(
}
if (Debug_persistent_print)
- elog(Persistent_DebugPrintLevel(),
+ elog(Persistent_DebugPrintLevel(),
"UpdateGpRelationNodeTuple: Updating relfilenode %u, segment file #%d, serial number " INT64_FORMAT " at TID %s",
relfilenode,
segmentFileNum,
@@ -1521,7 +1521,7 @@ UpdateGpRelfileNodeTuple(
repl_repl[Anum_gp_relfile_node_relfilenode_oid - 1] = true;
repl_val[Anum_gp_relfile_node_relfilenode_oid - 1] = ObjectIdGetDatum(relfilenode);
-
+
repl_repl[Anum_gp_relfile_node_segment_file_num - 1] = true;
repl_val[Anum_gp_relfile_node_segment_file_num - 1] = Int32GetDatum(segmentFileNum);
@@ -1529,12 +1529,12 @@ UpdateGpRelfileNodeTuple(
repl_repl[Anum_gp_relfile_node_persistent_tid- 1] = true;
repl_val[Anum_gp_relfile_node_persistent_tid- 1] = PointerGetDatum(persistentTid);
-
+
repl_repl[Anum_gp_relfile_node_persistent_serial_num - 1] = true;
repl_val[Anum_gp_relfile_node_persistent_serial_num - 1] = Int64GetDatum(persistentSerialNum);
newtuple = heap_modify_tuple(tuple, RelationGetDescr(gp_relfile_node), repl_val, repl_null, repl_repl);
-
+
simple_heap_update(gp_relfile_node, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(gp_relfile_node, newtuple);
@@ -1559,7 +1559,7 @@ AddNewRelfileNodeTuple(
/* updateIndex */ true,
&new_rel->rd_relationnodeinfo.persistentTid,
new_rel->rd_relationnodeinfo.persistentSerialNum);
-
+
}
}
@@ -1603,7 +1603,7 @@ heap_create_with_catalog(const char *relname,
pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
- // When creating gp_persistent_relfile_node, we can't directly insert meta info into gp_relfile_node
+ // When creating gp_persistent_relfile_node, we can't directly insert meta info into gp_relfile_node
// for this table is renamed from gp_relation_node, also it's schema changed.
if (IsBootstrapProcessingMode()|| (gp_upgrade_mode && GpPersistent_IsPersistentRelation(relid)))
gp_relfile_node_desc = NULL;
@@ -1630,7 +1630,7 @@ heap_create_with_catalog(const char *relname,
}
relstorage = stdRdOptions->columnstore;
}
-
+
if (IsBuiltinTablespace(reltablespace) && appendOnlyRel) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1669,7 +1669,7 @@ heap_create_with_catalog(const char *relname,
stdRdOptions->columnstore);
/* MPP-8058: disallow OIDS on column-oriented tables */
- if (tupdesc->tdhasoid &&
+ if (tupdesc->tdhasoid &&
IsNormalProcessingMode() &&
(Gp_role == GP_ROLE_DISPATCH))
{
@@ -1982,11 +1982,11 @@ heap_create_with_catalog(const char *relname,
}
/* MPP-7576: don't track internal namespace tables */
- switch (relnamespace)
+ switch (relnamespace)
{
case PG_CATALOG_NAMESPACE:
/* MPP-7773: don't track objects in system namespace
- * if modifying system tables (eg during upgrade)
+ * if modifying system tables (eg during upgrade)
*/
if (allowSystemTableModsDDL)
doIt = false;
@@ -2330,7 +2330,7 @@ RemoveAttrDefault(Oid relid, AttrNumber attnum,
object.objectSubId = 0;
performDeletion(&object, behavior);
-
+
found = true;
}
@@ -2429,9 +2429,9 @@ remove_gp_relation_node_and_schedule_drop(
Relation rel)
{
PersistentFileSysRelStorageMgr relStorageMgr;
-
+
if (Debug_persistent_print)
- elog(Persistent_DebugPrintLevel(),
+ elog(Persistent_DebugPrintLevel(),
"remove_gp_relation_node_and_schedule_drop: dropping relation '%s', relation id %u '%s', relfilenode %u",
rel->rd_rel->relname.data,
rel->rd_id,
@@ -2449,9 +2449,9 @@ remove_gp_relation_node_and_schedule_drop(
DeleteGpRelfileNodeTuple(
rel,
/* segmentFileNum */ 0);
-
+
if (Debug_persistent_print)
- elog(Persistent_DebugPrintLevel(),
+ elog(Persistent_DebugPrintLevel(),
"remove_gp_relation_node_and_schedule_drop: For Buffer Pool managed relation '%s' persistent TID %s and serial number " INT64_FORMAT " for DROP",
relpath(rel->rd_node),
ItemPointerToString(&rel->rd_relationnodeinfo.persistentTid),
@@ -2465,7 +2465,7 @@ remove_gp_relation_node_and_schedule_drop(
int32 segmentFileNum;
ItemPointerData persistentTid;
int64 persistentSerialNum;
-
+
relNodeRelation = heap_open(GpRelfileNodeRelationId, RowExclusiveLock);
GpRelfileNodeBeginScan(
@@ -2473,7 +2473,7 @@ remove_gp_relation_node_and_schedule_drop(
rel->rd_id,
rel->rd_rel->relfilenode,
&gpRelfileNodeScan);
-
+
while ((tuple = GpRelfileNodeGetNext(
&gpRelfileNodeScan,
&segmentFileNum,
@@ -2481,16 +2481,16 @@ remove_gp_relation_node_and_schedule_drop(
&persistentSerialNum)))
{
if (Debug_persistent_print)
- elog(Persistent_DebugPrintLevel(),
+ elog(Persistent_DebugPrintLevel(),
"remove_gp_relation_node_and_schedule_drop: For Append-Only relation %u relfilenode %u scanned segment file #%d, serial number " INT64_FORMAT " at TID %s for DROP",
rel->rd_id,
rel->rd_rel->relfilenode,
segmentFileNum,
persistentSerialNum,
ItemPointerToString(&persistentTid));
-
+
simple_heap_delete(relNodeRelation, &tuple->t_self);
-
+
MirroredFileSysObj_ScheduleDropAppendOnlyFile(
&rel->rd_node,
segmentFileNum,
@@ -2498,9 +2498,9 @@ remove_gp_relation_node_and_schedule_drop(
&persistentTid,
persistentSerialNum);
}
-
+
GpRelfileNodeEndScan(&gpRelfileNodeScan);
-
+
heap_close(relNodeRelation, RowExclusiveLock);
/*
@@ -2682,8 +2682,9 @@ heap_drop_with_catalog(Oid relid)
// start transaction in magma for DROP TABLE
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
// drop table in magma now
@@ -2693,7 +2694,7 @@ heap_drop_with_catalog(Oid relid)
database_name,
schema_name,
table_name,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
ReadCacheHashEntryReviseOnCommit(RelationGetRelid(rel), true);
}
@@ -2718,7 +2719,7 @@ heap_drop_with_catalog(Oid relid)
if (is_foreign_rel)
RemoveForeignTableEntry(relid);
-
+
/*
* delete distribution policy if present
*/
@@ -2815,12 +2816,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
adrcqCtx = caql_beginscan(
caql_addrel(cqclr(&cqc), adrel),
- cql("INSERT INTO pg_attrdef ",
+ cql("INSERT INTO pg_attrdef ",
NULL));
if (Debug_check_for_invalid_persistent_tid)
- {
- elog(LOG,
+ {
+ elog(LOG,
"StoreAttrDefault[1] relation %u/%u/%u '%s', isPresent %s, serial number " INT64_FORMAT ", TID %s",
adrel->rd_node.spcNode,
adrel->rd_node.dbNode,
@@ -2835,8 +2836,8 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin)
RelationFetchGpRelationNodeForXLog(adrel);
if (Debug_check_for_invalid_persistent_tid)
- {
- elog(LOG,
+ {
+ elog(LOG,
"StoreAttrDefault[2] relation %u/%u/%u '%s', isPresent %s, serial number " INT64_FORMAT ", TID %s",
adrel->rd_node.spcNode,
adrel->rd_node.dbNode,
@@ -3518,7 +3519,7 @@ RemoveStatistics(Oid relid, AttrNumber attnum)
" WHERE starelid = :1 "
" AND staattnum = :2 ",
ObjectIdGetDatum(relid),
- Int16GetDatum(attnum)));
+ Int16GetDatum(attnum)));
}
}
@@ -3550,7 +3551,7 @@ RelationTruncateIndexes(Relation heapRelation)
/* Now truncate the actual file (and discard buffers) */
RelationTruncate(
- currentIndex,
+ currentIndex,
0,
/* markPersistentAsPhysicallyTruncated */ true);
@@ -3636,7 +3637,7 @@ heap_truncate(List *relids)
/* Truncate the actual file (and discard buffers) */
RelationTruncate(
- rel,
+ rel,
0,
/* markPersistentAsPhysicallyTruncated */ false);
@@ -3888,7 +3889,7 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
newrnode.relNode = newrelfilenode;
isAppendOnly = RelationIsAo(relation);
-
+
relname = RelationGetRelationName(relation);
if (!isAppendOnly)
@@ -3897,7 +3898,7 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
PersistentFileSysRelStorageMgr localRelStorageMgr;
PersistentFileSysRelBufpoolKind relBufpoolKind;
-
+
GpPersistentRelfileNode_GetRelfileInfo(
relation->rd_rel->relkind,
relation->rd_rel->relstorage,
@@ -3905,9 +3906,9 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
&localRelStorageMgr,
&relBufpoolKind);
Assert(localRelStorageMgr == PersistentFileSysRelStorageMgr_BufferPool);
-
+
srel = smgropen(newrnode);
-
+
MirroredFileSysObj_TransactionCreateBufferPoolFile(
srel,
relBufpoolKind,
@@ -3931,8 +3932,8 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
if (Debug_check_for_invalid_persistent_tid &&
!Persistent_BeforePersistenceWork() &&
PersistentStore_IsZeroTid(&relation->rd_relationnodeinfo.persistentTid))
- {
- elog(ERROR,
+ {
+ elog(ERROR,
"setNewRelfilenodeCommon has invalid TID (0,0) for relation %u/%u/%u '%s', serial number " INT64_FORMAT,
newrnode.spcNode,
newrnode.dbNode,
@@ -3942,9 +3943,9 @@ setNewRelfilenodeCommon(Relation relation, Oid newrelfilenode)
}
relation->rd_relationnodeinfo.isPresent = true;
-
+
if (Debug_persistent_print)
- elog(Persistent_DebugPrintLevel(),
+ elog(Persistent_DebugPrintLevel(),
"setNewRelfilenodeCommon: NEW '%s', Append-Only '%s', persistent TID %s and serial number " INT64_FORMAT,
relpath(newrnode),
(isAppendOnly ? "true" : "false"),
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9b4762e..7747e5f 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -977,8 +977,9 @@ index_create(Oid heapRelationId,
// 2. start transaction in magma for CREATE INDEX
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
// 3. call InvokeMagmaCreateIndex
@@ -1006,7 +1007,7 @@ index_create(Oid heapRelationId,
InvokeMagmaCreateIndex(
&procInfo, database_name, schemaname,
tablename, &idxinfo,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
// free memory
pfree(idxinfo.indkey);
}
@@ -1281,8 +1282,9 @@ index_drop(Oid indexId)
// 2. start transaction in magma for DROP INDEX
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
// 3. call InvokeMagmaDropIndex
@@ -1302,7 +1304,7 @@ index_drop(Oid indexId)
InvokeMagmaDropIndex(
&procInfo, database_name, schemaname,
tablename, indexName,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
}
/*
@@ -1475,7 +1477,7 @@ index_update_stats(Relation rel, bool hasindex, bool isprimary,
* this relpages are only needed by QE,
* when this is a magma table, just ignore this info.
*/
- if (!((RelationIsExternal(rel) && RelationIsMagmaTable(rel->rd_id))))
+ if (!RelationIsMagmaTable2(rel->rd_id))
relpages = RelationGetNumberOfBlocks(rel);
Oid relid = RelationGetRelid(rel);
Relation pg_class;
@@ -2846,8 +2848,9 @@ reindex_index(Oid indexId, Oid newrelfilenode, List **extra_oids)
// 2. start transaction in magma for REINDEX INDEX
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
// 3. call InvokeMagmaReindexIndex
@@ -2867,7 +2870,7 @@ reindex_index(Oid indexId, Oid newrelfilenode, List **extra_oids)
InvokeMagmaReindexIndex(
&procInfo, database_name, schemaname,
tablename, indexName,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
}
/* Close rels, but keep locks */
diff --git a/src/backend/catalog/pg_compression.c b/src/backend/catalog/pg_compression.c
index 1a10d09..4df1427 100644
--- a/src/backend/catalog/pg_compression.c
+++ b/src/backend/catalog/pg_compression.c
@@ -582,7 +582,8 @@ compresstype_is_valid(char *comptype)
{
if(strcmp(comptype, "snappy") == 0 ||
strcmp(comptype, "gzip") == 0 ||
- strcmp(comptype, "lz4") == 0)
+ strcmp(comptype, "lz4") == 0 ||
+ strcmp(comptype, "zstd") == 0)
found = true;
}
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index 278103b..0e908fc 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -1073,9 +1073,16 @@ int64 get_block_locations_and_calculate_table_size(split_to_segment_mapping_cont
// start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(magmaTableFullNames);
+ PlugStorageStartTransaction();
useClientCacheDirectly = true;
}
+ if (((PlannedStmt *)context->srtc_context.base.node)->commandType ==
+ CMD_SELECT ||
+ context->isTargetNoMagma) {
+ PlugStorageGetTransactionSnapshot(magmaTableFullNames);
+ } else {
+ PlugStorageGetTransactionId(magmaTableFullNames);
+ }
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
}
@@ -2886,7 +2893,7 @@ static void ExternalGetMagmaRangeDataLocation(
// get range location from magma now
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
InvokeMagmaProtocolBlockLocation(ext_entry, procOid, dbname, schemaname,
- tablename, PlugStorageGetTransactionSnapshot(),
+ tablename, PlugStorageGetTransactionSnapshot(NULL),
useClientCacheDirectly,
&bldata);
}
@@ -6698,14 +6705,15 @@ void build_magma_scansplits_for_result_relations(List **alloc_result, List *relO
// start transaction in magma for SELECT/INSERT/UPDATE/DELETE/ANALYZE
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
ExtProtocolBlockLocationData *bldata = NULL;
InvokeMagmaProtocolBlockLocation(
ext_entry, procOid, dbname, schemaname, tablename,
- PlugStorageGetTransactionSnapshot(), false, &bldata);
+ PlugStorageGetTransactionSnapshot(NULL), false, &bldata);
pfree(dbname);
pfree(schemaname);
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c b/src/backend/cdb/cdbquerycontextdispatching.c
index 9ffb5e5..fcf11d8 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -868,7 +868,7 @@ RebuildPlugStorageSnapshot(QueryContextInfo *cxt)
pfree(snapshot.txnActions.txnActions);
- MagmaSnapshot *s = PlugStorageGetTransactionSnapshot();
+ MagmaSnapshot *s = PlugStorageGetTransactionSnapshot(NULL);
elog(LOG, "SNAPSHOT DEBUG: GET TOP (%llu, %u, %llu, %u, %d)",
s->currentTransaction.txnId,
@@ -1703,7 +1703,7 @@ prepareDispatchedCatalogSingleRelation(QueryContextInfo *cxt, Oid relid,
pfree(formatterName);
}
/* The pluggable storage snapshot must be dispatched */
- prepareDispatchedPlugStorageSnapshot(cxt, PlugStorageGetTransactionSnapshot());
+ prepareDispatchedPlugStorageSnapshot(cxt, PlugStorageGetTransactionSnapshot(NULL));
/* The distribution policy for table */
prepareDispatchedCatalogDistributionPolicy(cxt, relid);
diff --git a/src/backend/cdb/dispatcher.c b/src/backend/cdb/dispatcher.c
index 226ffae..f29514a 100644
--- a/src/backend/cdb/dispatcher.c
+++ b/src/backend/cdb/dispatcher.c
@@ -1180,6 +1180,9 @@ static void dispatcher_serialize_common_plan(DispatchData *data, CommonPlanConte
new_executor_partitioned_hash_recursive_depth_limit);
univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
numberStrBuf);
+ sprintf(numberStrBuf, "%d", new_executor_external_sort_memory_limit_size_mb);
+ univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+ numberStrBuf);
univPlanAddGuc(ctx->univplan, "new_interconnect_type",
show_new_interconnect_type());
diff --git a/src/backend/cdb/dispatcher_new.c b/src/backend/cdb/dispatcher_new.c
index f00afce..ee6cc69 100644
--- a/src/backend/cdb/dispatcher_new.c
+++ b/src/backend/cdb/dispatcher_new.c
@@ -709,6 +709,9 @@ static void dispatcher_serialize_common_plan(MainDispatchData *data,
new_executor_partitioned_hash_recursive_depth_limit);
univPlanAddGuc(ctx->univplan, "partitioned_hash_recursive_depth_limit",
numberStrBuf);
+ sprintf(numberStrBuf, "%d", new_executor_external_sort_memory_limit_size_mb);
+ univPlanAddGuc(ctx->univplan, "external_sort_memory_limit_size_mb",
+ numberStrBuf);
univPlanAddGuc(ctx->univplan, "new_interconnect_type",
show_new_interconnect_type());
diff --git a/src/backend/cdb/motion/ic_udp.c b/src/backend/cdb/motion/ic_udp.c
index 0c7030c..2f754c3 100644
--- a/src/backend/cdb/motion/ic_udp.c
+++ b/src/backend/cdb/motion/ic_udp.c
@@ -778,8 +778,6 @@ static bool SendChunkUDP(MotionLayerState *mlStates, ChunkTransportState *transp
static void doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID);
static bool dispatcherAYT(void);
-static void checkQDConnectionAlive(void);
-
static void *rxThreadFunc(void *arg);
@@ -5923,24 +5921,6 @@ formatSockAddr(struct sockaddr *sa, char* buf, int bufsize)
} /* formatSockAddr */
/*
- * checkQDConnectionAlive
- * Check whether QD connection is still alive. If not, report error.
- */
-static void
-checkQDConnectionAlive(void)
-{
- if (!dispatch_validate_conn(MyProcPort->sock))
- {
- if (Gp_role == GP_ROLE_EXECUTE)
- ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
- errmsg("Interconnect error segment lost contact with master (recv)")));
- else
- ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
- errmsg("Interconnect error master lost contact with client (recv)")));
- }
-}
-
-/*
* getCurrentTime
* get current time
*
@@ -6967,3 +6947,22 @@ WaitInterconnectQuitUDP(void)
}
ic_control_info.threadCreated = false;
}
+
+
+/*
+ * checkQDConnectionAlive
+ * Check whether QD connection is still alive. If not, report error.
+ */
+void
+checkQDConnectionAlive(void)
+{
+ if (!dispatch_validate_conn(MyProcPort->sock))
+ {
+ if (Gp_role == GP_ROLE_EXECUTE)
+ ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+ errmsg("Interconnect error segment lost contact with master (recv)")));
+ else
+ ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+ errmsg("Interconnect error master lost contact with client (recv)")));
+ }
+}
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index b3fcf4a..6b61415 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -3872,7 +3872,7 @@ int64 GetExternalTotalBytesMAGMA(Relation relation){
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
InvokeMagmaProtocolTableSize(ext_entry, procOid, dbname, schemaname, tablename,
- PlugStorageGetTransactionSnapshot(), &ts);
+ PlugStorageGetTransactionSnapshot(NULL), &ts);
pfree(dbname);
pfree(schemaname);
@@ -3899,7 +3899,7 @@ int64 GetDatabaseTotalBytesMAGMA(Oid dbOid){
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
InvokeMagmaProtocolDatabaseSize(procOid, dbname,
- PlugStorageGetTransactionSnapshot(),
+ PlugStorageGetTransactionSnapshot(NULL),
&dbs);
pfree(dbname);
@@ -4099,7 +4099,7 @@ uint64 GetExternalTotalBytes(Relation rel)
/* start transaction for magma table */
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 92f7098..ed98b00 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -1290,8 +1290,9 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
{
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
}
@@ -2574,7 +2575,7 @@ CopyTo(CopyState cstate)
currentScanDesc = InvokePlugStorageFormatBeginScan(
&beginScanFunc, cstate->planstmt, node, &(externalstate.ss),
serializeSchema, serializeSchemaLen, rel,
- formatterType, formatterName, PlugStorageGetTransactionSnapshot());
+ formatterType, formatterName, PlugStorageGetTransactionSnapshot(NULL));
}
else
{
@@ -4562,7 +4563,7 @@ CopyFrom(CopyState cstate)
formatterName,
plannedstmt,
segfileinfo->segno,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
pfree(insertInitFunc);
pfree(plannedstmt);
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 072197a..126861c 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1530,8 +1530,9 @@ dropdb(const char *dbname, bool missing_ok)
// start transaction in magma for DROP TABLE
if (PlugStorageGetTransactionStatus()
== PS_TXN_STS_DEFAULT) {
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(
PlugStorageGetTransactionStatus()
== PS_TXN_STS_STARTED);
@@ -1539,7 +1540,7 @@ dropdb(const char *dbname, bool missing_ok)
// drop table in magma now
InvokeMagmaDropTable(&procInfo, dbInfoRel->exttable, database_name,
schema_name, table_name,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
ReadCacheHashEntryReviseOnCommit(dbInfoRel->relationOid, true);
}
}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 7d43c8f..345de0b 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -34,6 +34,7 @@
#include "postgres.h"
+#include "access/aosegfiles.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/fileam.h"
@@ -423,10 +424,11 @@ DefineIndex(Oid relationId,
errmsg("access method \"%s\" does not support multicolumn indexes",
accessMethodName)));
- if (unique && RelationIsAo(rel))
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("append-only tables do not support unique indexes")));
+
+ /* native orc can't support unique/primary index */
+ if (unique && RelationIsOrc(rel))
+ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("native orc do not support unique indexes")));
amoptions = accessMethodForm->amoptions;
@@ -752,24 +754,14 @@ DefineIndex(Oid relationId,
errOmitLocation(true)));
}
else
- {
- char *formatOpt = caql_getcstring(
- NULL,
- cql("SELECT fmtopts FROM pg_exttable WHERE reloid = :1",
- ObjectIdGetDatum(relationId)));
- if (!formatOpt) {
- ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
- errmsg("Cannot support DefineIndex")));
- }
- else {
- char *formatName = getExtTblFormatterTypeInFmtOptsStr(formatOpt);
- if (!formatName ||
- (!(pg_strncasecmp(formatName, "magma", strlen("magma")) == 0))) {
- ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
- errmsg("Cannot support DefineIndex")));
- }
- }
- }
+ {
+ /* magma and native orc support index */
+ if (!(RelationIsOrc(rel) || RelationIsMagmaTable2(relationId)))
+ {
+ ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support DefineIndex")));
+ }
+ // dispatch_statement_node((Node *)stmt, NULL, NULL, NULL);
+ }
}
/* save lockrelid for below, then close rel */
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3460af4..a92eaa6 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -1833,14 +1833,15 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt)
// start transaction in magma for CREATE TABLE
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
+ PlugStorageGetTransactionId(NULL);
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
InvokeMagmaCreateTable(&procInfo,
database_name,
schema_name,
table_name,
- PlugStorageGetTransactionSnapshot(),
+ PlugStorageGetTransactionSnapshot(NULL),
createExtStmt->base.tableElts,
createExtStmt->pkey,
createExtStmt->base.distributedBy,
diff --git a/src/backend/executor/execDML.c b/src/backend/executor/execDML.c
index c6a0384..bd06077 100644
--- a/src/backend/executor/execDML.c
+++ b/src/backend/executor/execDML.c
@@ -470,7 +470,7 @@ ExecInsert(TupleTableSlot *slot,
formatterName,
estate->es_plannedstmt,
segfileinfo->segno,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
}
else
{
@@ -723,7 +723,7 @@ ldelete:;
InvokeMagmaBeginDelete(&procInfo,
resultRelationDesc,
estate->es_plannedstmt,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
}
else
{
@@ -1043,7 +1043,7 @@ lreplace:;
InvokeMagmaBeginUpdate(&procInfo,
resultRelationDesc,
estate->es_plannedstmt,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
elog(LOG, "exec update begin update: %d", extUpdDescEntry->ext_ins_oid);
}
else
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 71f49ef..5de2981 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -5084,7 +5084,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
formatterName,
estate->es_plannedstmt,
segfileinfo->segno,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
}
else
{
diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c
index 68d7add..2d203a7 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -355,7 +355,7 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
currentRelation,
formatterType,
formatterName,
- PlugStorageGetTransactionSnapshot());
+ PlugStorageGetTransactionSnapshot(NULL));
}
else
{
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c635d8f..df5775c 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -386,8 +386,11 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
pathlist = lappend(pathlist, seqpath);
/* Consider index and bitmap scans */
- create_index_paths(root, rel, relstorage,
- &indexpathlist, &bitmappathlist);
+ if (!relstorage_is_ao(relstorage))
+ {
+ /* Temporarily disable index for ao table */
+ create_index_paths(root, rel, relstorage, &indexpathlist, &bitmappathlist);
+ }
/* deal with magma index scan */
if (relstorage == RELSTORAGE_EXTERNAL)
diff --git a/src/backend/optimizer/plan/newPlanner.c b/src/backend/optimizer/plan/newPlanner.c
index 4840f47..dc5546e 100644
--- a/src/backend/optimizer/plan/newPlanner.c
+++ b/src/backend/optimizer/plan/newPlanner.c
@@ -17,6 +17,7 @@
*/
#include "optimizer/newPlanner.h"
+#include "catalog/catalog.h"
#include "access/aomd.h"
#include "access/fileam.h"
@@ -54,6 +55,7 @@ char *new_executor_enable_partitioned_hashjoin_mode;
char *new_executor_enable_external_sort_mode;
int new_executor_partitioned_hash_recursive_depth_limit;
int new_executor_ic_tcp_client_limit_per_query_per_segment;
+int new_executor_external_sort_memory_limit_size_mb;
const char *new_executor_runtime_filter_mode;
const char *new_executor_runtime_filter_mode_local = "local";
@@ -99,6 +101,8 @@ static void
do_convert_magma_rangevseg_map_to_common_plan(CommonPlanContext *ctx);
static void do_convert_rangetbl_to_common_plan(List *rtable,
CommonPlanContext *ctx);
+static void do_convert_result_partitions_to_common_plan(
+ PartitionNode *partitionNode, CommonPlanContext *ctx);
static void do_convert_token_map_to_common_plan(CommonPlanContext *ctx);
static void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx);
static void do_convert_splits_list_to_common_plan(List *splits, Oid relOid,
@@ -133,6 +137,19 @@ static bool checkSupportedSubLinkType(SubLinkType sublinkType);
static bool checkInsertSupportTable(PlannedStmt *stmt);
static bool checkIsPrepareQuery(QueryDesc *queryDesc);
+// @return format string whose life time goes along with current MemoryContext
+static const char *buildInternalTableFormatOptionStringInJson(Relation rel) {
+ AppendOnlyEntry *aoentry =
+ GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
+ StringInfoData option;
+ initStringInfo(&option);
+ appendStringInfoChar(&option, '{');
+ if (aoentry->compresstype)
+ appendStringInfo(&option, "%s", aoentry->compresstype);
+ appendStringInfoChar(&option, '}');
+ return option.data;
+}
+
#define DIRECT_LEFT_CHILD_VAR 0
#define INT64_MAX_LENGTH 20
@@ -313,6 +330,27 @@ void convert_to_common_plan(PlannedStmt *stmt, CommonPlanContext *ctx) {
pfree(rgId);
pfree(rgUrl);
}
+ // For append-only internal table
+ if (get_relation_storage_type(oid) == RELSTORAGE_ORC) {
+ ListCell *lc;
+ foreach (lc, stmt->result_segfileinfos) {
+ ResultRelSegFileInfoMapNode *pRelSegFileInfoMapNode =
+ (ResultRelSegFileInfoMapNode *)lfirst(lc);
+ ListCell *lc;
+ foreach (lc, pRelSegFileInfoMapNode->segfileinfos) {
+ ResultRelSegFileInfo *pSegFileInfo = lfirst(lc);
+ if (pSegFileInfo->numfiles == 0) {
+ // detect mixed-up partition of external table
+ ctx->convertible = false;
+ return;
+ }
+ univPlanAddResultRelSegFileInfo(
+ ctx->univplan, pRelSegFileInfoMapNode->relid,
+ pSegFileInfo->segno, pSegFileInfo->eof[0],
+ pSegFileInfo->uncompressed_eof[0]);
+ }
+ }
+ }
univPlanAddToPlanNode(ctx->univplan, true);
}
do_convert_plantree_to_common_plan(stmt->planTree, pid, true, false, NIL,
@@ -327,8 +365,9 @@ void convert_to_common_plan(PlannedStmt *stmt, CommonPlanContext *ctx) {
do_convert_plantree_to_common_plan(subplan, -1, true, true, NIL, NULL,
true, ctx);
}
- if (ctx->convertible)
- do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+ if (ctx->convertible) do_convert_rangetbl_to_common_plan(stmt->rtable, ctx);
+ if (ctx->convertible && stmt->result_partitions)
+ do_convert_result_partitions_to_common_plan(stmt->result_partitions, ctx);
if (ctx->convertible && enable_secure_filesystem)
do_convert_token_map_to_common_plan(ctx);
if (ctx->convertible && ctx->isMagma)
@@ -1294,9 +1333,11 @@ void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx) {
columnDataTypeMod[i] = att->atttypmod;
}
FormatType fmttype = UnivPlanOrcFormat;
- univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, "dummy", "{}",
- attNum, (const char **)columnName,
- columnDataType, columnDataTypeMod, NULL);
+ univPlanRangeTblEntryAddTable(
+ ctx->univplan, relid, fmttype, relpath(rel->rd_node),
+ buildInternalTableFormatOptionStringInJson(rel), attNum,
+ (const char **)columnName, columnDataType, columnDataTypeMod, NULL,
+ rel->rd_rel->relname.data);
} else if (RelationIsExternal(rel)) {
TupleDesc tableAttrs = rel->rd_att;
attNum = tableAttrs->natts;
@@ -1394,7 +1435,7 @@ void do_convert_onetbl_to_common_plan(Oid relid, CommonPlanContext *ctx) {
univPlanRangeTblEntryAddTable(ctx->univplan, relid, fmttype, location,
fmtOptsJson, attNum,
(const char **)columnName, columnDataType,
- columnDataTypeMod, targetName);
+ columnDataTypeMod, targetName, NULL);
if (fmtOptsJson != NULL)
pfree(fmtOptsJson);
@@ -1419,6 +1460,53 @@ end:
pfree(columnDataTypeMod);
}
+static void do_convert_result_partition_rule_to_common_plan(
+ CommonPlanContext *ctx, PartitionRule *partitionRule,
+ bool isDefaultPartition) {
+ if (partitionRule->children) {
+ // TODO(chiyang): sub-partition
+ ctx->convertible = false;
+ return;
+ }
+ univPlanResultPartitionsAddPartitionRule(
+ ctx->univplan, partitionRule->parchildrelid, partitionRule->parname,
+ isDefaultPartition);
+
+ ListCell *lc;
+ foreach (lc, partitionRule->parlistvalues) {
+ univPlanPartitionRuleAddPartitionValue(ctx->univplan, isDefaultPartition);
+ List *partitionListValues = (List *)lfirst(lc);
+ ListCell *lc;
+ foreach (lc, partitionListValues) {
+ Const *val = (List *)lfirst(lc);
+ do_convert_expr_to_common_plan(-1, val, ctx);
+ univPlanPartitionValueAddConst(ctx->univplan, isDefaultPartition);
+ }
+ }
+}
+
+static void do_convert_result_partitions_to_common_plan(
+ PartitionNode *partitionNode, CommonPlanContext *ctx) {
+ if (partitionNode->part->parkind != 'l') {
+ // TODO(chiyang): range partition
+ ctx->convertible = false;
+ return;
+ }
+ univPlanAddResultPartitions(ctx->univplan, partitionNode->part->parrelid,
+ partitionNode->part->parkind,
+ partitionNode->part->paratts,
+ partitionNode->part->parnatts);
+ ListCell *lc;
+ foreach (lc, partitionNode->rules) {
+ PartitionRule *partitionRule = (PartitionRule *)lfirst(lc);
+ do_convert_result_partition_rule_to_common_plan(ctx, partitionRule, false);
+ }
+ if (partitionNode->default_part) {
+ do_convert_result_partition_rule_to_common_plan(
+ ctx, partitionNode->default_part, true);
+ }
+}
+
void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
HASH_SEQ_STATUS status;
struct FileSystemCredential *entry;
@@ -1453,14 +1541,14 @@ void do_convert_token_map_to_common_plan(CommonPlanContext *ctx) {
// it's convertible and it's a magma scan
void do_convert_snapshot_to_common_plan(CommonPlanContext *ctx) {
// start transaction in magma for SELECT in new executor
- if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
- PlugStorageBeginTransaction(NULL);
- }
+ // if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT) {
+ // PlugStorageStartTransaction(NULL);
+ // }
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
int32_t size = 0;
char *snapshot = NULL;
- MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(), &snapshot,
- &size);
+ MagmaClientC_SerializeSnapshot(PlugStorageGetTransactionSnapshot(NULL),
+ &snapshot, &size);
if (snapshot && size != 0) {
univPlanAddSnapshot(ctx->univplan, snapshot, size);
}
@@ -1959,15 +2047,14 @@ end:
}
bool checkInsertSupportTable(PlannedStmt *stmt) {
- // disable partitioned result target
- if (stmt->result_partitions)
- return false;
- if (list_length(stmt->resultRelations) > 1)
- return false;
+ if (list_length(stmt->resultRelations) > 1) return false;
int32_t index = list_nth_int(stmt->resultRelations, 0);
RangeTblEntry *rte = (RangeTblEntry *)list_nth(stmt->rtable, index - 1);
- // if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+ if (RELSTORAGE_ORC == get_rel_relstorage(rte->relid)) return true;
+
+ // disable partition table insert for external table
+ if (stmt->result_partitions) return false;
Relation pgExtTableRel = heap_open(ExtTableRelationId, RowExclusiveLock);
cqContext cqc;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index be53f20..ce75706 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -8112,6 +8112,11 @@ static Query *transformIndexStmt(ParseState *pstate, IndexStmt *stmt,
if (RelationBuildPartitionDesc(rel, false)) stmt->do_part = true;
+ /* native orc can't create index in parent relation */
+ if (RelationIsOrc(rel) && stmt->do_part)
+ ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+ errmsg("Cannot support create index statement in native orc parent relation yet")));
+
if (stmt->do_part && Gp_role != GP_ROLE_EXECUTE) {
List *children;
struct HTAB *nameCache;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ed4b5dc..ee8958d 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -67,6 +67,7 @@
#include "cdb/cdbpersistentrelfile.h"
#include "access/aosegfiles.h"
+#include "access/orcsegfiles.h"
#include "access/parquetsegfiles.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbvars.h"
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 27e7894..89d20e0 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -17,6 +17,7 @@
#include "postgres.h"
#include "port.h"
+#include "access/aosegfiles.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/fileam.h"
@@ -415,6 +416,72 @@ QueryIsReadOnly(Query *parsetree)
}
/*
+ * CanCreateIndex: can support create index
+ * So far, magma table and native orc could support index
+ */
+void CanSupportIndex(IndexStmt *stmt, Oid relid)
+{
+ /* 1. upgrade mode should support index operation */
+ if (gp_upgrade_mode) return;
+
+ bool supportIndex = false;
+ Relation rel = heap_open(relid, AccessShareLock);
+ bool nativeOrc = RelationIsOrc(rel);
+ heap_close(rel, AccessShareLock);
+
+ /*
+ * 2. deal magma table and native orc
+ * for "stmt->magma", deal with special partition situation, oushu issue #1049
+ * its ugly, but there is no elegant way now
+ */
+ if (RelationIsMagmaTable2(relid) || stmt->magma || nativeOrc)
+ {
+ supportIndex = true;
+ if (nativeOrc)
+ {
+ /* add pg_aoseg.pg_orcseg_idx_xxx and its index pg_aoseg.pg_orcseg_idx_xxx_index */
+ AlterTableCreateAoSegIndexTableWithOid(relid, stmt->is_part_child);
+ }
+ }
+ if (supportIndex)
+ {
+ /*
+ * 3. magma/native orc index cant support the accessory conditions
+ */
+ if (stmt->options) {
+ ereport(ERROR,
+ (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+ errmsg("magma/native orc Index cannot support create index with clause")));
+ }
+ if (stmt->whereClause) {
+ ereport(ERROR,
+ (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+ errmsg("magma/native orc Index cannot support create index where predicate")));
+ }
+ if (stmt->tableSpace) {
+ ereport(ERROR,
+ (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+ errmsg("magma/native orc Index cannot support create index with tableSpace")));
+ }
+ ListCell *cell;
+ foreach(cell, stmt->indexParams)
+ {
+ IndexElem *elem = (IndexElem *) lfirst(cell);
+ if (elem->expr || elem->opclass)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
+ errmsg("magma/native orc Index cannot support create index with expr or opclass")));
+ }
+ }
+ }
+ else
+ {
+ ereport(ERROR, (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet")));
+ }
+}
+
+/*
* CommandIsReadOnly: is an executable query read-only?
*
* This is a much stricter test than we apply for XactReadOnly mode;
@@ -1349,83 +1416,13 @@ ProcessUtility(Node *parsetree,
lockmode = stmt->concurrent ? ShareUpdateExclusiveLock
: ShareLock;
relid = RangeVarGetRelid(stmt->relation, false, false/*allowHcatalog*/);
-
- /* Only create index for external table with magma */
Assert(OidIsValid(relid));
- char *formatOpt = caql_getcstring(
- NULL,
- cql("SELECT fmtopts FROM pg_exttable WHERE reloid = :1",
- ObjectIdGetDatum(relid)));
-
- if (!formatOpt)
- {
- if (stmt->magma) {}
- else if (!gp_upgrade_mode)
- {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
- }
- }
- else
- {
- char *formatName = getExtTblFormatterTypeInFmtOptsStr(formatOpt);
- if (!formatName)
- {
- if (!gp_upgrade_mode)
- {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
- }
- }
- /* in order to support magmatp/magmaap */
- else if ((pg_strncasecmp(formatName, FORMAT_MAGMA_TP_STR,
- sizeof(FORMAT_MAGMA_TP_STR)-1) == 0) ||
- (pg_strncasecmp(formatName, FORMAT_MAGMA_AP_STR,
- sizeof(FORMAT_MAGMA_AP_STR)-1) == 0))
- {
- if (stmt->options) {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
- errmsg("magma Index cannot support create index with clause")));
- }
- if (stmt->whereClause) {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
- errmsg("magma Index cannot support create index where predicate")));
- }
- if (stmt->tableSpace) {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
- errmsg("magma Index cannot support create index with tableSpace")));
- }
- ListCell *cell;
- foreach(cell, stmt->indexParams)
- {
- IndexElem *elem = (IndexElem *) lfirst(cell);
- if (elem->expr || elem->opclass)
- {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET),
- errmsg("magma Index cannot support create index with expr or opclass")));
- }
- }
- pfree(formatName);
- // break;
- }
- else
- {
- pfree(formatName);
- if (!gp_upgrade_mode)
- {
- ereport(ERROR,
- (errcode(ERRCODE_CDB_FEATURE_NOT_YET), errmsg("Cannot support create index statement yet") ));
- }
- }
- }
-
LockRelationOid(relid, lockmode);
CheckRelationOwnership(relid, true);
+ // check whether can support index
+ CanSupportIndex(stmt, relid);
+
DefineIndex(relid, /* relation */
stmt->idxname, /* index name */
InvalidOid, /* no predefined OID */
diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c
index de91837..a5e327e 100644
--- a/src/backend/utils/adt/dbsize.c
+++ b/src/backend/utils/adt/dbsize.c
@@ -222,7 +222,7 @@ calculate_database_size(Oid dbOid)
/* start transaction for magma table */
if (PlugStorageGetTransactionStatus() == PS_TXN_STS_DEFAULT)
{
- PlugStorageBeginTransaction(NULL);
+ PlugStorageStartTransaction();
}
Assert(PlugStorageGetTransactionStatus() == PS_TXN_STS_STARTED);
diff --git a/src/backend/utils/gp/segadmin.c b/src/backend/utils/gp/segadmin.c
index ab4bff0..a615242 100644
--- a/src/backend/utils/gp/segadmin.c
+++ b/src/backend/utils/gp/segadmin.c
@@ -309,13 +309,16 @@ Datum hawq_magma_status(PG_FUNCTION_ARGS)
}
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
// free memory
- free(data->magmaNodes[funcctx->call_cntr].node);
- free(data->magmaNodes[funcctx->call_cntr].dirs);
+ if (data->magmaNodes[funcctx->call_cntr].node)
+ free(data->magmaNodes[funcctx->call_cntr].node);
+ if (data->magmaNodes[funcctx->call_cntr].dirs)
+ free(data->magmaNodes[funcctx->call_cntr].dirs);
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
} else {
// free memory
- free(data->magmaNodes);
+ if (data->magmaNodes)
+ free(data->magmaNodes);
SRF_RETURN_DONE(funcctx);
}
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4bb606b..20f8f1c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4678,6 +4678,15 @@ static struct config_int ConfigureNamesInt[] =
10000, 0, 65535, NULL, NULL
},
+ {
+ {"new_executor_external_sort_memory_limit", PGC_USERSET, QUERY_TUNING_OTHER,
+ gettext_noop("Sets the memory usage (in MB) limit of external sort for new executor."),
+ NULL
+ },
+ &new_executor_external_sort_memory_limit_size_mb,
+ 256, 0, 1024, NULL, NULL
+ },
+
{
{"default_magma_hash_table_nvseg_per_seg", PGC_USERSET, QUERY_TUNING_OTHER,
gettext_noop("Sets default vseg number per node for Magma hash table"),
@@ -7490,7 +7499,6 @@ static struct config_string ConfigureNamesString[] =
"AUTO", assign_new_executor_mode, NULL
},
-
{
{"new_scheduler", PGC_USERSET, EXTERNAL_TABLES,
gettext_noop("Enable the new scheduler."),
diff --git a/src/include/access/orcsegfiles.h b/src/include/access/orcsegfiles.h
index 4a87981..f8b4582 100644
--- a/src/include/access/orcsegfiles.h
+++ b/src/include/access/orcsegfiles.h
@@ -30,6 +30,11 @@
#define Anum_pg_orcseg_tupcount 3
#define Anum_pg_orcseg_eofuncompressed 4
+#define Natts_pg_orcseg_idx 3
+#define Anum_pg_orcseg_idx_idxoid 1
+#define Anum_pg_orcseg_idx_segno 2
+#define Anum_pg_orcseg_idx_eof 3
+
extern void insertInitialOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo);
extern void insertOrcSegnoEntry(AppendOnlyEntry *aoEntry, int segNo,
float8 tupleCount, float8 eof,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 4b825ac..93e922d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -188,10 +188,15 @@ typedef enum PlugStorageTransactionStatus
typedef enum PlugStorageTransactionCommand
{
- PS_TXN_CMD_BEGIN = 0,
- PS_TXN_CMD_COMMIT = 1,
- PS_TXN_CMD_ABORT = 2,
- PS_TXN_CMD_INVALID = 3
+ PS_TXN_CMD_START_TRANSACTION = 0,
+ PS_TXN_CMD_COMMIT_TRANSACTION = 1,
+ PS_TXN_CMD_ABORT_TRANSACTION = 2,
+ PS_TXN_CMD_START_SUB_TRANSACTION = 3,
+ PS_TXN_CMD_COMMIT_SUB_TRANSACTION = 4,
+ PS_TXN_CMD_ABORT_SUB_TRANSACTION = 5,
+ PS_TXN_CMD_GET_SNAPSHOT = 6,
+ PS_TXN_CMD_GET_TRANSACTIONID = 7,
+ PS_TXN_CMD_INVALID = 8
} PlugStorageTransactionCommand;
typedef struct PlugStorageTransactionData
@@ -202,7 +207,8 @@ typedef struct PlugStorageTransactionData
TransactionId pst_transaction_id;
PlugStorageTransactionStatus pst_transaction_status;
PlugStorageTransactionCommand pst_transaction_command;
- MagmaSnapshot *pst_transaction_dist; /* magma format */
+ MagmaSnapshot *pst_transaction_snapshot; /* magma format */
+ MagmaTransactionState *pst_transaction_state; /* magma format */
} PlugStorageTransactionData;
typedef PlugStorageTransactionData *PlugStorageTransaction;
@@ -217,9 +223,10 @@ extern bool isCleanupAbortTransaction;
*/
extern PlugStorageTransaction PlugStorageGetTransaction(void);
extern PlugStorageTransactionStatus PlugStorageGetTransactionStatus(void);
-extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(void);
extern void PlugStorageSetTransactionSnapshot(MagmaSnapshot *snapshot);
-extern void PlugStorageBeginTransaction(List* magmaTableFullNames);
+extern void PlugStorageStartTransaction();
+extern MagmaSnapshot *PlugStorageGetTransactionSnapshot(List* magmaTableFullNames);
+extern void PlugStorageGetTransactionId(List* magmaTableFullNames);
extern void PlugStorageCommitTransaction(void);
extern void PlugStorageAbortTransaction(void);
extern void PlugStorageSetIsCleanupAbort(bool isCleanup);
diff --git a/src/include/catalog/aoseg.h b/src/include/catalog/aoseg.h
index 0e3d5ff..e653fb7 100644
--- a/src/include/catalog/aoseg.h
+++ b/src/include/catalog/aoseg.h
@@ -34,6 +34,7 @@ extern void AlterTableCreateAoSegTableWithOid(Oid relOid, Oid newOid,
Oid newIndexOid,
Oid *comptypeOid,
bool is_part_child);
+extern void AlterTableCreateAoSegIndexTableWithOid(Oid relOid, bool is_part_child);
extern void gpsql_appendonly_segfile_create(PG_FUNCTION_ARGS);
diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h
index 50b6c0d..8284945 100644
--- a/src/include/cdb/ml_ipc.h
+++ b/src/include/cdb/ml_ipc.h
@@ -354,4 +354,6 @@ extern void CleanUpNewInterconnect();
extern void ResetRpcClientInstance();
+extern void checkQDConnectionAlive(void);
+
#endif /* ML_IPC_H */
diff --git a/src/include/cwrapper/magma/cwrapper/magma-client-c.h b/src/include/cwrapper/magma/cwrapper/magma-client-c.h
index bef1a13..fa7b0dc 100644
--- a/src/include/cwrapper/magma/cwrapper/magma-client-c.h
+++ b/src/include/cwrapper/magma/cwrapper/magma-client-c.h
@@ -127,6 +127,31 @@ __attribute__((weak)) typedef struct MagmaColumn {
int32_t id;
} MagmaColumn;
+__attribute__((weak)) typedef int BackendId;
+
+__attribute__((weak)) typedef uint32_t LocalTransactionId;
+
+__attribute__((weak)) typedef struct VirtualTransactionId {
+ BackendId backendId;
+ LocalTransactionId localTransactionId;
+} VirtualTransactionId;
+
+__attribute__((weak)) typedef uint8_t TransactionStatus;
+
+__attribute__((weak)) struct MagmaRgIds;
+
+__attribute__((weak)) typedef struct MagmaRgIds MagmaRgIds;
+
+__attribute__((weak)) typedef struct MagmaTransactionState {
+ VirtualTransactionId
+ virtualTransactionId; // used for 'magma lock', generated on magma
+ // client startTransaction
+ MagmaTransactionId transactionId; // useless for read only transaction
+ uint32_t commandId; // useless for read only transaction
+ TransactionStatus state;
+ MagmaRgIds *relatedRgIds;
+} MagmaTransactionState;
+
__attribute__((weak)) typedef struct MagmaReplicaGroup {
uint32_t id;
uint16_t port;
@@ -144,8 +169,6 @@ __attribute__((weak)) typedef void *MagmaTablePtr;
__attribute__((weak)) typedef void *MagmaRangeDistPtr;
__attribute__((weak)) typedef void *MagmaRangePtr;
-__attribute__((weak)) typedef struct MagmaTransactionState MagmaTransactionState;
-
#ifdef __cplusplus
}
#endif
diff --git a/src/include/optimizer/newPlanner.h b/src/include/optimizer/newPlanner.h
index 97bcdf6..6067169 100644
--- a/src/include/optimizer/newPlanner.h
+++ b/src/include/optimizer/newPlanner.h
@@ -41,6 +41,7 @@ extern char *new_executor_enable_partitioned_hashjoin_mode;
extern char *new_executor_enable_external_sort_mode;
extern int new_executor_partitioned_hash_recursive_depth_limit;
extern int new_executor_ic_tcp_client_limit_per_query_per_segment;
+extern int new_executor_external_sort_memory_limit_size_mb;
extern const char *new_executor_runtime_filter_mode;
extern const char *new_executor_runtime_filter_mode_local;
diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h
index 99eec7c..776cf35 100644
--- a/src/include/tcop/utility.h
+++ b/src/include/tcop/utility.h
@@ -43,4 +43,6 @@ extern void CheckRelationOwnership(Oid relOid, bool noCatalogs);
extern void DropErrorMsgNonExistent(const RangeVar *rel, char rightkind, bool missing_ok);
+extern void CanSupportIndex(IndexStmt *stmt, Oid relid);
+
#endif /* UTILITY_H */