You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2016/10/24 03:44:53 UTC

incubator-hawq git commit: HAWQ-1113. Fix bug when files in yaml is disordered, hawq register error in force mode.

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 3bcc55791 -> e6fdfd3e3


HAWQ-1113. Fix bug when files in yaml is disordered, hawq register error in force mode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e6fdfd3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e6fdfd3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e6fdfd3e

Branch: refs/heads/master
Commit: e6fdfd3e3817368317c7c72cf8f112ce9d22da37
Parents: 3bcc557
Author: Chunling Wang <wa...@126.com>
Authored: Fri Oct 21 13:36:32 2016 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Mon Oct 24 11:43:34 2016 +0800

----------------------------------------------------------------------
 .../test_hawq_register_usage2_case2.cpp         |  10 +-
 tools/bin/hawqregister                          | 189 ++++++++++---------
 2 files changed, 103 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e6fdfd3e/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp b/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
index 4cce61f..492dbd3 100644
--- a/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp
@@ -246,12 +246,12 @@ TEST_F(TestHawqRegister, TestUsage2Case2TableExistNoData) {
     util.execute("drop table nt;");
 }
 
-TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2NormalYamlConfig) {
+TEST_F(TestHawqRegister, TestUsage2Case2NormalYamlConfig) {
     runYamlCaseForceMode("testusage2case2normalyamlconfig", "usage2case2/normal_yaml_config", 0, 50, 150);
 }
 
-TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2NormalYamlNoUpdateConfig) {
-    runYamlCaseForceMode("testusage2case2normalyamlnoupdateconfig", "usage2case2/normal_yaml_no_update_config", 0, 50, 150);
+TEST_F(TestHawqRegister, TestUsage2Case2NormalYamlNoUpdateConfig) {
+    runYamlCaseForceMode("testusage2case2normalyamlnoupdateconfig", "usage2case2/normal_yaml_no_update_config", 0, 50, 100);
 }
 
 TEST_F(TestHawqRegister, TestUsage2Case2FileNotIncludedInYamlConfig) {
@@ -270,8 +270,8 @@ TEST_F(TestHawqRegister, TestUsage2Case2HDFSFilePathContainErrorSymbol) {
     runYamlCaseForceMode("testusage2case2hdfsfilepathcontainerrorsymbol", "usage2case2/contain_error_symbol");
 }
 
-TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2ZeroEof) {
-    runYamlCaseForceMode("testusage2case2zeroeof", "usage2case2/zero_eof", 0, 50, 150);
+TEST_F(TestHawqRegister, TestUsage2Case2ZeroEof) {
+    runYamlCaseForceMode("testusage2case2zeroeof", "usage2case2/zero_eof", 0, 50, 143);
 }
 
 TEST_F(TestHawqRegister, TestUsage2Case2LargerEof) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e6fdfd3e/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 245d4d8..27b1e94 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -392,6 +392,9 @@ class HawqRegister(object):
         self.partitions_filepaths = partitions_filepaths
         self.partitions_filesizes = partitions_filesizes
         self.encoding = encoding
+        self.files_same_path = []
+        self.sizes_same_path = []
+        self.segnos_same_path = []
 
     def _option_parser_yml(self, yml_file):
         import yaml
@@ -558,14 +561,21 @@ class HawqRegister(object):
             self.failure_handler.rollback()
             sys.exit(1)
         self.firstsegno, self.tabledir = self._get_metadata()
-
+        
+        seg_list, existed_sizes = self._get_metadata_from_table()
+        existed_files = [self.tabledir + seg for seg in seg_list]
+        # check if file numbers in hdfs is consistent with the record count of pg_aoseg.
+        hdfs_file_no_lst = [f.split('/')[-1] for f in existed_files]
+        for k in range(1, self.firstsegno - 1):
+            if self.firstsegno - 1 > len(existed_files) or str(k) not in hdfs_file_no_lst:
+                logger.error("Hawq aoseg metadata doesn't consistent with file numbers in hdfs.")
+                self.failure_handler.rollback()
+                sys.exit(1)
         if self.mode == 'repair':
             if self.tabledir.strip('/') != self.filepath.strip('/'):
                 logger.error("In repair mode, file path from yaml file should be the same with table's path.")
                 self.failure_handler.rollback()
                 sys.exit(1)
-            seg_list, existed_sizes = self._get_metadata_from_table()
-            existed_files = [self.tabledir + seg for seg in seg_list]
             existed_info = {}
             for k, fn in enumerate(existed_files):
                 existed_info[fn] = existed_sizes[k]
@@ -590,10 +600,8 @@ class HawqRegister(object):
             self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
 
         self.do_not_move, self.files_update, self.sizes_update = False, [], []
-        self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes]
+        self.files_append, self.sizes_append = [f for f in self.files], [sz for sz in self.sizes]
         if self.mode == 'force':
-            seg_list, _ = self._get_metadata_from_table()
-            existed_files = [self.tabledir + seg for seg in seg_list]
             if len(self.files) == len(existed_files):
                 if sorted(self.files) != sorted(existed_files):
                     logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.')
@@ -601,6 +609,7 @@ class HawqRegister(object):
                     sys.exit(1)
                 else:
                     self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes
+                self.files_append, self.sizes_append = [],[]
             elif len(self.files) < len(existed_files):
                 logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.')
                 self.failure_handler.rollback()
@@ -610,8 +619,8 @@ class HawqRegister(object):
                     if f in existed_files:
                         self.files_update.append(self.files[k])
                         self.sizes_update.append(self.sizes[k])
-                        self.newfiles.remove(self.files[k])
-                        self.newsizes.remove(self.sizes[k])
+                        self.files_append.remove(self.files[k])
+                        self.sizes_append.remove(self.sizes[k])
                 if sorted(self.files_update) != sorted(existed_files):
                     logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.')
                     self.failure_handler.rollback()
@@ -623,11 +632,11 @@ class HawqRegister(object):
             for fn in existed_files:
                 if fn not in self.files:
                     self.files_delete.append(fn)
-            self.files, self.sizes = [], []
+            self.files_append, self.sizes_append = [], []
 
         self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir)
 
-        print 'New file(s) to be registered: ', self.newfiles
+        print 'New file(s) to be registered: ', self.files_append
         if self.files_update:
             print 'Catalog info need to be updated for these files: ', self.files_update
 
@@ -636,6 +645,7 @@ class HawqRegister(object):
                 logger.error('-e option is only supported with single file case.')
                 self.failure_handler.rollback()
                 sys.exit(1)
+            self.sizes_append = [self.filesize]
             self.sizes = [self.filesize]
         self._check_sizes_valid()
 
@@ -706,7 +716,31 @@ class HawqRegister(object):
 
     def _set_move_files_in_hdfs(self):
         segno = self.firstsegno
-        for f in self.newfiles:
+        # set self.files_same_path, self.sizes_same_path and self.segnos_same_path, which are for files existed in HDFS but not in catalog metadata
+        update_segno_lst = [f.split('/')[-1] for f in self.files_update]
+        catalog_lst = [str(i) for i in range(1, segno)]
+        new_catalog_lst = [str(i) for i in range(segno, len(self.files_update) + 1)]
+        exist_catalog_lst = []
+        for k, seg in enumerate(update_segno_lst):
+            if seg not in catalog_lst:
+                self.files_same_path.append(self.files_update[k])
+                self.sizes_same_path.append(self.sizes_update[k])
+            if seg in new_catalog_lst:
+                exist_catalog_lst.append(seg)
+        for seg in update_segno_lst:
+            if seg not in catalog_lst:
+                if seg in exist_catalog_lst:
+                    self.segnos_same_path.append(int(seg))
+                else:
+                    while (str(segno) in exist_catalog_lst):
+                        segno += 1
+                    self.segnos_same_path.append(segno)
+
+        for k, f in enumerate(self.files_same_path):
+            self.srcfiles.append(f)
+            self.dstfiles.append(self.tabledir + str(self.segnos_same_path[k]))
+
+        for f in self.files_append:
             self.srcfiles.append(f)
             self.dstfiles.append(self.tabledir + str(segno))
             segno += 1
@@ -736,68 +770,53 @@ class HawqRegister(object):
                 sys.exit(1)
 
     def _set_modify_metadata(self, mode):
-        if mode == 'insert':
-            eofs = self.sizes
-            query = ""
-            if self.file_format == 'Parquet':
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
-                for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1)
-            else:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
-                for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
-            query += ';'
-            self.queries += query
-        elif mode == 'force':
-            eofs = self.sizes
-            segno_lst = [f.split('/')[-1] for f in self.files]
-            query = "delete from pg_aoseg.%s;" % (self.seg_name)
-            firstsegno = 1
-            if self.file_format == 'Parquet':
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1)
-                for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
-            else:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1, -1)
-                for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
-            query += ';'
-            self.queries += query
-        elif mode == 'update':
-            eofs = self.sizes_update
-            query = ""
-            segno_lst = [f.split('/')[-1] for f in self.files_update]
-            if self.file_format == 'Parquet':
-                for i, eof in enumerate(eofs):
-                    query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i])
-            else:
-                for i, eof in enumerate(eofs):
-                    query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
-            self.queries += query
-        else: # update_and_insert
-            eofs = self.sizes
-            query = ""
-            if self.file_format == 'Parquet':
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
-                for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1)
-            else:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
-                for k, eof in enumerate(eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
-            query += ';'
-
-            segno_lst = [f.split('/')[-1] for f in self.files_update]
-            if self.file_format == 'Parquet':
-                for i, eof in enumerate(self.sizes_update):
-                    query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i])
-            else:
-                for i, eof in enumerate(self.sizes_update):
-                    query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
-            self.queries += query
+        segno = self.firstsegno
+        append_eofs = self.sizes_append
+        update_eofs = self.sizes_update
+        same_path_eofs = self.sizes_same_path
+        update_segno_lst = [f.split('/')[-1] for f in self.files_update]
+        same_path_segno_lst = [seg for seg in self.segnos_same_path]
+        query = ""
+        if mode == 'force':
+            query += "delete from pg_aoseg.%s;" % (self.seg_name)
+        
+        if self.file_format == 'Parquet': 
+            if len(update_eofs) > 0:
+                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1)
+                for k, update_eof in enumerate(update_eofs[1:]):
+                    query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1)
+                query += ';'
+            if len(same_path_eofs) > 0:
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1)
+                for k, same_path_eof in enumerate(same_path_eofs[1:]):
+                    query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1)
+                query += ';'
+            segno += len(same_path_eofs)
+            if len(append_eofs) > 0:
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1)
+                for k, append_eof in enumerate(append_eofs[1:]):
+                    query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1)
+                query += ';'
+        else:
+            if len(update_eofs) > 0:
+                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1, -1)
+                for k, update_eof in enumerate(update_eofs[1:]):
+                    query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1, -1)
+                query += ';'
+            if len(same_path_eofs) > 0:
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1, -1)
+                for k, same_path_eof in enumerate(same_path_eofs[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1, -1)
+                query += ';'
+            segno += len(same_path_eofs)
+            if len(append_eofs) > 0:
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1, -1)
+                for k, append_eof in enumerate(append_eofs[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1, -1)
+                query += ';'
+        self.queries += query
     
-    def _modify_metadata(self, mode):
+    def _modify_metadata(self):
         try:
             self.utility_accessor.update_catalog(self.queries)
         except pg.DatabaseError as e:
@@ -817,31 +836,19 @@ class HawqRegister(object):
     def _prepare_register(self):
         if not self.do_not_move:
             self._set_move_files_in_hdfs()
-        if (not self.do_not_move) and self.mode == 'force':
+        if self.mode == 'force' or self.mode == 'repair':
             self._set_modify_metadata('force')
         else:
-            if self.mode == 'force':
-                self._set_modify_metadata('force')
-            elif self.mode == 'repair':
-                self._set_modify_metadata('update')
-            else:
-                self._set_modify_metadata('insert')
+            self._set_modify_metadata('insert')
 
     def register(self):
         if not self.do_not_move:
             self._move_files_in_hdfs()
-        if (not self.do_not_move) and self.mode == 'force':
-            self._modify_metadata('force')
-        else:
-            if self.mode == 'force':
-                self._modify_metadata('force')
-            elif self.mode == 'repair':
-                self._modify_metadata('update')
-                if self.files_delete:
-                    self._delete_files_in_hdfs()
-                    self._delete_metadata()
-            else:
-                self._modify_metadata('insert')
+        self._modify_metadata()
+        if self.mode == 'repair':
+            if self.files_delete:
+                self._delete_files_in_hdfs()
+                self._delete_metadata()
         logger.info('Hawq Register Succeed.')