You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2016/10/24 03:44:53 UTC
incubator-hawq git commit: HAWQ-1113. Fix bug when files in yaml is
disordered, hawq register error in force mode.
Repository: incubator-hawq
Updated Branches:
refs/heads/master 3bcc55791 -> e6fdfd3e3
HAWQ-1113. Fix bug when files in yaml is disordered, hawq register error in force mode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e6fdfd3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e6fdfd3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e6fdfd3e
Branch: refs/heads/master
Commit: e6fdfd3e3817368317c7c72cf8f112ce9d22da37
Parents: 3bcc557
Author: Chunling Wang <wa...@126.com>
Authored: Fri Oct 21 13:36:32 2016 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Mon Oct 24 11:43:34 2016 +0800
----------------------------------------------------------------------
.../test_hawq_register_usage2_case2.cpp | 10 +-
tools/bin/hawqregister | 189 ++++++++++---------
2 files changed, 103 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e6fdfd3e/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp b/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
index 4cce61f..492dbd3 100644
--- a/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
@@ -246,12 +246,12 @@ TEST_F(TestHawqRegister, TestUsage2Case2TableExistNoData) {
util.execute("drop table nt;");
}
-TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2NormalYamlConfig) {
+TEST_F(TestHawqRegister, TestUsage2Case2NormalYamlConfig) {
runYamlCaseForceMode("testusage2case2normalyamlconfig", "usage2case2/normal_yaml_config", 0, 50, 150);
}
-TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2NormalYamlNoUpdateConfig) {
- runYamlCaseForceMode("testusage2case2normalyamlnoupdateconfig", "usage2case2/normal_yaml_no_update_config", 0, 50, 150);
+TEST_F(TestHawqRegister, TestUsage2Case2NormalYamlNoUpdateConfig) {
+ runYamlCaseForceMode("testusage2case2normalyamlnoupdateconfig", "usage2case2/normal_yaml_no_update_config", 0, 50, 100);
}
TEST_F(TestHawqRegister, TestUsage2Case2FileNotIncludedInYamlConfig) {
@@ -270,8 +270,8 @@ TEST_F(TestHawqRegister, TestUsage2Case2HDFSFilePathContainErrorSymbol) {
runYamlCaseForceMode("testusage2case2hdfsfilepathcontainerrorsymbol", "usage2case2/contain_error_symbol");
}
-TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2ZeroEof) {
- runYamlCaseForceMode("testusage2case2zeroeof", "usage2case2/zero_eof", 0, 50, 150);
+TEST_F(TestHawqRegister, TestUsage2Case2ZeroEof) {
+ runYamlCaseForceMode("testusage2case2zeroeof", "usage2case2/zero_eof", 0, 50, 143);
}
TEST_F(TestHawqRegister, TestUsage2Case2LargerEof) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e6fdfd3e/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 245d4d8..27b1e94 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -392,6 +392,9 @@ class HawqRegister(object):
self.partitions_filepaths = partitions_filepaths
self.partitions_filesizes = partitions_filesizes
self.encoding = encoding
+ self.files_same_path = []
+ self.sizes_same_path = []
+ self.segnos_same_path = []
def _option_parser_yml(self, yml_file):
import yaml
@@ -558,14 +561,21 @@ class HawqRegister(object):
self.failure_handler.rollback()
sys.exit(1)
self.firstsegno, self.tabledir = self._get_metadata()
-
+
+ seg_list, existed_sizes = self._get_metadata_from_table()
+ existed_files = [self.tabledir + seg for seg in seg_list]
+ # check if file numbers in hdfs is consistent with the record count of pg_aoseg.
+ hdfs_file_no_lst = [f.split('/')[-1] for f in existed_files]
+ for k in range(1, self.firstsegno - 1):
+ if self.firstsegno - 1 > len(existed_files) or str(k) not in hdfs_file_no_lst:
+ logger.error("Hawq aoseg metadata doesn't consistent with file numbers in hdfs.")
+ self.failure_handler.rollback()
+ sys.exit(1)
if self.mode == 'repair':
if self.tabledir.strip('/') != self.filepath.strip('/'):
logger.error("In repair mode, file path from yaml file should be the same with table's path.")
self.failure_handler.rollback()
sys.exit(1)
- seg_list, existed_sizes = self._get_metadata_from_table()
- existed_files = [self.tabledir + seg for seg in seg_list]
existed_info = {}
for k, fn in enumerate(existed_files):
existed_info[fn] = existed_sizes[k]
@@ -590,10 +600,8 @@ class HawqRegister(object):
self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
self.do_not_move, self.files_update, self.sizes_update = False, [], []
- self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes]
+ self.files_append, self.sizes_append = [f for f in self.files], [sz for sz in self.sizes]
if self.mode == 'force':
- seg_list, _ = self._get_metadata_from_table()
- existed_files = [self.tabledir + seg for seg in seg_list]
if len(self.files) == len(existed_files):
if sorted(self.files) != sorted(existed_files):
logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.')
@@ -601,6 +609,7 @@ class HawqRegister(object):
sys.exit(1)
else:
self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes
+ self.files_append, self.sizes_append = [],[]
elif len(self.files) < len(existed_files):
logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.')
self.failure_handler.rollback()
@@ -610,8 +619,8 @@ class HawqRegister(object):
if f in existed_files:
self.files_update.append(self.files[k])
self.sizes_update.append(self.sizes[k])
- self.newfiles.remove(self.files[k])
- self.newsizes.remove(self.sizes[k])
+ self.files_append.remove(self.files[k])
+ self.sizes_append.remove(self.sizes[k])
if sorted(self.files_update) != sorted(existed_files):
logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.')
self.failure_handler.rollback()
@@ -623,11 +632,11 @@ class HawqRegister(object):
for fn in existed_files:
if fn not in self.files:
self.files_delete.append(fn)
- self.files, self.sizes = [], []
+ self.files_append, self.sizes_append = [], []
self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir)
- print 'New file(s) to be registered: ', self.newfiles
+ print 'New file(s) to be registered: ', self.files_append
if self.files_update:
print 'Catalog info need to be updated for these files: ', self.files_update
@@ -636,6 +645,7 @@ class HawqRegister(object):
logger.error('-e option is only supported with single file case.')
self.failure_handler.rollback()
sys.exit(1)
+ self.sizes_append = [self.filesize]
self.sizes = [self.filesize]
self._check_sizes_valid()
@@ -706,7 +716,31 @@ class HawqRegister(object):
def _set_move_files_in_hdfs(self):
segno = self.firstsegno
- for f in self.newfiles:
+ # set self.files_same_path, self.sizes_same_path and self.segnos_same_path, which are for files existed in HDFS but not in catalog metadata
+ update_segno_lst = [f.split('/')[-1] for f in self.files_update]
+ catalog_lst = [str(i) for i in range(1, segno)]
+ new_catalog_lst = [str(i) for i in range(segno, len(self.files_update) + 1)]
+ exist_catalog_lst = []
+ for k, seg in enumerate(update_segno_lst):
+ if seg not in catalog_lst:
+ self.files_same_path.append(self.files_update[k])
+ self.sizes_same_path.append(self.sizes_update[k])
+ if seg in new_catalog_lst:
+ exist_catalog_lst.append(seg)
+ for seg in update_segno_lst:
+ if seg not in catalog_lst:
+ if seg in exist_catalog_lst:
+ self.segnos_same_path.append(int(seg))
+ else:
+ while (str(segno) in exist_catalog_lst):
+ segno += 1
+ self.segnos_same_path.append(segno)
+
+ for k, f in enumerate(self.files_same_path):
+ self.srcfiles.append(f)
+ self.dstfiles.append(self.tabledir + str(self.segnos_same_path[k]))
+
+ for f in self.files_append:
self.srcfiles.append(f)
self.dstfiles.append(self.tabledir + str(segno))
segno += 1
@@ -736,68 +770,53 @@ class HawqRegister(object):
sys.exit(1)
def _set_modify_metadata(self, mode):
- if mode == 'insert':
- eofs = self.sizes
- query = ""
- if self.file_format == 'Parquet':
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1)
- else:
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
- query += ';'
- self.queries += query
- elif mode == 'force':
- eofs = self.sizes
- segno_lst = [f.split('/')[-1] for f in self.files]
- query = "delete from pg_aoseg.%s;" % (self.seg_name)
- firstsegno = 1
- if self.file_format == 'Parquet':
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
- else:
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
- query += ';'
- self.queries += query
- elif mode == 'update':
- eofs = self.sizes_update
- query = ""
- segno_lst = [f.split('/')[-1] for f in self.files_update]
- if self.file_format == 'Parquet':
- for i, eof in enumerate(eofs):
- query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i])
- else:
- for i, eof in enumerate(eofs):
- query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
- self.queries += query
- else: # update_and_insert
- eofs = self.sizes
- query = ""
- if self.file_format == 'Parquet':
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1)
- else:
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
- query += ';'
-
- segno_lst = [f.split('/')[-1] for f in self.files_update]
- if self.file_format == 'Parquet':
- for i, eof in enumerate(self.sizes_update):
- query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i])
- else:
- for i, eof in enumerate(self.sizes_update):
- query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
- self.queries += query
+ segno = self.firstsegno
+ append_eofs = self.sizes_append
+ update_eofs = self.sizes_update
+ same_path_eofs = self.sizes_same_path
+ update_segno_lst = [f.split('/')[-1] for f in self.files_update]
+ same_path_segno_lst = [seg for seg in self.segnos_same_path]
+ query = ""
+ if mode == 'force':
+ query += "delete from pg_aoseg.%s;" % (self.seg_name)
+
+ if self.file_format == 'Parquet':
+ if len(update_eofs) > 0:
+ query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1)
+ for k, update_eof in enumerate(update_eofs[1:]):
+ query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1)
+ query += ';'
+ if len(same_path_eofs) > 0:
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1)
+ for k, same_path_eof in enumerate(same_path_eofs[1:]):
+ query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1)
+ query += ';'
+ segno += len(same_path_eofs)
+ if len(append_eofs) > 0:
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1)
+ for k, append_eof in enumerate(append_eofs[1:]):
+ query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1)
+ query += ';'
+ else:
+ if len(update_eofs) > 0:
+ query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1, -1)
+ for k, update_eof in enumerate(update_eofs[1:]):
+ query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1, -1)
+ query += ';'
+ if len(same_path_eofs) > 0:
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1, -1)
+ for k, same_path_eof in enumerate(same_path_eofs[1:]):
+ query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1, -1)
+ query += ';'
+ segno += len(same_path_eofs)
+ if len(append_eofs) > 0:
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1, -1)
+ for k, append_eof in enumerate(append_eofs[1:]):
+ query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1, -1)
+ query += ';'
+ self.queries += query
- def _modify_metadata(self, mode):
+ def _modify_metadata(self):
try:
self.utility_accessor.update_catalog(self.queries)
except pg.DatabaseError as e:
@@ -817,31 +836,19 @@ class HawqRegister(object):
def _prepare_register(self):
if not self.do_not_move:
self._set_move_files_in_hdfs()
- if (not self.do_not_move) and self.mode == 'force':
+ if self.mode == 'force' or self.mode == 'repair':
self._set_modify_metadata('force')
else:
- if self.mode == 'force':
- self._set_modify_metadata('force')
- elif self.mode == 'repair':
- self._set_modify_metadata('update')
- else:
- self._set_modify_metadata('insert')
+ self._set_modify_metadata('insert')
def register(self):
if not self.do_not_move:
self._move_files_in_hdfs()
- if (not self.do_not_move) and self.mode == 'force':
- self._modify_metadata('force')
- else:
- if self.mode == 'force':
- self._modify_metadata('force')
- elif self.mode == 'repair':
- self._modify_metadata('update')
- if self.files_delete:
- self._delete_files_in_hdfs()
- self._delete_metadata()
- else:
- self._modify_metadata('insert')
+ self._modify_metadata()
+ if self.mode == 'repair':
+ if self.files_delete:
+ self._delete_files_in_hdfs()
+ self._delete_metadata()
logger.info('Hawq Register Succeed.')