You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by xu...@apache.org on 2016/11/22 08:53:03 UTC
incubator-hawq git commit: HAWQ-1145. Add UDF
gp_relfile_insert_for_register and add insert metadata into gp_relfile_node
and gp_persistent_relfile_node for HAWQ register
Repository: incubator-hawq
Updated Branches:
refs/heads/master 68570a720 -> 787c096d4
HAWQ-1145. Add UDF gp_relfile_insert_for_register and add insert metadata into gp_relfile_node and gp_persistent_relfile_node for HAWQ register
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/787c096d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/787c096d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/787c096d
Branch: refs/heads/master
Commit: 787c096d43e4b27a20cc484e422f53ac81093d56
Parents: 68570a7
Author: Chunling Wang <wa...@126.com>
Authored: Fri Nov 18 15:09:24 2016 +0800
Committer: xunzhang <xu...@gmail.com>
Committed: Tue Nov 22 16:52:50 2016 +0800
----------------------------------------------------------------------
depends/libhdfs3/src/client/Hdfs.cpp | 4 +-
src/backend/cdb/cdbpersistentbuild.c | 117 +++++++++++++++++++
src/include/catalog/pg_proc.h | 4 +
src/include/catalog/pg_proc.sql | 2 +
src/include/utils/builtins.h | 1 +
.../test_hawq_register_partition.cpp | 7 ++
.../data/upgrade41/upg2_catupgrade_41.sql.in | 18 +++
tools/bin/hawqregister | 45 +++++++
8 files changed, 196 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/depends/libhdfs3/src/client/Hdfs.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Hdfs.cpp b/depends/libhdfs3/src/client/Hdfs.cpp
index 9906a87..395f4f8 100644
--- a/depends/libhdfs3/src/client/Hdfs.cpp
+++ b/depends/libhdfs3/src/client/Hdfs.cpp
@@ -646,9 +646,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char * path, int flags, int bufferSize,
if ((flags & O_CREAT) || (flags & O_APPEND) || (flags & O_WRONLY)) {
int internalFlags = 0;
- if ((flags & O_CREAT) && (flags & O_EXCL)) {
+ if (flags & O_CREAT) {
internalFlags |= Hdfs::Create;
- } else if ((flags & O_CREAT) || ((flags & O_APPEND) && (flags & O_WRONLY))) {
+ } else if ((flags & O_APPEND) && (flags & O_WRONLY)) {
internalFlags |= Hdfs::Create;
internalFlags |= Hdfs::Append;
} else if (flags & O_WRONLY) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/backend/cdb/cdbpersistentbuild.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbpersistentbuild.c b/src/backend/cdb/cdbpersistentbuild.c
index 3fea2bb..17dba4b 100644
--- a/src/backend/cdb/cdbpersistentbuild.c
+++ b/src/backend/cdb/cdbpersistentbuild.c
@@ -831,6 +831,123 @@ gp_persistent_build_all(PG_FUNCTION_ARGS)
PG_RETURN_INT32(1);
}
+/*
+ * gp_relfile_insert_for_register
+ *
+ * This function is an internal function, for hawq register to insert metadata
+ * into gp_relfile_node and gp_persistent_relfile_node.
+ */
+Datum
+gp_relfile_insert_for_register(PG_FUNCTION_ARGS)
+{
+ Oid tablespace = PG_GETARG_OID(0);
+ Oid database = PG_GETARG_OID(1);
+ Oid relation = PG_GETARG_OID(2);
+ Oid relfilenode = PG_GETARG_OID(3);
+ Oid segfile = PG_GETARG_OID(4);
+ char *relname = PG_GETARG_CSTRING(5);
+ char relkind = PG_GETARG_CHAR(6);
+ char relstorage = PG_GETARG_CHAR(7);
+ Oid relam = PG_GETARG_OID(8);
+
+ Relation gp_relfile_node;
+
+ RelFileNode relFileNode;
+
+ PersistentFileSysRelStorageMgr relStorageMgr;
+
+ ItemPointerData persistentTid;
+ int64 persistentSerialNum;
+
+ /* Fetch a copy of the tuple to scribble on */
+ HeapTuple tuple = SearchSysCacheCopy(DATABASEOID,
+ ObjectIdGetDatum(database),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "could not find tuple for database %u", database);
+ Form_pg_database form_pg_database = (Form_pg_database) GETSTRUCT(tuple);
+
+ Oid defaultTablespace = form_pg_database->dattablespace;
+ gp_relfile_node =
+ DirectOpen_GpRelfileNodeOpen(
+ defaultTablespace,
+ database);
+
+ relFileNode.spcNode = tablespace;
+ relFileNode.dbNode = database;
+ relFileNode.relNode = relfilenode;
+
+ relStorageMgr = (
+ (relstorage == RELSTORAGE_AOROWS ||
+ relstorage == RELSTORAGE_PARQUET) ?
+ PersistentFileSysRelStorageMgr_AppendOnly :
+ PersistentFileSysRelStorageMgr_BufferPool);
+
+ gp_before_persistence_work = true;
+
+ if (relStorageMgr == PersistentFileSysRelStorageMgr_BufferPool) {
+ PersistentFileSysRelStorageMgr localRelStorageMgr;
+ PersistentFileSysRelBufpoolKind relBufpoolKind;
+
+ GpPersistentRelfileNode_GetRelfileInfo(
+ relkind,
+ relstorage,
+ relam,
+ &localRelStorageMgr,
+ &relBufpoolKind);
+ Assert(localRelStorageMgr == PersistentFileSysRelStorageMgr_BufferPool);
+
+ /*
+ * Heap tables only ever add a single segment_file_num=0
+ * entry to gp_persistent_relation regardless of how many
+ * segment files there really are.
+ */
+ PersistentRelfile_AddCreated(
+ &relFileNode,
+ /* segmentFileNum */ 0,
+ relStorageMgr,
+ relBufpoolKind,
+ relname,
+ &persistentTid,
+ &persistentSerialNum,
+ /* flushToXLog */ false);
+
+ InsertGpRelfileNodeTuple(
+ gp_relfile_node,
+ relation, //pg_class OID
+ relname,
+ relFileNode.relNode, //pg_class relfilenode
+ /* segmentFileNum */ 0,
+ /* updateIndex */ true,
+ &persistentTid,
+ persistentSerialNum);
+ } else {
+ PersistentRelfile_AddCreated(
+ &relFileNode,
+ segfile,
+ relStorageMgr,
+ PersistentFileSysRelBufpoolKind_None,
+ relname,
+ &persistentTid,
+ &persistentSerialNum,
+ /* flushToXLog */ false);
+
+ InsertGpRelfileNodeTuple(
+ gp_relfile_node,
+ relation, //pg_class OID
+ relname,
+ relFileNode.relNode, //pg_class relfilenode
+ segfile,
+ /* updateIndex */ true,
+ &persistentTid,
+ persistentSerialNum);
+ }
+
+ gp_before_persistence_work = false;
+
+ PG_RETURN_INT32(1);
+}
+
static void
PersistentBuild_FindGpRelationNodeIndex(
Oid database,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 5393282..0b1da34 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -9924,6 +9924,10 @@ DESCR("Remove an entry specified by TID from a persistent table for the current
DATA(insert OID = 7182 ( gp_persistent_set_relation_bufpool_kind_all PGNSP PGUID 12 f f f f v 0 23 f "" _null_ _null_ _null_ gp_persistent_set_relation_bufpool_kind_all - _null_ n ));
DESCR("Populate the gp_persistent_relation_node table's relation_bufpool_kind column for the whole database instance for upgrade from 4.0 to 4.1");
+/* gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, cstring, char, char, Oid) => int4 */
+DATA(insert OID = 7185 ( gp_relfile_insert_for_register PGNSP PGUID 12 f f f f v 9 23 f "26 26 26 26 26 2275 18 18 26" _null_ _null_ _null_ gp_relfile_insert_for_register - _null_ n ));
+DESCR("insert record into gp_persistent_relfile_node and gp_relfile_node");
+
/* GIN array support */
/* ginarrayextract(anyarray, internal) => internal */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.sql
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql
index 50985d4..1d79f36 100644
--- a/src/include/catalog/pg_proc.sql
+++ b/src/include/catalog/pg_proc.sql
@@ -5243,6 +5243,8 @@
CREATE FUNCTION gp_persistent_set_relation_bufpool_kind_all() RETURNS int4 LANGUAGE internal VOLATILE AS 'gp_persistent_set_relation_bufpool_kind_all' WITH (OID=7182, DESCRIPTION="Populate the gp_persistent_relation_node table's relation_bufpool_kind column for the whole database instance for upgrade from 4.0 to 4.1");
+ CREATE FUNCTION gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, cstring, char, char, Oid) RETURNS int4 LANGUAGE internal VOLATILE AS 'gp_persistent_relnode_insert' WITH (OID=7179, DESCRIPTION="insert record into gp_relfile_insert_for_register and gp_relfile_node");
+
-- GIN array support
CREATE FUNCTION ginarrayextract(anyarray, internal) RETURNS internal LANGUAGE internal IMMUTABLE STRICT AS 'ginarrayextract' WITH (OID=2743, DESCRIPTION="GIN array support");
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/utils/builtins.h
----------------------------------------------------------------------
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 0b7e30d..64b251b 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1106,6 +1106,7 @@ Datum gp_persistent_build_all(PG_FUNCTION_ARGS);
Datum gp_persistent_reset_all(PG_FUNCTION_ARGS);
Datum gp_persistent_repair_delete(PG_FUNCTION_ARGS);
Datum gp_persistent_set_relation_bufpool_kind_all(PG_FUNCTION_ARGS);
+Datum gp_relfile_insert_for_register(PG_FUNCTION_ARGS);
/* utils/error/elog.c */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
index bb5e970..196f2c3 100644
--- a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
@@ -51,6 +51,8 @@ void TestHawqRegister::runYamlCaseTableExistsPartition(std::string casename, std
util.query("select * from nt;", 100);
} else {
util.query("select * from nt;", checknum);
+ util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;");
+ util.query("select * from nt;", checknum + 100);
}
EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str())));
@@ -91,6 +93,8 @@ void TestHawqRegister::runYamlCaseTableNotExistsPartition(std::string casename,
util.query("select * from pg_class where relname = 'nt';", 0);
} else {
util.query("select * from nt;", checknum);
+ util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;");
+ util.query("select * from nt;", checknum + 100);
}
EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str())));
@@ -164,6 +168,9 @@ void TestHawqRegister::runYamlCaseForceModePartition(std::string casename, std::
EXPECT_EQ(result1_2_1 + result1_2_2, result2_2);
EXPECT_EQ(result1_3_1 + result1_3_2, result2_3);
+ util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;");
+ util.query("select * from nt;", checknum + 100);
+
EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str())));
util.execute("drop table t;");
util.execute("drop table nt;");
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
----------------------------------------------------------------------
diff --git a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
index eb40c01..13d61db 100644
--- a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
+++ b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
@@ -61,6 +61,24 @@ is 'Populate the gp_persistent_relation_node table''s relation_bufpool_kind colu
select gp_persistent_set_relation_bufpool_kind_all();
select gp_persistent_set_relation_bufpool_kind_all() from gp_dist_random('gp_version_at_initdb');
+create function @gpupgradeschemaname@.gp_relfile_insert_for_register(
+ @gpupgradeschemaname@.bool)
+returns @gpupgradeschemaname@.int4
+language internal AS 'gp_relfile_insert_for_register' volatile
+with (oid = 7183);
+select @gpupgradeschemaname@.addProcPIN(7183);
+comment on function @gpupgradeschemaname@.gp_relfile_insert_for_register(
+ @gpupgradeschemaname@.oid,
+ @gpupgradeschemaname@.oid,
+ @gpupgradeschemaname@.oid,
+ @gpupgradeschemaname@.oid,
+ @gpupgradeschemaname@.oid,
+ @gpupgradeschemaname@.cstring,
+ @gpupgradeschemaname@.char,
+ @gpupgradeschemaname@.char,
+ @gpupgradeschemaname@.oid)
+is 'insert record in gp_persistent_relfile_node and gp_relfile_node';
+
update @gpupgradeschemaname@.pg_class set relhaspkey = true where oid = 5094;
select catDML('update @gpupgradeschemaname@.pg_class set relhaspkey = true where oid = 5094') from gp_dist_random('gp_version_at_initdb');
--
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 5f1c47c..4adbdcc 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -348,6 +348,18 @@ class GpRegisterAccessor(object):
query = "select pg_encoding_to_char(%s);" % encoding_indx
return self.exec_query(query)[0]['pg_encoding_to_char']
+ def get_metadata_for_relfile_insert(self, database, tablename):
+ _, tablename = tablename_handler(tablename)
+ query = "select reltablespace from pg_class where relname = '%s';" % tablename
+ tablespace_oid = int(self.exec_query(query)[0]['reltablespace'])
+ if tablespace_oid == 0:
+ query = "select oid from pg_tablespace where spcname='dfs_default';"
+ tablespace_oid = int(self.exec_query(query)[0]['oid'])
+ query = "select oid from pg_database where datname='%s';" % database
+ database_oid = int(self.exec_query(query)[0]['oid'])
+ query = "select oid, relfilenode, relname, relkind, relstorage, relam from pg_class where relname='%s';" % tablename
+ return [tablespace_oid, database_oid, int(self.exec_query(query)[0]['oid']), int(self.exec_query(query)[0]['relfilenode']), str(self.exec_query(query)[0]['relname']), str(self.exec_query(query)[0]['relkind']), str(self.exec_query(query)[0]['relstorage']), int(self.exec_query(query)[0]['relam'])]
+
def update_catalog(self, query):
self.conn.query(query)
@@ -430,6 +442,9 @@ class HawqRegister(object):
self.failure_handler.rollback()
sys.exit(1)
+ def _get_metadata_for_relfile_insert(self):
+ return self.accessor.get_metadata_for_relfile_insert(self.database, self.tablename)
+
def _set_yml_data(self, file_format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, file_locations,\
bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\
partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, \
@@ -885,7 +900,9 @@ class HawqRegister(object):
same_path_varblockcounts = self.varblockcounts_same_path
update_segno_lst = [f.split('/')[-1] for f in self.files_update]
same_path_segno_lst = [seg for seg in self.segnos_same_path]
+ relfile_data = self._get_metadata_for_relfile_insert()
query = ""
+ insert_relfile_segs = []
if mode == 'force':
query += "delete from pg_aoseg.%s;" % (self.seg_name)
@@ -900,17 +917,21 @@ class HawqRegister(object):
query += ';'
if len(same_path_eofs) > 0:
query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_eofuncompresseds[0])
+ insert_relfile_segs.append(int(same_path_segno_lst[0]));
k = 0
for same_path_eof, same_path_tupcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_eofuncompresseds[1:]):
query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_eofuncompressed)
+ insert_relfile_segs.append(int(same_path_segno_lst[k + 1]));
k += 1
query += ';'
segno += len(same_path_eofs)
if len(append_eofs) > 0:
query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_eofuncompresseds[0])
+ insert_relfile_segs.append(segno);
k = 0
for append_eof, append_tupcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_eofuncompresseds[1:]):
query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_eofuncompressed)
+ insert_relfile_segs.append(segno + k + 1);
k += 1
query += ';'
else:
@@ -923,20 +944,27 @@ class HawqRegister(object):
query += ';'
if len(same_path_eofs) > 0:
query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_varblockcounts[0], same_path_eofuncompresseds[0])
+ insert_relfile_segs.append(int(same_path_segno_lst[0]));
k = 0
for same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_varblockcounts[1:], same_path_eofuncompresseds[1:]):
query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed)
+ insert_relfile_segs.append(int(same_path_segno_lst[k + 1]));
k += 1
query += ';'
segno += len(same_path_eofs)
if len(append_eofs) > 0:
query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_varblockcounts[0], append_eofuncompresseds[0])
+ insert_relfile_segs.append(segno);
k = 0
for append_eof, append_tupcount, append_varblockcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_varblockcounts[1:], append_eofuncompresseds[1:]):
query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_varblockcount, append_eofuncompressed)
+ insert_relfile_segs.append(segno + k + 1);
k += 1
query += ';'
self.queries += query
+ for seg in insert_relfile_segs:
+ self.queries += "select gp_relfile_insert_for_register(%d, %d, %d, %d, %d, '%s', '%s', '%s', %d);" % (relfile_data[0], relfile_data[1], relfile_data[2], relfile_data[3], seg, relfile_data[4], relfile_data[5], relfile_data[6], relfile_data[7])
+
def _modify_metadata(self):
try:
@@ -1016,6 +1044,15 @@ class HawqRegisterPartition(HawqRegister):
logger.error('Multi-level partition table is not supported!')
sys.exit(1)
+ parent_tablename = self.tablename
+ parent_files = self.files
+ parent_sizes = self.sizes
+ parent_tupcounts = self.tupcounts
+ parent_eofuncompresseds = self.eofuncompresseds
+ parent_varblockcounts = self.varblockcounts
+ if self.yml:
+ self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
+ self._check_file_not_folder()
for k, pn in enumerate(self.partitions_name):
self.tablename = pn
self.files = self.partitions_filepaths[k]
@@ -1034,6 +1071,14 @@ class HawqRegisterPartition(HawqRegister):
self.queries = "set allow_system_table_mods='dml';"
self.queries += "begin transaction;"
self._check_duplicate_constraint()
+ self.tablename = parent_tablename
+ self.files = parent_files
+ self.sizes = parent_sizes
+ self.tupcounts = parent_tupcounts
+ self.eofuncompresseds = parent_eofuncompresseds
+ self.varblockcounts = parent_varblockcounts
+ self._do_check()
+ self._prepare_register()
schemaname, _ = tablename_handler(self.dst_table_name)
for k, pn in enumerate(self.partitions_name):
self.constraint = self.partitions_constraint[k]