You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2016/10/10 09:44:47 UTC
incubator-hawq git commit: HAWQ-991. Refator HAWQ Register code for
partition table.
Repository: incubator-hawq
Updated Branches:
refs/heads/master 28d192d23 -> c7d6a7f52
HAWQ-991. Refator HAWQ Register code for partition table.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/c7d6a7f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/c7d6a7f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/c7d6a7f5
Branch: refs/heads/master
Commit: c7d6a7f52b03ece32d10b1dd9088a91e14565384
Parents: 28d192d
Author: Chunling Wang <wa...@126.com>
Authored: Sat Oct 1 22:06:51 2016 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Mon Oct 10 17:44:06 2016 +0800
----------------------------------------------------------------------
tools/bin/hawqregister | 461 +++++++++++++++++++++-----------------------
1 file changed, 225 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c7d6a7f5/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 69809f7..29b3a30 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -252,11 +252,6 @@ class GpRegisterAccessor(object):
rows = self.exec_query(query)
return rows[0]['attrnums']
- def get_partition_info(self, tablename):
- ''' Get partition information from pg_partitions, return a constraint-tablename dictionary '''
- query = "SELECT partitiontablename, partitionboundary FROM pg_partitions WHERE tablename = '%s'" % tablename
- return self.exec_query(query)
-
def get_bucket_number(self, tablename):
query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
rows = self.exec_query(query)
@@ -299,17 +294,20 @@ class HawqRegister(object):
self.yml = options.yml_config
self.filepath = options.filepath
self.database = options.database
+ self.dst_table_name = table.lower()
self.tablename = table.lower()
self.filesize = options.filesize
self.accessor = GpRegisterAccessor(conn)
self.utility_accessor = GpRegisterAccessor(utility_conn)
self.failure_handler = failure_handler
self.mode = self._init_mode(options.force, options.repair)
+ self.srcfiles = []
+ self.dstfiles = []
self._init()
def _init_mode(self, force, repair):
def table_existed():
- return self.accessor.get_table_existed(self.tablename)
+ return self.accessor.get_table_existed(self.dst_table_name)
if self.yml:
if force:
@@ -325,231 +323,221 @@ class HawqRegister(object):
return 'usage2_table_not_exist'
else:
if not table_existed():
- logger.error('Table %s does not exist.\nYou should create table before registering the data.' % self.tablename)
+ logger.error('Table %s does not exist.\nYou should create table before registering the data.' % self.dst_table_name)
sys.exit(1)
else:
return 'usage1'
- def _init(self):
- def check_hash_type():
- self.accessor.check_hash_type(self.tablename)
-
- # check conflicting distributed policy
- def check_distribution_policy():
- if self.distribution_policy.startswith('DISTRIBUTED BY'):
- if len(self.files) % self.bucket_number != 0:
- logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
- self.failure_handler.rollback()
- sys.exit(1)
-
- def check_database_encoding():
- encoding_indx = self.accessor.get_database_encoding_indx(self.database)
- encoding = self.accessor.get_database_encoding(encoding_indx)
- if self.encoding.strip() != encoding:
- logger.error('Database encoding from yaml configuration file(%s) is not consistent with encoding from input args(%s).' % (self.encoding, encoding))
- sys.exit(1)
+ def _check_hash_type(self):
+ self.accessor.check_hash_type(self.dst_table_name)
- def create_table():
- try:
- (ret, query) = self.accessor.do_create_table(self.src_table_name, self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number,
+ def _create_table(self):
+ try:
+ (ret, query) = self.accessor.do_create_table(self.src_table_name, self.dst_table_name, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number,
self.partitionby, self.partitions_constraint, self.partitions_name)
- except pg.DatabaseError as e:
- print e
+ except pg.DatabaseError as e:
+ print e
+ sys.exit(1)
+ if ret:
+ self.failure_handler.commit(('SQL', query))
+ return ret
+
+ def _check_database_encoding(self):
+ encoding_indx = self.accessor.get_database_encoding_indx(self.database)
+ encoding = self.accessor.get_database_encoding(encoding_indx)
+ if self.encoding.strip() != encoding:
+ logger.error('Database encoding from yaml configuration file(%s) is not consistent with encoding from input args(%s).' % (self.encoding, encoding))
+ sys.exit(1)
+
+ def _check_policy_consistency(self):
+ policy = self._get_distribution_policy() # "" or "{1,3}"
+ if policy is None:
+ return
+ if self.distribution_policy == 'DISTRIBUTED RANDOMLY':
+ logger.error('Distribution policy of %s from yaml is not consistent with the policy of existing table.' % self.tablename)
+ self.failure_handler.rollback()
+ sys.exit(1)
+ tmp_dict = {}
+ for i, d in enumerate(self.schema):
+ tmp_dict[d['name']] = i + 1
+ # 'DISTRIBUETD BY (1,3)' -> {1,3}
+ cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
+ original_policy = ','.join([str(tmp_dict[col]) for col in cols])
+ if policy.strip('{').strip('}') != original_policy:
+ logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.dst_table_name)
+ self.failure_handler.rollback()
+ sys.exit(1)
+
+ def _set_yml_dataa(self, file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\
+ bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\
+ partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding):
+ self.file_format = file_format
+ self.files = files
+ self.sizes = sizes
+ self.src_table_name = tablename
+ self.schema = schema
+ self.distribution_policy = distribution_policy
+ self.file_locations = file_locations
+ self.bucket_number = bucket_number
+ self.partitionby = partitionby
+ self.partitions_constraint = partitions_constraint
+ self.partitions_name = partitions_name
+ self.partitions_compression_level = partitions_compression_level
+ self.partitions_compression_type = partitions_compression_type
+ self.partitions_checksum = partitions_checksum
+ self.partitions_filepaths = partitions_filepaths
+ self.partitions_filesizes = partitions_filesizes
+ self.encoding = encoding
+
+ def _option_parser_yml(self, yml_file):
+ import yaml
+ try:
+ with open(yml_file, 'r') as f:
+ params = yaml.load(f)
+ except yaml.scanner.ScannerError as e:
+ print e
+ self.failure_handler.rollback()
+ sys.exit(1)
+ table_column_num = self.accessor.get_table_column_num(self.tablename)
+ register_yaml_dict_check(params, table_column_num, self.tablename)
+ partitions_filepaths = []
+ partitions_filesizes = []
+ partitions_constraint = []
+ partitions_name = []
+ partitions_checksum = []
+ partitions_compression_level = []
+ partitions_compression_type = []
+ files, sizes = [], []
+
+ if params['FileFormat'].lower() == 'parquet':
+ Format = 'Parquet'
+ else: #AO format
+ Format = 'AO'
+ Format_FileLocations = '%s_FileLocations' % Format
+ partitionby = params.get(Format_FileLocations).get('PartitionBy')
+ if partitionby:
+ logger.info('Partition table is not supported in current release of hawq register.')
+ sys.exit(0)
+ if params.get(Format_FileLocations).get('Partitions') and len(params[Format_FileLocations]['Partitions']):
+ partitions_checksum = [d['Checksum'] for d in params[Format_FileLocations]['Partitions']]
+ partitions_compression_level = [d['CompressionLevel'] for d in params[Format_FileLocations]['Partitions']]
+ partitions_compression_type = [d['CompressionType'] for d in params[Format_FileLocations]['Partitions']]
+ partitions_constraint = [d['Constraint'] for d in params[Format_FileLocations]['Partitions']]
+ partitions_files = [d['Files'] for d in params[Format_FileLocations]['Partitions']]
+ if len(partitions_files):
+ for pfile in partitions_files:
+ partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile])
+ partitions_filesizes.append([item['size'] for item in pfile])
+ partitions_name = [d['Name'] for d in params[Format_FileLocations]['Partitions']]
+ if len(params[Format_FileLocations]['Files']):
+ files, sizes = [params['DFS_URL'] + d['path'] for d in params[Format_FileLocations]['Files']], [d['size'] for d in params[Format_FileLocations]['Files']]
+ encoding = params['Encoding']
+ self._set_yml_dataa(Format, files, sizes, params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations], params['Bucketnum'], partitionby,\
+ partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
+
+
+ # check conflicting distributed policy
+ def _check_distribution_policy(self):
+ if self.distribution_policy.startswith('DISTRIBUTED BY'):
+ if len(self.files) % self.bucket_number != 0:
+ logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
+ self.failure_handler.rollback()
sys.exit(1)
- if ret:
- self.failure_handler.commit(('SQL', query))
- return ret
- def get_seg_name():
- return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
+ def _get_seg_name(self):
+ return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
- def get_metadata():
- return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
+ def _get_metadata(self):
+ return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
- def get_metadata_from_table():
- return self.accessor.get_metadata_from_seg_name(self.seg_name)
+ def _get_metadata_from_table(self):
+ return self.accessor.get_metadata_from_seg_name(self.seg_name)
- def get_distribution_policy():
- return self.accessor.get_distribution_policy_info(self.tablename)
+ def _get_distribution_policy(self):
+ return self.accessor.get_distribution_policy_info(self.tablename)
- def check_policy_consistency():
- policy = get_distribution_policy() # "" or "{1,3}"
- if policy is None:
- return
- if self.distribution_policy == 'DISTRIBUTED RANDOMLY':
- logger.error('Distribution policy of %s from yaml is not consistent with the policy of existing table.' % self.tablename)
- self.failure_handler.rollback()
- sys.exit(1)
- tmp_dict = {}
- for i, d in enumerate(self.schema):
- tmp_dict[d['name']] = i + 1
- # 'DISTRIBUETD BY (1,3)' -> {1,3}
- cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
- original_policy = ','.join([str(tmp_dict[col]) for col in cols])
- if policy.strip('{').strip('}') != original_policy:
- logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename)
- self.failure_handler.rollback()
+ def _check_bucket_number(self):
+ def get_bucket_number():
+ return self.accessor.get_bucket_number(self.tablename)
+
+ if self.bucket_number != get_bucket_number():
+ logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
+ self.failure_handler.rollback()
+ sys.exit(1)
+
+ def _check_file_not_folder(self):
+ for fn in self.files:
+ hdfscmd = 'hadoop fs -test -f %s' % fn
+ if local_ssh(hdfscmd, logger):
+ logger.info('%s is not a file in hdfs, please check the yaml configuration file.' % fn)
sys.exit(1)
- def check_bucket_number():
- def get_bucket_number():
- return self.accessor.get_bucket_number(self.tablename)
+ def _is_folder(self, filepath):
+ hdfscmd = 'hadoop fs -test -d %s' % filepath
+ if local_ssh(hdfscmd, logger):
+ return False
+ else:
+ return True
- if self.bucket_number != get_bucket_number():
- logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
+ def _check_sizes_valid(self):
+ for sz in self.sizes:
+ if type(sz) != type(1):
+ logger.error('File size(%s) in yaml configuration file should be int type.' % sz)
self.failure_handler.rollback()
sys.exit(1)
-
- def set_yml_dataa(file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\
- bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\
- partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding):
- self.file_format = file_format
- self.files = files
- self.sizes = sizes
- self.src_table_name = tablename
- self.schema = schema
- self.distribution_policy = distribution_policy
- self.file_locations = file_locations
- self.bucket_number = bucket_number
- self.partitionby = partitionby
- self.partitions_constraint = partitions_constraint
- self.partitions_name = partitions_name
- self.partitions_compression_level = partitions_compression_level
- self.partitions_compression_type = partitions_compression_type
- self.partitions_checksum = partitions_checksum
- self.partitions_filepaths = partitions_filepaths
- self.partitions_filesizes = partitions_filesizes
- self.encoding = encoding
-
- def option_parser_yml(yml_file):
- import yaml
- try:
- with open(yml_file, 'r') as f:
- params = yaml.load(f)
- except yaml.scanner.ScannerError as e:
- print e
+ if sz < 0:
+ logger.error('File size(%s) in yaml configuration file should not be less than 0.' % sz)
+ self.failure_handler.rollback()
+ sys.exit(1)
+ for k, fn in enumerate(self.files):
+ hdfscmd = 'hadoop fs -du %s' % fn
+ _, out, _ = local_ssh_output(hdfscmd)
+ if self.sizes[k] > int(out.strip().split()[0]):
+ logger.error('File size(%s) in yaml configuration file should not exceed actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn))
self.failure_handler.rollback()
sys.exit(1)
-
- table_column_num = self.accessor.get_table_column_num(self.tablename)
- register_yaml_dict_check(params, table_column_num, self.tablename)
- partitions_filepaths = []
- partitions_filesizes = []
- partitions_constraint = []
- partitions_name = []
- partitions_checksum = []
- partitions_compression_level = []
- partitions_compression_type = []
- files, sizes = [], []
-
- if params['FileFormat'].lower() == 'parquet':
- partitionby = params.get('Parquet_FileLocations').get('PartitionBy')
- if partitionby:
- logger.info('Partition table is not supported in current release of hawq register.')
- sys.exit(0)
- if params.get('Parquet_FileLocations').get('Partitions') and len(params['Parquet_FileLocations']['Partitions']):
- partitions_checksum = [d['Checksum'] for d in params['Parquet_FileLocations']['Partitions']]
- partitions_compression_level = [d['CompressionLevel'] for d in params['Parquet_FileLocations']['Partitions']]
- partitions_compression_type = [d['CompressionType'] for d in params['Parquet_FileLocations']['Partitions']]
- partitions_constraint = [d['Constraint'] for d in params['Parquet_FileLocations']['Partitions']]
- partitions_files = [d['Files'] for d in params['Parquet_FileLocations']['Partitions']]
- if len(partitions_files):
- for pfile in partitions_files:
- partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile])
- partitions_filesizes.append([item['size'] for item in pfile])
- partitions_name = [d['Name'] for d in params['Parquet_FileLocations']['Partitions']]
- if len(params['Parquet_FileLocations']['Files']):
- files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']]
- encoding = params['Encoding']
- set_yml_dataa('Parquet', files, sizes, params['TableName'], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'], partitionby,\
- partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
-
- else: #AO format
- partitionby = params.get('AO_FileLocations').get('PartitionBy')
- if partitionby:
- logger.info('Partition table is not supported in current release of hawq register.')
- sys.exit(0)
- if params.get('AO_FileLocations').get('Partitions') and len(params['AO_FileLocations']['Partitions']):
- partitions_checksum = [d['Checksum'] for d in params['AO_FileLocations']['Partitions']]
- partitions_compressionLevel = [d['CompressionLevel'] for d in params['AO_FileLocations']['Partitions']]
- partitions_compressionType = [d['CompressionType'] for d in params['AO_FileLocations']['Partitions']]
- partitions_constraint = [d['Constraint'] for d in params['AO_FileLocations']['Partitions']]
- partitions_files = [d['Files'] for d in params['AO_FileLocations']['Partitions']]
- if len(partitions_files):
- for pfile in partitions_files:
- partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile])
- partitions_filesizes.append([item['size'] for item in pfile])
- partitions_name = [d['Name'] for d in params['AO_FileLocations']['Partitions']]
- if len(params['AO_FileLocations']['Files']):
- files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']], [d['size'] for d in params['AO_FileLocations']['Files']]
- encoding = params['Encoding']
- set_yml_dataa('AO', files, sizes, params['TableName'], params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'], partitionby, partitions_constraint,\
- partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
-
- def check_file_not_folder():
- for fn in self.files:
- hdfscmd = 'hadoop fs -test -f %s' % fn
- if local_ssh(hdfscmd, logger):
- logger.info('%s is not a file in hdfs, please check the yaml configuration file.' % fn)
- sys.exit(1)
-
- def is_folder(filepath):
- hdfscmd = 'hadoop fs -test -d %s' % filepath
- if local_ssh(hdfscmd, logger):
- return False
- else:
- return True
- def check_sizes_valid():
- for sz in self.sizes:
- if type(sz) != type(1):
- logger.error('File size(%s) in yaml configuration file should be int type.' % sz)
- self.failure_handler.rollback()
- sys.exit(1)
- if sz < 0:
- logger.error('File size(%s) in yaml configuration file should not be less than 0.' % sz)
+ def _check_no_regex_filepath(self, files):
+ for fn in files:
+ tmp_lst = fn.split('/')
+ for v in tmp_lst:
+ if v == '.':
+ logger.error('Hawq register does not support file path with regex: %s.' % fn)
self.failure_handler.rollback()
sys.exit(1)
- for k, fn in enumerate(self.files):
- hdfscmd = 'hadoop fs -du %s' % fn
- _, out, _ = local_ssh_output(hdfscmd)
- if self.sizes[k] > int(out.strip().split()[0]):
- logger.error('File size(%s) in yaml configuration file should not exceed actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn))
+ for ch in ['..', '*']:
+ if fn.find(ch) != -1:
+ logger.error('Hawq register does not support file path with regex: %s.' % fn)
self.failure_handler.rollback()
sys.exit(1)
- def check_no_regex_filepath(files):
- for fn in files:
- tmp_lst = fn.split('/')
- for v in tmp_lst:
- if v == '.':
- logger.error('Hawq register does not support file path with regex: %s.' % fn)
- self.failure_handler.rollback()
- sys.exit(1)
- for ch in ['..', '*']:
- if fn.find(ch) != -1:
- logger.error('Hawq register does not support file path with regex: %s.' % fn)
- self.failure_handler.rollback()
- sys.exit(1)
-
+ def _init(self):
if self.yml:
- option_parser_yml(options.yml_config)
+ self._option_parser_yml(options.yml_config)
self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
- check_file_not_folder()
- check_database_encoding()
+ self._check_file_not_folder()
+ self._check_database_encoding()
if self.mode != 'repair':
- if not create_table() and self.mode != 'force':
+ if not self._create_table() and self.mode != 'force':
self.mode = 'usage2_table_exist'
- check_bucket_number()
- check_distribution_policy()
- check_policy_consistency()
- check_no_regex_filepath(self.files)
else:
- if is_folder(self.filepath) and self.filesize:
+ if self._is_folder(self.filepath) and self.filesize:
logger.error('-e option is only supported with single file case.')
sys.exit(1)
self.file_format = 'Parquet'
- check_hash_type() # Usage1 only support randomly distributed table
+ self._check_hash_type() # Usage1 only support randomly distributed table
+ self.queries = "set allow_system_table_mods='dml';"
+ self.queries += "begin transaction;"
+ self._do_check()
+ self._prepare_register()
+ self.queries += "end transaction;"
+
+ def _do_check(self):
+ if self.yml:
+ self._check_bucket_number()
+ self._check_distribution_policy()
+ self._check_policy_consistency()
+ self._check_no_regex_filepath(self.files)
if not self.filepath:
if self.mode == 'usage1':
logger.info('Please specify filepath with -f option.')
@@ -557,18 +545,18 @@ class HawqRegister(object):
logger.info('Hawq Register Succeed.')
sys.exit(0)
- (self.seg_name, tmp_ret) = get_seg_name()
+ (self.seg_name, tmp_ret) = self._get_seg_name()
if not tmp_ret:
self.failure_handler.rollback()
sys.exit(1)
- self.firstsegno, self.tabledir = get_metadata()
+ self.firstsegno, self.tabledir = self._get_metadata()
if self.mode == 'repair':
if self.tabledir.strip('/') != self.filepath.strip('/'):
logger.error("In repair mode, file path from yaml file should be the same with table's path.")
self.failure_handler.rollback()
sys.exit(1)
- seg_list, existed_sizes = get_metadata_from_table()
+ seg_list, existed_sizes = self._get_metadata_from_table()
existed_files = [self.tabledir + seg for seg in seg_list]
existed_info = {}
for k, fn in enumerate(existed_files):
@@ -590,13 +578,13 @@ class HawqRegister(object):
sys.exit(1)
if not self.yml:
- check_no_regex_filepath([self.filepath])
+ self._check_no_regex_filepath([self.filepath])
self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
self.do_not_move, self.files_update, self.sizes_update = False, [], []
self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes]
if self.mode == 'force':
- seg_list, _ = get_metadata_from_table()
+ seg_list, _ = self._get_metadata_from_table()
existed_files = [self.tabledir + seg for seg in seg_list]
if len(self.files) == len(existed_files):
if sorted(self.files) != sorted(existed_files):
@@ -641,17 +629,11 @@ class HawqRegister(object):
self.failure_handler.rollback()
sys.exit(1)
self.sizes = [self.filesize]
- check_sizes_valid()
+ self._check_sizes_valid()
if self.file_format == 'Parquet':
self._check_parquet_format(self.files)
- def _get_partition_info(self):
- dic = {}
- for ele in self.accessor.get_partition_info(self.tablename):
- dic[ele['partitionboundary']] = ele['partitiontablename']
- return dic
-
def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir):
'''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
if not filepath:
@@ -714,13 +696,17 @@ class HawqRegister(object):
self.failure_handler.rollback()
sys.exit(1)
- def _move_files_in_hdfs(self):
- '''Move file(s) in src path into the folder correspoding to the target table'''
+ def _set_move_files_in_hdfs(self):
segno = self.firstsegno
for f in self.newfiles:
- srcfile = f
- dstfile = self.tabledir + str(segno)
+ self.srcfiles.append(f)
+ self.dstfiles.append(self.tabledir + str(segno))
segno += 1
+
+ def _move_files_in_hdfs(self):
+ '''Move file(s) in src path into the folder correspoding to the target table'''
+ for k, srcfile in enumerate(self.srcfiles):
+ dstfile = self.dstfiles[k]
if srcfile != dstfile:
hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
@@ -733,7 +719,7 @@ class HawqRegister(object):
def _delete_files_in_hdfs(self):
for fn in self.files_delete:
- hdfscmd = 'hadoop dfs -rm %s' % fn
+ hdfscmd = 'hadoop fs -rm %s' % fn
sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
result = local_ssh(hdfscmd, logger)
if result != 0:
@@ -741,10 +727,10 @@ class HawqRegister(object):
self.failure_handler.rollback()
sys.exit(1)
- def _modify_metadata(self, mode):
+ def _set_modify_metadata(self, mode):
if mode == 'insert':
eofs = self.sizes
- query = "set allow_system_table_mods='dml';"
+ query = ""
if self.file_format == 'Parquet':
query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
for k, eof in enumerate(eofs[1:]):
@@ -754,12 +740,11 @@ class HawqRegister(object):
for k, eof in enumerate(eofs[1:]):
query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
query += ';'
+ self.queries += query
elif mode == 'force':
eofs = self.sizes
- query = "set allow_system_table_mods='dml';"
- query += "begin transaction;"
segno_lst = [f.split('/')[-1] for f in self.files]
- query += "delete from pg_aoseg.%s;" % (self.seg_name)
+ query = "delete from pg_aoseg.%s;" % (self.seg_name)
firstsegno = 1
if self.file_format == 'Parquet':
query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1)
@@ -770,11 +755,10 @@ class HawqRegister(object):
for k, eof in enumerate(eofs[1:]):
query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
query += ';'
- query += "end transaction;"
+ self.queries += query
elif mode == 'update':
eofs = self.sizes_update
- query = "set allow_system_table_mods='dml';"
- query += "begin transaction;"
+ query = ""
segno_lst = [f.split('/')[-1] for f in self.files_update]
if self.file_format == 'Parquet':
for i, eof in enumerate(eofs):
@@ -782,11 +766,10 @@ class HawqRegister(object):
else:
for i, eof in enumerate(eofs):
query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
- query += "end transaction;"
+ self.queries += query
else: # update_and_insert
eofs = self.sizes
- query = "set allow_system_table_mods='dml';"
- query += "begin transaction;"
+ query = ""
if self.file_format == 'Parquet':
query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
for k, eof in enumerate(eofs[1:]):
@@ -804,9 +787,11 @@ class HawqRegister(object):
else:
for i, eof in enumerate(self.sizes_update):
query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
- query += "end transaction;"
+ self.queries += query
+
+ def _modify_metadata(self, mode):
try:
- self.utility_accessor.update_catalog(query)
+ self.utility_accessor.update_catalog(self.queries)
except pg.DatabaseError as e:
print e
self.failure_handler.rollback()
@@ -821,14 +806,18 @@ class HawqRegister(object):
query += "end transaction;"
return self.utility_accessor.update_catalog(query)
- def _mapping_tablename_from_yml(self, partitions):
- ''' Mapping table name from yml file, return a list of (table_name,(file_path, file_size)) '''
- mappings = []
- for pos, constraint in enumerate(self.partitions_constraint):
- if partitions.has_key(constraint):
- mappings.extend([(partitions[constraint], (self.partitions_filepaths[pos][i], self.partitions_filesizes[pos][i]))
- for i in xrange(len(self.partitions_filepaths[pos]))])
- return mappings
+ def _prepare_register(self):
+ if not self.do_not_move:
+ self._set_move_files_in_hdfs()
+ if (not self.do_not_move) and self.mode == 'force':
+ self._set_modify_metadata('force')
+ else:
+ if self.mode == 'force':
+ self._set_modify_metadata('force')
+ elif self.mode == 'repair':
+ self._set_modify_metadata('update')
+ else:
+ self._set_modify_metadata('insert')
def register(self):
if not self.do_not_move: