You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by li...@apache.org on 2016/09/18 06:40:16 UTC
[1/2] incubator-hawq git commit: HAWQ-1034. Implement --repair option
for hawq register.
Repository: incubator-hawq
Updated Branches:
refs/heads/master 981c0a939 -> ef2aef879
HAWQ-1034. Implement --repair option for hawq register.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ef2aef87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ef2aef87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ef2aef87
Branch: refs/heads/master
Commit: ef2aef87958941082a016afeea45b7bbcccb9779
Parents: 637f9d5
Author: xunzhang <xu...@gmail.com>
Authored: Sat Sep 17 20:20:43 2016 +0800
Committer: Lili Ma <ic...@gmail.com>
Committed: Sun Sep 18 14:39:54 2016 +0800
----------------------------------------------------------------------
tools/bin/hawqregister | 115 ++++++++++++++++++++++++++++++++++++++------
1 file changed, 99 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ef2aef87/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index ee5275b..ffae437 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -54,8 +54,8 @@ def option_parser():
parser.add_option('-f', '--filepath', dest='filepath', help='file name in HDFS')
parser.add_option('-e', '--eof', dest='filesize', type='int', default=0, help='eof of the file to be registered')
parser.add_option('-c', '--config', dest='yml_config', default='', help='configuration file in YAML format')
- parser.add_option('--force', action='store_true', default=False)
- parser.add_option('--repair', action='store_true', default=False)
+ parser.add_option('-F', '--force', dest='force', action='store_true', default=False)
+ parser.add_option('-R', '--repair', dest='repair', action='store_true', default=False)
return parser
@@ -166,7 +166,7 @@ class GpRegisterAccessor(object):
qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
rows = self.exec_query(qry)
if len(rows) == 0:
- logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
+ logger.error('Table %s is not an append-only table. There is no record in gp_distribution_policy table.' % tablename)
sys.exit(1)
if rows[0]['attrnums']:
logger.error('Cannot register file(s) to a table which is hash distribuetd.')
@@ -196,6 +196,14 @@ class GpRegisterAccessor(object):
rows = self.exec_query(query)
return rows[0]['attrnums']
+ def get_bucket_number(self, tablename):
+ query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
+ rows = self.exec_query(query)
+ oid = rows[0]['oid']
+ query = "select * from gp_distribution_policy where localoid = '%s';" % oid
+ rows = self.exec_query(query)
+ return rows[0]['bucketnum']
+
def get_metadata_from_database(self, tablename, seg_name):
query = 'select segno from pg_aoseg.%s;' % seg_name
firstsegno = len(self.exec_query(query)) + 1
@@ -233,7 +241,7 @@ class HawqRegister(object):
return 'force'
elif repair:
if not table_existed():
- logger.error('--repair mode asserts the table is already create.')
+ logger.error('--repair mode asserts the table has been already created.')
sys.exit(1)
return 'repair'
else:
@@ -261,32 +269,76 @@ class HawqRegister(object):
def get_metadata():
return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
+ def get_distribution_policy():
+ return self.accessor.get_distribution_policy_info(self.tablename)
+
+ def check_policy_consistency():
+ policy = get_distribution_policy() # "" or "{1,3}"
+ if policy is None:
+ if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly':
+ return
+ else:
+ logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename)
+ sys.exit(1)
+ tmp_dict = {}
+ for i, d in enumerate(self.schema):
+ tmp_dict[d['name']] = i + 1
+ # 'DISTRIBUETD BY (1,3)' -> {1,3}
+ cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
+ original_policy = ','.join([str(tmp_dict[col]) for col in cols])
+ if policy.strip('{').strip('}') != original_policy:
+ logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename)
+ sys.exit(1)
+
+ def check_bucket_number():
+ def get_bucket_number():
+ return self.accessor.get_bucket_number(self.tablename)
+
+ if self.bucket_number != get_bucket_number():
+ logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
+ sys.exit(1)
+
if self.yml:
self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml)
self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
check_distribution_policy()
- if self.mode != 'force':
+ if self.mode != 'force' and self.mode != 'repair':
if not create_table():
self.mode = 'second_exist'
else:
self.file_format = 'Parquet'
check_hash_type() # Usage1 only support randomly distributed table
if not self.filepath:
+ if self.mode == 'first':
+ logger('Please specify filepath with -f option.')
+ else:
+ logger.info('Hawq Register Succeed.')
sys.exit(0)
- if self.mode == 'repair':
- # TODO
- # check distribution policy consistency
- # check bucketnum, pagesize, rowgroupsize, etc
- # check filesize smaller
- pass
-
self.seg_name = get_seg_name()
self.firstsegno, self.tabledir = get_metadata()
+ if self.mode == 'repair':
+ if self.tabledir.strip('/') != self.filepath.strip('/'):
+ logger.error("In repair mode, file path from yaml file should be the same with table's path.")
+ sys.exit(1)
+ check_policy_consistency()
+ check_bucket_number()
+ existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
+ existed_info = {}
+ for k, fn in enumerate(existed_files):
+ existed_info[fn] = existed_sizes[k]
+ for k, fn in enumerate(self.files):
+ if fn not in existed_files:
+ logger.error('Can not register in repair mode since giving non-existing file: %s.' % fn)
+ sys.exit(1)
+ if self.sizes[k] > existed_info[fn]:
+ logger.error('Can not register in repair mode since giving larger file size: %s' % self.sizes[k])
+ sys.exit(1)
+
if self.mode == 'second_exist':
if self.tabledir.strip('/') == self.filepath.strip('/'):
- logger.error('Files to be registeted in this case should not be the same with table path.')
+ logger.error('Files to be registered should not be the same with table path.')
sys.exit(1)
self.do_not_move, self.files_update, self.sizes_update = False, [], []
@@ -294,7 +346,7 @@ class HawqRegister(object):
existed_files, _ = self._get_files_in_hdfs(self.tabledir)
if len(self.files) == len(existed_files):
if sorted(self.files) != sorted(existed_files):
- logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.')
+ logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.')
sys.exit(1)
else:
self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes
@@ -307,6 +359,14 @@ class HawqRegister(object):
self.sizes_update.append(sizes_old[k])
self.files.remove(files_old[k])
self.sizes.remove(sizes_old[k])
+ elif self.mode == 'repair':
+ self.do_not_move = True
+ self.files_update, self.sizes_update = [fn for fn in self.files], [sz for sz in self.sizes]
+ self.files_delete = []
+ for fn in existed_files:
+ if fn not in self.files:
+ self.files_delete.append(fn)
+ self.files, self.sizes = [], []
self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir)
@@ -332,13 +392,13 @@ class HawqRegister(object):
# check whether the files to be registered is in hdfs
filesystem = filepath.split('://')
if filesystem[0] != 'hdfs':
- logger.error('Only support to register file(s) in hdfs')
+ logger.error('Only support registering file(s) in hdfs.')
sys.exit(1)
fileroot = filepath.split('/')
tableroot = tabledir.split('/')
# check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
if fileroot[0:3] != tableroot[0:3]:
- logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
+ logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'." % (filepath, tabledir))
sys.exit(1)
def _get_files_in_hdfs(self, filepath):
@@ -393,6 +453,15 @@ class HawqRegister(object):
logger.error('Fail to move %s to %s' % (srcfile, dstfile))
sys.exit(1)
+ def _delete_files_in_hdfs(self):
+ for fn in self.files_delete:
+ hdfscmd = 'hdfs dfs -rm %s' % fn
+ sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
+ result = local_ssh(hdfscmd, logger)
+ if result != 0:
+ logger.error('Fail to delete %s ' % fn)
+ sys.exit(1)
+
def _modify_metadata(self, mode):
if mode == 'insert':
eofs = self.sizes
@@ -434,6 +503,15 @@ class HawqRegister(object):
query += "end transaction;"
return self.utility_accessor.update_catalog(query)
+ def _delete_metadata(self):
+ query = "set allow_system_table_mods='dml';"
+ query += "begin transaction;"
+ segno_lst = [fn.strip().split('/')[-1] for fn in self.files_delete]
+ for seg in segno_lst:
+ query += "delete from pg_aoseg.%s where segno = '%s';" % (self.seg_name, seg)
+ query += "end transaction;"
+ return self.utility_accessor.update_catalog(query)
+
def register(self):
if not self.do_not_move:
self._move_files_in_hdfs()
@@ -442,6 +520,11 @@ class HawqRegister(object):
else:
if self.mode == 'force':
self._modify_metadata('update')
+ elif self.mode == 'repair':
+ self._modify_metadata('update')
+ if self.files_delete:
+ self._delete_files_in_hdfs()
+ self._delete_metadata()
else:
self._modify_metadata('insert')
logger.info('Hawq Register Succeed.')
[2/2] incubator-hawq git commit: HAWQ-1060. Refactor hawq register
with better readability and quality.
Posted by li...@apache.org.
HAWQ-1060. Refactor hawq register with better readability and quality.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/637f9d57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/637f9d57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/637f9d57
Branch: refs/heads/master
Commit: 637f9d5787aec24d72a3159b356d388ba116991e
Parents: 981c0a9
Author: xunzhang <xu...@gmail.com>
Authored: Fri Sep 16 18:48:39 2016 +0800
Committer: Lili Ma <ic...@gmail.com>
Committed: Sun Sep 18 14:39:54 2016 +0800
----------------------------------------------------------------------
tools/bin/hawqregister | 680 ++++++++++++++++++++++----------------------
1 file changed, 335 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/637f9d57/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index bbdc946..ee5275b 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -20,80 +20,81 @@
# Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] [-f filepath] [-e eof] <tablename>
# Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] [--force] [--repair] <tablename>
-import os, sys, optparse, getpass, re, urlparse
+import os
+import sys
try:
from gppylib.commands.unix import getLocalHostname, getUserName
from gppylib.db import dbconn
from gppylib.gplog import get_default_logger, setup_tool_logging
from gppylib.gpparseopts import OptParser, OptChecker
from pygresql import pg
- from pygresql.pgdb import DatabaseError
from hawqpylib.hawqlib import local_ssh, local_ssh_output
except ImportError, e:
print e
- sys.stderr.write('cannot import module, please check that you have source greenplum_path.sh\n')
+ sys.stderr.write('Cannot import module, please check that you have source greenplum_path.sh\n')
sys.exit(2)
# setup logging
logger = get_default_logger()
EXECNAME = os.path.split(__file__)[-1]
-setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
+setup_tool_logging(EXECNAME, getLocalHostname(), getUserName())
def option_parser():
+ '''option parser'''
parser = OptParser(option_class=OptChecker,
usage='usage: %prog [options] table_name',
version='%prog version $Revision: #1 $')
parser.remove_option('-h')
- parser.add_option('-?', '--help', action = 'help')
- parser.add_option('-h', '--host', help = 'host of the target DB')
- parser.add_option('-p', '--port', help = 'port of the target DB', type = 'int', default = 0)
- parser.add_option('-U', '--user', help = 'username of the target DB')
- parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database name')
- parser.add_option('-f', '--filepath', dest = 'filepath', help = 'file name in HDFS')
- parser.add_option('-e', '--eof', dest = 'filesize', type = 'int', default = 0, help = 'eof of the file to be registered')
- parser.add_option('-c', '--config', dest = 'yml_config', default = '', help = 'configuration file in YAML format')
- parser.add_option('--force', action = 'store_true', default = False)
- parser.add_option('--repair', action = 'store_true', default = False)
+ parser.add_option('-?', '--help', action='help')
+ parser.add_option('-h', '--host', help='host of the target DB')
+ parser.add_option('-p', '--port', help='port of the target DB', type='int', default=0)
+ parser.add_option('-U', '--user', help='username of the target DB')
+ parser.add_option('-d', '--database', default='postgres', dest='database', help='database name')
+ parser.add_option('-f', '--filepath', dest='filepath', help='file name in HDFS')
+ parser.add_option('-e', '--eof', dest='filesize', type='int', default=0, help='eof of the file to be registered')
+ parser.add_option('-c', '--config', dest='yml_config', default='', help='configuration file in YAML format')
+ parser.add_option('--force', action='store_true', default=False)
+ parser.add_option('--repair', action='store_true', default=False)
return parser
def register_yaml_dict_check(D):
- # check exists
+ '''check exists'''
check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName', 'Bucketnum']
for attr in check_list:
- if D.get(attr) == None:
+ if D.get(attr) is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr)
sys.exit(1)
if D['FileFormat'] in ['Parquet', 'AO']:
prefix = D['FileFormat']
local_check_list = ['%s_FileLocations' % prefix, '%s_Schema' % prefix]
for attr in local_check_list:
- if D.get(attr) == None:
+ if D.get(attr) is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr)
sys.exit(1)
- if D['%s_FileLocations' % prefix].get('Files') == None:
+ if D['%s_FileLocations' % prefix].get('Files') is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files' % prefix)
sys.exit(1)
for d in D['%s_FileLocations' % prefix]['Files']:
- if d.get('path') == None:
+ if d.get('path') is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files.path' % prefix)
sys.exit(1)
- if d.get('size') == None:
+ if d.get('size') is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files.size' % prefix)
sys.exit(1)
else:
logger.error('hawq register only support Parquet and AO formats. Format %s is not supported.' % D['FileFormat'])
sys.exit(1)
prefix = D['FileFormat']
- if D.get('%s_Schema' % prefix) == None:
+ if D.get('%s_Schema' % prefix) is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema' % prefix)
sys.exit(1)
for d in D['%s_Schema' % prefix]:
- if d.get('name') == None:
+ if d.get('name') is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema.name' % prefix)
sys.exit(1)
- if d.get('type') == None:
+ if d.get('type') is None:
logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema.type' % prefix)
sys.exit(1)
if D['FileFormat'] == 'Parquet':
@@ -126,182 +127,263 @@ def option_parser_yml(yml_file):
return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
-def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
- try:
- query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
- conn = dbconn.connect(dburl, False)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- conn.close()
- for row in rows:
- if row[0] != 0:
- return False
- except DatabaseError, ex:
- logger.error('Failed to execute query "%s"' % query)
- sys.exit(1)
-
- try:
+class GpRegisterAccessor(object):
+ def __init__(self, conn):
+ self.conn = conn
+ rows = self.exec_query("""
+ SELECT oid, datname, dat2tablespace,
+ pg_encoding_to_char(encoding) encoding
+ FROM pg_database WHERE datname=current_database()""")
+ self.dbid = rows[0]['oid']
+ self.dbname = rows[0]['datname']
+ self.spcid = rows[0]['dat2tablespace']
+ self.dbencoding = rows[0]['encoding']
+ self.dbversion = self.exec_query('select version()')[0]['version']
+
+ def exec_query(self, sql):
+ '''execute query and return dict result'''
+ return self.conn.query(sql).dictresult()
+
+ def get_table_existed(self, tablename):
+ qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
+ return self.exec_query(qry)[0]['count'] == 1
+
+ def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
+ if self.get_table_existed(tablename):
+ return False
schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
fmt = 'ROW' if fmt == 'AO' else fmt
if fmt == 'ROW':
query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
- % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
+ % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
else: # Parquet
query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
- % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
- conn = dbconn.connect(dburl, False)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- conn.close()
+ % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
+ self.conn.query(query)
return True
- except DatabaseError, ex:
- print DatabaseError, ex
- logger.error('Failed to execute query "%s"' % query)
- sys.exit(1)
+ def check_hash_type(self, tablename):
+ qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
+ rows = self.exec_query(qry)
+ if len(rows) == 0:
+ logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
+ sys.exit(1)
+ if rows[0]['attrnums']:
+ logger.error('Cannot register file(s) to a table which is hash distribuetd.')
+ sys.exit(1)
-def get_seg_name(dburl, tablename, database, fmt):
- try:
- relname = ''
+ # pg_paqseg_#
+ def get_seg_name(self, tablename, database, fmt):
tablename = tablename.split('.')[-1]
query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
"where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
- conn = dbconn.connect(dburl, True)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- if not rows.rowcount:
+ rows = self.exec_query(query)
+ if len(rows) == 0:
logger.error('table "%s" not found in db "%s"' % (tablename, database))
sys.exit(1)
- for row in rows:
- relname = row[0]
- conn.close()
- except DatabaseError, ex:
- logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
- sys.exit(1)
- if fmt == 'Parquet':
- if relname.find("paq") == -1:
- logger.error("table '%s' is not parquet format" % tablename)
- sys.exit(1)
-
- return relname
-
-
-def check_hash_type(dburl, tablename):
- '''Check whether target table is hash distributed, in that case simple insertion does not work'''
- try:
- query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
- conn = dbconn.connect(dburl, False)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- if not rows.rowcount:
- logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
- sys.exit(1)
- for row in rows:
- if row[0]:
- logger.error('Cannot register file(s) to a table which is hash distribuetd.')
+ relname = rows[0]['relname']
+ if fmt == 'Parquet':
+ if relname.find('paq') == -1:
+ logger.error("table '%s' is not parquet format" % tablename)
sys.exit(1)
- conn.close()
- except DatabaseError, ex:
- logger.error('Failed to execute query "%s"' % query)
- sys.exit(1)
+ return relname
+ def get_distribution_policy_info(self, tablename):
+ query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
+ rows = self.exec_query(query)
+ oid = rows[0]['oid']
+ query = "select * from gp_distribution_policy where localoid = '%s';" % oid
+ rows = self.exec_query(query)
+ return rows[0]['attrnums']
-def get_metadata_from_database(dburl, tablename, seg_name):
- '''Get the metadata to be inserted from hdfs'''
- try:
+ def get_metadata_from_database(self, tablename, seg_name):
query = 'select segno from pg_aoseg.%s;' % seg_name
- conn = dbconn.connect(dburl, False)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- conn.close()
- except DatabaseError, ex:
- logger.error('Failed to execute query "%s"' % query)
- sys.exit(1)
-
- firstsegno = rows.rowcount + 1
-
- try:
+ firstsegno = len(self.exec_query(query)) + 1
# get the full path of correspoding file for target table
query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
"gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
"gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
"and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
- conn = dbconn.connect(dburl, False)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- conn.close()
- except DatabaseError, ex:
- logger.error('Failed to execute query "%s"' % query)
- sys.exit(1)
- for row in rows:
- tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), str(row[3]), ''])
- return firstsegno, tabledir
-
-
-def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
- '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
- if not filepath:
- return
- # check whether the files to be registered is in hdfs
- filesystem = filepath.split('://')
- if filesystem[0] != 'hdfs':
- logger.error('Only support to register file(s) in hdfs')
- sys.exit(1)
- fileroot = filepath.split('/')
- tableroot = tabledir.split('/')
- # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
- if fileroot[0:3] != tableroot[0:3]:
- logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
- sys.exit(1)
+ D = self.exec_query(query)[0]
+ tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
+ return firstsegno, tabledir
+
+ def update_catalog(self, query):
+ self.conn.query(query)
+
+
+class HawqRegister(object):
+ def __init__(self, options, table, utility_conn, conn):
+ self.yml = options.yml_config
+ self.filepath = options.filepath
+ self.database = options.database
+ self.tablename = table
+ self.filesize = options.filesize
+ self.accessor = GpRegisterAccessor(conn)
+ self.utility_accessor = GpRegisterAccessor(utility_conn)
+ self.mode = self._init_mode(options.force, options.repair)
+ self._init()
+
+ def _init_mode(self, force, repair):
+ def table_existed():
+ return self.accessor.get_table_existed(self.tablename)
+
+ if self.yml:
+ if force:
+ return 'force'
+ elif repair:
+ if not table_existed():
+ logger.error('--repair mode asserts the table is already create.')
+ sys.exit(1)
+ return 'repair'
+ else:
+ return 'second_normal'
+ else:
+ return 'first'
+ def _init(self):
+ def check_hash_type():
+ self.accessor.check_hash_type(self.tablename)
-def get_files_in_hdfs(filepath):
- '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files'''
- files = []
- sizes = []
- hdfscmd = "hdfs dfs -test -e %s" % filepath
- result = local_ssh(hdfscmd, logger)
- if result != 0:
- logger.error("Path '%s' does not exist in hdfs" % filepath)
- sys.exit(1)
- hdfscmd = "hdfs dfs -ls -R %s" % filepath
- result, out, err = local_ssh_output(hdfscmd)
- outlines = out.splitlines()
- # recursively search all the files under path 'filepath'
- for line in outlines:
- lineargs = line.split()
- if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
- files.append(lineargs[7])
- sizes.append(int(lineargs[4]))
- if len(files) == 0:
- logger.error("Dir '%s' is empty" % filepath)
- sys.exit(1)
- return files, sizes
-
-
-def check_parquet_format(files):
- '''Check whether the file to be registered is parquet format'''
- for f in files:
- hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
- rc, out, err = local_ssh_output(hdfscmd)
- if out == '0':
- continue
- hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
- result1 = local_ssh(hdfscmd, logger)
- hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
- result2 = local_ssh(hdfscmd, logger)
- if result1 or result2:
- logger.error('File %s is not parquet format' % f)
+ # check conflicting distributed policy
+ def check_distribution_policy():
+ if self.distribution_policy.startswith('DISTRIBUTED BY'):
+ if len(self.files) % self.bucket_number != 0:
+ logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
+ sys.exit(1)
+
+ def create_table():
+ return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
+
+ def get_seg_name():
+ return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
+
+ def get_metadata():
+ return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
+
+ if self.yml:
+ self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml)
+ self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
+ check_distribution_policy()
+ if self.mode != 'force':
+ if not create_table():
+ self.mode = 'second_exist'
+ else:
+ self.file_format = 'Parquet'
+ check_hash_type() # Usage1 only support randomly distributed table
+ if not self.filepath:
+ sys.exit(0)
+
+ if self.mode == 'repair':
+ # TODO
+ # check distribution policy consistency
+ # check bucketnum, pagesize, rowgroupsize, etc
+ # check filesize smaller
+ pass
+
+ self.seg_name = get_seg_name()
+ self.firstsegno, self.tabledir = get_metadata()
+
+ if self.mode == 'second_exist':
+ if self.tabledir.strip('/') == self.filepath.strip('/'):
+ logger.error('Files to be registeted in this case should not be the same with table path.')
+ sys.exit(1)
+
+ self.do_not_move, self.files_update, self.sizes_update = False, [], []
+ if self.mode == 'force':
+ existed_files, _ = self._get_files_in_hdfs(self.tabledir)
+ if len(self.files) == len(existed_files):
+ if sorted(self.files) != sorted(existed_files):
+ logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.')
+ sys.exit(1)
+ else:
+ self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes
+ self.files, self.sizes = [], []
+ else:
+ files_old, sizes_old = [f for f in self.files], [sz for sz in self.sizes]
+ for k, f in enumerate(files_old):
+ if f in existed_files:
+ self.files_update.append(files_old[k])
+ self.sizes_update.append(sizes_old[k])
+ self.files.remove(files_old[k])
+ self.sizes.remove(sizes_old[k])
+
+ self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir)
+
+ if not self.yml:
+ self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
+ print 'New file(s) to be registered: ', self.files
+ if self.files_update:
+ print 'Catalog info need to be updated for these files: ', self.files_update
+
+ if self.filesize:
+ if len(self.files) != 1:
+ logger.error('-e option is only supported with single file case.')
+ sys.exit(1)
+ self.sizes = [self.filesize]
+
+ if self.file_format == 'Parquet':
+ self._check_parquet_format(self.files)
+
+ def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir):
+ '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
+ if not filepath:
+ return
+ # check whether the files to be registered is in hdfs
+ filesystem = filepath.split('://')
+ if filesystem[0] != 'hdfs':
+ logger.error('Only support to register file(s) in hdfs')
+ sys.exit(1)
+ fileroot = filepath.split('/')
+ tableroot = tabledir.split('/')
+ # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
+ if fileroot[0:3] != tableroot[0:3]:
+ logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
sys.exit(1)
+ def _get_files_in_hdfs(self, filepath):
+ '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files'''
+ files, sizes = [], []
+ hdfscmd = "hdfs dfs -test -e %s" % filepath
+ result = local_ssh(hdfscmd, logger)
+ if result != 0:
+ logger.error("Path '%s' does not exist in hdfs" % filepath)
+ sys.exit(1)
+ hdfscmd = "hdfs dfs -ls -R %s" % filepath
+ result, out, err = local_ssh_output(hdfscmd)
+ outlines = out.splitlines()
+ # recursively search all the files under path 'filepath'
+ for line in outlines:
+ lineargs = line.split()
+ if len(lineargs) == 8 and lineargs[0].find("d") == -1:
+ files.append(lineargs[7])
+ sizes.append(int(lineargs[4]))
+ if len(files) == 0:
+ logger.error("Dir '%s' is empty" % filepath)
+ sys.exit(1)
+ return files, sizes
-def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, normal):
- '''Move file(s) in src path into the folder correspoding to the target table'''
- if normal:
- segno = firstsegno
+ def _check_parquet_format(self, files):
+ '''Check whether the file to be registered is parquet format'''
for f in files:
+ hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
+ rc, out, err = local_ssh_output(hdfscmd)
+ if out == '0':
+ continue
+ hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
+ result1 = local_ssh(hdfscmd, logger)
+ hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
+ result2 = local_ssh(hdfscmd, logger)
+ if result1 or result2:
+ logger.error('File %s is not parquet format' % f)
+ sys.exit(1)
+
+ def _move_files_in_hdfs(self):
+ '''Move file(s) in src path into the folder correspoding to the target table'''
+ segno = self.firstsegno
+ for f in self.files:
srcfile = f
- dstfile = tabledir + str(segno)
+ dstfile = self.tabledir + str(segno)
segno += 1
if srcfile != dstfile:
hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
@@ -310,190 +392,98 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor
if result != 0:
logger.error('Fail to move %s to %s' % (srcfile, dstfile))
sys.exit(1)
- else:
- segno = firstsegno
- for f in files:
- dstfile = f
- srcfile = tabledir + str(segno)
- segno += 1
- if srcfile != dstfile:
- hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile)
- sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd)
- result = local_ssh(hdfscmd, logger)
- if result != 0:
- logger.error('Fail to move "%s" to "%s"' % (srcfile, dstfile))
- sys.exit(1)
-
-def insert_metadata_into_database(dburl, databasename, tablename, seg_name, firstsegno, tabledir, eofs, fmt):
- '''Insert the metadata into database'''
- try:
- query = "set allow_system_table_mods='dml';"
- if fmt == 'Parquet':
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
+ def _modify_metadata(self, mode):
+ if mode == 'insert':
+ eofs = self.sizes
+ query = "set allow_system_table_mods='dml';"
+ if self.file_format == 'Parquet':
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
+ for k, eof in enumerate(eofs[1:]):
+ query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1)
+ else:
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
+ for k, eof in enumerate(eofs[1:]):
+ query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
+ query += ';'
+ elif mode == 'update':
+ eofs = self.sizes_update
+ query = "set allow_system_table_mods='dml';"
+ query += "begin transaction;"
+ segno_lst = [f.split('/')[-1] for f in self.files_update]
+ for i, eof in enumerate(eofs):
+ query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (self.seg_name, eof, segno_lst[i])
+ query += "end transaction;"
+ else: # update_and_insert
+ eofs = self.sizes
+ query = "set allow_system_table_mods='dml';"
+ query += "begin transaction;"
+ if self.file_format == 'Parquet':
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1)
+ for k, eof in enumerate(eofs[1:]):
+ query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1)
+ else:
+ query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1)
+ for k, eof in enumerate(eofs[1:]):
+ query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1)
+ query += ';'
+
+ segno_lst = [f.split('/')[-1] for f in self.files_update]
+ for i, eof in enumerate(self.sizes_update):
+ query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (self.seg_name, eof, segno_lst[i])
+ query += "end transaction;"
+ return self.utility_accessor.update_catalog(query)
+
+ def register(self):
+ if not self.do_not_move:
+ self._move_files_in_hdfs()
+ if (not self.do_not_move) and self.mode == 'force':
+ self._modify_metadata('update_and_insert')
else:
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
- query += ';'
- conn = dbconn.connect(dburl, True)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- conn.close()
- except DatabaseError, ex:
- logger.error('Failed to execute query "%s"' % query)
- move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False)
- sys.exit(1)
-
-def update_metadata_into_database(dburl, seg_name, files, eofs):
- '''Update the catalog table in --force case'''
- try:
- query = "set allow_system_table_mods='dml';"
- query += "begin transaction;"
- segno_lst = [f.split('/')[-1] for f in files]
- for i, eof in enumerate(eofs):
- query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i])
- query += "end transaction;"
- conn = dbconn.connect(dburl, True)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- conn.close()
- except DatabaseError, ex:
- logger.error('Failed to execute query "%s"' % query)
- sys.exit(1)
-
-
-def update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, eofs, fmt, update_files, update_eofs):
- '''Insert and update the catalog table in --force case'''
+ if self.mode == 'force':
+ self._modify_metadata('update')
+ else:
+ self._modify_metadata('insert')
+ logger.info('Hawq Register Succeed.')
+
+
+def main(options, args):
+ def connectdb(options):
+ '''
+ Trying to connect database, return a connection object.
+ If failed to connect, raise a pg.InternalError
+ '''
+ url = dbconn.DbURL(hostname=options.host, port=options.port,
+ dbname=options.database, username=options.user)
+ logger.info('try to connect database %s:%s %s' % (url.pghost, url.pgport, url.pgdb))
+ utility_conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport,
+ user=url.pguser, passwd=url.pgpass, opt='-c gp_session_role=utility')
+ conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport,
+ user=url.pguser, passwd=url.pgpass)
+ return utility_conn, conn
+
+ # connect db
try:
- query = "set allow_system_table_mods='dml';"
- query += "begin transaction;"
- if fmt == 'Parquet':
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1)
- else:
- query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1)
- for k, eof in enumerate(eofs[1:]):
- query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1)
- query += ';'
-
- segno_lst = [f.split('/')[-1] for f in update_files]
- for i, eof in enumerate(update_eofs):
- query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i])
- query += "end transaction;"
- conn = dbconn.connect(dburl, True)
- rows = dbconn.execSQL(conn, query)
- conn.commit()
- conn.close()
- except DatabaseError, ex:
- logger.error('Failed to execute query "%s"' % query)
- move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False)
- sys.exit(1)
+ utility_conn, conn = connectdb(options)
+ except pg.InternalError:
+ logger.error('Fail to connect to database, this script can only be run when database is up.')
+ return 1
+ # register
+ ins = HawqRegister(options, args[0], utility_conn, conn)
+ ins.register()
+ conn.close()
if __name__ == '__main__':
-
parser = option_parser()
options, args = parser.parse_args()
-
- if len(args) != 1 or ((options.yml_config or options.force or options.repair) and options.filepath) or (options.force and options.repair):
+ if len(args) != 1 or (options.force and options.repair):
parser.print_help(sys.stderr)
sys.exit(1)
- if local_ssh('hdfs', logger):
- logger.error('command "hdfs" is not available.')
+ if (options.yml_config or options.force or options.repair) and options.filepath:
+ parser.print_help(sys.stderr)
sys.exit(1)
-
- dburl = dbconn.DbURL(hostname = options.host, port = options.port, username = options.user, dbname = options.database)
- filepath, database, tablename = options.filepath, options.database, args[0]
-
- second_normal_mode, second_exist_mode, force_mode, repair_mode = False, False, False, False
- if options.yml_config: # Usage2
- if options.force:
- force_mode = True
- elif options.repair:
- repair_mode = True
- else:
- second_normal_mode = True
- fileformat, files, sizes, schema, distribution_policy, file_locations, bucket_number = option_parser_yml(options.yml_config)
- filepath = files[0][:files[0].rfind('/')] if files else ''
- # check conflicting distributed policy
- if distribution_policy.startswith('DISTRIBUTED BY'):
- if len(files) % bucket_number != 0:
- logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
- sys.exit(1)
- if not force_mode:
- if not create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations, bucket_number):
- second_normal_mode, second_exist_mode = False, True
- else:
- fileformat = 'Parquet'
- check_hash_type(dburl, tablename) # Usage1 only support randomly distributed table
-
- if repair_mode:
- # check distribution policy consistency
- # check bucketnum, pagesize, rowgroupsize, etc
- # check filesize smaller
- pass
-
- # check filepath
- if not filepath:
- sys.exit(0)
-
- seg_name = get_seg_name(dburl, tablename, database, fileformat)
- firstsegno, tabledir = get_metadata_from_database(dburl, tablename, seg_name)
-
- if second_exist_mode:
- if tabledir.strip('/') == filepath.strip('/'):
- logger.error('Files to be registered in this case should not be the same with table path.')
- sys.exit(1)
-
- do_not_move, files_update, sizes_update = False, [], []
- if force_mode:
- existed_files, _ = get_files_in_hdfs(tabledir)
- if len(files) == len(existed_files):
- if sorted(files) != sorted(existed_files):
- logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.')
- sys.exit(1)
- else:
- do_not_move, files_update, sizes_update = True, files, sizes
- files, sizes = [], []
- else:
- files_old, sizes_old = [f for f in files], [sz for sz in sizes]
- for k, f in enumerate(files_old):
- if f in existed_files:
- files_update.append(files_old[k])
- sizes_update.append(sizes_old[k])
- files.remove(files_old[k])
- sizes.remove(sizes_old[k])
-
- check_files_and_table_in_same_hdfs_cluster(filepath, tabledir)
-
- if not options.yml_config:
- files, sizes = get_files_in_hdfs(filepath)
- print 'New file(s) to be registered:', files
- if files_update:
- print 'Files(s) catalog info need to be update:', files_update
-
- # set specified eofs
- if options.filesize:
- if len(files) != 1:
- logger.error('-e option is only supported with single file case.')
- sys.exit(1)
- sizes = [options.filesize]
-
- if fileformat == 'Parquet':
- check_parquet_format(files)
- if not do_not_move:
- move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True)
-
- if (not do_not_move) and force_mode:
- update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat, files_update, sizes_update)
- else:
- if force_mode:
- update_metadata_into_database(dburl, seg_name, files_update, sizes_update)
- else:
- insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat)
-
- logger.info('Hawq Register Succeed.')
+ if local_ssh('hdfs'):
+ logger.error('Command "hdfs" is not available.')
+ sys.exit(1)
+ main(options, args)