You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hawq.apache.org by xunzhang <gi...@git.apache.org> on 2016/09/16 10:50:57 UTC

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

GitHub user xunzhang opened a pull request:

    https://github.com/apache/incubator-hawq/pull/904

    HAWQ-1060. Refactor hawq register with better readability and quality.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xunzhang/incubator-hawq HAWQ-1060

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-hawq/pull/904.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #904
    
----
commit 399fc4b5a42d8ff1a19c809fe44e5b20137a70f7
Author: xunzhang <xu...@gmail.com>
Date:   2016-09-16T10:48:39Z

    HAWQ-1060. Refactor hawq register with better readability and quality.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #904: HAWQ-1060. Refactor hawq register with better rea...

Posted by xunzhang <gi...@git.apache.org>.
Github user xunzhang commented on the issue:

    https://github.com/apache/incubator-hawq/pull/904
  
    Auxiliary Info from Pylint:
    
    ```
    wuhong@pivotal-2:~/Desktop/local/bin$ pylint -E hawqregister
    No config file found, using default configuration
    
    wuhong@pivotal-2:~/Desktop/local/bin$ pylint hawqregister
    No config file found, using default configuration
    ************* Module hawqregister
    C: 20, 0: Line too long (110/100) (line-too-long)
    C: 21, 0: Line too long (120/100) (line-too-long)
    C: 55, 0: Line too long (117/100) (line-too-long)
    C: 56, 0: Line too long (112/100) (line-too-long)
    C: 67, 0: Line too long (144/100) (line-too-long)
    C: 74, 0: Line too long (148/100) (line-too-long)
    C: 77, 0: Line too long (173/100) (line-too-long)
    C: 81, 0: Line too long (182/100) (line-too-long)
    C: 84, 0: Line too long (182/100) (line-too-long)
    C: 87, 0: Line too long (120/100) (line-too-long)
    C: 91, 0: Line too long (156/100) (line-too-long)
    C: 95, 0: Line too long (165/100) (line-too-long)
    C: 98, 0: Line too long (165/100) (line-too-long)
    C:104, 0: Line too long (177/100) (line-too-long)
    C:110, 0: Line too long (172/100) (line-too-long)
    C:121, 0: Line too long (147/100) (line-too-long)
    C:122, 0: Line too long (167/100) (line-too-long)
    C:123, 0: Line too long (149/100) (line-too-long)
    C:125, 0: Line too long (128/100) (line-too-long)
    C:126, 0: Line too long (153/100) (line-too-long)
    C:127, 0: Line too long (130/100) (line-too-long)
    C:148, 0: Line too long (106/100) (line-too-long)
    C:151, 0: Line too long (110/100) (line-too-long)
    C:157, 0: Line too long (147/100) (line-too-long)
    C:158, 0: Line too long (182/100) (line-too-long)
    C:160, 0: Line too long (164/100) (line-too-long)
    C:161, 0: Line too long (214/100) (line-too-long)
    C:166, 0: Line too long (167/100) (line-too-long)
    C:178, 0: Line too long (109/100) (line-too-long)
    C:179, 0: Line too long (145/100) (line-too-long)
    C:203, 0: Line too long (152/100) (line-too-long)
    C:204, 0: Line too long (127/100) (line-too-long)
    C:205, 0: Line too long (157/100) (line-too-long)
    C:206, 0: Line too long (139/100) (line-too-long)
    C:208, 0: Line too long (129/100) (line-too-long)
    C:252, 0: Line too long (117/100) (line-too-long)
    C:256, 0: Line too long (162/100) (line-too-long)
    C:259, 0: Line too long (102/100) (line-too-long)
    C:265, 0: Line too long (162/100) (line-too-long)
    C:289, 0: Line too long (107/100) (line-too-long)
    C:297, 0: Line too long (164/100) (line-too-long)
    C:300, 0: Line too long (105/100) (line-too-long)
    C:329, 0: Line too long (136/100) (line-too-long)
    C:339, 0: Line too long (117/100) (line-too-long)
    C:341, 0: Line too long (179/100) (line-too-long)
    C:345, 0: Line too long (116/100) (line-too-long)
    C:401, 0: Line too long (125/100) (line-too-long)
    C:405, 0: Line too long (133/100) (line-too-long)
    C:415, 0: Line too long (117/100) (line-too-long)
    C:422, 0: Line too long (125/100) (line-too-long)
    C:426, 0: Line too long (133/100) (line-too-long)
    C:433, 0: Line too long (117/100) (line-too-long)
    C:460, 0: Line too long (103/100) (line-too-long)
    C:469, 0: Line too long (101/100) (line-too-long)
    C:  1, 0: Missing module docstring (missing-docstring)
    C: 32,20: Invalid variable name "e" (invalid-name)
    C: 38, 0: Invalid constant name "logger" (invalid-name)
    W: 45, 4: Redefining name 'parser' from outer scope (line 478) (redefined-outer-name)
    C: 62, 0: Invalid argument name "D" (invalid-name)
    C: 79,12: Invalid variable name "d" (invalid-name)
    C: 93, 8: Invalid variable name "d" (invalid-name)
    R: 62, 0: Too many branches (20/12) (too-many-branches)
    C:114, 0: Missing function docstring (missing-docstring)
    C:116,32: Invalid variable name "f" (invalid-name)
    C:130, 0: Missing class docstring (missing-docstring)
    C:147, 4: Missing method docstring (missing-docstring)
    C:151, 4: Missing method docstring (missing-docstring)
    R:151, 4: Too many arguments (7/5) (too-many-arguments)
    C:165, 4: Missing method docstring (missing-docstring)
    C:176, 4: Missing method docstring (missing-docstring)
    C:191, 4: Missing method docstring (missing-docstring)
    C:199, 4: Missing method docstring (missing-docstring)
    C:207, 8: Invalid variable name "D" (invalid-name)
    C:211, 4: Missing method docstring (missing-docstring)
    C:215, 0: Missing class docstring (missing-docstring)
    R:215, 0: Too many instance attributes (21/7) (too-many-instance-attributes)
    W:216,23: Redefining name 'options' from outer scope (line 479) (redefined-outer-name)
    C:228, 8: Missing function docstring (missing-docstring)
    C:245, 8: Missing function docstring (missing-docstring)
    C:249, 8: Missing function docstring (missing-docstring)
    C:255, 8: Missing function docstring (missing-docstring)
    C:258, 8: Missing function docstring (missing-docstring)
    C:261, 8: Missing function docstring (missing-docstring)
    C:304,23: Invalid variable name "f" (invalid-name)
    R:244, 4: Too many branches (20/12) (too-many-branches)
    R:244, 4: Too many statements (51/50) (too-many-statements)
    C:328, 4: Invalid method name "_check_files_and_table_in_same_hdfs_cluster" (invalid-name)
    R:328, 4: Method could be a function (no-self-use)
    W:353,21: Unused variable 'err' (unused-variable)
    R:344, 4: Method could be a function (no-self-use)
    C:368,12: Invalid variable name "f" (invalid-name)
    C:370,12: Invalid variable name "rc" (invalid-name)
    W:370,21: Unused variable 'err' (unused-variable)
    W:370,12: Unused variable 'rc' (unused-variable)
    R:366, 4: Method could be a function (no-self-use)
    C:384,12: Invalid variable name "f" (invalid-name)
    R:396, 4: Too many branches (13/12) (too-many-branches)
    C:437, 4: Missing method docstring (missing-docstring)
    R:215, 0: Too few public methods (1/2) (too-few-public-methods)
    W:450,18: Redefining name 'args' from outer scope (line 479) (redefined-outer-name)
    W:450, 9: Redefining name 'options' from outer scope (line 479) (redefined-outer-name)
    C:450, 0: Missing function docstring (missing-docstring)
    W:451,18: Redefining name 'options' from outer scope (line 479) (redefined-outer-name)
    C:478, 4: Invalid constant name "parser" (invalid-name)
    C:479, 4: Invalid constant name "options" (invalid-name)
    C:479,13: Invalid constant name "args" (invalid-name)
    
    
    Report
    ======
    366 statements analysed.
    
    Statistics by type
    ------------------
    
    +---------+-------+-----------+-----------+------------+---------+
    |type     |number |old number |difference |%documented |%badname |
    +=========+=======+===========+===========+============+=========+
    |module   |1      |1          |=          |0.00        |0.00     |
    +---------+-------+-----------+-----------+------------+---------+
    |class    |2      |2          |=          |0.00        |0.00     |
    +---------+-------+-----------+-----------+------------+---------+
    |method   |18     |18         |=          |55.56       |5.56     |
    +---------+-------+-----------+-----------+------------+---------+
    |function |11     |11         |=          |27.27       |0.00     |
    +---------+-------+-----------+-----------+------------+---------+
    
    
    
    External dependencies
    ---------------------
    ::
    
        yaml (hawqregister)
    
    
    
    Raw metrics
    -----------
    
    +----------+-------+------+---------+-----------+
    |type      |number |%     |previous |difference |
    +==========+=======+======+=========+===========+
    |code      |391    |79.63 |391      |=          |
    +----------+-------+------+---------+-----------+
    |docstring |15     |3.05  |15       |=          |
    +----------+-------+------+---------+-----------+
    |comment   |36     |7.33  |36       |=          |
    +----------+-------+------+---------+-----------+
    |empty     |49     |9.98  |49       |=          |
    +----------+-------+------+---------+-----------+
    
    
    
    Duplication
    -----------
    
    +-------------------------+------+---------+-----------+
    |                         |now   |previous |difference |
    +=========================+======+=========+===========+
    |nb duplicated lines      |0     |0        |=          |
    +-------------------------+------+---------+-----------+
    |percent duplicated lines |0.000 |0.000    |=          |
    +-------------------------+------+---------+-----------+
    
    
    
    Messages by category
    --------------------
    
    +-----------+-------+---------+-----------+
    |type       |number |previous |difference |
    +===========+=======+=========+===========+
    |convention |88     |88       |=          |
    +-----------+-------+---------+-----------+
    |refactor   |10     |10       |=          |
    +-----------+-------+---------+-----------+
    |warning    |8      |8        |=          |
    +-----------+-------+---------+-----------+
    |error      |0      |0        |=          |
    +-----------+-------+---------+-----------+
    
    
    
    Messages
    --------
    
    +-----------------------------+------------+
    |message id                   |occurrences |
    +=============================+============+
    |line-too-long                |54          |
    +-----------------------------+------------+
    |missing-docstring            |19          |
    +-----------------------------+------------+
    |invalid-name                 |15          |
    +-----------------------------+------------+
    |redefined-outer-name         |5           |
    +-----------------------------+------------+
    |unused-variable              |3           |
    +-----------------------------+------------+
    |too-many-branches            |3           |
    +-----------------------------+------------+
    |no-self-use                  |3           |
    +-----------------------------+------------+
    |too-many-statements          |1           |
    +-----------------------------+------------+
    |too-many-instance-attributes |1           |
    +-----------------------------+------------+
    |too-many-arguments           |1           |
    +-----------------------------+------------+
    |too-few-public-methods       |1           |
    +-----------------------------+------------+
    
    
    
    Global evaluation
    -----------------
    Your code has been rated at 7.10/10 (previous run: 7.10/10, +0.00)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #904: HAWQ-1060. Refactor hawq register with better rea...

Posted by xunzhang <gi...@git.apache.org>.
Github user xunzhang commented on the issue:

    https://github.com/apache/incubator-hawq/pull/904
  
    cc @ictmalili @zhangh43 @linwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79296000
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    +                    sys.exit(1)
    +                return 'repair'
    +            else:
    +                return 'second_normal'
    +        else:
    +            return 'first'
     
    -    return relname
    +    def _init(self):
    +        def check_hash_type():
    +            self.accessor.check_hash_type(self.tablename)
     
    +        # check conflicting distributed policy
    +        def check_distribution_policy():
    +            if self.distribution_policy.startswith('DISTRIBUTED BY'):
    +                if len(self.files) % self.bucket_number != 0:
    +                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
    +                    sys.exit(1)
     
    -def check_hash_type(dburl, tablename):
    -    '''Check whether target table is hash distributed, in that case simple insertion does not work'''
    -    try:
    -        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    -            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    -            sys.exit(1)
    -        for row in rows:
    -            if row[0]:
    -                logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +        def create_table():
    +            return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
    +
    +        def get_seg_name():
    +            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
    +
    +        def get_metadata():
    +            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
    +
    +        def get_distribution_policy():
    +            return self.accessor.get_distribution_policy_info(self.tablename)
    +
    +        def check_policy_consistency():
    +            policy = get_distribution_policy() # "" or "{1,3}"
    +            if policy is None:
    +                if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly':
    +                    return
    +                else:
    +                    logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
    +                    sys.exit(1)
    +            tmp_dict = {}
    +            for i, d in enumerate(self.schema):
    +                tmp_dict[d['name']] = i + 1
    +            # 'DISTRIBUETD BY (1,3)' -> {1,3}
    +            cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
    +            original_policy = ','.join([str(tmp_dict[col]) for col in cols])
    +            if policy.strip('{').strip('}') != original_policy:
    +                logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
                     sys.exit(1)
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +        def check_bucket_number():
    +            def get_bucket_number():
    +                return self.accessor.get_bucket_number(self.tablename)
     
    -def get_metadata_from_database(dburl, tablename, seg_name):
    -    '''Get the metadata to be inserted from hdfs'''
    -    try:
    -        query = 'select segno from pg_aoseg.%s;' % seg_name
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    +            if self.bucket_number != get_bucket_number():
    +                logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
    +                sys.exit(1)
     
    -    firstsegno = rows.rowcount + 1
    +        if self.yml:
    +            self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml)
    +            self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
    +            check_distribution_policy()
    +            if self.mode != 'force' and self.mode != 'repair':
    +                if not create_table():
    +                    self.mode = 'second_exist'
    +        else:
    +            self.file_format = 'Parquet'
    +            check_hash_type() # Usage1 only support randomly distributed table
    +        if not self.filepath:
    +            sys.exit(0)
     
    -    try:
    -        # get the full path of correspoding file for target table
    -        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    -                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    -                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    -                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -    for row in rows:
    -        tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), str(row[3]), ''])
    -    return firstsegno, tabledir
    -
    -
    -def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
    -    '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
    -    if not filepath:
    -        return
    -    # check whether the files to be registered is in hdfs
    -    filesystem = filepath.split('://')
    -    if filesystem[0] != 'hdfs':
    -        logger.error('Only support to register file(s) in hdfs')
    -        sys.exit(1)
    -    fileroot = filepath.split('/')
    -    tableroot = tabledir.split('/')
    -    # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
    -    if fileroot[0:3] != tableroot[0:3]:
    -        logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
    -        sys.exit(1)
    +        self.seg_name = get_seg_name()
    +        self.firstsegno, self.tabledir = get_metadata()
     
    +        if self.mode == 'repair':
    +            if self.tabledir.strip('/') != self.filepath.strip('/'):
    +                logger.error('In repair mode, tablename in yml file should be the same with input args')
    +                sys.exit(1)
    +            check_policy_consistency()
    +            check_bucket_number()
    +            existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
    +            existed_info = {}
    +            for k, fn in enumerate(existed_files):
    +                existed_info[fn] = existed_sizes[k]
    +            for k, fn in enumerate(self.files):
    +                if fn not in existed_files:
    +                    logger.error('Can not register --repair since giving non-existed files.')
    +                    sys.exit(1)
    +                if self.sizes[k] > existed_info[fn]:
    +                    logger.error('Can not register --repair since giving larger file size.')
    +                    sys.exit(1)
     
    -def get_files_in_hdfs(filepath):
    -    '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files'''
    -    files = []
    -    sizes = []
    -    hdfscmd = "hdfs dfs -test -e %s" % filepath
    -    result = local_ssh(hdfscmd, logger)
    -    if result != 0:
    -        logger.error("Path '%s' does not exist in hdfs" % filepath)
    -        sys.exit(1)
    -    hdfscmd = "hdfs dfs -ls -R %s" % filepath
    -    result, out, err = local_ssh_output(hdfscmd)
    -    outlines = out.splitlines()
    -    # recursively search all the files under path 'filepath'
    -    for line in outlines:
    -        lineargs = line.split()
    -        if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
    -            files.append(lineargs[7])
    -            sizes.append(int(lineargs[4]))
    -    if len(files) == 0:
    -        logger.error("Dir '%s' is empty" % filepath)
    -        sys.exit(1)
    -    return files, sizes
    -
    -
    -def check_parquet_format(files):
    -    '''Check whether the file to be registered is parquet format'''
    -    for f in files:
    -        hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
    -        rc, out, err = local_ssh_output(hdfscmd)
    -        if out == '0':
    -            continue
    -        hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
    -        result1 = local_ssh(hdfscmd, logger)
    -        hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
    -        result2 = local_ssh(hdfscmd, logger)
    -        if result1 or result2:
    -            logger.error('File %s is not parquet format' % f)
    +        if self.mode == 'second_exist':
    +            if self.tabledir.strip('/') == self.filepath.strip('/'):
    +                logger.error('Files to be registeted in this case should not be the same with table path.')
    +                sys.exit(1)
    +
    +        self.do_not_move, self.files_update, self.sizes_update = False, [], []
    +        if self.mode == 'force':
    +            existed_files, _ = self._get_files_in_hdfs(self.tabledir)
    +            if len(self.files) == len(existed_files):
    +                if sorted(self.files) != sorted(existed_files):
    +                    logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.')
    --- End diff --
    
    In force mode, you should include existing table files in .yml configuration file. Otherwise you should drop ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295956
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    +                    sys.exit(1)
    +                return 'repair'
    +            else:
    +                return 'second_normal'
    +        else:
    +            return 'first'
     
    -    return relname
    +    def _init(self):
    +        def check_hash_type():
    +            self.accessor.check_hash_type(self.tablename)
     
    +        # check conflicting distributed policy
    +        def check_distribution_policy():
    +            if self.distribution_policy.startswith('DISTRIBUTED BY'):
    +                if len(self.files) % self.bucket_number != 0:
    +                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
    +                    sys.exit(1)
     
    -def check_hash_type(dburl, tablename):
    -    '''Check whether target table is hash distributed, in that case simple insertion does not work'''
    -    try:
    -        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    -            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    -            sys.exit(1)
    -        for row in rows:
    -            if row[0]:
    -                logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +        def create_table():
    +            return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
    +
    +        def get_seg_name():
    +            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
    +
    +        def get_metadata():
    +            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
    +
    +        def get_distribution_policy():
    +            return self.accessor.get_distribution_policy_info(self.tablename)
    +
    +        def check_policy_consistency():
    +            policy = get_distribution_policy() # "" or "{1,3}"
    +            if policy is None:
    +                if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly':
    +                    return
    +                else:
    +                    logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
    +                    sys.exit(1)
    +            tmp_dict = {}
    +            for i, d in enumerate(self.schema):
    +                tmp_dict[d['name']] = i + 1
    +            # 'DISTRIBUETD BY (1,3)' -> {1,3}
    +            cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
    +            original_policy = ','.join([str(tmp_dict[col]) for col in cols])
    +            if policy.strip('{').strip('}') != original_policy:
    +                logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
                     sys.exit(1)
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +        def check_bucket_number():
    +            def get_bucket_number():
    +                return self.accessor.get_bucket_number(self.tablename)
     
    -def get_metadata_from_database(dburl, tablename, seg_name):
    -    '''Get the metadata to be inserted from hdfs'''
    -    try:
    -        query = 'select segno from pg_aoseg.%s;' % seg_name
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    +            if self.bucket_number != get_bucket_number():
    +                logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
    +                sys.exit(1)
     
    -    firstsegno = rows.rowcount + 1
    +        if self.yml:
    +            self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml)
    +            self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
    +            check_distribution_policy()
    +            if self.mode != 'force' and self.mode != 'repair':
    +                if not create_table():
    +                    self.mode = 'second_exist'
    +        else:
    +            self.file_format = 'Parquet'
    +            check_hash_type() # Usage1 only support randomly distributed table
    +        if not self.filepath:
    +            sys.exit(0)
     
    -    try:
    -        # get the full path of correspoding file for target table
    -        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    -                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    -                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    -                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -    for row in rows:
    -        tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), str(row[3]), ''])
    -    return firstsegno, tabledir
    -
    -
    -def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
    -    '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
    -    if not filepath:
    -        return
    -    # check whether the files to be registered is in hdfs
    -    filesystem = filepath.split('://')
    -    if filesystem[0] != 'hdfs':
    -        logger.error('Only support to register file(s) in hdfs')
    -        sys.exit(1)
    -    fileroot = filepath.split('/')
    -    tableroot = tabledir.split('/')
    -    # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
    -    if fileroot[0:3] != tableroot[0:3]:
    -        logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
    -        sys.exit(1)
    +        self.seg_name = get_seg_name()
    +        self.firstsegno, self.tabledir = get_metadata()
     
    +        if self.mode == 'repair':
    +            if self.tabledir.strip('/') != self.filepath.strip('/'):
    +                logger.error('In repair mode, tablename in yml file should be the same with input args')
    +                sys.exit(1)
    +            check_policy_consistency()
    +            check_bucket_number()
    +            existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
    +            existed_info = {}
    +            for k, fn in enumerate(existed_files):
    +                existed_info[fn] = existed_sizes[k]
    +            for k, fn in enumerate(self.files):
    +                if fn not in existed_files:
    +                    logger.error('Can not register --repair since giving non-existed files.')
    +                    sys.exit(1)
    +                if self.sizes[k] > existed_info[fn]:
    +                    logger.error('Can not register --repair since giving larger file size.')
    +                    sys.exit(1)
     
    -def get_files_in_hdfs(filepath):
    -    '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files'''
    -    files = []
    -    sizes = []
    -    hdfscmd = "hdfs dfs -test -e %s" % filepath
    -    result = local_ssh(hdfscmd, logger)
    -    if result != 0:
    -        logger.error("Path '%s' does not exist in hdfs" % filepath)
    -        sys.exit(1)
    -    hdfscmd = "hdfs dfs -ls -R %s" % filepath
    -    result, out, err = local_ssh_output(hdfscmd)
    -    outlines = out.splitlines()
    -    # recursively search all the files under path 'filepath'
    -    for line in outlines:
    -        lineargs = line.split()
    -        if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
    -            files.append(lineargs[7])
    -            sizes.append(int(lineargs[4]))
    -    if len(files) == 0:
    -        logger.error("Dir '%s' is empty" % filepath)
    -        sys.exit(1)
    -    return files, sizes
    -
    -
    -def check_parquet_format(files):
    -    '''Check whether the file to be registered is parquet format'''
    -    for f in files:
    -        hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
    -        rc, out, err = local_ssh_output(hdfscmd)
    -        if out == '0':
    -            continue
    -        hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
    -        result1 = local_ssh(hdfscmd, logger)
    -        hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
    -        result2 = local_ssh(hdfscmd, logger)
    -        if result1 or result2:
    -            logger.error('File %s is not parquet format' % f)
    +        if self.mode == 'second_exist':
    +            if self.tabledir.strip('/') == self.filepath.strip('/'):
    +                logger.error('Files to be registeted in this case should not be the same with table path.')
    --- End diff --
    
    typo. "registered". 
    Also change 'in this case'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #904: HAWQ-1060. Refactor hawq register with better rea...

Posted by xunzhang <gi...@git.apache.org>.
Github user xunzhang commented on the issue:

    https://github.com/apache/incubator-hawq/pull/904
  
    also cc @wengyanqing @ztao1987 @wcl14 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #904: HAWQ-1060. Refactor hawq register with better rea...

Posted by xunzhang <gi...@git.apache.org>.
Github user xunzhang commented on the issue:

    https://github.com/apache/incubator-hawq/pull/904
  
    Merged into master, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295485
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    --- End diff --
    
    "create" should be "created".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295812
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    +                    sys.exit(1)
    +                return 'repair'
    +            else:
    +                return 'second_normal'
    +        else:
    +            return 'first'
     
    -    return relname
    +    def _init(self):
    +        def check_hash_type():
    +            self.accessor.check_hash_type(self.tablename)
     
    +        # check conflicting distributed policy
    +        def check_distribution_policy():
    +            if self.distribution_policy.startswith('DISTRIBUTED BY'):
    +                if len(self.files) % self.bucket_number != 0:
    +                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
    +                    sys.exit(1)
     
    -def check_hash_type(dburl, tablename):
    -    '''Check whether target table is hash distributed, in that case simple insertion does not work'''
    -    try:
    -        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    -            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    -            sys.exit(1)
    -        for row in rows:
    -            if row[0]:
    -                logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +        def create_table():
    +            return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
    +
    +        def get_seg_name():
    +            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
    +
    +        def get_metadata():
    +            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
    +
    +        def get_distribution_policy():
    +            return self.accessor.get_distribution_policy_info(self.tablename)
    +
    +        def check_policy_consistency():
    +            policy = get_distribution_policy() # "" or "{1,3}"
    +            if policy is None:
    --- End diff --
    
    Why this policy can be none? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by xunzhang <gi...@git.apache.org>.
Github user xunzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295679
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    --- End diff --
    
    sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #904: HAWQ-1060. Refactor hawq register with better rea...

Posted by zhangh43 <gi...@git.apache.org>.
Github user zhangh43 commented on the issue:

    https://github.com/apache/incubator-hawq/pull/904
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295805
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    +                    sys.exit(1)
    +                return 'repair'
    +            else:
    +                return 'second_normal'
    +        else:
    +            return 'first'
     
    -    return relname
    +    def _init(self):
    +        def check_hash_type():
    +            self.accessor.check_hash_type(self.tablename)
     
    +        # check conflicting distributed policy
    +        def check_distribution_policy():
    +            if self.distribution_policy.startswith('DISTRIBUTED BY'):
    +                if len(self.files) % self.bucket_number != 0:
    +                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
    +                    sys.exit(1)
     
    -def check_hash_type(dburl, tablename):
    -    '''Check whether target table is hash distributed, in that case simple insertion does not work'''
    -    try:
    -        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    -            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    -            sys.exit(1)
    -        for row in rows:
    -            if row[0]:
    -                logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +        def create_table():
    +            return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
    +
    +        def get_seg_name():
    +            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
    +
    +        def get_metadata():
    +            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
    +
    +        def get_distribution_policy():
    +            return self.accessor.get_distribution_policy_info(self.tablename)
    +
    +        def check_policy_consistency():
    +            policy = get_distribution_policy() # "" or "{1,3}"
    +            if policy is None:
    +                if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly':
    +                    return
    +                else:
    +                    logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
    --- End diff --
    
    What's previous policy? Could we change "previous" to a more specific description?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by xunzhang <gi...@git.apache.org>.
Github user xunzhang closed the pull request at:

    https://github.com/apache/incubator-hawq/pull/904


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295934
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    +                    sys.exit(1)
    +                return 'repair'
    +            else:
    +                return 'second_normal'
    +        else:
    +            return 'first'
     
    -    return relname
    +    def _init(self):
    +        def check_hash_type():
    +            self.accessor.check_hash_type(self.tablename)
     
    +        # check conflicting distributed policy
    +        def check_distribution_policy():
    +            if self.distribution_policy.startswith('DISTRIBUTED BY'):
    +                if len(self.files) % self.bucket_number != 0:
    +                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
    +                    sys.exit(1)
     
    -def check_hash_type(dburl, tablename):
    -    '''Check whether target table is hash distributed, in that case simple insertion does not work'''
    -    try:
    -        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    -            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    -            sys.exit(1)
    -        for row in rows:
    -            if row[0]:
    -                logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +        def create_table():
    +            return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
    +
    +        def get_seg_name():
    +            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
    +
    +        def get_metadata():
    +            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
    +
    +        def get_distribution_policy():
    +            return self.accessor.get_distribution_policy_info(self.tablename)
    +
    +        def check_policy_consistency():
    +            policy = get_distribution_policy() # "" or "{1,3}"
    +            if policy is None:
    +                if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly':
    +                    return
    +                else:
    +                    logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
    +                    sys.exit(1)
    +            tmp_dict = {}
    +            for i, d in enumerate(self.schema):
    +                tmp_dict[d['name']] = i + 1
    +            # 'DISTRIBUETD BY (1,3)' -> {1,3}
    +            cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
    +            original_policy = ','.join([str(tmp_dict[col]) for col in cols])
    +            if policy.strip('{').strip('}') != original_policy:
    +                logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
                     sys.exit(1)
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +        def check_bucket_number():
    +            def get_bucket_number():
    +                return self.accessor.get_bucket_number(self.tablename)
     
    -def get_metadata_from_database(dburl, tablename, seg_name):
    -    '''Get the metadata to be inserted from hdfs'''
    -    try:
    -        query = 'select segno from pg_aoseg.%s;' % seg_name
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    +            if self.bucket_number != get_bucket_number():
    +                logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
    +                sys.exit(1)
     
    -    firstsegno = rows.rowcount + 1
    +        if self.yml:
    +            self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml)
    +            self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
    +            check_distribution_policy()
    +            if self.mode != 'force' and self.mode != 'repair':
    +                if not create_table():
    +                    self.mode = 'second_exist'
    +        else:
    +            self.file_format = 'Parquet'
    +            check_hash_type() # Usage1 only support randomly distributed table
    +        if not self.filepath:
    +            sys.exit(0)
     
    -    try:
    -        # get the full path of correspoding file for target table
    -        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    -                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    -                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    -                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -    for row in rows:
    -        tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), str(row[3]), ''])
    -    return firstsegno, tabledir
    -
    -
    -def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
    -    '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
    -    if not filepath:
    -        return
    -    # check whether the files to be registered is in hdfs
    -    filesystem = filepath.split('://')
    -    if filesystem[0] != 'hdfs':
    -        logger.error('Only support to register file(s) in hdfs')
    -        sys.exit(1)
    -    fileroot = filepath.split('/')
    -    tableroot = tabledir.split('/')
    -    # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
    -    if fileroot[0:3] != tableroot[0:3]:
    -        logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
    -        sys.exit(1)
    +        self.seg_name = get_seg_name()
    +        self.firstsegno, self.tabledir = get_metadata()
     
    +        if self.mode == 'repair':
    +            if self.tabledir.strip('/') != self.filepath.strip('/'):
    +                logger.error('In repair mode, tablename in yml file should be the same with input args')
    +                sys.exit(1)
    +            check_policy_consistency()
    +            check_bucket_number()
    +            existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
    +            existed_info = {}
    +            for k, fn in enumerate(existed_files):
    +                existed_info[fn] = existed_sizes[k]
    +            for k, fn in enumerate(self.files):
    +                if fn not in existed_files:
    +                    logger.error('Can not register --repair since giving non-existed files.')
    +                    sys.exit(1)
    --- End diff --
    
    Could we print more detail information for non-existed files? Also change --repair to repair mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79296004
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    +                    sys.exit(1)
    +                return 'repair'
    +            else:
    +                return 'second_normal'
    +        else:
    +            return 'first'
     
    -    return relname
    +    def _init(self):
    +        def check_hash_type():
    +            self.accessor.check_hash_type(self.tablename)
     
    +        # check conflicting distributed policy
    +        def check_distribution_policy():
    +            if self.distribution_policy.startswith('DISTRIBUTED BY'):
    +                if len(self.files) % self.bucket_number != 0:
    +                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
    +                    sys.exit(1)
     
    -def check_hash_type(dburl, tablename):
    -    '''Check whether target table is hash distributed, in that case simple insertion does not work'''
    -    try:
    -        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    -            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    -            sys.exit(1)
    -        for row in rows:
    -            if row[0]:
    -                logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +        def create_table():
    +            return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
    +
    +        def get_seg_name():
    +            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
    +
    +        def get_metadata():
    +            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
    +
    +        def get_distribution_policy():
    +            return self.accessor.get_distribution_policy_info(self.tablename)
    +
    +        def check_policy_consistency():
    +            policy = get_distribution_policy() # "" or "{1,3}"
    +            if policy is None:
    +                if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly':
    +                    return
    +                else:
    +                    logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
    +                    sys.exit(1)
    +            tmp_dict = {}
    +            for i, d in enumerate(self.schema):
    +                tmp_dict[d['name']] = i + 1
    +            # 'DISTRIBUETD BY (1,3)' -> {1,3}
    +            cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
    +            original_policy = ','.join([str(tmp_dict[col]) for col in cols])
    +            if policy.strip('{').strip('}') != original_policy:
    +                logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
                     sys.exit(1)
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +        def check_bucket_number():
    +            def get_bucket_number():
    +                return self.accessor.get_bucket_number(self.tablename)
     
    -def get_metadata_from_database(dburl, tablename, seg_name):
    -    '''Get the metadata to be inserted from hdfs'''
    -    try:
    -        query = 'select segno from pg_aoseg.%s;' % seg_name
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    +            if self.bucket_number != get_bucket_number():
    +                logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
    +                sys.exit(1)
     
    -    firstsegno = rows.rowcount + 1
    +        if self.yml:
    +            self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml)
    +            self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
    +            check_distribution_policy()
    +            if self.mode != 'force' and self.mode != 'repair':
    +                if not create_table():
    +                    self.mode = 'second_exist'
    +        else:
    +            self.file_format = 'Parquet'
    +            check_hash_type() # Usage1 only support randomly distributed table
    +        if not self.filepath:
    +            sys.exit(0)
     
    -    try:
    -        # get the full path of correspoding file for target table
    -        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    -                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    -                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    -                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -    for row in rows:
    -        tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), str(row[3]), ''])
    -    return firstsegno, tabledir
    -
    -
    -def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
    -    '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
    -    if not filepath:
    -        return
    -    # check whether the files to be registered is in hdfs
    -    filesystem = filepath.split('://')
    -    if filesystem[0] != 'hdfs':
    -        logger.error('Only support to register file(s) in hdfs')
    -        sys.exit(1)
    -    fileroot = filepath.split('/')
    -    tableroot = tabledir.split('/')
    -    # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
    -    if fileroot[0:3] != tableroot[0:3]:
    -        logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
    -        sys.exit(1)
    +        self.seg_name = get_seg_name()
    +        self.firstsegno, self.tabledir = get_metadata()
     
    +        if self.mode == 'repair':
    +            if self.tabledir.strip('/') != self.filepath.strip('/'):
    +                logger.error('In repair mode, tablename in yml file should be the same with input args')
    +                sys.exit(1)
    +            check_policy_consistency()
    +            check_bucket_number()
    +            existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
    +            existed_info = {}
    +            for k, fn in enumerate(existed_files):
    +                existed_info[fn] = existed_sizes[k]
    +            for k, fn in enumerate(self.files):
    +                if fn not in existed_files:
    +                    logger.error('Can not register --repair since giving non-existed files.')
    +                    sys.exit(1)
    +                if self.sizes[k] > existed_info[fn]:
    +                    logger.error('Can not register --repair since giving larger file size.')
    +                    sys.exit(1)
     
    -def get_files_in_hdfs(filepath):
    -    '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files'''
    -    files = []
    -    sizes = []
    -    hdfscmd = "hdfs dfs -test -e %s" % filepath
    -    result = local_ssh(hdfscmd, logger)
    -    if result != 0:
    -        logger.error("Path '%s' does not exist in hdfs" % filepath)
    -        sys.exit(1)
    -    hdfscmd = "hdfs dfs -ls -R %s" % filepath
    -    result, out, err = local_ssh_output(hdfscmd)
    -    outlines = out.splitlines()
    -    # recursively search all the files under path 'filepath'
    -    for line in outlines:
    -        lineargs = line.split()
    -        if len(lineargs) == 8 and lineargs[0].find ("d") == -1:
    -            files.append(lineargs[7])
    -            sizes.append(int(lineargs[4]))
    -    if len(files) == 0:
    -        logger.error("Dir '%s' is empty" % filepath)
    -        sys.exit(1)
    -    return files, sizes
    -
    -
    -def check_parquet_format(files):
    -    '''Check whether the file to be registered is parquet format'''
    -    for f in files:
    -        hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f
    -        rc, out, err = local_ssh_output(hdfscmd)
    -        if out == '0':
    -            continue
    -        hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f
    -        result1 = local_ssh(hdfscmd, logger)
    -        hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f
    -        result2 = local_ssh(hdfscmd, logger)
    -        if result1 or result2:
    -            logger.error('File %s is not parquet format' % f)
    +        if self.mode == 'second_exist':
    +            if self.tabledir.strip('/') == self.filepath.strip('/'):
    +                logger.error('Files to be registeted in this case should not be the same with table path.')
    +                sys.exit(1)
    +
    +        self.do_not_move, self.files_update, self.sizes_update = False, [], []
    +        if self.mode == 'force':
    +            existed_files, _ = self._get_files_in_hdfs(self.tabledir)
    +            if len(self.files) == len(existed_files):
    +                if sorted(self.files) != sorted(existed_files):
    +                    logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.')
    +                    sys.exit(1)
    +                else:
    +                    self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes
    +                    self.files, self.sizes = [], []
    +            else:
    +                files_old, sizes_old = [f for f in self.files], [sz for sz in self.sizes]
    +                for k, f in enumerate(files_old):
    +                    if f in existed_files:
    +                        self.files_update.append(files_old[k])
    +                        self.sizes_update.append(sizes_old[k])
    +                        self.files.remove(files_old[k])
    +                        self.sizes.remove(sizes_old[k])
    +        elif self.mode == 'repair':
    +            self.do_not_move = True
    +            self.files_update, self.sizes_update = [fn for fn in self.files], [sz for sz in self.sizes]
    +            self.files_delete = []
    +            for fn in existed_files:
    +                if fn not in self.files:
    +                    self.files_delete.append(fn)
    +            self.files, self.sizes = [], []
    +
    +        self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir)
    +
    +        if not self.yml:
    +            self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
    +        print 'New file(s) to be registered: ', self.files
    +        if self.files_update:
    +            print 'Catalog info need to be updated for these files: ', self.files_update
    +
    +        if self.filesize:
    +            if len(self.files) != 1:
    +                logger.error('-e option is only supported with single file case.')
    +                sys.exit(1)
    +            self.sizes = [self.filesize]
    +
    +        if self.file_format == 'Parquet':
    +            self._check_parquet_format(self.files)
    +
    +    def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir):
    +        '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
    +        if not filepath:
    +            return
    +        # check whether the files to be registered is in hdfs
    +        filesystem = filepath.split('://')
    +        if filesystem[0] != 'hdfs':
    +            logger.error('Only support to register file(s) in hdfs')
    --- End diff --
    
    Only support registering file(s) in hdfs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295436
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    --- End diff --
    
    Could we change the description more user friendly? For example, "Table %s is not an append-only table. There's no record in gp_distribution_policy" 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #904: HAWQ-1060. Refactor hawq register with bet...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/904#discussion_r79295886
  
    --- Diff: tools/bin/hawqregister ---
    @@ -126,182 +127,319 @@ def option_parser_yml(yml_file):
         return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum']
     
     
    -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    -    try:
    -        query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -        for row in rows:
    -            if row[0] != 0:
    -                return False
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -
    -    try:
    +class GpRegisterAccessor(object):
    +    def __init__(self, conn):
    +        self.conn = conn
    +        rows = self.exec_query("""
    +        SELECT oid, datname, dat2tablespace,
    +               pg_encoding_to_char(encoding) encoding
    +        FROM pg_database WHERE datname=current_database()""")
    +        self.dbid = rows[0]['oid']
    +        self.dbname = rows[0]['datname']
    +        self.spcid = rows[0]['dat2tablespace']
    +        self.dbencoding = rows[0]['encoding']
    +        self.dbversion = self.exec_query('select version()')[0]['version']
    +
    +    def exec_query(self, sql):
    +        '''execute query and return dict result'''
    +        return self.conn.query(sql).dictresult()
    +
    +    def get_table_existed(self, tablename):
    +        qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower()
    +        return self.exec_query(qry)[0]['count'] == 1
    +
    +    def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number):
    +        if self.get_table_existed(tablename):
    +            return False
             schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info])
             fmt = 'ROW' if fmt == 'AO' else fmt
             if fmt == 'ROW':
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy))
             else: # Parquet
                 query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;'
    -                    % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    +                     % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy))
    +        self.conn.query(query)
             return True
    -    except DatabaseError, ex:
    -        print DatabaseError, ex
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +    def check_hash_type(self, tablename):
    +        qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename
    +        rows = self.exec_query(qry)
    +        if len(rows) == 0:
    +            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    +            sys.exit(1)
    +        if rows[0]['attrnums']:
    +            logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +            sys.exit(1)
     
    -def get_seg_name(dburl, tablename, database, fmt):
    -    try:
    -        relname = ''
    +    # pg_paqseg_#
    +    def get_seg_name(self, tablename, database, fmt):
             tablename = tablename.split('.')[-1]
             query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 "
                      "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename
    -        conn = dbconn.connect(dburl, True)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    +        rows = self.exec_query(query)
    +        if len(rows) == 0:
                 logger.error('table "%s" not found in db "%s"' % (tablename, database))
                 sys.exit(1)
    -        for row in rows:
    -            relname = row[0]
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to run query "%s" with dbname "%s"' % (query, database))
    -        sys.exit(1)
    -    if fmt == 'Parquet':
    -        if relname.find("paq") == -1:
    -            logger.error("table '%s' is not parquet format" % tablename)
    -            sys.exit(1)
    +        relname = rows[0]['relname']
    +        if fmt == 'Parquet':
    +            if relname.find('paq') == -1:
    +                logger.error("table '%s' is not parquet format" % tablename)
    +                sys.exit(1)
    +        return relname
    +
    +    def get_distribution_policy_info(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['attrnums']
    +
    +    def get_bucket_number(self, tablename):
    +        query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower()
    +        rows = self.exec_query(query)
    +        oid = rows[0]['oid']
    +        query = "select * from gp_distribution_policy where localoid = '%s';" % oid
    +        rows = self.exec_query(query)
    +        return rows[0]['bucketnum']
    +
    +    def get_metadata_from_database(self, tablename, seg_name):
    +        query = 'select segno from pg_aoseg.%s;' % seg_name
    +        firstsegno = len(self.exec_query(query)) + 1
    +        # get the full path of correspoding file for target table
    +        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    +                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    +                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    +                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    +        D = self.exec_query(query)[0]
    +        tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), ''])
    +        return firstsegno, tabledir
    +
    +    def update_catalog(self, query):
    +        self.conn.query(query)
    +
    +
    +class HawqRegister(object):
    +    def __init__(self, options, table, utility_conn, conn):
    +        self.yml = options.yml_config
    +        self.filepath = options.filepath
    +        self.database = options.database
    +        self.tablename = table
    +        self.filesize = options.filesize
    +        self.accessor = GpRegisterAccessor(conn)
    +        self.utility_accessor = GpRegisterAccessor(utility_conn)
    +        self.mode = self._init_mode(options.force, options.repair)
    +        self._init()
    +
    +    def _init_mode(self, force, repair):
    +        def table_existed():
    +            return self.accessor.get_table_existed(self.tablename)
    +
    +        if self.yml:
    +            if force:
    +                return 'force'
    +            elif repair:
    +                if not table_existed():
    +                    logger.error('--repair mode asserts the table is already create.')
    +                    sys.exit(1)
    +                return 'repair'
    +            else:
    +                return 'second_normal'
    +        else:
    +            return 'first'
     
    -    return relname
    +    def _init(self):
    +        def check_hash_type():
    +            self.accessor.check_hash_type(self.tablename)
     
    +        # check conflicting distributed policy
    +        def check_distribution_policy():
    +            if self.distribution_policy.startswith('DISTRIBUTED BY'):
    +                if len(self.files) % self.bucket_number != 0:
    +                    logger.error('Files to be registered must be multiple times to the bucket number of hash table.')
    +                    sys.exit(1)
     
    -def check_hash_type(dburl, tablename):
    -    '''Check whether target table is hash distributed, in that case simple insertion does not work'''
    -    try:
    -        query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        if not rows.rowcount:
    -            logger.error('Table %s not found in table gp_distribution_policy.' % tablename)
    -            sys.exit(1)
    -        for row in rows:
    -            if row[0]:
    -                logger.error('Cannot register file(s) to a table which is hash distribuetd.')
    +        def create_table():
    +            return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number)
    +
    +        def get_seg_name():
    +            return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format)
    +
    +        def get_metadata():
    +            return self.accessor.get_metadata_from_database(self.tablename, self.seg_name)
    +
    +        def get_distribution_policy():
    +            return self.accessor.get_distribution_policy_info(self.tablename)
    +
    +        def check_policy_consistency():
    +            policy = get_distribution_policy() # "" or "{1,3}"
    +            if policy is None:
    +                if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly':
    +                    return
    +                else:
    +                    logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
    +                    sys.exit(1)
    +            tmp_dict = {}
    +            for i, d in enumerate(self.schema):
    +                tmp_dict[d['name']] = i + 1
    +            # 'DISTRIBUETD BY (1,3)' -> {1,3}
    +            cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',')
    +            original_policy = ','.join([str(tmp_dict[col]) for col in cols])
    +            if policy.strip('{').strip('}') != original_policy:
    +                logger.error('Distribution policy of %s is not consistent with previous policy.' % self.tablename)
                     sys.exit(1)
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
     
    +        def check_bucket_number():
    +            def get_bucket_number():
    +                return self.accessor.get_bucket_number(self.tablename)
     
    -def get_metadata_from_database(dburl, tablename, seg_name):
    -    '''Get the metadata to be inserted from hdfs'''
    -    try:
    -        query = 'select segno from pg_aoseg.%s;' % seg_name
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    +            if self.bucket_number != get_bucket_number():
    +                logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename)
    +                sys.exit(1)
     
    -    firstsegno = rows.rowcount + 1
    +        if self.yml:
    +            self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml)
    +            self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
    +            check_distribution_policy()
    +            if self.mode != 'force' and self.mode != 'repair':
    +                if not create_table():
    +                    self.mode = 'second_exist'
    +        else:
    +            self.file_format = 'Parquet'
    +            check_hash_type() # Usage1 only support randomly distributed table
    +        if not self.filepath:
    +            sys.exit(0)
     
    -    try:
    -        # get the full path of correspoding file for target table
    -        query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, "
    -                 "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = "
    -                 "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid "
    -                 "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1]
    -        conn = dbconn.connect(dburl, False)
    -        rows = dbconn.execSQL(conn, query)
    -        conn.commit()
    -        conn.close()
    -    except DatabaseError, ex:
    -        logger.error('Failed to execute query "%s"' % query)
    -        sys.exit(1)
    -    for row in rows:
    -        tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), str(row[3]), ''])
    -    return firstsegno, tabledir
    -
    -
    -def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir):
    -    '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster'''
    -    if not filepath:
    -        return
    -    # check whether the files to be registered is in hdfs
    -    filesystem = filepath.split('://')
    -    if filesystem[0] != 'hdfs':
    -        logger.error('Only support to register file(s) in hdfs')
    -        sys.exit(1)
    -    fileroot = filepath.split('/')
    -    tableroot = tabledir.split('/')
    -    # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020'
    -    if fileroot[0:3] != tableroot[0:3]:
    -        logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir))
    -        sys.exit(1)
    +        self.seg_name = get_seg_name()
    +        self.firstsegno, self.tabledir = get_metadata()
     
    +        if self.mode == 'repair':
    +            if self.tabledir.strip('/') != self.filepath.strip('/'):
    +                logger.error('In repair mode, tablename in yml file should be the same with input args')
    --- End diff --
    
    Could we print the input args here? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #904: HAWQ-1060. Refactor hawq register with better rea...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on the issue:

    https://github.com/apache/incubator-hawq/pull/904
  
    LGTM. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---