You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/09/02 02:54:08 UTC

[2/3] incubator-hawq git commit: HAWQ-1025. Refactor hawq register to implement HAWQ-1025: using actual sizes, check files number multiple of bucket number if hash table.

HAWQ-1025. Refactor hawq register to implement HAWQ-1025: using actual sizes, check files number multiple of bucket number if hash table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/af12c6b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/af12c6b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/af12c6b3

Branch: refs/heads/master
Commit: af12c6b3bfb4e676c0d960bd0282b58c1a8221b6
Parents: 2c4f25c
Author: xunzhang <xu...@gmail.com>
Authored: Thu Sep 1 18:12:41 2016 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Fri Sep 2 10:53:27 2016 +0800

----------------------------------------------------------------------
 src/test/feature/ManagementTool/incorrect1.yml  |   1 +
 src/test/feature/ManagementTool/incorrect2.yml  |   1 +
 src/test/feature/ManagementTool/incorrect3.yml  |   1 +
 src/test/feature/ManagementTool/incorrect4.yml  |   1 +
 src/test/feature/ManagementTool/incorrect5.yml  |   1 +
 src/test/feature/ManagementTool/incorrect6.yml  |   1 +
 src/test/feature/ManagementTool/incorrect7.yml  |  29 +++++
 src/test/feature/ManagementTool/incorrect8.yml  |  28 +++++
 .../ManagementTool/test_hawq_register.cpp       |  12 +--
 tools/bin/hawqregister                          | 107 +++++++++++--------
 10 files changed, 128 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect1.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect1.yml b/src/test/feature/ManagementTool/incorrect1.yml
index 8258a90..cfb2983 100755
--- a/src/test/feature/ManagementTool/incorrect1.yml
+++ b/src/test/feature/ManagementTool/incorrect1.yml
@@ -19,3 +19,4 @@ Parquet_Schema:
   type: int4
 TableName: public.hawqregister
 Version: 1.0.0
+Bucketnum: 6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect2.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect2.yml b/src/test/feature/ManagementTool/incorrect2.yml
index 8258a90..cfb2983 100755
--- a/src/test/feature/ManagementTool/incorrect2.yml
+++ b/src/test/feature/ManagementTool/incorrect2.yml
@@ -19,3 +19,4 @@ Parquet_Schema:
   type: int4
 TableName: public.hawqregister
 Version: 1.0.0
+Bucketnum: 6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect3.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect3.yml b/src/test/feature/ManagementTool/incorrect3.yml
index f45b5e5..ff56f4e 100755
--- a/src/test/feature/ManagementTool/incorrect3.yml
+++ b/src/test/feature/ManagementTool/incorrect3.yml
@@ -19,3 +19,4 @@ Parquet_Schema:
   type: int4
 TableName: public.hawqregister
 Version: 1.0.0
+Bucketnum: 6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect4.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect4.yml b/src/test/feature/ManagementTool/incorrect4.yml
index 3b8b921..aec3a7f 100755
--- a/src/test/feature/ManagementTool/incorrect4.yml
+++ b/src/test/feature/ManagementTool/incorrect4.yml
@@ -19,3 +19,4 @@ Parquet_Schema:
 - name: i
 TableName: public.hawqregister
 Version: 1.0.0
+Bucketnum: 6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect5.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect5.yml b/src/test/feature/ManagementTool/incorrect5.yml
index 42a37ae..540744b 100755
--- a/src/test/feature/ManagementTool/incorrect5.yml
+++ b/src/test/feature/ManagementTool/incorrect5.yml
@@ -19,3 +19,4 @@ Encoding: UTF8
 FileFormat: AO
 TableName: public.t1
 Version: 1.0.0
+Bucketnum: 6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect6.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect6.yml b/src/test/feature/ManagementTool/incorrect6.yml
index 0d572dc..f42d0f7 100755
--- a/src/test/feature/ManagementTool/incorrect6.yml
+++ b/src/test/feature/ManagementTool/incorrect6.yml
@@ -20,3 +20,4 @@ Encoding: UTF8
 FileFormat: AO
 TableName: public.t1
 Version: 1.0.0
+Bucketnum: 6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect7.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect7.yml b/src/test/feature/ManagementTool/incorrect7.yml
new file mode 100644
index 0000000..d9a9185
--- /dev/null
+++ b/src/test/feature/ManagementTool/incorrect7.yml
@@ -0,0 +1,29 @@
+AO_FileLocations:
+  Blocksize: 32768
+  Checksum: false
+  CompressionLevel: 0
+  CompressionType: null
+  Files:
+  - path: /hawq_default/16385/16387/17015/1
+    size: 16
+  - path: /hawq_default/16385/16387/17015/2
+    size: 0
+  - path: /hawq_default/16385/16387/17015/3
+    size: 0
+  - path: /hawq_default/16385/16387/17015/4
+    size: 16
+  - path: /hawq_default/16385/16387/17015/5
+    size: 16
+AO_Schema:
+- name: i
+  type: int4
+DBVersion: PostgreSQL 8.2.15 (Greenplum Database 4.2.0 build 1) (HAWQ 2.0.1.0 build
+  dev) on x86_64-apple-darwin15.5.0, compiled by GCC Apple LLVM version 7.3.0 (clang-703.0.31)
+  compiled on Aug 31 2016 11:10:07
+DFS_URL: hdfs://localhost:8020
+Distribution_Policy: DISTRIBUTED BY (i)
+Encoding: UTF8
+FileFormat: AO
+TableName: public.t10
+Version: 1.0.0
+Bucketnum: 6

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/incorrect8.yml
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/incorrect8.yml b/src/test/feature/ManagementTool/incorrect8.yml
new file mode 100644
index 0000000..32d3b0d
--- /dev/null
+++ b/src/test/feature/ManagementTool/incorrect8.yml
@@ -0,0 +1,28 @@
+AO_FileLocations:
+  Blocksize: 32768
+  Checksum: false
+  CompressionLevel: 0
+  CompressionType: null
+  Files:
+  - path: /hawq_default/16385/16387/17015/1
+    size: 16
+  - path: /hawq_default/16385/16387/17015/2
+    size: 0
+  - path: /hawq_default/16385/16387/17015/3
+    size: 0
+  - path: /hawq_default/16385/16387/17015/4
+    size: 16
+  - path: /hawq_default/16385/16387/17015/5
+    size: 16
+AO_Schema:
+- name: i
+  type: int4
+DBVersion: PostgreSQL 8.2.15 (Greenplum Database 4.2.0 build 1) (HAWQ 2.0.1.0 build
+  dev) on x86_64-apple-darwin15.5.0, compiled by GCC Apple LLVM version 7.3.0 (clang-703.0.31)
+  compiled on Aug 31 2016 11:10:07
+DFS_URL: hdfs://localhost:8020
+Distribution_Policy: DISTRIBUTED BY (i)
+Encoding: UTF8
+FileFormat: AO
+TableName: public.t10
+Version: 1.0.0

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/src/test/feature/ManagementTool/test_hawq_register.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register.cpp b/src/test/feature/ManagementTool/test_hawq_register.cpp
index f7f67c0..97489b8 100644
--- a/src/test/feature/ManagementTool/test_hawq_register.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register.cpp
@@ -341,15 +341,11 @@ TEST_F(TestHawqRegister, TestIncorrectYaml) {
   EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "incorrect4.yml xx"));
   EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "incorrect5.yml xx"));
   EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "incorrect6.yml xx"));
+  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "incorrect8.yml xx"));
 }
 
-TEST_F(TestHawqRegister, TestCreateExistedTable) {
+TEST_F(TestHawqRegister, TestDismatchFileNumber) {
   SQLUtility util;
-  util.execute("drop table if exists t10;");
-  util.execute("create table t10(i int) with (appendonly=true, orientation=row) distributed by (i);");
-  util.execute("insert into t10 values(1), (2), (3);");
-  EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB + " -o t10.yml testhawqregister_testcreateexistedtable.t10"));
-  auto tmp = Command::getCommandOutput("hawq register -d " + (string) HAWQ_DB + " -c t10.yml testhawqregister_testcreateexistedtable.t10");
-  auto out = hawq::test::trim(hawq::test::trimNewLine(tmp));
-  EXPECT_EQ(1, hawq::test::endsWith(out, "has already existed."));
+  string filePath = util.getTestRootPath() + "/ManagementTool/";
+  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "incorrect7.yml xx"));
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/af12c6b3/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 6b2c918..91f087a 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -17,8 +17,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] [-f filepath] tablename
-# Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] tablename
+# Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] [-f filepath] [-e eof] <tablename>
+# Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] [--force] [--repair] <tablename>
+
 import os, sys, optparse, getpass, re, urlparse
 try:
     from gppylib.commands.unix import getLocalHostname, getUserName
@@ -44,14 +45,16 @@ def option_parser():
                        usage='usage: %prog [options] table_name',
                        version='%prog version $Revision: #1 $')
     parser.remove_option('-h')
-    parser.add_option('-?', '--help', action='help')
-    parser.add_option('-h', '--host', help='host of the target DB')
-    parser.add_option('-p', '--port', help='port of the target DB', type='int', default=0)
-    parser.add_option('-U', '--user', help='username of the target DB')
+    parser.add_option('-?', '--help', action = 'help')
+    parser.add_option('-h', '--host', help = 'host of the target DB')
+    parser.add_option('-p', '--port', help = 'port of the target DB', type = 'int', default = 0)
+    parser.add_option('-U', '--user', help = 'username of the target DB')
     parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database name')
-    parser.add_option('-f', '--filepath', dest = 'filepath', help='file name in HDFS')
-    parser.add_option('-e', '--eof', dest = 'filesizes', type='list', default = [], help = 'eof list of files to be registered')
-    parser.add_option('-c', '--config', dest = 'yml_config', default = '', help='configuration file in YAML format')
+    parser.add_option('-f', '--filepath', dest = 'filepath', help = 'file name in HDFS')
+    parser.add_option('-e', '--eof', dest = 'filesize', type = 'int', default = 0, help = 'eof of the file to be registered')
+    parser.add_option('-c', '--config', dest = 'yml_config', default = '', help = 'configuration file in YAML format')
+    parser.add_option('--force', action = 'store_true', default = False)
+    parser.add_option('--repair', action = 'store_true', default = False)
     return parser
 
 
@@ -114,22 +117,16 @@ def option_parser_yml(yml_file):
     register_yaml_dict_check(params)
     if params['FileFormat'] == 'Parquet':
         if not len(params['Parquet_FileLocations']['Files']):
-            return 'Parquet', '', params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
-        offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/')
-        filepath = (params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'][:offset]
-                    if len(params['Parquet_FileLocations']['Files']) != 1
-                    else params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'])
-        return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
+            return 'Parquet', [], [], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
+        files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']]
+        return 'Parquet', files, sizes, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
     if not len(params['AO_FileLocations']['Files']):
-        return 'AO', '', params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
-    offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/')
-    filepath = (params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'][:offset]
-                if len(params['AO_FileLocations']['Files']) != 1
-                else params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'])
-    return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
+        return 'AO', [], [], params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
+    files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']], [d['size'] for d in params['AO_FileLocations']['Files']]
+    return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
 
 
-def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations):
+def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
     try:
         query = "select count(*) from pg_class where relname = '%s'" % tablename.split('.')[-1].lower()
         conn = dbconn.connect(dburl, False)
@@ -137,24 +134,26 @@ def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_lo
         conn.commit()
         for row in rows:
             if row[0] != 0:
-                logger.error("Register failed: table %s has already existed." % tablename)
-                sys.exit(1)
+                # TODO
+                pass
     except DatabaseError, ex:
         logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
+
     try:
         schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
         fmt = 'ROW' if fmt == 'AO' else fmt
         if fmt == 'ROW':
-            query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s) %s;'
-                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], distrbution_policy))
+            query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
+                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
         else: # Parquet
-            query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s) %s;'
-                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], distrbution_policy))
+            query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
+                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
         conn.commit()
     except DatabaseError, ex:
+        print DatabaseError, ex
         logger.error('Failed to execute query "%s"' % query)
         sys.exit(1)
 
@@ -222,9 +221,9 @@ def get_metadata_from_database(dburl, tablename, seg_name):
     try:
         # get the full path of correspoding file for target table
         query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
-             "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
-             "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
-             "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
+                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
+                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
+                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
         conn = dbconn.connect(dburl, False)
         rows = dbconn.execSQL(conn, query)
         conn.commit()
@@ -328,56 +327,72 @@ def insert_metadata_into_database(dburl, databasename, tablename, seg_name, firs
     '''Insert the metadata into database'''
     try:
         query = "SET allow_system_table_mods='dml';"
-        segno = firstsegno
-        for eof in eofs:
-            query += "insert into pg_aoseg.%s values(%d, %d, %d, %d);" % (seg_name, segno, eof, -1, -1)
-            segno += 1
+        query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1)
+        for k, eof in enumerate(eofs[1:]):
+            query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
+        query += ';'
         conn = dbconn.connect(dburl, True)
         rows = dbconn.execSQL(conn, query)
         conn.commit()
         conn.close()
     except DatabaseError, ex:
         logger.error('Failed to connect to database, this script can only be run when the database is up')
-        move_files_in_hdfs(options.database, options.tablename, files, firstsegno, tabledir, False)
+        move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, False)
         sys.exit(1)
 
 
 if __name__ == '__main__':
+
     parser = option_parser()
     options, args = parser.parse_args()
-    if len(args) != 1 or (options.yml_config and options.filepath):
+
+    if len(args) != 1 or ((options.yml_config or options.force or options.repair) and options.filepath) or (options.force and options.repair):
         parser.print_help(sys.stderr)
         sys.exit(1)
     if local_ssh('hadoop', logger):
         logger.error('command "hadoop" is not available.')
         sys.exit(1)
 
-    dburl = dbconn.DbURL(hostname=options.host, port=options.port, username=options.user, dbname=options.database)
+    dburl = dbconn.DbURL(hostname = options.host, port = options.port, username = options.user, dbname = options.database)
     filepath, database, tablename = options.filepath, options.database, args[0]
 
     if options.yml_config: # Usage2
-        fileformat, filepath, schema, distribution_policy, file_locations, _ = option_parser_yml(options.yml_config)
-        create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations)
+        fileformat, files, sizes, schema, distribution_policy, file_locations, bucket_number = option_parser_yml(options.yml_config)
+        filepath = files[0][:files[0].rfind('/')] if files else ''
+        if distribution_policy.startswith('DISTRIBUTED BY'):
+            if len(files) % bucket_number != 0:
+                logger.error('Files to be registered must match the bucket number of hash table.')
+                sys.exit(1)
+        create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations, bucket_number)
     else:
         fileformat = 'Parquet'
         check_hash_type(dburl, tablename) # Usage1 only support randomly distributed table
+
+    # check filepath
     if not filepath:
         sys.exit(0)
+
     seg_name = get_seg_name(dburl, tablename, database, fileformat)
     firstsegno, tabledir = get_metadata_from_database(dburl, tablename, seg_name)
-    sizes = 0
+
     check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
-    files, sizes = get_files_in_hdfs(filepath)
+
+    if not options.yml_config:
+        files, sizes = get_files_in_hdfs(filepath)
     print 'File(s) to be registered:', files
+
     # set specified eofs
-    if options.filesizes:
-        if len(options.filesizes) != len(files):
-            logger.error('Specified eof list is incomplete.')
+    if options.filesize:
+        if options.filesize != len(files):
+            logger.error('-e option is only supported with single file case.')
             sys.exit(1)
-        sizes = options.sizes
+        sizes = [options.filesize]
 
     if fileformat == 'Parquet':
         check_parquet_format(files)
     move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
+
+    # update catalog table
     insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes)
+
     logger.info('Hawq Register Succeed.')