You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by zt...@apache.org on 2022/03/16 06:37:16 UTC
[hawq] 02/02: HAWQ-1834. add options for native orc table creation
This is an automated email from the ASF dual-hosted git repository.
ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git
commit c79ab3228fddc1620b6790b0fee8ffe629b77017
Author: ztao1987 <zh...@gmail.com>
AuthorDate: Wed Mar 16 14:36:21 2022 +0800
HAWQ-1834. add options for native orc table creation
---
src/backend/access/common/reloptions.c | 278 +++++++++++++++++++++------------
src/backend/access/orc/orcam.c | 25 +--
src/backend/utils/cache/relcache.c | 37 +++++
src/include/access/orcam.h | 7 +
src/include/utils/rel.h | 5 +-
5 files changed, 228 insertions(+), 124 deletions(-)
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 6211fdd..5307f90 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -300,6 +300,175 @@ parseRelOptions(Datum options, int numkeywords, const char *const * keywords,
}
}
+/*
+ * Parse reloptions for native orc format
+ */
+void
+checkOrcOptions(Datum reloptions, bool validate, StdRdOptions *result)
+{
+ /*
+ * 1. Needn't check option 'appendonly' and 'orientation' because we already
+ * check them in default_reloptions.
+ * 2. 'compresslevel' is a default option in reloptions, but we actually don't
+ * use it in native orc format.
+ * 3. Everytime we add an option into orc_keywords, we should also add one
+ * into default_keywords because there will perform a first check.
+ */
+ const char *const orc_keywords[] = {
+ "appendonly",
+ "orientation",
+ "compresstype",
+ "compresslevel",
+ "dicthreshold",
+ "compressblocksize",
+ "rowindexstride",
+ "stripesize",
+ "bloomfilter",
+ "bucketnum",
+ };
+
+ bool appendonly = true;
+ char columnstore = RELSTORAGE_ORC;
+ char* compresstype = NULL;
+ int32 compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
+ int32 rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
+ int32 stripesize = DEFAULT_ORC_STRIPE_SIZE;
+ char* bloomfilter = NULL;
+ int32 bucket_num = 0;
+ int j = 0;
+
+ char *orcOptionValues[ARRAY_SIZE(orc_keywords)];
+ parseRelOptions(reloptions, ARRAY_SIZE(orc_keywords), orc_keywords, orcOptionValues, validate);
+
+ /* orc compresstype */
+ if (orcOptionValues[2] != NULL)
+ {
+ compresstype = orcOptionValues[2];
+
+ if ((strcmp(compresstype, "snappy") != 0) && (strcmp(compresstype, "lz4") != 0)
+ // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
+ // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
+ && (strcmp(compresstype, "zlib") != 0)
+ && (strcmp(compresstype, "zstd") != 0)
+ && (strcmp(compresstype, "none") != 0))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("orc table doesn't support compress type: \'%s\'", compresstype),
+ errOmitLocation(true)));
+ }
+
+ if (compresstype) {
+ StringInfoData option;
+ initStringInfo(&option);
+ appendStringInfo(&option, "\"compresstype\":\"%s\"",
+ compresstype);
+ compresstype = pstrdup(option.data);
+ }
+ }
+
+ /* orc dicthreshold */
+ if (orcOptionValues[4] != NULL)
+ {
+ char *end;
+ double threshold = strtod(orcOptionValues[4], &end);
+ if (end == orcOptionValues[4] || *end != '\0' || threshold < 0 || threshold > 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("\'dicthreshold\' must be within [0-1]"),
+ errOmitLocation(true)));
+ StringInfoData option;
+ initStringInfo(&option);
+ if (compresstype != NULL)
+ appendStringInfo(&option, "%s,",compresstype);
+ appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
+ orcOptionValues[1]);
+ compresstype = pstrdup(option.data);
+ }
+
+ /* orc compressblocksize */
+ if (orcOptionValues[5] != NULL)
+ {
+ compressblocksize = pg_atoi(orcOptionValues[5], sizeof(int32), 0);
+ if ((compressblocksize < MIN_ORC_COMPRESS_BLOCK_SIZE) || (compressblocksize > MAX_ORC_COMPRESS_BLOCK_SIZE))
+ {
+ if (validate)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("compressblock size for orc table should between 1B and 1GB and should be specified in Bytes. "
+ "Got %d Bytes", compressblocksize), errOmitLocation(true)));
+
+ compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
+ }
+ }
+
+ /* orc rowgroupsize */
+ if (orcOptionValues[6] != NULL)
+ {
+ rowindexstride = pg_atoi(orcOptionValues[6], sizeof(int32), 0);
+
+ if ((rowindexstride < MIN_ORC_ROW_GROUP_SIZE) || (rowindexstride > MAX_ORC_ROW_GROUP_SIZE))
+ {
+ if (validate)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("row group size for orc table should between 1000 and 1024*1024*1024. "
+ "Got %d", rowindexstride), errOmitLocation(true)));
+
+ rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
+ }
+ }
+
+ /* orc stripesize */
+ if (orcOptionValues[7] != NULL)
+ {
+ stripesize = pg_atoi(orcOptionValues[7], sizeof(int32), 0);
+
+ if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE))
+ {
+ if (validate)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. "
+ "Got %d MB", stripesize), errOmitLocation(true)));
+
+ stripesize = DEFAULT_ORC_STRIPE_SIZE;
+ }
+ }
+
+ /* orc bloomfilter */
+ if (orcOptionValues[8] != NULL)
+ {
+ StringInfoData option;
+ initStringInfo(&option);
+ appendStringInfo(&option, orcOptionValues[8]);
+ bloomfilter = pstrdup(option.data);
+ }
+
+ /* orc bucket_num */
+ if (orcOptionValues[9] != NULL)
+ {
+ bucket_num= pg_atoi(orcOptionValues[9], sizeof(int32), 0);
+ if(bucket_num <= 0)
+ {
+ if (validate)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("bucket number should be greater than 0. "
+ "Got %d", bucket_num), errOmitLocation(true)));
+
+ bucket_num = 0;
+ }
+ }
+ result->compressblocksize = compressblocksize;
+ result->stripesize = stripesize;
+ result->rowindexstride = rowindexstride;
+ if (compresstype != NULL)
+ for (j = 0;j < strlen(compresstype); j++)
+ compresstype[j] = pg_tolower(compresstype[j]);
+ result->compresstype = compresstype;
+ result->bloomfilter = bloomfilter;
+}
/*
* Parse reloptions for anything using StdRdOptions
@@ -323,6 +492,8 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
"dicthreshold",
"bloomfilter",
"stripesize",
+ "rowindexstride",
+ "compressblocksize",
};
char *values[ARRAY_SIZE(default_keywords)];
@@ -330,7 +501,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
int32 blocksize = DEFAULT_APPENDONLY_BLOCK_SIZE;
int32 pagesize = DEFAULT_PARQUET_PAGE_SIZE;
int32 rowgroupsize = DEFAULT_PARQUET_ROWGROUP_SIZE;
- int32 stripesize = DEFAULT_ORC_STRIPE_SIZE;
bool appendonly = false;
bool checksum = false;
char* compresstype = NULL;
@@ -542,28 +712,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
errmsg("non-parquet table doesn't support compress type: \'%s\'", compresstype),
errOmitLocation(true)));
}
-
- if ((columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "snappy") != 0)
- && (strcmp(compresstype, "lz4") != 0)
- // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
- // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
- && (strcmp(compresstype, "zlib") != 0)
- && (strcmp(compresstype, "zstd") != 0)
- && (strcmp(compresstype, "none") != 0))
- {
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("orc table doesn't support compress type: \'%s\'", compresstype),
- errOmitLocation(true)));
- }
-
- if (!(columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "lz4") == 0))
- {
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("non-orc table doesn't support compress type: \'%s\'", compresstype),
- errOmitLocation(true)));
- }
}
/* compression level */
@@ -644,14 +792,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
compresslevel = setDefaultCompressionLevel(compresstype);
}
- if (columnstore == RELSTORAGE_ORC && compresstype) {
- StringInfoData option;
- initStringInfo(&option);
- appendStringInfo(&option, "\"compresstype\":\"%s\"",
- compresstype);
- compresstype = pstrdup(option.data);
- }
-
/* checksum */
if (values[7] != NULL)
{
@@ -811,75 +951,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
errOmitLocation(true)));
}
- /* stripesize */
- if (values[13] != NULL)
- {
- if(!(columnstore == RELSTORAGE_ORC)){
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid option \'stripesize\' for non-orc table"),
- errOmitLocation(true)));
- }
-
- stripesize = pg_atoi(values[13], sizeof(int32), 0);
-
- if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE))
- {
- if (validate)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. "
- "Got %d MB", stripesize), errOmitLocation(true)));
-
- stripesize = DEFAULT_ORC_STRIPE_SIZE;
- }
- }
-
- // dicthreshold
- if (values[11] != NULL) {
- if(!(columnstore == RELSTORAGE_ORC)){
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid option \'dicthreshold\' for non-orc table"),
- errOmitLocation(true)));
- }
- char *end;
- double threshold = strtod(values[11], &end);
- if (end == values[11] || *end != '\0' || threshold < 0 || threshold > 1)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("\'dicthreshold\' must be within [0-1]"),
- errOmitLocation(true)));
- StringInfoData option;
- initStringInfo(&option);
- if (compresstype != NULL)
- appendStringInfo(&option, "%s,",compresstype);
- appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
- values[11]);
- compresstype = pstrdup(option.data);
- }
-
- // bloomfilter
- if (values[12] != NULL) {
- if(!(columnstore == RELSTORAGE_ORC)){
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid option \'bloomfilter\' for non-orc table"),
- errOmitLocation(true)));
- }
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("option \'bloomfilter\' for orc table not supported yet"),
- errOmitLocation(true)));
- StringInfoData option;
- initStringInfo(&option);
- if (compresstype != NULL)
- appendStringInfo(&option, "%s",compresstype);
- appendStringInfo(&option, ",\"bloomfilter\": \"%s\"",
- values[12]);
- compresstype = pstrdup(option.data);
- }
-
result = (StdRdOptions *) palloc(sizeof(StdRdOptions));
SET_VARSIZE(result, sizeof(StdRdOptions));
@@ -888,7 +959,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
result->blocksize = blocksize;
result->pagesize = pagesize;
result->rowgroupsize = rowgroupsize;
- result->stripesize = stripesize;
result->compresslevel = compresslevel;
if (compresstype != NULL)
for (j = 0;j < strlen(compresstype); j++)
@@ -900,6 +970,12 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
result->errorTable = errorTable;
result->bucket_num = bucket_num;
+ // extra parse and check for ORC format
+ if (columnstore == RELSTORAGE_ORC)
+ {
+ checkOrcOptions(reloptions, validate, result);
+ }
+
return (bytea *) result;
}
diff --git a/src/backend/access/orc/orcam.c b/src/backend/access/orc/orcam.c
index f226844..f7260c3 100644
--- a/src/backend/access/orc/orcam.c
+++ b/src/backend/access/orc/orcam.c
@@ -279,7 +279,6 @@ static int32 GetSplitCount(List *fileSplits, Oid idxId) {
return ret;
}
->>>>>>> 7910d663d... step2
OrcInsertDescData *orcBeginInsert(Relation rel,
ResultRelSegFileInfo *segfileinfo) {
OrcInsertDescData *insertDesc =
@@ -299,17 +298,7 @@ OrcInsertDescData *orcBeginInsert(Relation rel,
AppendOnlyEntry *aoentry =
GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
StringInfoData option;
- initStringInfo(&option);
- appendStringInfoChar(&option, '{');
- appendStringInfo(&option, "\"logicEof\": %" PRId64, segfileinfo->eof[0]);
- appendStringInfo(&option, ", \"uncompressedEof\": %" PRId64,
- segfileinfo->uncompressed_eof[0]);
- appendStringInfo(
- &option, ", \"stripeSize\": %" PRId64,
- ((StdRdOptions *)(rel->rd_options))->stripesize * 1024 * 1024);
- if (aoentry->compresstype)
- appendStringInfo(&option, ", %s", aoentry->compresstype);
- appendStringInfoChar(&option, '}');
+ constructOrcFormatOptionString(&option, rel, segfileinfo, aoentry);
insertDesc->orcFormatData = palloc0(sizeof(OrcFormatData));
insertDesc->orcFormatData->fmt =
@@ -929,11 +918,7 @@ OrcDeleteDescData *orcBeginDelete(Relation rel, List *fileSplits,
AppendOnlyEntry *aoentry =
GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
StringInfoData option;
- initStringInfo(&option);
- appendStringInfoChar(&option, '{');
- if (aoentry->compresstype)
- appendStringInfo(&option, "%s", aoentry->compresstype);
- appendStringInfoChar(&option, '}');
+ constructOrcFormatOptionString(&option, rel, NULL, aoentry);
int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
@@ -1047,11 +1032,7 @@ OrcUpdateDescData *orcBeginUpdate(Relation rel, List *fileSplits,
AppendOnlyEntry *aoentry =
GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
StringInfoData option;
- initStringInfo(&option);
- appendStringInfoChar(&option, '{');
- if (aoentry->compresstype)
- appendStringInfo(&option, "%s", aoentry->compresstype);
- appendStringInfoChar(&option, '}');
+ constructOrcFormatOptionString(&option, rel, NULL, aoentry);
int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index a9f62ac..6c69dd7 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -789,9 +789,46 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
case RELKIND_AOSEGMENTS:
case RELKIND_AOBLOCKDIR:
case RELKIND_UNCATALOGED:
+ {
+ // check for bloom filter here because
+ // we could not get tupledesc in default_reloptions.
+ const char *const keywords[] =
+ { "bloomfilter" };
+ const int32_t keywords_size = 1;
+ char *values[keywords_size];
+ char *bloomfilter = NULL;
+ char *key = "bloomfilter";
+
+ parseRelOptions(datum, keywords_size, keywords, values, false);
+ if (values[0] != NULL)
+ {
+ TupleDesc tup_desc = relation->rd_att;
+ int attnum = tup_desc->natts;
+ char **attribute_names = palloc0(attnum * sizeof(char*));
+ for (int i = 0; i < attnum; ++i)
+ {
+ int name_len =
+ strlen(
+ ((Form_pg_attribute) (tup_desc->attrs[i]))->attname.data);
+ char *attribute = palloc0(name_len + 1);
+ strncpy(attribute, ((Form_pg_attribute )
+ (tup_desc->attrs[i]))->attname.data, name_len);
+ attribute_names[i] = attribute;
+ }
+ char *dup_val = pstrdup(values[0]);
+ char *token = strtok(dup_val, ",");
+ while (token)
+ {
+ checkPlugStorageFormatOption(&bloomfilter, key, token, true,
+ attnum, attribute_names);
+ bloomfilter = NULL;
+ token = strtok(NULL, ",");
+ }
+ }
options = heap_reloptions(relation->rd_rel->relkind, datum,
false);
break;
+ }
case RELKIND_INDEX:
options = index_reloptions(relation->rd_am->amoptions, datum,
false);
diff --git a/src/include/access/orcam.h b/src/include/access/orcam.h
index 47d24c2..871fc05 100644
--- a/src/include/access/orcam.h
+++ b/src/include/access/orcam.h
@@ -26,6 +26,13 @@
#include "cdb/cdbquerycontextdispatching.h"
#include "nodes/relation.h"
+#define DEFAULT_ORC_ROW_GROUP_SIZE 65536
+#define MIN_ORC_ROW_GROUP_SIZE 1000
+#define MAX_ORC_ROW_GROUP_SIZE 1024 * 1024 * 1024
+// here we use orc block size in Bytes
+#define DEFAULT_ORC_COMPRESS_BLOCK_SIZE 256 * 1024
+#define MIN_ORC_COMPRESS_BLOCK_SIZE 1
+#define MAX_ORC_COMPRESS_BLOCK_SIZE 1024 * 1024 * 1024
// here we use orc stripe size in MBytes
#define DEFAULT_ORC_STRIPE_SIZE 64
#define MIN_ORC_STRIPE_SIZE 1
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 3fbeee3..2fbd2ac 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -285,7 +285,6 @@ typedef struct StdRdOptions
int blocksize; /* max varblock size (AO rels only) */
int pagesize; /* page size(Parquet rels only) */
int rowgroupsize; /* row group size (Parquet rels only)*/
- int stripesize; /* stripe size (ORC rels only) */
int compresslevel; /* compression level (AO rels only) */
char* compresstype; /* compression type (AO rels only) */
bool checksum; /* checksum (AO rels only) */
@@ -293,6 +292,10 @@ typedef struct StdRdOptions
bool forceHeap; /* specified appendonly=false */
bool errorTable; /* skip GOH tablespace checking. */
int bucket_num; /* default init segment num for random/hash/external table */
+ char* bloomfilter; /* columns using bloomfilter (ORC rels only) */
+ int stripesize; /* stripe size (ORC rels only) */
+ int rowindexstride; /* row index stride (ORC rels only) */
+ int compressblocksize; /* compressblocksize in native orc, different from blocksize (ORC rels only) */
} StdRdOptions;
#define HEAP_MIN_FILLFACTOR 10