You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/09/14 02:24:44 UTC
incubator-hawq git commit: HAWQ-1033. Combine update and insert into
one transaction, fix hadoop warning.
Repository: incubator-hawq
Updated Branches:
refs/heads/master 49ceb69e6 -> be201dcc6
HAWQ-1033. Combine update and insert into one transaction, fix hadoop warning.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/be201dcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/be201dcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/be201dcc
Branch: refs/heads/master
Commit: be201dcc6848eb5f65cf8afabfc6de377ae98cd8
Parents: 49ceb69
Author: xunzhang <xu...@gmail.com>
Authored: Tue Sep 13 14:41:45 2016 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Wed Sep 14 10:24:16 2016 +0800
----------------------------------------------------------------------
tools/bin/hawqregister | 72 ++++++++++++++++++++++++++++++++-------------
1 file changed, 52 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/be201dcc/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index b3e3493..bbdc946 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -115,7 +115,7 @@ def option_parser_yml(yml_file):
with open(yml_file, 'r') as f:
params = yaml.load(f)
register_yaml_dict_check(params)
- if params['FileFormat'] == 'Parquet':
+ if params['FileFormat'].lower() == 'parquet':
if not len(params['Parquet_FileLocations']['Files']):
return 'Parquet', [], [], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']]
@@ -259,12 +259,12 @@ def get_files_in_hdfs(filepath):
'''Get all the files refered by 'filepath', which could be a file or a directory containing all the files'''
files = []
sizes = []
- hdfscmd = "hadoop fs -test -e %s" % filepath
+ hdfscmd = "hdfs dfs -test -e %s" % filepath
result = local_ssh(hdfscmd, logger)
if result != 0:
logger.error("Path '%s' does not exist in hdfs" % filepath)
sys.exit(1)
- hdfscmd = "hadoop fs -ls -R %s" % filepath
+ hdfscmd = "hdfs dfs -ls -R %s" % filepath
result, out, err = local_ssh_output(hdfscmd)
outlines = out.splitlines()
# recursively search all the files under path 'filepath'
@@ -282,13 +282,13 @@ def get_files_in_hdfs(filepath):
def check_parquet_format(files):
'''Check whether the file to be registered is parquet format'''
for f in files:
- hdfscmd = 'hadoop fs -du -h %s | head -c 1' % f
+ hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
rc, out, err = local_ssh_output(hdfscmd)
if out == '0':
continue
- hdfscmd = 'hadoop fs -cat %s | head -c 4 | grep PAR1' % f
+ hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
result1 = local_ssh(hdfscmd, logger)
- hdfscmd = 'hadoop fs -cat %s | tail -c 4 | grep PAR1' % f
+ hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
result2 = local_ssh(hdfscmd, logger)
if result1 or result2:
logger.error('File %s is not parquet format' % f)
@@ -304,7 +304,7 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
dstfile = tabledir + str(segno)
segno += 1
if srcfile != dstfile:
- hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+ hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
result = local_ssh(hdfscmd, logger)
if result != 0:
@@ -317,7 +317,7 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
srcfile = tabledir + str(segno)
segno += 1
if srcfile != dstfile:
- hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+ hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
result = local_ssh(hdfscmd, logger)
if result != 0:
@@ -365,6 +365,35 @@ def update_metadata_into_database(dburl, seg_name, files, eofs):
sys.exit(1)
+def update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, eofs, fmt, update_files, update_eofs):
+ '''Insert and update the catalog table in --force case'''
+ try:
+ query = "set allow_system_table_mods='dml';"
+ query += "begin transaction;"
+ if fmt == 'Parquet':
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1)
+ for k, eof in enumerate(eofs[1:]):
+ query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
+ else:
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1)
+ for k, eof in enumerate(eofs[1:]):
+ query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
+ query += ';'
+
+ segno_lst = [f.split('/')[-1] for f in update_files]
+ for i, eof in enumerate(update_eofs):
+ query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i])
+ query += "end transaction;"
+ conn = dbconn.connect(dburl, True)
+ rows = dbconn.execSQL(conn, query)
+ conn.commit()
+ conn.close()
+ except DatabaseError, ex:
+ logger.error('Failed to execute query "%s"' % query)
+ move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False)
+ sys.exit(1)
+
+
if __name__ == '__main__':
parser = option_parser()
@@ -373,8 +402,8 @@ if __name__ == '__main__':
if len(args) != 1 or ((options.yml_config or options.force or options.repair) and options.filepath) or (options.force and options.repair):
parser.print_help(sys.stderr)
sys.exit(1)
- if local_ssh('hadoop', logger):
- logger.error('command "hadoop" is not available.')
+ if local_ssh('hdfs', logger):
+ logger.error('command "hdfs" is not available.')
sys.exit(1)
dburl = dbconn.DbURL(hostname = options.host, port = options.port, username = options.user, dbname = options.database)
@@ -431,12 +460,13 @@ if __name__ == '__main__':
do_not_move, files_update, sizes_update = True, files, sizes
files, sizes = [], []
else:
- for k, f in enumerate(files):
+ files_old, sizes_old = [f for f in files], [sz for sz in sizes]
+ for k, f in enumerate(files_old):
if f in existed_files:
- files_update.append(files[k])
- sizes_update.append(sizes[k])
- files.remove(files[k])
- sizes.remove(sizes[k])
+ files_update.append(files_old[k])
+ sizes_update.append(sizes_old[k])
+ files.remove(files_old[k])
+ sizes.remove(sizes_old[k])
check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
@@ -458,10 +488,12 @@ if __name__ == '__main__':
if not do_not_move:
move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
- # update catalog table
- if not do_not_move:
- insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat)
- if force_mode:
- update_metadata_into_database(dburl, seg_name, files_update, sizes_update)
+ if (not do_not_move) and force_mode:
+ update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat, files_update, sizes_update)
+ else:
+ if force_mode:
+ update_metadata_into_database(dburl, seg_name, files_update, sizes_update)
+ else:
+ insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat)
logger.info('Hawq Register Succeed.')