You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by zt...@apache.org on 2022/03/16 06:37:16 UTC

[hawq] 02/02: HAWQ-1834. add options for native orc table creation

This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit c79ab3228fddc1620b6790b0fee8ffe629b77017
Author: ztao1987 <zh...@gmail.com>
AuthorDate: Wed Mar 16 14:36:21 2022 +0800

    HAWQ-1834. add options for native orc table creation
---
 src/backend/access/common/reloptions.c | 278 +++++++++++++++++++++------------
 src/backend/access/orc/orcam.c         |  25 +--
 src/backend/utils/cache/relcache.c     |  37 +++++
 src/include/access/orcam.h             |   7 +
 src/include/utils/rel.h                |   5 +-
 5 files changed, 228 insertions(+), 124 deletions(-)

diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 6211fdd..5307f90 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -300,6 +300,175 @@ parseRelOptions(Datum options, int numkeywords, const char *const * keywords,
 	}
 }
 
+/*
+ * Parse reloptions for native orc format
+ */
+void
+checkOrcOptions(Datum reloptions, bool validate, StdRdOptions *result)
+{
+  /*
+   * 1. Needn't check option 'appendonly' and 'orientation' because we already
+   *    check them in default_reloptions.
+   * 2. 'compresslevel' is a default option in reloptions, but we actually don't
+   *    use it in native orc format.
+   * 3. Everytime we add an option into orc_keywords, we should also add one
+   *    into default_keywords because there will perform a first check.
+   */
+  const char *const orc_keywords[] = {
+    "appendonly",
+    "orientation",
+    "compresstype",
+    "compresslevel",
+    "dicthreshold",
+    "compressblocksize",
+    "rowindexstride",
+    "stripesize",
+    "bloomfilter",
+    "bucketnum",
+  };
+
+  bool    appendonly = true;
+  char    columnstore = RELSTORAGE_ORC;
+  char*   compresstype = NULL;
+  int32   compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
+  int32   rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
+  int32   stripesize = DEFAULT_ORC_STRIPE_SIZE;
+  char*   bloomfilter = NULL;
+  int32   bucket_num = 0;
+  int     j = 0;
+
+  char     *orcOptionValues[ARRAY_SIZE(orc_keywords)];
+  parseRelOptions(reloptions, ARRAY_SIZE(orc_keywords), orc_keywords, orcOptionValues, validate);
+
+  /* orc compresstype */
+  if (orcOptionValues[2] != NULL)
+  {
+    compresstype = orcOptionValues[2];
+
+    if ((strcmp(compresstype, "snappy") != 0) && (strcmp(compresstype, "lz4") != 0)
+            // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
+            // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
+            && (strcmp(compresstype, "zlib") != 0)
+            && (strcmp(compresstype, "zstd") != 0)
+            && (strcmp(compresstype, "none") != 0))
+    {
+      ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("orc table doesn't support compress type: \'%s\'", compresstype),
+             errOmitLocation(true)));
+    }
+
+    if (compresstype) {
+      StringInfoData option;
+      initStringInfo(&option);
+      appendStringInfo(&option, "\"compresstype\":\"%s\"",
+                             compresstype);
+      compresstype = pstrdup(option.data);
+    }
+  }
+
+  /* orc dicthreshold */
+  if (orcOptionValues[4] != NULL)
+  {
+    char *end;
+    double threshold = strtod(orcOptionValues[4], &end);
+    if (end == orcOptionValues[4] || *end != '\0' || threshold < 0 || threshold > 1)
+      ereport(ERROR,
+        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+         errmsg("\'dicthreshold\' must be within [0-1]"),
+             errOmitLocation(true)));
+    StringInfoData option;
+    initStringInfo(&option);
+    if (compresstype != NULL)
+      appendStringInfo(&option, "%s,",compresstype);
+    appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
+                     orcOptionValues[1]);
+    compresstype = pstrdup(option.data);
+  }
+
+  /* orc compressblocksize */
+  if (orcOptionValues[5] != NULL)
+  {
+    compressblocksize = pg_atoi(orcOptionValues[5], sizeof(int32), 0);
+    if ((compressblocksize < MIN_ORC_COMPRESS_BLOCK_SIZE) || (compressblocksize > MAX_ORC_COMPRESS_BLOCK_SIZE))
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("compressblock size for orc table should between 1B and 1GB and should be specified in Bytes. "
+                 "Got %d Bytes", compressblocksize), errOmitLocation(true)));
+
+      compressblocksize = DEFAULT_ORC_COMPRESS_BLOCK_SIZE;
+    }
+  }
+
+  /* orc rowgroupsize */
+  if (orcOptionValues[6] != NULL)
+  {
+    rowindexstride = pg_atoi(orcOptionValues[6], sizeof(int32), 0);
+
+    if ((rowindexstride < MIN_ORC_ROW_GROUP_SIZE) || (rowindexstride > MAX_ORC_ROW_GROUP_SIZE))
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("row group size for orc table should between 1000 and 1024*1024*1024. "
+                 "Got %d", rowindexstride), errOmitLocation(true)));
+
+      rowindexstride = DEFAULT_ORC_ROW_GROUP_SIZE;
+    }
+  }
+
+  /* orc stripesize */
+  if (orcOptionValues[7] != NULL)
+  {
+    stripesize = pg_atoi(orcOptionValues[7], sizeof(int32), 0);
+
+    if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE))
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. "
+                 "Got %d MB", stripesize), errOmitLocation(true)));
+
+      stripesize = DEFAULT_ORC_STRIPE_SIZE;
+    }
+  }
+
+  /* orc bloomfilter */
+  if (orcOptionValues[8] != NULL)
+  {
+    StringInfoData option;
+    initStringInfo(&option);
+    appendStringInfo(&option, orcOptionValues[8]);
+    bloomfilter = pstrdup(option.data);
+  }
+
+  /* orc bucket_num */
+  if (orcOptionValues[9] != NULL)
+  {
+    bucket_num= pg_atoi(orcOptionValues[9], sizeof(int32), 0);
+    if(bucket_num <= 0)
+    {
+      if (validate)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+             errmsg("bucket number should be greater than 0. "
+                 "Got %d", bucket_num), errOmitLocation(true)));
+
+      bucket_num = 0;
+    }
+  }
+  result->compressblocksize = compressblocksize;
+  result->stripesize = stripesize;
+  result->rowindexstride = rowindexstride;
+  if (compresstype != NULL)
+    for (j = 0;j < strlen(compresstype); j++)
+      compresstype[j] = pg_tolower(compresstype[j]);
+  result->compresstype = compresstype;
+  result->bloomfilter = bloomfilter;
+}
 
 /*
  * Parse reloptions for anything using StdRdOptions
@@ -323,6 +492,8 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 		"dicthreshold",
 		"bloomfilter",
 		"stripesize",
+		"rowindexstride",
+		"compressblocksize",
 	};
 
 	char	   *values[ARRAY_SIZE(default_keywords)];
@@ -330,7 +501,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 	int32		blocksize = DEFAULT_APPENDONLY_BLOCK_SIZE;
 	int32		pagesize = DEFAULT_PARQUET_PAGE_SIZE;
 	int32		rowgroupsize = DEFAULT_PARQUET_ROWGROUP_SIZE;
-	int32   stripesize = DEFAULT_ORC_STRIPE_SIZE;
 	bool		appendonly = false;
 	bool		checksum = false;
 	char*		compresstype = NULL;
@@ -542,28 +712,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 						 errmsg("non-parquet table doesn't support compress type: \'%s\'", compresstype),
 						 errOmitLocation(true)));
 		}
-
-		if ((columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "snappy") != 0)
-		        && (strcmp(compresstype, "lz4") != 0)
-		        // XXX(changyong): The default zlib compression level of ORC table is Z_DEFAULT_COMPRESSION,
-		        // and this is different from hive of which default compression level is (Z_BEST_SPEED + 1).
-		        && (strcmp(compresstype, "zlib") != 0)
-		        && (strcmp(compresstype, "zstd") != 0)
-		        && (strcmp(compresstype, "none") != 0))
-    {
-      ereport(ERROR,
-            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-             errmsg("orc table doesn't support compress type: \'%s\'", compresstype),
-             errOmitLocation(true)));
-    }
-
-		if (!(columnstore == RELSTORAGE_ORC) && (strcmp(compresstype, "lz4") == 0))
-    {
-      ereport(ERROR,
-            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-             errmsg("non-orc table doesn't support compress type: \'%s\'", compresstype),
-             errOmitLocation(true)));
-    }
 	}
 
 	/* compression level */
@@ -644,14 +792,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 		compresslevel = setDefaultCompressionLevel(compresstype);
 	}
 
-	if (columnstore == RELSTORAGE_ORC && compresstype) {
-    StringInfoData option;
-    initStringInfo(&option);
-    appendStringInfo(&option, "\"compresstype\":\"%s\"",
-                           compresstype);
-    compresstype = pstrdup(option.data);
-  }
-
 	/* checksum */
 	if (values[7] != NULL)
 	{
@@ -811,75 +951,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 					 errOmitLocation(true)));
 	}
 
-  /* stripesize */
-  if (values[13] != NULL)
-  {
-    if(!(columnstore == RELSTORAGE_ORC)){
-      ereport(ERROR,
-          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-           errmsg("invalid option \'stripesize\' for non-orc table"),
-           errOmitLocation(true)));
-    }
-
-    stripesize = pg_atoi(values[13], sizeof(int32), 0);
-
-    if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE))
-    {
-      if (validate)
-        ereport(ERROR,
-            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-             errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. "
-                 "Got %d MB", stripesize), errOmitLocation(true)));
-
-      stripesize = DEFAULT_ORC_STRIPE_SIZE;
-    }
-  }
-
-	// dicthreshold
-	if (values[11] != NULL) {
-	  if(!(columnstore == RELSTORAGE_ORC)){
-      ereport(ERROR,
-          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-           errmsg("invalid option \'dicthreshold\' for non-orc table"),
-           errOmitLocation(true)));
-    }
-	  char *end;
-	  double threshold = strtod(values[11], &end);
-	  if (end == values[11] || *end != '\0' || threshold < 0 || threshold > 1)
-	    ereport(ERROR,
-        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-         errmsg("\'dicthreshold\' must be within [0-1]"),
-             errOmitLocation(true)));
-	  StringInfoData option;
-	  initStringInfo(&option);
-	  if (compresstype != NULL)
-	    appendStringInfo(&option, "%s,",compresstype);
-	  appendStringInfo(&option, "\"dicthreshold\": \"%s\"",
-	                   values[11]);
-	  compresstype = pstrdup(option.data);
-	}
-
-	// bloomfilter
-	if (values[12] != NULL) {
-    if(!(columnstore == RELSTORAGE_ORC)){
-      ereport(ERROR,
-          (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-           errmsg("invalid option \'bloomfilter\' for non-orc table"),
-           errOmitLocation(true)));
-    }
-    ereport(ERROR,
-        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-         errmsg("option \'bloomfilter\' for orc table not supported yet"),
-         errOmitLocation(true)));
-    StringInfoData option;
-    initStringInfo(&option);
-    if (compresstype != NULL)
-      appendStringInfo(&option, "%s",compresstype);
-    appendStringInfo(&option, ",\"bloomfilter\": \"%s\"",
-                     values[12]);
-    compresstype = pstrdup(option.data);
-  }
-
 	result = (StdRdOptions *) palloc(sizeof(StdRdOptions));
 	SET_VARSIZE(result, sizeof(StdRdOptions));
 
@@ -888,7 +959,6 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 	result->blocksize = blocksize;
 	result->pagesize = pagesize;
 	result->rowgroupsize = rowgroupsize;
-	result->stripesize = stripesize;
 	result->compresslevel = compresslevel;
 	if (compresstype != NULL)
 		for (j = 0;j < strlen(compresstype); j++)
@@ -900,6 +970,12 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
 	result->errorTable = errorTable;
 	result->bucket_num = bucket_num;
 
+	// extra parse and check for ORC format
+	if (columnstore == RELSTORAGE_ORC)
+	{
+		checkOrcOptions(reloptions, validate, result);
+	}
+
 	return (bytea *) result;
 }
 
diff --git a/src/backend/access/orc/orcam.c b/src/backend/access/orc/orcam.c
index f226844..f7260c3 100644
--- a/src/backend/access/orc/orcam.c
+++ b/src/backend/access/orc/orcam.c
@@ -279,7 +279,6 @@ static int32 GetSplitCount(List *fileSplits, Oid idxId) {
   return ret;
 }
 
->>>>>>> 7910d663d... step2
 OrcInsertDescData *orcBeginInsert(Relation rel,
                                   ResultRelSegFileInfo *segfileinfo) {
   OrcInsertDescData *insertDesc =
@@ -299,17 +298,7 @@ OrcInsertDescData *orcBeginInsert(Relation rel,
   AppendOnlyEntry *aoentry =
       GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
   StringInfoData option;
-  initStringInfo(&option);
-  appendStringInfoChar(&option, '{');
-  appendStringInfo(&option, "\"logicEof\": %" PRId64, segfileinfo->eof[0]);
-  appendStringInfo(&option, ", \"uncompressedEof\": %" PRId64,
-                   segfileinfo->uncompressed_eof[0]);
-  appendStringInfo(
-      &option, ", \"stripeSize\": %" PRId64,
-      ((StdRdOptions *)(rel->rd_options))->stripesize * 1024 * 1024);
-  if (aoentry->compresstype)
-    appendStringInfo(&option, ", %s", aoentry->compresstype);
-  appendStringInfoChar(&option, '}');
+  constructOrcFormatOptionString(&option, rel, segfileinfo, aoentry);
 
   insertDesc->orcFormatData = palloc0(sizeof(OrcFormatData));
   insertDesc->orcFormatData->fmt =
@@ -929,11 +918,7 @@ OrcDeleteDescData *orcBeginDelete(Relation rel, List *fileSplits,
   AppendOnlyEntry *aoentry =
       GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
   StringInfoData option;
-  initStringInfo(&option);
-  appendStringInfoChar(&option, '{');
-  if (aoentry->compresstype)
-    appendStringInfo(&option, "%s", aoentry->compresstype);
-  appendStringInfoChar(&option, '}');
+  constructOrcFormatOptionString(&option, rel, NULL, aoentry);
 
   int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
   char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
@@ -1047,11 +1032,7 @@ OrcUpdateDescData *orcBeginUpdate(Relation rel, List *fileSplits,
   AppendOnlyEntry *aoentry =
       GetAppendOnlyEntry(RelationGetRelid(rel), SnapshotNow);
   StringInfoData option;
-  initStringInfo(&option);
-  appendStringInfoChar(&option, '{');
-  if (aoentry->compresstype)
-    appendStringInfo(&option, "%s", aoentry->compresstype);
-  appendStringInfoChar(&option, '}');
+  constructOrcFormatOptionString(&option, rel, NULL, aoentry);
 
   int hdfsPathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
   char *hdfsPath = (char *)palloc0(hdfsPathMaxLen);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index a9f62ac..6c69dd7 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -789,9 +789,46 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
 		case RELKIND_AOSEGMENTS:
 		case RELKIND_AOBLOCKDIR:
 		case RELKIND_UNCATALOGED:
+		{
+			// check for bloom filter here because
+			// we could not get tupledesc in default_reloptions.
+			const char *const keywords[] =
+			{ "bloomfilter" };
+			const int32_t keywords_size = 1;
+			char *values[keywords_size];
+			char *bloomfilter = NULL;
+			char *key = "bloomfilter";
+
+			parseRelOptions(datum, keywords_size, keywords, values, false);
+			if (values[0] != NULL)
+			{
+				TupleDesc tup_desc = relation->rd_att;
+				int attnum = tup_desc->natts;
+				char **attribute_names = palloc0(attnum * sizeof(char*));
+				for (int i = 0; i < attnum; ++i)
+				{
+					int name_len =
+							strlen(
+									((Form_pg_attribute) (tup_desc->attrs[i]))->attname.data);
+					char *attribute = palloc0(name_len + 1);
+					strncpy(attribute, ((Form_pg_attribute )
+					(tup_desc->attrs[i]))->attname.data, name_len);
+					attribute_names[i] = attribute;
+				}
+				char *dup_val = pstrdup(values[0]);
+				char *token = strtok(dup_val, ",");
+				while (token)
+				{
+					checkPlugStorageFormatOption(&bloomfilter, key, token, true,
+							attnum, attribute_names);
+					bloomfilter = NULL;
+					token = strtok(NULL, ",");
+				}
+			}
 			options = heap_reloptions(relation->rd_rel->relkind, datum,
 									  false);
 			break;
+		}
 		case RELKIND_INDEX:
 			options = index_reloptions(relation->rd_am->amoptions, datum,
 									   false);
diff --git a/src/include/access/orcam.h b/src/include/access/orcam.h
index 47d24c2..871fc05 100644
--- a/src/include/access/orcam.h
+++ b/src/include/access/orcam.h
@@ -26,6 +26,13 @@
 #include "cdb/cdbquerycontextdispatching.h"
 #include "nodes/relation.h"
 
+#define DEFAULT_ORC_ROW_GROUP_SIZE 65536
+#define MIN_ORC_ROW_GROUP_SIZE 1000
+#define MAX_ORC_ROW_GROUP_SIZE 1024 * 1024 * 1024
+// here we use orc block size in Bytes
+#define DEFAULT_ORC_COMPRESS_BLOCK_SIZE 256 * 1024
+#define MIN_ORC_COMPRESS_BLOCK_SIZE 1
+#define MAX_ORC_COMPRESS_BLOCK_SIZE 1024 * 1024 * 1024
 // here we use orc stripe size in MBytes
 #define DEFAULT_ORC_STRIPE_SIZE 64
 #define MIN_ORC_STRIPE_SIZE 1
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 3fbeee3..2fbd2ac 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -285,7 +285,6 @@ typedef struct StdRdOptions
 	int			blocksize;		/* max varblock size (AO rels only) */
 	int			pagesize;		/* page size(Parquet rels only) */
 	int			rowgroupsize;	/* row group size (Parquet rels only)*/
-	int     stripesize;  /* stripe size (ORC rels only) */
 	int			compresslevel;  /* compression level (AO rels only) */
 	char*		compresstype;   /* compression type (AO rels only) */
 	bool		checksum;		/* checksum (AO rels only) */
@@ -293,6 +292,10 @@ typedef struct StdRdOptions
 	bool		forceHeap;		/* specified appendonly=false */
 	bool		errorTable;		/* skip GOH tablespace checking. */
 	int 		bucket_num;		/* default init segment num for random/hash/external table */
+	char*		bloomfilter;	/* columns using bloomfilter (ORC rels only) */
+	int 		stripesize; 	/* stripe size (ORC rels only) */
+	int 		rowindexstride;	/* row index stride (ORC rels only) */
+	int 		compressblocksize;  /* compressblocksize in native orc, different from blocksize (ORC rels only) */
 } StdRdOptions;
 
 #define HEAP_MIN_FILLFACTOR			10