You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/09/02 02:54:09 UTC
[3/3] incubator-hawq git commit: HAWQ-1025. Add bucket number in the
yaml file of hawq extract, modify to use actual eof for usage1.
HAWQ-1025. Add bucket number in the yaml file of hawq extract, modify to use actual eof for usage1.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2c4f25c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2c4f25c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2c4f25c8
Branch: refs/heads/master
Commit: 2c4f25c8e1259356f02513b696271da5dc269042
Parents: 25a4ab5
Author: xunzhang <xu...@gmail.com>
Authored: Tue Aug 30 16:03:42 2016 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Fri Sep 2 10:53:27 2016 +0800
----------------------------------------------------------------------
tools/bin/hawqextract | 20 ++++++++++++++++----
tools/bin/hawqregister | 29 ++++++++++++++++++-----------
2 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2c4f25c8/tools/bin/hawqextract
----------------------------------------------------------------------
diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract
index f3ffe5b..28f55d6 100644
--- a/tools/bin/hawqextract
+++ b/tools/bin/hawqextract
@@ -259,6 +259,17 @@ class GpMetadataAccessor:
cols_list = [cols[int(k)-1] for k in policy.strip('{}').split(',')]
return 'DISTRIBUTED BY (' + ','.join(cols_list) + ')'
+ def get_bucket_number(self, oid):
+ '''
+ Get table's bucket number from gp_distribution_policy view.
+ '''
+ qry = """
+ SELECT bucketnum
+ FROM gp_distribution_policy
+ WHERE localoid = '%s'
+ """ % oid
+ return self.exec_query(qry)[0]['bucketnum']
+
def connectdb(options):
'''
@@ -405,12 +416,12 @@ def extract_metadata(conn, tbname):
}
file_locations['Partitions'].append(par_info)
metadata['AO_FileLocations'] = file_locations
-
logger.info('-- extract AO_Schema')
metadata['AO_Schema'] = accessor.get_schema(relid)
-
logger.info('-- extract Distribution_Policy')
metadata['Distribution_Policy'] = accessor.get_distribution_policy_info(rel_pgclass['oid'], relid)
+ logger.info('-- extract bucket number')
+ metadata['Bucketnum'] = accessor.get_bucket_number(rel_pgclass['oid'])
def extract_Parquet_metadata():
relid = rel_pgclass['oid']
@@ -455,12 +466,13 @@ def extract_metadata(conn, tbname):
}
file_locations['Partitions'].append(par_info)
metadata['Parquet_FileLocations'] = file_locations
-
logger.info('-- extract Parquet_Schema')
metadata['Parquet_Schema'] = accessor.get_schema(relid)
-
logger.info('-- extract Distribution_Policy')
metadata['Distribution_Policy'] = accessor.get_distribution_policy_info(rel_pgclass['oid'], relid)
+ logger.info('-- extract bucket number')
+ metadata['Bucketnum'] = accessor.get_bucket_number(rel_pgclass['oid'])
+
# extract AO/Parquet specific metadata
cases = { 'AO': extract_AO_metadata,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2c4f25c8/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 10bdf0b..6b2c918 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -50,13 +50,14 @@ def option_parser():
parser.add_option('-U', '--user', help='username of the target DB')
parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database name')
parser.add_option('-f', '--filepath', dest = 'filepath', help='file name in HDFS')
+ parser.add_option('-e', '--eof', dest = 'filesizes', type='list', default = [], help = 'eof list of files to be registered')
parser.add_option('-c', '--config', dest = 'yml_config', default = '', help='configuration file in YAML format')
return parser
def register_yaml_dict_check(D):
# check exists
- check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName']
+ check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName', 'Bucketnum']
for attr in check_list:
if D.get(attr) == None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr)
@@ -113,19 +114,19 @@ def option_parser_yml(yml_file):
register_yaml_dict_check(params)
if params['FileFormat'] == 'Parquet':
if not len(params['Parquet_FileLocations']['Files']):
- return 'Parquet', '', params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations']
+ return 'Parquet', '', params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/')
filepath = (params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'][:offset]
if len(params['Parquet_FileLocations']['Files']) != 1
else params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'])
- return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations']
+ return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
if not len(params['AO_FileLocations']['Files']):
- return 'AO', '', params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations']
+ return 'AO', '', params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/')
filepath = (params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'][:offset]
if len(params['AO_FileLocations']['Files']) != 1
else params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'])
- return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations']
+ return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations):
@@ -297,8 +298,8 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
'''Move file(s) in src path into the folder correspoding to the target table'''
if normal:
segno = firstsegno
- for file in files:
- srcfile = file
+ for f in files:
+ srcfile = f
dstfile = tabledir + str(segno)
segno += 1
if srcfile != dstfile:
@@ -310,8 +311,8 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
sys.exit(1)
else:
segno = firstsegno
- for file in files:
- dstfile = file
+ for f in files:
+ dstfile = f
srcfile = tabledir + str(segno)
segno += 1
if srcfile != dstfile:
@@ -355,7 +356,7 @@ if __name__ == '__main__':
filepath, database, tablename = options.filepath, options.database, args[0]
if options.yml_config: # Usage2
- fileformat, filepath, schema, distribution_policy, file_locations = option_parser_yml(options.yml_config)
+ fileformat, filepath, schema, distribution_policy, file_locations, _ = option_parser_yml(options.yml_config)
create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations)
else:
fileformat = 'Parquet'
@@ -368,9 +369,15 @@ if __name__ == '__main__':
check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
files, sizes = get_files_in_hdfs(filepath)
print 'File(s) to be registered:', files
+ # set specified eofs
+ if options.filesizes:
+ if len(options.filesizes) != len(files):
+ logger.error('Specified eof list is incomplete.')
+ sys.exit(1)
+ sizes = options.sizes
+
if fileformat == 'Parquet':
check_parquet_format(files)
- print files
move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes)
logger.info('Hawq Register Succeed.')