You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by xu...@apache.org on 2016/11/22 08:53:03 UTC

incubator-hawq git commit: HAWQ-1145. Add UDF gp_relfile_insert_for_register and add insert metadata into gp_relfile_node and gp_persistent_relfile_node for HAWQ register

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 68570a720 -> 787c096d4


HAWQ-1145. Add UDF gp_relfile_insert_for_register and add insert metadata into gp_relfile_node and gp_persistent_relfile_node for HAWQ register


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/787c096d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/787c096d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/787c096d

Branch: refs/heads/master
Commit: 787c096d43e4b27a20cc484e422f53ac81093d56
Parents: 68570a7
Author: Chunling Wang <wa...@126.com>
Authored: Fri Nov 18 15:09:24 2016 +0800
Committer: xunzhang <xu...@gmail.com>
Committed: Tue Nov 22 16:52:50 2016 +0800

----------------------------------------------------------------------
 depends/libhdfs3/src/client/Hdfs.cpp            |   4 +-
 src/backend/cdb/cdbpersistentbuild.c            | 117 +++++++++++++++++++
 src/include/catalog/pg_proc.h                   |   4 +
 src/include/catalog/pg_proc.sql                 |   2 +
 src/include/utils/builtins.h                    |   1 +
 .../test_hawq_register_partition.cpp            |   7 ++
 .../data/upgrade41/upg2_catupgrade_41.sql.in    |  18 +++
 tools/bin/hawqregister                          |  45 +++++++
 8 files changed, 196 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/depends/libhdfs3/src/client/Hdfs.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Hdfs.cpp b/depends/libhdfs3/src/client/Hdfs.cpp
index 9906a87..395f4f8 100644
--- a/depends/libhdfs3/src/client/Hdfs.cpp
+++ b/depends/libhdfs3/src/client/Hdfs.cpp
@@ -646,9 +646,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char * path, int flags, int bufferSize,
         if ((flags & O_CREAT) || (flags & O_APPEND) || (flags & O_WRONLY)) {
             int internalFlags = 0;
 
-            if ((flags & O_CREAT) && (flags & O_EXCL)) {
+            if (flags & O_CREAT)  {
                 internalFlags |= Hdfs::Create;
-            } else if ((flags & O_CREAT) || ((flags & O_APPEND) && (flags & O_WRONLY))) {
+            } else if ((flags & O_APPEND) && (flags & O_WRONLY)) {
                 internalFlags |= Hdfs::Create;
                 internalFlags |= Hdfs::Append;
             } else if (flags & O_WRONLY) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/backend/cdb/cdbpersistentbuild.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbpersistentbuild.c b/src/backend/cdb/cdbpersistentbuild.c
index 3fea2bb..17dba4b 100644
--- a/src/backend/cdb/cdbpersistentbuild.c
+++ b/src/backend/cdb/cdbpersistentbuild.c
@@ -831,6 +831,123 @@ gp_persistent_build_all(PG_FUNCTION_ARGS)
 	PG_RETURN_INT32(1);
 }
 
+/*
+ * gp_relfile_insert_for_register
+ * 
+ * This function is an internal function, for hawq register to insert metadata
+ * into gp_relfile_node and gp_persistent_relfile_node.
+ */
+Datum
+gp_relfile_insert_for_register(PG_FUNCTION_ARGS)
+{
+	Oid		tablespace = PG_GETARG_OID(0);
+	Oid		database = PG_GETARG_OID(1);
+	Oid		relation = PG_GETARG_OID(2);
+	Oid		relfilenode = PG_GETARG_OID(3);
+	Oid		segfile = PG_GETARG_OID(4);
+	char           *relname = PG_GETARG_CSTRING(5);
+	char		relkind = PG_GETARG_CHAR(6);
+	char		relstorage = PG_GETARG_CHAR(7);
+	Oid		relam = PG_GETARG_OID(8);
+
+	Relation	gp_relfile_node;
+
+	RelFileNode	relFileNode;
+
+	PersistentFileSysRelStorageMgr relStorageMgr;
+
+	ItemPointerData	persistentTid;
+	int64		persistentSerialNum;
+
+	/* Fetch a copy of the tuple to scribble on */
+	HeapTuple	tuple = SearchSysCacheCopy(DATABASEOID,
+					     ObjectIdGetDatum(database),
+					     0, 0, 0);
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "could not find tuple for database %u", database);
+	Form_pg_database form_pg_database = (Form_pg_database) GETSTRUCT(tuple);
+
+	Oid		defaultTablespace = form_pg_database->dattablespace;
+	gp_relfile_node =
+		DirectOpen_GpRelfileNodeOpen(
+					     defaultTablespace,
+					     database);
+
+	relFileNode.spcNode = tablespace;
+	relFileNode.dbNode = database;
+	relFileNode.relNode = relfilenode;
+
+	relStorageMgr = (
+			 (relstorage == RELSTORAGE_AOROWS ||
+			  relstorage == RELSTORAGE_PARQUET) ?
+			 PersistentFileSysRelStorageMgr_AppendOnly :
+			 PersistentFileSysRelStorageMgr_BufferPool);
+
+	gp_before_persistence_work = true;
+
+	if (relStorageMgr == PersistentFileSysRelStorageMgr_BufferPool) {
+		PersistentFileSysRelStorageMgr localRelStorageMgr;
+		PersistentFileSysRelBufpoolKind relBufpoolKind;
+
+		GpPersistentRelfileNode_GetRelfileInfo(
+						       relkind,
+						       relstorage,
+						       relam,
+						       &localRelStorageMgr,
+						       &relBufpoolKind);
+		Assert(localRelStorageMgr == PersistentFileSysRelStorageMgr_BufferPool);
+
+		/*
+		 * Heap tables only ever add a single segment_file_num=0
+		 * entry to gp_persistent_relation regardless of how many
+		 * segment files there really are.
+		 */
+		PersistentRelfile_AddCreated(
+					     &relFileNode,
+					      /* segmentFileNum */ 0,
+					     relStorageMgr,
+					     relBufpoolKind,
+					     relname,
+					     &persistentTid,
+					     &persistentSerialNum,
+					      /* flushToXLog */ false);
+
+		InsertGpRelfileNodeTuple(
+					 gp_relfile_node,
+					 relation, //pg_class OID
+					 relname,
+				 relFileNode.relNode, //pg_class relfilenode
+					  /* segmentFileNum */ 0,
+					  /* updateIndex */ true,
+					 &persistentTid,
+					 persistentSerialNum);
+	} else {
+		PersistentRelfile_AddCreated(
+					     &relFileNode,
+					     segfile,
+					     relStorageMgr,
+				       PersistentFileSysRelBufpoolKind_None,
+					     relname,
+					     &persistentTid,
+					     &persistentSerialNum,
+					      /* flushToXLog */ false);
+
+		InsertGpRelfileNodeTuple(
+					 gp_relfile_node,
+					 relation, //pg_class OID
+					 relname,
+				 relFileNode.relNode, //pg_class relfilenode
+					 segfile,
+					  /* updateIndex */ true,
+					 &persistentTid,
+					 persistentSerialNum);
+	}
+
+	gp_before_persistence_work = false;
+
+	PG_RETURN_INT32(1);
+}
+
 static void
 PersistentBuild_FindGpRelationNodeIndex(
 	Oid				database,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 5393282..0b1da34 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -9924,6 +9924,10 @@ DESCR("Remove an entry specified by TID from a persistent table for the current
 DATA(insert OID = 7182 ( gp_persistent_set_relation_bufpool_kind_all  PGNSP PGUID 12 f f f f v 0 23 f "" _null_ _null_ _null_ gp_persistent_set_relation_bufpool_kind_all - _null_ n ));
 DESCR("Populate the gp_persistent_relation_node table's relation_bufpool_kind column for the whole database instance for upgrade from 4.0 to 4.1");
 
+/* gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, cstring, char, char, Oid) => int4 */
+DATA(insert OID = 7185 ( gp_relfile_insert_for_register  PGNSP PGUID 12 f f f f v 9 23 f "26 26 26 26 26 2275 18 18 26" _null_ _null_ _null_ gp_relfile_insert_for_register - _null_ n ));
+DESCR("insert record into gp_persistent_relfile_node and gp_relfile_node");
+
 
 /* GIN array support */
 /* ginarrayextract(anyarray, internal) => internal */ 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.sql
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql
index 50985d4..1d79f36 100644
--- a/src/include/catalog/pg_proc.sql
+++ b/src/include/catalog/pg_proc.sql
@@ -5243,6 +5243,8 @@
 
  CREATE FUNCTION gp_persistent_set_relation_bufpool_kind_all() RETURNS int4 LANGUAGE internal VOLATILE AS 'gp_persistent_set_relation_bufpool_kind_all' WITH (OID=7182, DESCRIPTION="Populate the gp_persistent_relation_node table's relation_bufpool_kind column for the whole database instance for upgrade from 4.0 to 4.1");
 
+ CREATE FUNCTION gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, cstring, char, char, Oid) RETURNS int4 LANGUAGE internal VOLATILE AS 'gp_persistent_relnode_insert' WITH (OID=7179, DESCRIPTION="insert record into gp_relfile_insert_for_register and gp_relfile_node");
+
 -- GIN array support
  CREATE FUNCTION ginarrayextract(anyarray, internal) RETURNS internal LANGUAGE internal IMMUTABLE STRICT AS 'ginarrayextract' WITH (OID=2743, DESCRIPTION="GIN array support");
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/utils/builtins.h
----------------------------------------------------------------------
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 0b7e30d..64b251b 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1106,6 +1106,7 @@ Datum gp_persistent_build_all(PG_FUNCTION_ARGS);
 Datum gp_persistent_reset_all(PG_FUNCTION_ARGS);
 Datum gp_persistent_repair_delete(PG_FUNCTION_ARGS);
 Datum gp_persistent_set_relation_bufpool_kind_all(PG_FUNCTION_ARGS);
+Datum gp_relfile_insert_for_register(PG_FUNCTION_ARGS);
 
 
 /* utils/error/elog.c */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
index bb5e970..196f2c3 100644
--- a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
@@ -51,6 +51,8 @@ void TestHawqRegister::runYamlCaseTableExistsPartition(std::string casename, std
         util.query("select * from nt;", 100);
     } else {
         util.query("select * from nt;", checknum);
+        util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;");
+        util.query("select * from nt;", checknum + 100);
     }
     
     EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str())));
@@ -91,6 +93,8 @@ void TestHawqRegister::runYamlCaseTableNotExistsPartition(std::string casename,
         util.query("select * from pg_class where relname = 'nt';", 0);
     } else {
         util.query("select * from nt;", checknum);
+        util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;");
+        util.query("select * from nt;", checknum + 100);
     }
     
     EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str())));
@@ -164,6 +168,9 @@ void TestHawqRegister::runYamlCaseForceModePartition(std::string casename, std::
     EXPECT_EQ(result1_2_1 + result1_2_2, result2_2);
     EXPECT_EQ(result1_3_1 + result1_3_2, result2_3);
 
+    util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;");
+    util.query("select * from nt;", checknum + 100);
+
     EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str())));
     util.execute("drop table t;");
     util.execute("drop table nt;");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
----------------------------------------------------------------------
diff --git a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
index eb40c01..13d61db 100644
--- a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
+++ b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
@@ -61,6 +61,24 @@ is 'Populate the gp_persistent_relation_node table''s relation_bufpool_kind colu
 select gp_persistent_set_relation_bufpool_kind_all();
 select gp_persistent_set_relation_bufpool_kind_all() from gp_dist_random('gp_version_at_initdb');
 
+create function @gpupgradeschemaname@.gp_relfile_insert_for_register(
+     @gpupgradeschemaname@.bool)
+returns @gpupgradeschemaname@.int4
+language internal AS 'gp_relfile_insert_for_register' volatile
+with (oid = 7183);
+select @gpupgradeschemaname@.addProcPIN(7183);
+comment on function @gpupgradeschemaname@.gp_relfile_insert_for_register(
+     @gpupgradeschemaname@.oid,
+     @gpupgradeschemaname@.oid,
+     @gpupgradeschemaname@.oid,
+     @gpupgradeschemaname@.oid,
+     @gpupgradeschemaname@.oid,
+     @gpupgradeschemaname@.cstring,
+     @gpupgradeschemaname@.char,
+     @gpupgradeschemaname@.char,
+     @gpupgradeschemaname@.oid)
+is 'insert record in gp_persistent_relfile_node and gp_relfile_node';
+
 update @gpupgradeschemaname@.pg_class set relhaspkey = true where oid = 5094;
 select catDML('update @gpupgradeschemaname@.pg_class set relhaspkey = true where oid = 5094') from gp_dist_random('gp_version_at_initdb');
 --

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 5f1c47c..4adbdcc 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -348,6 +348,18 @@ class GpRegisterAccessor(object):
         query = "select pg_encoding_to_char(%s);" % encoding_indx
         return self.exec_query(query)[0]['pg_encoding_to_char']
 
+    def get_metadata_for_relfile_insert(self, database, tablename):
+        _, tablename = tablename_handler(tablename)
+        query = "select reltablespace from pg_class where relname = '%s';" % tablename
+        tablespace_oid = int(self.exec_query(query)[0]['reltablespace'])
+        if tablespace_oid == 0:
+            query = "select oid from pg_tablespace where spcname='dfs_default';"
+            tablespace_oid = int(self.exec_query(query)[0]['oid'])
+        query = "select oid from pg_database where datname='%s';" % database
+        database_oid = int(self.exec_query(query)[0]['oid'])
+        query = "select oid, relfilenode, relname, relkind, relstorage, relam from pg_class where relname='%s';" % tablename
+        return [tablespace_oid, database_oid, int(self.exec_query(query)[0]['oid']), int(self.exec_query(query)[0]['relfilenode']), str(self.exec_query(query)[0]['relname']), str(self.exec_query(query)[0]['relkind']), str(self.exec_query(query)[0]['relstorage']), int(self.exec_query(query)[0]['relam'])]
+
     def update_catalog(self, query):
         self.conn.query(query)
 
@@ -430,6 +442,9 @@ class HawqRegister(object):
             self.failure_handler.rollback()
             sys.exit(1)
 
+    def _get_metadata_for_relfile_insert(self):
+        return self.accessor.get_metadata_for_relfile_insert(self.database, self.tablename)
+
     def _set_yml_data(self, file_format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, file_locations,\
                       bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\
                       partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, \
@@ -885,7 +900,9 @@ class HawqRegister(object):
         same_path_varblockcounts = self.varblockcounts_same_path
         update_segno_lst = [f.split('/')[-1] for f in self.files_update]
         same_path_segno_lst = [seg for seg in self.segnos_same_path]
+        relfile_data = self._get_metadata_for_relfile_insert()
         query = ""
+        insert_relfile_segs = []
         
         if mode == 'force':
             query += "delete from pg_aoseg.%s;" % (self.seg_name)
@@ -900,17 +917,21 @@ class HawqRegister(object):
                 query += ';'
             if len(same_path_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_eofuncompresseds[0])
+                insert_relfile_segs.append(int(same_path_segno_lst[0]));
                 k = 0
                 for same_path_eof, same_path_tupcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_eofuncompressed)
+                    insert_relfile_segs.append(int(same_path_segno_lst[k + 1]));
                     k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_eofuncompresseds[0])
+                insert_relfile_segs.append(segno);
                 k = 0
                 for append_eof, append_tupcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_eofuncompressed)
+                    insert_relfile_segs.append(segno + k + 1);
                     k += 1
                 query += ';'
         else:
@@ -923,20 +944,27 @@ class HawqRegister(object):
                 query += ';'
             if len(same_path_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_varblockcounts[0], same_path_eofuncompresseds[0])
+                insert_relfile_segs.append(int(same_path_segno_lst[0]));
                 k = 0
                 for same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_varblockcounts[1:], same_path_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed)
+                    insert_relfile_segs.append(int(same_path_segno_lst[k + 1]));
                     k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_varblockcounts[0], append_eofuncompresseds[0])
+                insert_relfile_segs.append(segno);
                 k = 0
                 for append_eof, append_tupcount, append_varblockcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_varblockcounts[1:], append_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_varblockcount, append_eofuncompressed)
+                    insert_relfile_segs.append(segno + k + 1);
                     k += 1
                 query += ';'
         self.queries += query
+        for seg in insert_relfile_segs:
+            self.queries += "select gp_relfile_insert_for_register(%d, %d, %d, %d, %d, '%s', '%s', '%s', %d);" % (relfile_data[0], relfile_data[1], relfile_data[2], relfile_data[3], seg, relfile_data[4], relfile_data[5], relfile_data[6], relfile_data[7])
+
     
     def _modify_metadata(self):
         try:
@@ -1016,6 +1044,15 @@ class HawqRegisterPartition(HawqRegister):
             logger.error('Multi-level partition table is not supported!')
             sys.exit(1)
 
+        parent_tablename = self.tablename
+        parent_files = self.files
+        parent_sizes = self.sizes
+        parent_tupcounts = self.tupcounts
+        parent_eofuncompresseds = self.eofuncompresseds
+        parent_varblockcounts = self.varblockcounts
+        if self.yml:
+            self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
+            self._check_file_not_folder()
         for k, pn in enumerate(self.partitions_name):
             self.tablename = pn
             self.files = self.partitions_filepaths[k]
@@ -1034,6 +1071,14 @@ class HawqRegisterPartition(HawqRegister):
         self.queries = "set allow_system_table_mods='dml';"
         self.queries += "begin transaction;"
         self._check_duplicate_constraint()
+        self.tablename = parent_tablename
+        self.files = parent_files
+        self.sizes = parent_sizes
+        self.tupcounts = parent_tupcounts
+        self.eofuncompresseds = parent_eofuncompresseds
+        self.varblockcounts = parent_varblockcounts
+        self._do_check()
+        self._prepare_register()
         schemaname, _ = tablename_handler(self.dst_table_name)
         for k, pn in enumerate(self.partitions_name):
             self.constraint = self.partitions_constraint[k]