You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/09/02 02:54:09 UTC

[3/3] incubator-hawq git commit: HAWQ-1025. Add bucket number in the yaml file of hawq extract, modify to use actual eof for usage1.

HAWQ-1025. Add bucket number in the yaml file of hawq extract, modify to use actual eof for usage1.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2c4f25c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2c4f25c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2c4f25c8

Branch: refs/heads/master
Commit: 2c4f25c8e1259356f02513b696271da5dc269042
Parents: 25a4ab5
Author: xunzhang <xu...@gmail.com>
Authored: Tue Aug 30 16:03:42 2016 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Fri Sep 2 10:53:27 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqextract  | 20 ++++++++++++++++----
 tools/bin/hawqregister | 29 ++++++++++++++++++-----------
 2 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2c4f25c8/tools/bin/hawqextract
----------------------------------------------------------------------
diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract
index f3ffe5b..28f55d6 100644
--- a/tools/bin/hawqextract
+++ b/tools/bin/hawqextract
@@ -259,6 +259,17 @@ class GpMetadataAccessor:
             cols_list = [cols[int(k)-1] for k in policy.strip('{}').split(',')]
             return 'DISTRIBUTED BY (' + ','.join(cols_list) + ')'
 
+    def get_bucket_number(self, oid):
+        '''
+        Get table's bucket number from gp_distribution_policy view.
+        '''
+        qry = """
+        SELECT bucketnum
+        FROM gp_distribution_policy
+        WHERE localoid = '%s'
+        """ % oid
+        return self.exec_query(qry)[0]['bucketnum']
+
 
 def connectdb(options):
     '''
@@ -405,12 +416,12 @@ def extract_metadata(conn, tbname):
                 }
                 file_locations['Partitions'].append(par_info)
         metadata['AO_FileLocations'] = file_locations
-
         logger.info('-- extract AO_Schema')
         metadata['AO_Schema'] = accessor.get_schema(relid)
-
         logger.info('-- extract Distribution_Policy')
         metadata['Distribution_Policy'] = accessor.get_distribution_policy_info(rel_pgclass['oid'], relid)
+        logger.info('-- extract bucket number')
+        metadata['Bucketnum'] = accessor.get_bucket_number(rel_pgclass['oid'])
 
     def extract_Parquet_metadata():
         relid = rel_pgclass['oid']
@@ -455,12 +466,13 @@ def extract_metadata(conn, tbname):
                 }
                 file_locations['Partitions'].append(par_info)
         metadata['Parquet_FileLocations'] = file_locations
-
         logger.info('-- extract Parquet_Schema')
         metadata['Parquet_Schema'] = accessor.get_schema(relid)
-
         logger.info('-- extract Distribution_Policy')
         metadata['Distribution_Policy'] = accessor.get_distribution_policy_info(rel_pgclass['oid'], relid)
+        logger.info('-- extract bucket number')
+        metadata['Bucketnum'] = accessor.get_bucket_number(rel_pgclass['oid'])
+
 
     # extract AO/Parquet specific metadata
     cases = { 'AO': extract_AO_metadata,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2c4f25c8/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 10bdf0b..6b2c918 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -50,13 +50,14 @@ def option_parser():
     parser.add_option('-U', '--user', help='username of the target DB')
     parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database name')
     parser.add_option('-f', '--filepath', dest = 'filepath', help='file name in HDFS')
+    parser.add_option('-e', '--eof', dest = 'filesizes', type='list', default = [], help = 'eof list of files to be registered')
     parser.add_option('-c', '--config', dest = 'yml_config', default = '', help='configuration file in YAML format')
     return parser
 
 
 def register_yaml_dict_check(D):
     # check exists
-    check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName']
+    check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName', 'Bucketnum']
     for attr in check_list:
         if D.get(attr) == None:
             logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr)
@@ -113,19 +114,19 @@ def option_parser_yml(yml_file):
     register_yaml_dict_check(params)
     if params['FileFormat'] == 'Parquet':
         if not len(params['Parquet_FileLocations']['Files']):
-            return 'Parquet', '', params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations']
+            return 'Parquet', '', params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
         offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/')
         filepath = (params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'][:offset]
                     if len(params['Parquet_FileLocations']['Files']) != 1
                     else params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'])
-        return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations']
+        return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
     if not len(params['AO_FileLocations']['Files']):
-        return 'AO', '', params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations']
+        return 'AO', '', params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/')
     filepath = (params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'][:offset]
                 if len(params['AO_FileLocations']['Files']) != 1
                 else params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'])
-    return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations']
+    return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
 
 
 def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations):
@@ -297,8 +298,8 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
     '''Move file(s) in src path into the folder correspoding to the target table'''
     if normal:
         segno = firstsegno
-        for file in files:
-            srcfile = file
+        for f in files:
+            srcfile = f
             dstfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
@@ -310,8 +311,8 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
                     sys.exit(1)
     else:
         segno = firstsegno
-        for file in files:
-            dstfile = file
+        for f in files:
+            dstfile = f
             srcfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
@@ -355,7 +356,7 @@ if __name__ == '__main__':
     filepath, database, tablename = options.filepath, options.database, args[0]
 
     if options.yml_config: # Usage2
-        fileformat, filepath, schema, distribution_policy, file_locations = option_parser_yml(options.yml_config)
+        fileformat, filepath, schema, distribution_policy, file_locations, _ = option_parser_yml(options.yml_config)
         create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations)
     else:
         fileformat = 'Parquet'
@@ -368,9 +369,15 @@ if __name__ == '__main__':
     check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
     files, sizes = get_files_in_hdfs(filepath)
     print 'File(s) to be registered:', files
+    # set specified eofs
+    if options.filesizes:
+        if len(options.filesizes) != len(files):
+            logger.error('Specified eof list is incomplete.')
+            sys.exit(1)
+        sizes = options.sizes
+
     if fileformat == 'Parquet':
         check_parquet_format(files)
-    print files
     move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
     insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes)
     logger.info('Hawq Register Succeed.')