You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/09/14 02:24:44 UTC

incubator-hawq git commit: HAWQ-1033. Combine update and insert into one transaction, fix hadoop warning.

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 49ceb69e6 -> be201dcc6


HAWQ-1033. Combine update and insert into one transaction, fix hadoop warning.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/be201dcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/be201dcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/be201dcc

Branch: refs/heads/master
Commit: be201dcc6848eb5f65cf8afabfc6de377ae98cd8
Parents: 49ceb69
Author: xunzhang <xu...@gmail.com>
Authored: Tue Sep 13 14:41:45 2016 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Wed Sep 14 10:24:16 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqregister | 72 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 52 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/be201dcc/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index b3e3493..bbdc946 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -115,7 +115,7 @@ def option_parser_yml(yml_file):
     with open(yml_file, 'r') as f:
         params = yaml.load(f)
     register_yaml_dict_check(params)
-    if params['FileFormat'] == 'Parquet':
+    if params['FileFormat'].lower() == 'parquet':
         if not len(params['Parquet_FileLocations']['Files']):
             return 'Parquet', [], [], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum']
         files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']]
@@ -259,12 +259,12 @@ def get_files_in_hdfs(filepath):
     '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files'''
     files = []
     sizes = []
-    hdfscmd = "hadoop fs -test -e %s" % filepath
+    hdfscmd = "hdfs dfs -test -e %s" % filepath
     result = local_ssh(hdfscmd, logger)
     if result != 0:
         logger.error("Path '%s' does not exist in hdfs" % filepath)
         sys.exit(1)
-    hdfscmd = "hadoop fs -ls -R %s" % filepath
+    hdfscmd = "hdfs dfs -ls -R %s" % filepath
     result, out, err = local_ssh_output(hdfscmd)
     outlines = out.splitlines()
     # recursively search all the files under path 'filepath'
@@ -282,13 +282,13 @@ def get_files_in_hdfs(filepath):
 def check_parquet_format(files):
     '''Check whether the file to be registered is parquet format'''
     for f in files:
-        hdfscmd = 'hadoop fs -du -h %s | head -c 1' % f
+        hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
         rc, out, err = local_ssh_output(hdfscmd)
         if out == '0':
             continue
-        hdfscmd = 'hadoop fs -cat %s | head -c 4 | grep PAR1' % f
+        hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
         result1 = local_ssh(hdfscmd, logger)
-        hdfscmd = 'hadoop fs -cat %s | tail -c 4 | grep PAR1' % f
+        hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
         result2 = local_ssh(hdfscmd, logger)
         if result1 or result2:
             logger.error('File %s is not parquet format' % f)
@@ -304,7 +304,7 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
             dstfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
-                hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+                hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
                 sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
                 result = local_ssh(hdfscmd, logger)
                 if result != 0:
@@ -317,7 +317,7 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
             srcfile = tabledir + str(segno)
             segno += 1
             if srcfile != dstfile:
-                hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
+                hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
                 sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
                 result = local_ssh(hdfscmd, logger)
                 if result != 0:
@@ -365,6 +365,35 @@ def update_metadata_into_database(dburl, seg_name, files, eofs):
         sys.exit(1)
 
 
+def update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, eofs, fmt, update_files, update_eofs):
+    '''Insert and update the catalog table in --force case'''
+    try:
+        query = "set allow_system_table_mods='dml';"
+        query += "begin transaction;"
+        if fmt == 'Parquet':
+            query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1)
+            for k, eof in enumerate(eofs[1:]):
+                query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
+        else:
+            query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1)
+            for k, eof in enumerate(eofs[1:]):
+                query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
+        query += ';'
+
+        segno_lst = [f.split('/')[-1] for f in update_files]
+        for i, eof in enumerate(update_eofs):
+            query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i])
+        query += "end transaction;"
+        conn = dbconn.connect(dburl, True)
+        rows = dbconn.execSQL(conn, query)
+        conn.commit()
+        conn.close()
+    except DatabaseError, ex:
+        logger.error('Failed to execute query "%s"' % query)
+        move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False)
+        sys.exit(1)
+
+
 if __name__ == '__main__':
 
     parser = option_parser()
@@ -373,8 +402,8 @@ if __name__ == '__main__':
     if len(args) != 1 or ((options.yml_config or options.force or options.repair) and options.filepath) or (options.force and options.repair):
         parser.print_help(sys.stderr)
         sys.exit(1)
-    if local_ssh('hadoop', logger):
-        logger.error('command "hadoop" is not available.')
+    if local_ssh('hdfs', logger):
+        logger.error('command "hdfs" is not available.')
         sys.exit(1)
 
     dburl = dbconn.DbURL(hostname = options.host, port = options.port, username = options.user, dbname = options.database)
@@ -431,12 +460,13 @@ if __name__ == '__main__':
                 do_not_move, files_update, sizes_update = True, files, sizes
                 files, sizes = [], []
         else:
-            for k, f in enumerate(files):
+            files_old, sizes_old = [f for f in files], [sz for sz in sizes]
+            for k, f in enumerate(files_old):
                 if f in existed_files:
-                    files_update.append(files[k])
-                    sizes_update.append(sizes[k])
-                    files.remove(files[k])
-                    sizes.remove(sizes[k])
+                    files_update.append(files_old[k])
+                    sizes_update.append(sizes_old[k])
+                    files.remove(files_old[k])
+                    sizes.remove(sizes_old[k])
 
     check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
 
@@ -458,10 +488,12 @@ if __name__ == '__main__':
     if not do_not_move:
         move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
 
-    # update catalog table
-    if not do_not_move:
-        insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat)
-    if force_mode:
-        update_metadata_into_database(dburl, seg_name, files_update, sizes_update)
+    if (not do_not_move) and force_mode:
+        update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat, files_update, sizes_update)
+    else:
+        if force_mode:
+            update_metadata_into_database(dburl, seg_name, files_update, sizes_update)
+        else:
+            insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat)
 
     logger.info('Hawq Register Succeed.')