You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by li...@apache.org on 2016/09/22 01:54:27 UTC
incubator-hawq git commit: HAWQ-1024. Rollback if hawq register
failed in process.
Repository: incubator-hawq
Updated Branches:
refs/heads/master b5c77addd -> 107b79dfe
HAWQ-1024. Rollback if hawq register failed in process.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/107b79df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/107b79df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/107b79df
Branch: refs/heads/master
Commit: 107b79dfe1deaf3098be37be4b10581a9743a0e9
Parents: b5c77ad
Author: xunzhang <xu...@gmail.com>
Authored: Wed Sep 21 20:10:00 2016 +0800
Committer: Lili Ma <ic...@gmail.com>
Committed: Thu Sep 22 09:54:19 2016 +0800
----------------------------------------------------------------------
tools/bin/hawqregister | 101 +++++++++++++++++++++++++++++++++-----------
1 file changed, 76 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/107b79df/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index fa23a1a..bef1460 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -112,6 +112,42 @@ def register_yaml_dict_check(D):
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % 'AO_FileLocations.%s' % attr)
sys.exit(1)
+
+class FailureHandler(object):
+ def __init__(self, conn):
+ self.operations = []
+ self.conn = conn
+
+ def commit(self, cmd):
+ self.operations.append(cmd)
+
+ def assemble_SQL(self, cmd):
+ return 'DROP TABLE %s' % cmd[cmd.find('table')+6:cmd.find('(')]
+
+ def assemble_hdfscmd(self, cmd):
+ lst = cmd.strip().split()
+ return ' '.join(lst[:-2] + [lst[-1], lst[-2]])
+
+ def rollback(self):
+ for (typ, cmd) in reversed(self.operations):
+ if typ == 'SQL':
+ sql = self.assemble_SQL(cmd)
+ try:
+ self.conn.query(sql)
+ except pg.DatabaseError as e:
+ logger.error('Rollback failure: %s.' % sql)
+ print e
+ sys.exit(1)
+ if typ == 'HDFSCMD':
+ hdfscmd = self.assemble_hdfscmd(cmd)
+ sys.stdout.write('Rollback hdfscmd: "%s"\n' % hdfscmd)
+ result = local_ssh(hdfscmd, logger)
+ if result != 0:
+ logger.error('Fail to rollback: %s.' % hdfscmd)
+ sys.exit(1)
+ logger.info('Hawq Register Rollback Succeed.')
+
+
class GpRegisterAccessor(object):
def __init__(self, conn):
self.conn = conn
@@ -135,21 +171,21 @@ class GpRegisterAccessor(object):
def do_create_table(self, src_table_name, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number, partitionby, partitions_constraint, partitions_name):
if self.get_table_existed(tablename):
- return False
+ return False, ''
schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
partlist = ""
for index in range(len(partitions_constraint)):
- if index > 0:
- partlist += ", "
- partition_refine_name = partitions_name[index]
- splitter = src_table_name.split(".")[-1] + '_1_prt_'
- partition_refine_name = partition_refine_name.split(splitter)[-1]
- #in some case, constraint contains "partition XXX" but in other case, it doesn't contain. we need to treat them separately.
- if partitions_constraint[index].strip().startswith("DEFAULT PARTITION") or partitions_constraint[index].strip().startswith("PARTITION") or (len(partition_refine_name) > 0 and partition_refine_name[0].isdigit()):
- partlist = partlist + " " + partitions_constraint[index]
- else:
- partlist = partlist + "PARTITION " + partition_refine_name + " " + partitions_constraint[index]
-
+ if index > 0:
+ partlist += ", "
+ partition_refine_name = partitions_name[index]
+ splitter = src_table_name.split(".")[-1] + '_1_prt_'
+ partition_refine_name = partition_refine_name.split(splitter)[-1]
+ #in some case, constraint contains "partition XXX" but in other case, it doesn't contain. we need to treat them separately.
+ if partitions_constraint[index].strip().startswith("DEFAULT PARTITION") or partitions_constraint[index].strip().startswith("PARTITION") or (len(partition_refine_name) > 0 and partition_refine_name[0].isdigit()):
+ partlist = partlist + " " + partitions_constraint[index]
+ else:
+ partlist = partlist + "PARTITION " + partition_refine_name + " " + partitions_constraint[index]
+
fmt = 'ROW' if fmt == 'AO' else fmt
if fmt == 'ROW':
if partitionby is None:
@@ -165,9 +201,8 @@ class GpRegisterAccessor(object):
else:
query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s %s (%s);'
% (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy, partitionby, partlist))
- print query
self.conn.query(query)
- return True
+ return True, query
def check_hash_type(self, tablename):
qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
@@ -241,7 +276,7 @@ class GpRegisterAccessor(object):
class HawqRegister(object):
- def __init__(self, options, table, utility_conn, conn):
+ def __init__(self, options, table, utility_conn, conn, failure_handler):
self.yml = options.yml_config
self.filepath = options.filepath
self.database = options.database
@@ -249,6 +284,7 @@ class HawqRegister(object):
self.filesize = options.filesize
self.accessor = GpRegisterAccessor(conn)
self.utility_accessor = GpRegisterAccessor(utility_conn)
+ self.failure_handler = failure_handler
self.mode = self._init_mode(options.force, options.repair)
self._init()
@@ -288,8 +324,14 @@ class HawqRegister(object):
sys.exit(1)
def create_table():
- return self.accessor.do_create_table(self.src_table_name, self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number,
- self.partitionby, self.partitions_constraint, self.partitions_name)
+ try:
+ (ret, query) = self.accessor.do_create_table(self.src_table_name, self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number,
+ self.partitionby, self.partitions_constraint, self.partitions_name)
+ except pg.DatabaseError as e:
+ print e
+ sys.exit(1)
+ self.failure_handler.commit(('SQL', query))
+ return ret
def get_seg_name():
return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
@@ -325,7 +367,7 @@ class HawqRegister(object):
if self.bucket_number != get_bucket_number():
logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
sys.exit(1)
-
+
def set_yml_dataa(file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\
bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\
partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding):
@@ -339,11 +381,11 @@ class HawqRegister(object):
self.bucket_number = bucket_number
self.partitionby = partitionby
self.partitions_constraint = partitions_constraint
- self.partitions_name = partitions_name
+ self.partitions_name = partitions_name
self.partitions_compression_level = partitions_compression_level
self.partitions_compression_type = partitions_compression_type
self.partitions_checksum = partitions_checksum
- self.partitions_filepaths = partitions_filepaths
+ self.partitions_filepaths = partitions_filepaths
self.partitions_filesizes = partitions_filesizes
self.encoding = encoding
@@ -360,7 +402,7 @@ class HawqRegister(object):
partitions_compression_level = []
partitions_compression_type = []
files, sizes = [], []
-
+
if params['FileFormat'].lower() == 'parquet':
partitionby = params.get('Parquet_FileLocations').get('PartitionBy')
if params.get('Parquet_FileLocations').get('Partitions') and len(params['Parquet_FileLocations']['Partitions']):
@@ -379,7 +421,7 @@ class HawqRegister(object):
encoding = params['Encoding']
set_yml_dataa('Parquet', files, sizes, params['TableName'], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'], partitionby,\
partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
-
+
else: #AO format
partitionby = params.get('AO_FileLocations').get('PartitionBy')
if params.get('AO_FileLocations').get('Partitions') and len(params['AO_FileLocations']['Partitions']):
@@ -398,7 +440,7 @@ class HawqRegister(object):
encoding = params['Encoding']
set_yml_dataa('AO', files, sizes, params['TableName'], params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'], partitionby, partitions_constraint,\
partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
-
+
def check_file_not_folder():
for fn in self.files:
hdfscmd = 'hdfs dfs -test -f %s' % fn
@@ -576,7 +618,9 @@ class HawqRegister(object):
result = local_ssh(hdfscmd, logger)
if result != 0:
logger.error('Fail to move %s to %s' % (srcfile, dstfile))
+ self.failure_handler.rollback()
sys.exit(1)
+ self.failure_handler.commit(('HDFSCMD', hdfscmd))
def _delete_files_in_hdfs(self):
for fn in self.files_delete:
@@ -634,7 +678,12 @@ class HawqRegister(object):
for i, eof in enumerate(self.sizes_update):
query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
query += "end transaction;"
- return self.utility_accessor.update_catalog(query)
+ try:
+ self.utility_accessor.update_catalog(query)
+ except pg.DatabaseError as e:
+ print e
+ self.failure_handler.rollback()
+ sys.exit(1)
def _delete_metadata(self):
query = "set allow_system_table_mods='dml';"
@@ -693,8 +742,10 @@ def main(options, args):
except pg.InternalError:
logger.error('Fail to connect to database, this script can only be run when database is up.')
return 1
+
+ failure_handler = FailureHandler(conn)
# register
- ins = HawqRegister(options, args[0], utility_conn, conn)
+ ins = HawqRegister(options, args[0], utility_conn, conn, failure_handler)
ins.register()
conn.close()