You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2016/10/10 09:44:47 UTC

incubator-hawq git commit: HAWQ-991. Refator HAWQ Register code for partition table.

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 28d192d23 -> c7d6a7f52


HAWQ-991. Refator HAWQ Register code for partition table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/c7d6a7f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/c7d6a7f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/c7d6a7f5

Branch: refs/heads/master
Commit: c7d6a7f52b03ece32d10b1dd9088a91e14565384
Parents: 28d192d
Author: Chunling Wang <wa...@126.com>
Authored: Sat Oct 1 22:06:51 2016 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Mon Oct 10 17:44:06 2016 +0800

----------------------------------------------------------------------
 tools/bin/hawqregister | 461 +++++++++++++++++++++-----------------------
 1 file changed, 225 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c7d6a7f5/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 69809f7..29b3a30 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -252,11 +252,6 @@ class GpRegisterAccessor(object):
         rows = self.exec_query(query)
         return rows[0]['attrnums']
 
-    def get_partition_info(self, tablename):
-        ''' Get partition information from pg_partitions, return a constraint-tablename dictionary '''
-        query = "SELECT partitiontablename, partitionboundary FROM pg_partitions WHERE tablename = '%s'" % tablename
-        return self.exec_query(query)
-
     def get_bucket_number(self, tablename):
         query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
         rows = self.exec_query(query)
@@ -299,17 +294,20 @@ class HawqRegister(object):
         self.yml = options.yml_config
         self.filepath = options.filepath
         self.database = options.database
+        self.dst_table_name = table.lower()
         self.tablename = table.lower()
         self.filesize = options.filesize
         self.accessor = GpRegisterAccessor(conn)
         self.utility_accessor = GpRegisterAccessor(utility_conn)
         self.failure_handler = failure_handler
         self.mode = self._init_mode(options.force, options.repair)
+        self.srcfiles = []
+        self.dstfiles = []
         self._init()
 
     def _init_mode(self, force, repair):
         def table_existed():
-            return self.accessor.get_table_existed(self.tablename)
+            return self.accessor.get_table_existed(self.dst_table_name)
 
         if self.yml:
             if force:
@@ -325,231 +323,221 @@ class HawqRegister(object):
                 return 'usage2_table_not_exist'
         else:
             if not table_existed():
-                logger.error('Table %s does not exist.\nYou should create table before registering the data.' % self.tablename)
+                logger.error('Table %s does not exist.\nYou should create table before registering the data.' % self.dst_table_name)
                 sys.exit(1)
             else:
                 return 'usage1'
 
-    def _init(self):
-        def check_hash_type():
-            self.accessor.check_hash_type(self.tablename)
-
-        # check conflicting distributed policy
-        def check_distribution_policy():
-            if self.distribution_policy.startswith('DISTRIBUTED BY'):
-                if len(self.files) % self.bucket_number != 0:
-                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
-                    self.failure_handler.rollback()
-                    sys.exit(1)
-
-        def check_database_encoding():
-            encoding_indx = self.accessor.get_database_encoding_indx(self.database)
-            encoding = self.accessor.get_database_encoding(encoding_indx)
-            if self.encoding.strip() != encoding:
-                logger.error('Database encoding from yaml configuration file(%s) is not consistent with encoding from input args(%s).' % (self.encoding, encoding))
-                sys.exit(1)
+    def _check_hash_type(self):
+        self.accessor.check_hash_type(self.dst_table_name)
 
-        def create_table():
-            try:
-                (ret, query) = self.accessor.do_create_table(self.src_table_name, self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number,
+    def _create_table(self):
+        try:
+           (ret, query) = self.accessor.do_create_table(self.src_table_name, self.dst_table_name, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number,
                                                              self.partitionby, self.partitions_constraint, self.partitions_name)
-            except pg.DatabaseError as e:
-                print e
+        except pg.DatabaseError as e:
+            print e
+            sys.exit(1)
+        if ret:
+            self.failure_handler.commit(('SQL', query))
+        return ret
+
+    def _check_database_encoding(self):
+        encoding_indx = self.accessor.get_database_encoding_indx(self.database)
+        encoding = self.accessor.get_database_encoding(encoding_indx)
+        if self.encoding.strip() != encoding:
+            logger.error('Database encoding from yaml configuration file(%s) is not consistent with encoding from input args(%s).' % (self.encoding, encoding))
+            sys.exit(1)
+
+    def _check_policy_consistency(self):
+        policy = self._get_distribution_policy() # "" or "{1,3}"
+        if policy is None:
+            return
+        if self.distribution_policy == 'DISTRIBUTED RANDOMLY':
+            logger.error('Distribution policy of %s from yaml is not consistent with the policy of existing table.' % self.tablename)
+            self.failure_handler.rollback()
+            sys.exit(1)
+        tmp_dict = {}
+        for i, d in enumerate(self.schema):
+            tmp_dict[d['name']] = i + 1
+        # 'DISTRIBUETD BY (1,3)' -> {1,3}
+        cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
+        original_policy = ','.join([str(tmp_dict[col]) for col in cols])
+        if policy.strip('{').strip('}') != original_policy:
+            logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.dst_table_name)
+            self.failure_handler.rollback()
+            sys.exit(1)
+
+    def _set_yml_dataa(self, file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\
+                      bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\
+                      partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding):
+        self.file_format = file_format
+        self.files = files
+        self.sizes = sizes
+        self.src_table_name = tablename
+        self.schema = schema
+        self.distribution_policy = distribution_policy
+        self.file_locations = file_locations
+        self.bucket_number = bucket_number
+        self.partitionby = partitionby
+        self.partitions_constraint = partitions_constraint
+        self.partitions_name = partitions_name
+        self.partitions_compression_level = partitions_compression_level
+        self.partitions_compression_type = partitions_compression_type
+        self.partitions_checksum = partitions_checksum
+        self.partitions_filepaths = partitions_filepaths
+        self.partitions_filesizes = partitions_filesizes
+        self.encoding = encoding
+
+    def _option_parser_yml(self, yml_file):
+        import yaml
+        try:
+            with open(yml_file, 'r') as f:
+                params = yaml.load(f)
+        except yaml.scanner.ScannerError as e:
+            print e
+            self.failure_handler.rollback()
+            sys.exit(1)
+        table_column_num = self.accessor.get_table_column_num(self.tablename)
+        register_yaml_dict_check(params, table_column_num, self.tablename)
+        partitions_filepaths = []
+        partitions_filesizes = []
+        partitions_constraint = []
+        partitions_name = []
+        partitions_checksum = []
+        partitions_compression_level = []
+        partitions_compression_type = []
+        files, sizes = [], []
+
+        if params['FileFormat'].lower() == 'parquet':
+            Format = 'Parquet'
+        else: #AO format
+            Format = 'AO'
+        Format_FileLocations = '%s_FileLocations' % Format
+        partitionby = params.get(Format_FileLocations).get('PartitionBy')
+        if partitionby:
+            logger.info('Partition table is not supported in current release of hawq register.')
+            sys.exit(0)
+        if params.get(Format_FileLocations).get('Partitions') and len(params[Format_FileLocations]['Partitions']):
+            partitions_checksum = [d['Checksum'] for d in params[Format_FileLocations]['Partitions']]
+            partitions_compression_level = [d['CompressionLevel'] for d in params[Format_FileLocations]['Partitions']]
+            partitions_compression_type = [d['CompressionType'] for d in params[Format_FileLocations]['Partitions']]
+            partitions_constraint = [d['Constraint'] for d in params[Format_FileLocations]['Partitions']]
+            partitions_files = [d['Files'] for d in params[Format_FileLocations]['Partitions']]
+            if len(partitions_files):
+                for pfile in partitions_files:
+                    partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile])
+                    partitions_filesizes.append([item['size'] for item in pfile])
+            partitions_name = [d['Name'] for d in params[Format_FileLocations]['Partitions']]
+        if len(params[Format_FileLocations]['Files']):
+            files, sizes = [params['DFS_URL'] + d['path'] for d in params[Format_FileLocations]['Files']], [d['size'] for d in params[Format_FileLocations]['Files']]
+        encoding = params['Encoding']
+        self._set_yml_dataa(Format, files, sizes, params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations], params['Bucketnum'], partitionby,\
+                      partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
+
+
+    # check conflicting distributed policy
+    def _check_distribution_policy(self):
+        if self.distribution_policy.startswith('DISTRIBUTED BY'):
+            if len(self.files) % self.bucket_number != 0:
+                logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
+                self.failure_handler.rollback()
                 sys.exit(1)
-            if ret:
-                self.failure_handler.commit(('SQL', query))
-            return ret
 
-        def get_seg_name():
-            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
+    def _get_seg_name(self):
+        return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
 
-        def get_metadata():
-            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
+    def _get_metadata(self):
+        return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
 
-        def get_metadata_from_table():
-            return self.accessor.get_metadata_from_seg_name(self.seg_name)
+    def _get_metadata_from_table(self):
+        return self.accessor.get_metadata_from_seg_name(self.seg_name)
 
-        def get_distribution_policy():
-            return self.accessor.get_distribution_policy_info(self.tablename)
+    def _get_distribution_policy(self):
+        return self.accessor.get_distribution_policy_info(self.tablename)
 
-        def check_policy_consistency():
-            policy = get_distribution_policy() # "" or "{1,3}"
-            if policy is None:
-                return
-            if self.distribution_policy == 'DISTRIBUTED RANDOMLY':
-                logger.error('Distribution policy of %s from yaml is not consistent with the policy of existing table.' % self.tablename)
-                self.failure_handler.rollback()
-                sys.exit(1)
-            tmp_dict = {}
-            for i, d in enumerate(self.schema):
-                tmp_dict[d['name']] = i + 1
-            # 'DISTRIBUETD BY (1,3)' -> {1,3}
-            cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
-            original_policy = ','.join([str(tmp_dict[col]) for col in cols])
-            if policy.strip('{').strip('}') != original_policy:
-                logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename)
-                self.failure_handler.rollback()
+    def _check_bucket_number(self):
+        def get_bucket_number():
+            return self.accessor.get_bucket_number(self.tablename)
+
+        if self.bucket_number != get_bucket_number():
+            logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
+            self.failure_handler.rollback()
+            sys.exit(1)
+
+    def _check_file_not_folder(self):
+        for fn in self.files:
+            hdfscmd = 'hadoop fs -test -f %s' % fn
+            if local_ssh(hdfscmd, logger):
+                logger.info('%s is not a file in hdfs, please check the yaml configuration file.' % fn)
                 sys.exit(1)
 
-        def check_bucket_number():
-            def get_bucket_number():
-                return self.accessor.get_bucket_number(self.tablename)
+    def _is_folder(self, filepath):
+        hdfscmd = 'hadoop fs -test -d %s' % filepath
+        if local_ssh(hdfscmd, logger):
+            return False
+        else:
+            return True
 
-            if self.bucket_number != get_bucket_number():
-                logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
+    def _check_sizes_valid(self):
+        for sz in self.sizes:
+            if type(sz) != type(1):
+                logger.error('File size(%s) in yaml configuration file should be int type.' % sz)
                 self.failure_handler.rollback()
                 sys.exit(1)
-
-        def set_yml_dataa(file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\
-                          bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\
-                          partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding):
-            self.file_format = file_format
-            self.files = files
-            self.sizes = sizes
-            self.src_table_name = tablename
-            self.schema = schema
-            self.distribution_policy = distribution_policy
-            self.file_locations = file_locations
-            self.bucket_number = bucket_number
-            self.partitionby = partitionby
-            self.partitions_constraint = partitions_constraint
-            self.partitions_name = partitions_name
-            self.partitions_compression_level = partitions_compression_level
-            self.partitions_compression_type = partitions_compression_type
-            self.partitions_checksum = partitions_checksum
-            self.partitions_filepaths = partitions_filepaths
-            self.partitions_filesizes = partitions_filesizes
-            self.encoding = encoding
-
-        def option_parser_yml(yml_file):
-            import yaml
-            try:
-                with open(yml_file, 'r') as f:
-                    params = yaml.load(f)
-            except yaml.scanner.ScannerError as e:
-                print e
+            if sz < 0:
+                logger.error('File size(%s) in yaml configuration file should not be less than 0.' % sz)
+                self.failure_handler.rollback()
+                sys.exit(1)
+        for k, fn in enumerate(self.files):
+            hdfscmd = 'hadoop fs -du %s' % fn
+            _, out, _ = local_ssh_output(hdfscmd)
+            if self.sizes[k] > int(out.strip().split()[0]):
+                logger.error('File size(%s) in yaml configuration file should not exceed actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn))
                 self.failure_handler.rollback()
                 sys.exit(1)
-            
-            table_column_num = self.accessor.get_table_column_num(self.tablename)
-            register_yaml_dict_check(params, table_column_num, self.tablename)
-            partitions_filepaths = []
-            partitions_filesizes = []
-            partitions_constraint = []
-            partitions_name = []
-            partitions_checksum = []
-            partitions_compression_level = []
-            partitions_compression_type = []
-            files, sizes = [], []
-
-            if params['FileFormat'].lower() == 'parquet':
-                partitionby = params.get('Parquet_FileLocations').get('PartitionBy')
-                if partitionby:
-                    logger.info('Partition table is not supported in current release of hawq register.')
-                    sys.exit(0)
-                if params.get('Parquet_FileLocations').get('Partitions') and len(params['Parquet_FileLocations']['Partitions']):
-                    partitions_checksum = [d['Checksum'] for d in params['Parquet_FileLocations']['Partitions']]
-                    partitions_compression_level = [d['CompressionLevel'] for d in params['Parquet_FileLocations']['Partitions']]
-                    partitions_compression_type = [d['CompressionType'] for d in params['Parquet_FileLocations']['Partitions']]
-                    partitions_constraint = [d['Constraint'] for d in params['Parquet_FileLocations']['Partitions']]
-                    partitions_files = [d['Files'] for d in params['Parquet_FileLocations']['Partitions']]
-                    if len(partitions_files):
-                        for pfile in partitions_files:
-                            partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile])
-                            partitions_filesizes.append([item['size'] for item in pfile])
-                    partitions_name = [d['Name'] for d in params['Parquet_FileLocations']['Partitions']]
-                if len(params['Parquet_FileLocations']['Files']):
-                    files, sizes = [params['DFS_URL'] + d['path'] for d in params['Parquet_FileLocations']['Files']], [d['size'] for d in params['Parquet_FileLocations']['Files']]
-                encoding = params['Encoding']
-                set_yml_dataa('Parquet', files, sizes, params['TableName'], params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'], params['Bucketnum'], partitionby,\
-                              partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
-
-            else: #AO format
-                partitionby = params.get('AO_FileLocations').get('PartitionBy')
-                if partitionby:
-                    logger.info('Partition table is not supported in current release of hawq register.')
-                    sys.exit(0)
-                if params.get('AO_FileLocations').get('Partitions') and len(params['AO_FileLocations']['Partitions']):
-                    partitions_checksum = [d['Checksum'] for d in params['AO_FileLocations']['Partitions']]
-                    partitions_compressionLevel = [d['CompressionLevel'] for d in params['AO_FileLocations']['Partitions']]
-                    partitions_compressionType = [d['CompressionType'] for d in params['AO_FileLocations']['Partitions']]
-                    partitions_constraint = [d['Constraint'] for d in params['AO_FileLocations']['Partitions']]
-                    partitions_files = [d['Files'] for d in params['AO_FileLocations']['Partitions']]
-                    if len(partitions_files):
-                        for pfile in partitions_files:
-                            partitions_filepaths.append([params['DFS_URL'] + item['path'] for item in pfile])
-                            partitions_filesizes.append([item['size'] for item in pfile])
-                    partitions_name = [d['Name'] for d in params['AO_FileLocations']['Partitions']]
-                if len(params['AO_FileLocations']['Files']):
-                    files, sizes = [params['DFS_URL'] + d['path'] for d in params['AO_FileLocations']['Files']], [d['size'] for d in params['AO_FileLocations']['Files']]
-                encoding = params['Encoding']
-                set_yml_dataa('AO', files, sizes, params['TableName'], params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'], partitionby, partitions_constraint,\
-                              partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)
-
-        def check_file_not_folder():
-            for fn in self.files:
-                hdfscmd = 'hadoop fs -test -f %s' % fn
-                if local_ssh(hdfscmd, logger):
-                    logger.info('%s is not a file in hdfs, please check the yaml configuration file.' % fn)
-                    sys.exit(1)
-
-        def is_folder(filepath):
-            hdfscmd = 'hadoop fs -test -d %s' % filepath
-            if local_ssh(hdfscmd, logger):
-                return False
-            else:
-                return True
 
-        def check_sizes_valid():
-            for sz in self.sizes:
-                if type(sz) != type(1):
-                    logger.error('File size(%s) in yaml configuration file should be int type.' % sz)
-                    self.failure_handler.rollback()
-                    sys.exit(1)
-                if sz < 0:
-                    logger.error('File size(%s) in yaml configuration file should not be less than 0.' % sz)
+    def _check_no_regex_filepath(self, files):
+        for fn in files:
+            tmp_lst = fn.split('/')
+            for v in tmp_lst:
+                if v == '.':
+                    logger.error('Hawq register does not support file path with regex: %s.' % fn)
                     self.failure_handler.rollback()
                     sys.exit(1)
-            for k, fn in enumerate(self.files):
-                hdfscmd = 'hadoop fs -du %s' % fn
-                _, out, _ = local_ssh_output(hdfscmd)
-                if self.sizes[k] > int(out.strip().split()[0]):
-                    logger.error('File size(%s) in yaml configuration file should not exceed actual length(%s) of file %s.' % (self.sizes[k], out.strip().split()[0], fn))
+            for ch in ['..', '*']:
+                if fn.find(ch) != -1:
+                    logger.error('Hawq register does not support file path with regex: %s.' % fn)
                     self.failure_handler.rollback()
                     sys.exit(1)
 
-        def check_no_regex_filepath(files):
-            for fn in files:
-                tmp_lst = fn.split('/')
-                for v in tmp_lst:
-                    if v == '.':
-                        logger.error('Hawq register does not support file path with regex: %s.' % fn)
-                        self.failure_handler.rollback()
-                        sys.exit(1)
-                for ch in ['..', '*']:
-                    if fn.find(ch) != -1:
-                        logger.error('Hawq register does not support file path with regex: %s.' % fn)
-                        self.failure_handler.rollback()
-                        sys.exit(1)
-
+    def _init(self):
         if self.yml:
-            option_parser_yml(options.yml_config)
+            self._option_parser_yml(options.yml_config)
             self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
-            check_file_not_folder()
-            check_database_encoding()
+            self._check_file_not_folder()
+            self._check_database_encoding()
             if self.mode != 'repair':
-                if not create_table() and self.mode != 'force':
+                if not self._create_table() and self.mode != 'force':
                     self.mode = 'usage2_table_exist'
-            check_bucket_number()
-            check_distribution_policy()
-            check_policy_consistency()
-            check_no_regex_filepath(self.files)
         else:
-            if is_folder(self.filepath) and self.filesize:
+            if self._is_folder(self.filepath) and self.filesize:
                 logger.error('-e option is only supported with single file case.')
                 sys.exit(1)
             self.file_format = 'Parquet'
-            check_hash_type() # Usage1 only support randomly distributed table
+            self._check_hash_type() # Usage1 only support randomly distributed table
+        self.queries = "set allow_system_table_mods='dml';"
+        self.queries += "begin transaction;"
+        self._do_check()
+        self._prepare_register()
+        self.queries += "end transaction;"
+
+    def _do_check(self):
+        if self.yml:
+            self._check_bucket_number()
+            self._check_distribution_policy()
+            self._check_policy_consistency()
+            self._check_no_regex_filepath(self.files)
         if not self.filepath:
             if self.mode == 'usage1':
                 logger.info('Please specify filepath with -f option.')
@@ -557,18 +545,18 @@ class HawqRegister(object):
                 logger.info('Hawq Register Succeed.')
             sys.exit(0)
 
-        (self.seg_name, tmp_ret) = get_seg_name()
+        (self.seg_name, tmp_ret) = self._get_seg_name()
         if not tmp_ret:
             self.failure_handler.rollback()
             sys.exit(1)
-        self.firstsegno, self.tabledir = get_metadata()
+        self.firstsegno, self.tabledir = self._get_metadata()
 
         if self.mode == 'repair':
             if self.tabledir.strip('/') != self.filepath.strip('/'):
                 logger.error("In repair mode, file path from yaml file should be the same with table's path.")
                 self.failure_handler.rollback()
                 sys.exit(1)
-            seg_list, existed_sizes = get_metadata_from_table()
+            seg_list, existed_sizes = self._get_metadata_from_table()
             existed_files = [self.tabledir + seg for seg in seg_list]
             existed_info = {}
             for k, fn in enumerate(existed_files):
@@ -590,13 +578,13 @@ class HawqRegister(object):
                 sys.exit(1)
 
         if not self.yml:
-            check_no_regex_filepath([self.filepath])
+            self._check_no_regex_filepath([self.filepath])
             self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
 
         self.do_not_move, self.files_update, self.sizes_update = False, [], []
         self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes]
         if self.mode == 'force':
-            seg_list, _ = get_metadata_from_table()
+            seg_list, _ = self._get_metadata_from_table()
             existed_files = [self.tabledir + seg for seg in seg_list]
             if len(self.files) == len(existed_files):
                 if sorted(self.files) != sorted(existed_files):
@@ -641,17 +629,11 @@ class HawqRegister(object):
                 self.failure_handler.rollback()
                 sys.exit(1)
             self.sizes = [self.filesize]
-        check_sizes_valid()
+        self._check_sizes_valid()
 
         if self.file_format == 'Parquet':
             self._check_parquet_format(self.files)
 
-    def _get_partition_info(self):
-        dic = {}
-        for ele in self.accessor.get_partition_info(self.tablename):
-            dic[ele['partitionboundary']] = ele['partitiontablename']
-        return dic
-
     def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir):
         '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
         if not filepath:
@@ -714,13 +696,17 @@ class HawqRegister(object):
                 self.failure_handler.rollback()
                 sys.exit(1)
 
-    def _move_files_in_hdfs(self):
-        '''Move file(s) in src path into the folder correspoding to the target table'''
+    def _set_move_files_in_hdfs(self):
         segno = self.firstsegno
         for f in self.newfiles:
-            srcfile = f
-            dstfile = self.tabledir + str(segno)
+            self.srcfiles.append(f)
+            self.dstfiles.append(self.tabledir + str(segno))
             segno += 1
+
+    def _move_files_in_hdfs(self):
+        '''Move file(s) in src path into the folder correspoding to the target table'''
+        for k, srcfile in enumerate(self.srcfiles):
+            dstfile = self.dstfiles[k]
             if srcfile != dstfile:
                 hdfscmd = 'hadoop fs -mv %s %s' % (srcfile, dstfile)
                 sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
@@ -733,7 +719,7 @@ class HawqRegister(object):
 
     def _delete_files_in_hdfs(self):
         for fn in self.files_delete:
-            hdfscmd = 'hadoop dfs -rm %s' % fn
+            hdfscmd = 'hadoop fs -rm %s' % fn
             sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
             result = local_ssh(hdfscmd, logger)
             if result != 0:
@@ -741,10 +727,10 @@ class HawqRegister(object):
                 self.failure_handler.rollback()
                 sys.exit(1)
 
-    def _modify_metadata(self, mode):
+    def _set_modify_metadata(self, mode):
         if mode == 'insert':
             eofs = self.sizes
-            query = "set allow_system_table_mods='dml';"
+            query = ""
             if self.file_format == 'Parquet':
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
                 for k, eof in enumerate(eofs[1:]):
@@ -754,12 +740,11 @@ class HawqRegister(object):
                 for k, eof in enumerate(eofs[1:]):
                     query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
             query += ';'
+            self.queries += query
         elif mode == 'force':
             eofs = self.sizes
-            query = "set allow_system_table_mods='dml';"
-            query += "begin transaction;"
             segno_lst = [f.split('/')[-1] for f in self.files]
-            query += "delete from pg_aoseg.%s;" % (self.seg_name)
+            query = "delete from pg_aoseg.%s;" % (self.seg_name)
             firstsegno = 1
             if self.file_format == 'Parquet':
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1)
@@ -770,11 +755,10 @@ class HawqRegister(object):
                 for k, eof in enumerate(eofs[1:]):
                     query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
             query += ';'
-            query += "end transaction;"
+            self.queries += query
         elif mode == 'update':
             eofs = self.sizes_update
-            query = "set allow_system_table_mods='dml';"
-            query += "begin transaction;"
+            query = ""
             segno_lst = [f.split('/')[-1] for f in self.files_update]
             if self.file_format == 'Parquet':
                 for i, eof in enumerate(eofs):
@@ -782,11 +766,10 @@ class HawqRegister(object):
             else:
                 for i, eof in enumerate(eofs):
                     query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
-            query += "end transaction;"
+            self.queries += query
         else: # update_and_insert
             eofs = self.sizes
-            query = "set allow_system_table_mods='dml';"
-            query += "begin transaction;"
+            query = ""
             if self.file_format == 'Parquet':
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
                 for k, eof in enumerate(eofs[1:]):
@@ -804,9 +787,11 @@ class HawqRegister(object):
             else:
                 for i, eof in enumerate(self.sizes_update):
                     query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
-            query += "end transaction;"
+            self.queries += query
+    
+    def _modify_metadata(self, mode):
         try:
-            self.utility_accessor.update_catalog(query)
+            self.utility_accessor.update_catalog(self.queries)
         except pg.DatabaseError as e:
             print e
             self.failure_handler.rollback()
@@ -821,14 +806,18 @@ class HawqRegister(object):
         query += "end transaction;"
         return self.utility_accessor.update_catalog(query)
 
-    def _mapping_tablename_from_yml(self, partitions):
-        ''' Mapping table name from yml file, return a list of (table_name,(file_path, file_size)) '''
-        mappings = []
-        for pos, constraint in enumerate(self.partitions_constraint):
-            if partitions.has_key(constraint):
-                mappings.extend([(partitions[constraint], (self.partitions_filepaths[pos][i], self.partitions_filesizes[pos][i]))
-                                for i in xrange(len(self.partitions_filepaths[pos]))])
-        return mappings
+    def _prepare_register(self):
+        if not self.do_not_move:
+            self._set_move_files_in_hdfs()
+        if (not self.do_not_move) and self.mode == 'force':
+            self._set_modify_metadata('force')
+        else:
+            if self.mode == 'force':
+                self._set_modify_metadata('force')
+            elif self.mode == 'repair':
+                self._set_modify_metadata('update')
+            else:
+                self._set_modify_metadata('insert')
 
     def register(self):
         if not self.do_not_move: