You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2015/11/05 04:10:03 UTC
[7/8] incubator-hawq git commit: HAWQ-121. Remove legacy command line
tools.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9932786b/tools/bin/gpexpand
----------------------------------------------------------------------
diff --git a/tools/bin/gpexpand b/tools/bin/gpexpand
deleted file mode 100755
index eda97d6..0000000
--- a/tools/bin/gpexpand
+++ /dev/null
@@ -1,2931 +0,0 @@
-#!/usr/bin/env python
-# Line too long - pylint: disable=C0301
-# Invalid name - pylint: disable=C0103
-#
-# Copyright (c) Pivotal Inc 2014. All Rights Reserved.
-#
-from gppylib.mainUtils import getProgramName
-
-import copy
-import datetime
-import os
-import sys
-import socket
-import signal
-import traceback
-from time import strftime, sleep
-
-try:
- from gppylib.commands.unix import *
- from gppylib.fault_injection import inject_fault
- from gppylib.commands.gp import *
- from gppylib.commands.pg import PgControlData
- from gppylib.gparray import GpArray, MODE_CHANGELOGGING, STATUS_DOWN, GpDB
- from gppylib.gpparseopts import OptParser, OptChecker
- from gppylib.gplog import *
- from gppylib.db import catalog
- from gppylib.db import dbconn
- from gppylib.userinput import *
- from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS
- from gppylib.system import configurationInterface, configurationImplGpdb
- from gppylib.system.environment import GpMasterEnvironment
- from pygresql.pgdb import DatabaseError
- from pygresql import pg
- from gppylib.gpcoverage import GpCoverage
- from gppylib.gpcatalog import MASTER_ONLY_TABLES
- from gppylib.operations.package import SyncPackages
- from gppylib.operations.utils import ParallelOperation
- from gppylib.parseutils import line_reader, parse_gpexpand_segment_line, \
- canonicalize_address
- from gppylib.operations.filespace import PG_SYSTEM_FILESPACE, GP_TRANSACTION_FILES_FILESPACE,\
- GP_TEMPORARY_FILES_FILESPACE, GetCurrentFilespaceEntries, GetFilespaceEntries, GetFilespaceEntriesDict,\
- RollBackFilespaceChanges, GetMoveOperationList, FileType, UpdateFlatFiles
-
-except ImportError, e:
- sys.exit('ERROR: Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e))
-
-
-# constants
-MAX_PARALLEL_EXPANDS=16
-MAX_BATCH_SIZE=128
-
-GPDB_STOPPED=1
-GPDB_STARTED=2
-GPDB_UTILITY=3
-
-FILE_SPACES_INPUT_FILENAME_SUFFIX = ".fs"
-SEGMENT_CONFIGURATION_BACKUP_FILE = "gpexpand.gp_segment_configuration"
-FILE_SPACES_INPUT_FILE_LINE_1_PREFIX = "filespaceOrder"
-
-description = ("""
-Adds additional segments to a pre-existing GPDB Array.
-""")
-
-_help = ["""
-The input file should be be a plain text file with a line for each segment
-to add with the format:
-
- <hostname>:<port>:<data_directory>:<dbid>:<content>:<definedprimary>
-""",
-"""
-If an input file is not specified, gpexpand will ask a series of questions
-and create one.
-""",
-]
-
-_TODO = ["""
-
-Remaining TODO items:
-====================
-""",
-
-"""* smarter heuristics on setting ranks. """,
-
-"""* make sure system isn't in "readonly mode" during setup. """,
-
-"""* need a startup validation where we check the status detail
- with gp_distribution_policy and make sure that our book
- keeping matches reality. we don't have a perfect transactional
- model since the tables can be in a different database from
- where the gpexpand schema is kept. """,
-
-"""* currently requires that GPHOME and PYTHONPATH be set on all of the remote hosts of
- the system. should get rid of this requirement. """
-]
-
-_usage = """[-f hosts_file] [-D database_name]
-
-gpexpand -i input_file [-D database_name] [-B batch_size] [-V]
-
-gpexpand [-d duration[hh][:mm[:ss]] | [-e 'YYYY-MM-DD hh:mm:ss']]
- [-a] [-n parallel_processes] [-D database_name]
-
-gpexpand -r [-D database_name]
-
-gpexpand -c [-D database_name]
-
-gpexpand -? | -h | --help | --verbose | -v"""
-
-EXECNAME = os.path.split(__file__)[-1]
-
-
-
-#----------------------- Command line option parser ----------------------
-
-def parseargs():
- parser = OptParser(option_class=OptChecker,
- description=' '.join(description.split()),
- version='%prog version $Revision: #174 $')
- parser.setHelp(_help)
- parser.set_usage('%prog ' + _usage)
- parser.remove_option('-h')
-
- parser.add_option('-c','--clean', action='store_true',
- help='remove the expansion schema.')
- parser.add_option('-r', '--rollback', action='store_true',
- help='rollback failed expansion setup.')
- parser.add_option('-a', '--analyze', action='store_true',
- help='Analyze the expanded table after redistribution.')
- parser.add_option('-d','--duration', type='duration', metavar='[h][:m[:s]]',
- help='duration from beginning to end.')
- parser.add_option('-e','--end', type='datetime', metavar='datetime',
- help="ending date and time in the format 'YYYY-MM-DD hh:mm:ss'.")
- parser.add_option('-i','--input', dest="filename",
- help="input expansion configuration file.", metavar="FILE")
- parser.add_option('-f', '--hosts-file', metavar='<hosts_file>',
- help='file containing new host names used to generate input file')
- parser.add_option('-D','--database', dest='database',
- help='Database to create the gpexpand schema and tables in. If this ' \
- 'option is not given, PGDATABASE will be used. The template1, ' \
- 'template0 and postgres databases cannot be used.')
- parser.add_option('-B', '--batch-size', type='int', default=16, metavar="<batch_size>",
- help='Expansion configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE)
- parser.add_option('-n', '--parallel', type="int", default=1, metavar="<parallel_processes>",
- help='number of tables to expand at a time. Valid values are 1-%d.' % MAX_PARALLEL_EXPANDS)
- parser.add_option('-v','--verbose', action='store_true',
- help='debug output.')
- parser.add_option('-h', '-?', '--help', action='help',
- help='show this help message and exit.')
- parser.add_option('-s', '--silent', action='store_true',
- help='Do not prompt for confirmation to proceed on warnings')
- parser.add_option('--usage', action="briefhelp")
-
- parser.set_defaults(verbose=False,filters=[], slice=(None, None))
-
- # Parse the command line arguments
- (options, args) = parser.parse_args()
-
- if len(args) > 0:
- logger.error('Unknown argument %s' % args[0])
- parser.exit(1)
-
- #-n sanity check
- if options.parallel > MAX_PARALLEL_EXPANDS or options.parallel < 1:
- logger.error('Invalid argument. parallel value must be >= 1 and <= %d' % MAX_PARALLEL_EXPANDS)
- parser.print_help()
- parser.exit(1)
-
- proccount=os.environ.get('GP_MGMT_PROCESS_COUNT')
- if options.batch_size == 16 and proccount is not None:
- options.batch_size = int(proccount)
-
- if options.batch_size < 1 or options.batch_size > 128:
- logger.error('Invalid argument. -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE)
- parser.print_help()
- parser.exit(1)
-
- # OptParse can return date instead of datetime so we might need to convert
- if options.end and not isinstance(options.end, datetime.datetime):
- options.end = datetime.datetime.combine(options.end, datetime.time(0))
-
- if options.end and options.end < datetime.datetime.now():
- logger.error('End time occurs in the past')
- parser.print_help()
- parser.exit(1)
-
- if options.end and options.duration:
- logger.warn('Both end and duration options were given.')
- # Both a duration and an end time were given.
- if options.end > datetime.datetime.now() + options.duration:
- logger.warn('The duration argument will be used for the expansion end time.')
- options.end = datetime.datetime.now() + options.duration
- else:
- logger.warn('The end argument will be used for the expansion end time.')
- elif options.duration:
- options.end = datetime.datetime.now() + options.duration
-
- # -c and -r options are mutually exclusive
- if options.rollback and options.clean:
- rollbackOpt = "--rollback" if "--rollback" in sys.argv else "-r"
- cleanOpt = "--clean" if "--clean" in sys.argv else "-c"
- logger.error("%s and %s options cannot be specified together." % (rollbackOpt, cleanOpt))
- parser.exit(1)
-
- try:
- options.master_data_directory = get_masterdatadir()
- options.gphome = get_gphome()
- except GpError, msg:
- logger.error(msg)
- parser.exit(1)
-
- if not os.path.exists(options.master_data_directory):
- logger.error('Master data directory does not exist.')
- parser.exit(1)
-
- if options.database and (options.database.lower() == 'template0'
- or options.database.lower() == 'template1'
- or options.database.lower() == 'postgres'):
- logger.error('%s cannot be used to store the gpexpand schema and tables' % options.database)
- parser.exit(1)
- elif not options.database:
- options.database = os.getenv('PGDATABASE', os.getenv('LOGNAME'))
-
- options.pgport = int(os.getenv('PGPORT', 5432))
-
- return options, args
-
-#-------------------------------------------------------------------------
-# process information functions
-def create_pid_file(master_data_directory):
- """Creates gpexpand pid file"""
- try:
- fp = open(master_data_directory + '/gpexpand.pid', 'w')
- fp.write(str(os.getpid()))
- except IOError:
- raise
- finally:
- if fp: fp.close()
-
-def remove_pid_file(master_data_directory):
- """Removes gpexpand pid file"""
- try:
- os.unlink(master_data_directory + '/gpexpand.pid')
- except:
- pass
-
-def is_gpexpand_running(master_data_directory):
- """Checks if there is another instance of gpexpand running"""
- is_running = False
- try:
- fp = open(master_data_directory + '/gpexpand.pid', 'r')
- pid = int(fp.readline().strip())
- fp.close()
- is_running = check_pid(pid)
- except IOError:
- pass
- except Exception, msg:
- raise
-
- return is_running
-
-
-def gpexpand_status_file_exists(master_data_directory):
- """Checks if gpexpand.pid exists"""
- return os.path.exists(master_data_directory + '/gpexpand.status')
-
-
-#-------------------------------------------------------------------------
-#expansion schema
-
-undone_status = "NOT STARTED"
-start_status = "IN PROGRESS"
-done_status = "COMPLETED"
-does_not_exist_status = 'NO LONGER EXISTS'
-
-gpexpand_schema='gpexpand'
-create_schema_sql = "CREATE SCHEMA " + gpexpand_schema
-drop_schema_sql = "DROP schema IF EXISTS %s CASCADE" % gpexpand_schema
-
-status_table='status'
-status_table_sql="""CREATE TABLE %s.%s
- ( status text,
- updated timestamp ) """ % (gpexpand_schema,status_table)
-
-status_detail_table='status_detail'
-status_detail_table_sql="""CREATE TABLE %s.%s
- ( dbname text,
- fq_name text,
- schema_oid oid,
- table_oid oid,
- distribution_policy smallint[],
- distribution_policy_names text,
- distribution_policy_coloids text,
- storage_options text,
- rank int,
- status text,
- expansion_started timestamp,
- expansion_finished timestamp,
- source_bytes numeric,
- updated timestamp) """ % (gpexpand_schema, status_detail_table)
-
-class StatusDetailCols(object):
- """
- StatusDetailCols represents columns for status_detail table.
- When updating the table definition, this class also needs to update.
- """
-
- class Column(object):
- def __init__(self, colorder, default):
- self.order = colorder
- self.value = default
-
- def set_value(self, val):
- self.value = val
-
- def get_value(self):
- return self.value
-
- def __init__(self, **kwargs):
- """
- @param kwargs with key column name and value select expression.
- """
- cols = [
- 'dbname',
- 'fq_name',
- 'schema_oid',
- 'table_oid',
- 'distribution_policy',
- 'distribution_policy_names',
- 'distribution_policy_coloids',
- 'storage_options',
- 'rank',
- 'status',
- 'expansion_started',
- 'expansion_finished',
- 'source_bytes',
- 'updated'
- ]
- colmap = {}
- for i, col in enumerate(cols):
- colmap[col] = StatusDetailCols.Column(i, col)
- # default value for updated. We need to use python datetime.now()
- # instead of database now(), as we compare it user input elsewhere.
- colmap['updated'].set_value(quote(datetime.datetime.now()))
- for k, v in kwargs.iteritems():
- colmap[k].set_value(v)
- self.cols = cols
- self.colmap = colmap
-
- def view_list(self):
- """
- view_list returns a list of columns used in view definition.
- """
- return ', '.join(filter(lambda x: x != 'updated', self.cols))
-
- def into_list(self):
- """
- into_list returns a list of column names used in INSERT INTO.
- """
- return ', '.join(self.cols)
-
- def value_list(self):
- """
- value_list returns a list of values referenced in SELECT targets.
- """
- return ', '.join(map(lambda x: self.colmap[x].get_value(), self.cols))
-
-def quote(arg):
- """Does someone care escapes?"""
- return "'" + str(arg) + "'"
-
-
-logical_status_view='logical_status'
-logical_status_view_sql="""CREATE VIEW {schema}.{viewname} AS
-SELECT
- {select}
-FROM(
- SELECT
- *,
- max(updated) OVER (PARTITION BY dbname, schema_oid, table_oid) AS last_updated
- FROM
- {schema}.{table}
-)s
-WHERE updated = last_updated
-""".format(
- schema=gpexpand_schema,
- viewname=logical_status_view,
- table=status_detail_table,
- select=StatusDetailCols().view_list())
-
-# gpexpand views
-progress_view='expansion_progress'
-progress_view_sql="""CREATE VIEW %s.%s AS
-SELECT
- CASE status
- WHEN '%s' THEN 'Tables Expanded'
- WHEN '%s' THEN 'Tables Left'
- WHEN '%s' THEN 'Tables In Progress'
- END AS Name,
- count(*)::text AS Value
-FROM %s.%s GROUP BY status
-
-UNION
-
-SELECT
- CASE status
- WHEN '%s' THEN 'Bytes Done'
- WHEN '%s' THEN 'Bytes Left'
- WHEN '%s' THEN 'Bytes In Progress'
- END AS Name,
- SUM(source_bytes)::text AS Value
-FROM %s.%s GROUP BY status
-
-UNION
-
-SELECT
- 'Estimated Expansion Rate' AS Name,
- (SUM(source_bytes) / (1 + extract(epoch FROM (max(expansion_finished) - min(expansion_started)))) / 1024 / 1024)::text || ' MB/s' AS Value
-FROM %s.%s
-WHERE status = '%s'
-AND
-expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY updated DESC LIMIT 1)
-
-UNION
-
-SELECT
-'Estimated Time to Completion' AS Name,
-CAST((SUM(source_bytes) / (
-SELECT 1 + SUM(source_bytes) / (1 + (extract(epoch FROM (max(expansion_finished) - min(expansion_started)))))
-FROM %s.%s
-WHERE status = '%s'
-AND
-expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY
-updated DESC LIMIT 1)))::text || ' seconds' as interval)::text AS Value
-FROM %s.%s
-WHERE status = '%s'
- OR status = '%s'""" % (gpexpand_schema, progress_view,
- done_status, undone_status, start_status,
- gpexpand_schema, logical_status_view,
- done_status, undone_status, start_status,
- gpexpand_schema, logical_status_view,
- gpexpand_schema, logical_status_view,
- done_status,
- gpexpand_schema, status_table,
- 'EXPANSION STARTED',
- gpexpand_schema, logical_status_view,
- done_status,
- gpexpand_schema, status_table,
- 'EXPANSION STARTED',
- gpexpand_schema, logical_status_view,
- start_status, undone_status)
-
-unalterable_table_sql = """
-SELECT
- current_database() AS database,
- pg_catalog.quote_ident(nspname) || '.' ||
- pg_catalog.quote_ident(relname) AS table,
- attnum,
- attlen,
- attbyval,
- attstorage,
- attalign,
- atttypmod,
- attndims,
- reltoastrelid != 0 AS istoasted
-FROM
- pg_catalog.pg_attribute,
- pg_catalog.pg_class,
- pg_catalog.pg_namespace
-WHERE
- attisdropped
- AND attnum >= 0
- AND attrelid = pg_catalog.pg_class.oid
- AND relnamespace = pg_catalog.pg_namespace.oid
- AND (attlen, attbyval, attalign, attstorage) NOT IN
- (SELECT typlen, typbyval, typalign, typstorage
- FROM pg_catalog.pg_type
- WHERE typisdefined AND typtype='b' )
-ORDER BY
- attrelid, attnum
-"""
-
-has_unique_index_sql = """
-SELECT
- current_database() || '.' || pg_catalog.quote_ident(nspname) || '.' || pg_catalog.quote_ident(relname) AS table
-FROM
- pg_class c,
- pg_namespace n,
- pg_index i
-WHERE
- i.indrelid = c.oid
- AND c.relnamespace = n.oid
- AND i.indisunique
- AND n.nspname not in ('pg_catalog', 'information_schema', 'pg_toast',
- 'pg_bitmapindex', 'pg_aoseg')
-"""
-
-#-------------------------------------------------------------------------
-class InvalidStatusError(Exception): pass
-class ValidationError(Exception): pass
-
-#-------------------------------------------------------------------------
-class GpExpandStatus():
- """Class that manages gpexpand status file.
-
- The status file is placed in the master data directory on both the master and
- the standby master. it's used to keep track of where we are in the progression.
- """
- def __init__(self, logger, master_data_directory, master_mirror=None):
- self.logger=logger
-
- self._status_values = { 'UNINITIALIZED': 1,
- 'EXPANSION_PREPARE_STARTED': 2,
- 'BUILD_SEGMENT_TEMPLATE_STARTED': 3,
- 'BUILD_SEGMENT_TEMPLATE_DONE': 4,
- 'BUILD_SEGMENTS_STARTED': 5,
- 'BUILD_SEGMENTS_DONE': 6,
- 'UPDATE_OLD_SEGMENTS_STARTED': 7,
- 'UPDATE_OLD_SEGMENTS_DONE': 8,
- 'UPDATE_CATALOG_STARTED': 9,
- 'UPDATE_CATALOG_DONE': 10,
- 'SETUP_EXPANSION_SCHEMA_STARTED': 11,
- 'SETUP_EXPANSION_SCHEMA_DONE': 12,
- 'PREPARE_EXPANSION_SCHEMA_STARTED': 13,
- 'PREPARE_EXPANSION_SCHEMA_DONE': 14,
- 'EXPANSION_PREPARE_DONE': 15
- }
- self._status = []
- self._status_info = []
- self._master_data_directory = master_data_directory
- self._master_mirror = master_mirror
- self._status_filename = master_data_directory + '/gpexpand.status'
- self._status_standby_filename = master_data_directory + '/gpexpand.standby.status'
- self._fp = None
- self._fp_standby = None
- self._temp_dir = None
- self._input_filename = None
- self._original_primary_count = None
- self._gp_segment_configuration_backup = None
-
- if os.path.exists(self._status_filename):
- self._read_status_file()
-
- def _read_status_file(self):
- """Reads in an existing gpexpand status file"""
- self.logger.debug("Trying to read in a pre-existing gpexpand status file")
- try:
- self._fp = open(self._status_filename, 'a+')
- self._fp.seek(0)
-
- for line in self._fp:
- (status, status_info) = line.rstrip().split(':')
- if status == 'BUILD_SEGMENT_TEMPLATE_STARTED':
- self._temp_dir = status_info
- elif status == 'BUILD_SEGMENTS_STARTED':
- self._tar_filename = status_info
- elif status == 'BUILD_SEGMENTS_DONE':
- self._number_new_segments = status_info
- elif status == 'EXPANSION_PREPARE_STARTED':
- self._input_filename = status_info
- elif status == 'UPDATE_OLD_SEGMENTS_STARTED':
- self._original_primary_count = int(status_info)
- elif status == 'UPDATE_CATALOG_STARTED':
- self._gp_segment_configuration_backup = status_info
-
- self._status.append(status)
- self._status_info.append(status_info)
- except IOError:
- raise
-
- if not self._status_values.has_key(self._status[-1]):
- raise InvalidStatusError('Invalid status file. Unknown status %s' % self._status)
-
- def create_status_file(self):
- """Creates a new gpexpand status file"""
- try:
- self._fp = open(self._status_filename, 'w')
- if self._master_mirror:
- self._fp_standby = open(self._status_standby_filename, 'w')
- self._fp_standby.write('UNINITIALIZED:None\n')
- self._fp_standby.flush()
- self._fp.write('UNINITIALIZED:None\n')
- self._fp.flush()
- self._status.append('UNINITIALIZED')
- self._status_info.append('None')
- except IOError:
- raise
-
- self._sync_status_file()
-
-
- def _sync_status_file(self):
- """Syncs the gpexpand status file with the master mirror"""
- if self._master_mirror:
- cpCmd = RemoteCopy('gpexpand copying status file to master mirror',
- self._status_standby_filename, self._master_mirror.getSegmentHostName(),
- self._status_filename)
- cpCmd.run(validateAfter=True)
-
- def set_status(self, status, status_info=None):
- """Sets the current status. gpexpand status must be set in
- proper order. Any out of order status result in an
- InvalidStatusError exception"""
- self.logger.debug("Transitioning from %s to %s" % (self._status[-1],status))
-
- if not self._fp:
- raise InvalidStatusError('The status file is invalid and cannot be written to')
- if not self._status_values.has_key(status):
- raise InvalidStatusError('%s is an invalid gpexpand status' % status)
- # Only allow state transitions forward or backward 1
- if self._status and \
- self._status_values[status] != self._status_values[self._status[-1]] + 1:
- raise InvalidStatusError('Invalid status transition from %s to %s' % (self._status[-1], status))
- if self._master_mirror:
- self._fp_standby.write('%s:%s\n' % (status, status_info))
- self._fp_standby.flush()
- self._sync_status_file()
- self._fp.write('%s:%s\n' % (status, status_info))
- self._fp.flush()
- self._status.append(status)
- self._status_info.append(status_info)
-
- def get_current_status(self):
- """Gets the current status that has been written to the gpexpand
- status file"""
- if (len(self._status) > 0 and len(self._status_info) > 0):
- return (self._status[-1], self._status_info[-1])
- else:
- return (None, None)
-
- def get_status_history(self):
- """Gets the full status history"""
- return zip(self._status, self._status_info)
-
- def remove_status_file(self):
- """Closes and removes the gpexand status file"""
- if self._fp:
- self._fp.close()
- self._fp = None
- if self._fp_standby:
- self._fp_standby.close()
- self._fp_standby = None
- if os.path.exists(self._status_filename):
- os.unlink(self._status_filename)
- if os.path.exists(self._status_standby_filename):
- os.unlink(self._status_standby_filename)
- if self._master_mirror:
- RemoveFiles.remote('gpexpand master mirror status file cleanup',
- self._master_mirror.getSegmentHostName(),
- self._status_filename)
-
- def remove_segment_configuration_backup_file(self):
- """ Remove the segment configuration backup file """
- self.logger.debug("Removing segment configuration backup file")
- if self._gp_segment_configuration_backup != None and os.path.exists(self._gp_segment_configuration_backup) == True:
- os.unlink(self._gp_segment_configuration_backup)
-
- def get_temp_dir(self):
- """Gets temp dir that was used during template creation"""
- return self._temp_dir
-
- def get_input_filename(self):
- """Gets input file that was used by expansion setup"""
- return self._input_filename
-
- def get_tar_filename(self):
- """Gets tar file that was used during template creation"""
- return self._tar_filename
-
- def get_number_new_segments(self):
- """ Gets the number of new segments added """
- return self._number_new_segments
-
- def get_original_primary_count(self):
- """Returns the original number of primary segments"""
- return self._original_primary_count
-
- def get_gp_segment_configuration_backup(self):
- """Gets the filename of the gp_segment_configuration backup file
- created during expansion setup"""
- return self._gp_segment_configuration_backup
-
- def set_gp_segment_configuration_backup(self, filename):
- """Sets the filename of the gp_segment_configuration backup file"""
- self._gp_segment_configuration_backup = filename
-
- def is_standby(self):
- """Returns True if running on standby"""
- return os.path.exists(self._master_data_directory + self._status_standby_filename)
-
-#-------------------------------------------------------------------------
-
-class ExpansionError(Exception): pass
-class SegmentTemplateError(Exception): pass
-
-#-------------------------------------------------------------------------
-class SegmentTemplate:
- """Class for creating, distributing and deploying new segments to an
- existing GPDB array"""
-
- def __init__(self, logger, statusLogger, pool,
- gparray, masterDataDirectory,
- dburl,conn, tempDir,
- schemaTarFile='gpexpand_schema.tar'):
- self.logger = logger
- self.statusLogger = statusLogger
- self.pool = pool
- self.gparray = gparray
- self.tempDir = tempDir
- self.dburl = dburl
- self.conn = conn
- self.gparray = gparray
- self.masterDataDirectory = masterDataDirectory
- self.schema_tar_file = schemaTarFile
- self.maxDbId = self.gparray.get_max_dbid()
-
- hosts = []
- for seg in gparray.getExpansionSegDbList():
- hosts.append(seg.getSegmentHostName())
- self.hosts = SegmentTemplate.consolidate_hosts(pool, hosts)
- logger.debug('Hosts: %s' % self.hosts)
-
- @staticmethod
- def consolidate_hosts(pool, hosts):
- tmpHosts = {}
- consolidatedHosts = []
-
- for host in hosts:
- tmpHosts[host] = 0
-
- for host in tmpHosts.keys():
- hostnameCmd = Hostname('gpexpand associating hostnames with segments', ctxt=REMOTE, remoteHost=host)
- pool.addCommand(hostnameCmd)
-
- pool.join()
-
- finished_cmds = pool.getCompletedItems()
-
- for cmd in finished_cmds:
- if not cmd.was_successful():
- raise SegmentTemplateError(cmd.get_results())
- if cmd.get_hostname() not in consolidatedHosts:
- logger.debug('Adding %s to host list' % cmd.get_hostname())
- consolidatedHosts.append(cmd.get_hostname())
-
- return consolidatedHosts
-
-
- def build_segment_template(self):
- """Builds segment template tar file"""
- self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_STARTED', self.tempDir)
- self._create_template()
- self._fixup_template()
- self._tar_template()
- self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_DONE')
-
-
- def build_new_segments(self):
- """Deploys the template tar file and configures the new segments"""
- self.statusLogger.set_status('BUILD_SEGMENTS_STARTED', os.path.abspath(self.schema_tar_file))
- self._distribute_template()
- self._configure_new_segments()
- numNewSegments = len(self.gparray.getExpansionSegDbList())
- self.statusLogger.set_status('BUILD_SEGMENTS_DONE', numNewSegments)
-
-
- def _create_template(self):
- """Creates the schema template that is used by new segments"""
- self.logger.info('Creating segment template')
-
- MakeDirectory.local('gpexpand create temp dir',self.tempDir)
-
- self.oldSegCount = self.gparray.get_segment_count()
-
- GpStop.local('gpexpand _create_template stop gpdb', masterOnly=True, fast=True)
-
- # Verify that we actually stopped
- self.logger.debug('Validating array state')
- pgControlDataCmd = PgControlData('Validate stopped', self.masterDataDirectory)
- state = None
- try:
- pgControlDataCmd.run(validateAfter=True)
- except Exception, e:
- raise SegmentTemplateError(e)
- state = pgControlDataCmd.get_value('Database cluster state')
- if state != 'shut down':
- raise SegmentTemplateError('Failed to stop the array. pg_controldata return state of %s' % state)
-
- try:
- srcSeg = self._select_src_segment()
- srcSeg.createTemplate(dstDir=self.tempDir)
- except Exception, msg:
- raise SegmentTemplateError(msg)
-
-
- def _select_src_segment(self):
- """Gets a segment to use as a source for pg_hba.conf
- and postgresql.conf files"""
- seg = self.gparray.segments[0]
- if seg.primaryDB.valid:
- self.srcSegHostname = seg.primaryDB.getSegmentHostName()
- self.srcSegDataDir = seg.primaryDB.getSegmentDataDirectory()
- return seg.primaryDB
- else:
- raise SegmentTemplateError("no valid segdb for content=0 to use as a template")
-
-
- def _distribute_template(self):
- """Distributes the template tar file to the new segments and expands it"""
- self.logger.info('Distributing template tar file to new hosts')
-
- self._distribute_tarfile()
-
- def _distribute_tarfile(self):
- """Distributes template tar file to hosts"""
- for host in self.hosts:
- logger.debug('Copying tar file to %s' % host)
- cpCmd = RemoteCopy('gpexpand distribute tar file to new hosts', self.schema_tar_file, host, '.')
- self.pool.addCommand(cpCmd)
-
- self.pool.join()
- self.pool.check_results()
-
-
- def _configure_new_segments(self):
- """Configures new segments. This includes modifying the postgresql.conf file
- and setting up the gp_id table"""
-
- self.logger.info('Configuring new segments (primary)')
- new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getExpansionSegDbList(), primaryMirror = 'primary')
- for host in iter(new_segment_info):
- segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', new_segment_info[host],
- tarFile=self.schema_tar_file, newSegments=True,
- verbose=gplog.logging_is_verbose(), batchSize=options.batch_size,
- ctxt=REMOTE, remoteHost=host)
- self.pool.addCommand(segCfgCmd)
-
- self.pool.join()
- self.pool.check_results()
-
- def _get_transaction_filespace_dir(self, transaction_flat_file):
- filespace_dir = None
-
- with open(transaction_flat_file) as file:
- for line in file:
- fs_info = line.split()
- if len(fs_info) != 2:
- continue
- filespace_dir = fs_info[1]
-
- return filespace_dir
-
- def _fixup_template(self):
- """Copies postgresql.conf and pg_hba.conf files from a valid segment on the system.
- Then modifies the template copy of pg_hba.conf"""
-
- self.logger.info('Copying postgresql.conf from existing segment into template')
-
- localHostname = self.gparray.master.getSegmentHostName()
- cpCmd = RemoteCopy('gpexpand copying postgresql.conf to %s:%s/postgresql.conf' % (self.srcSegHostname, self.srcSegDataDir),
- self.srcSegDataDir + '/postgresql.conf', localHostname,
- self.tempDir, ctxt=REMOTE, remoteHost=self.srcSegHostname)
- cpCmd.run(validateAfter=True)
-
-
- self.logger.info('Copying pg_hba.conf from existing segment into template')
- cpCmd = RemoteCopy('gpexpand copy pg_hba.conf to %s:%s/pg_hba.conf' % (self.srcSegHostname, self.srcSegDataDir),
- self.srcSegDataDir + '/pg_hba.conf', localHostname,
- self.tempDir, ctxt=REMOTE, remoteHost=self.srcSegHostname)
- cpCmd.run(validateAfter=True)
-
- #Copy the transaction directories into template
- pg_system_filespace_entries = GetFilespaceEntriesDict(GetFilespaceEntries(self.gparray,
- PG_SYSTEM_FILESPACE).run()).run()
- transaction_flat_file = os.path.join(pg_system_filespace_entries[1][2], GP_TRANSACTION_FILES_FILESPACE)
- filespace_dir = None
- if os.path.exists(transaction_flat_file):
- filespace_dir = self._get_transaction_filespace_dir(transaction_flat_file)
- logger.debug('Filespace location = %s' % filespace_dir)
-
- if filespace_dir:
- transaction_files_dir = ['pg_xlog', 'pg_multixact', 'pg_subtrans', 'pg_clog',
- 'pg_distributedlog', 'pg_distributedxidmap']
- for directory in transaction_files_dir:
- dst_dir = os.path.join(self.tempDir, directory)
- src_dir = os.path.join(filespace_dir, directory)
-
- mkCmd = MakeDirectory('gpexpand creating transaction directories in template', dst_dir)
- mkCmd.run(validateAfter=True)
- cpCmd = LocalDirCopy('gpexpand copying dir %s' % src_dir, src_dir, dst_dir)
- cpCmd.run(validateAfter=True)
-
- # Don't need log files and gpperfmon files in template.
- rmCmd = RemoveFiles('gpexpand remove gppermfon data from template',
- self.tempDir + '/gpperfmon/data')
- rmCmd.run(validateAfter=True)
- rmCmd = RemoveFiles('gpexpand remove logs from template',
- self.tempDir + '/pg_log/*')
- rmCmd.run(validateAfter=True)
-
- #other files not needed
- rmCmd = RemoveFiles('gpexpand remove postmaster.opt from template',
- self.tempDir + '/postmaster.opts')
- rmCmd.run(validateAfter=True)
- rmCmd = RemoveFiles('gpexpand remove postmaster.pid from template',
- self.tempDir + '/postmaster.pid')
- rmCmd.run(validateAfter=True)
- rmCmd = RemoveFiles('gpexpand remove gpexpand files from template',
- self.tempDir + '/gpexpand.*')
- rmCmd.run(validateAfter=True)
-
- #We dont need the flat files
- rmCmd = RemoveFiles('gpexpand remove transaction flat file from template',
- self.tempDir + '/' + GP_TRANSACTION_FILES_FILESPACE)
- rmCmd.run(validateAfter=True)
- rmCmd = RemoveFiles('gpexpand remove temporary flat file from template',
- self.tempDir + '/' + GP_TEMPORARY_FILES_FILESPACE)
- rmCmd.run(validateAfter=True)
-
- self.logger.info('Adding new segments into template pg_hba.conf')
- try:
- fp = open(self.tempDir + '/pg_hba.conf', 'a')
- try:
- new_host_set = set()
- for newSeg in self.gparray.getExpansionSegDbList():
- address = newSeg.getSegmentAddress()
- host = newSeg.getSegmentHostName()
- new_host_set.add(host)
- addrinfo = socket.getaddrinfo(address, None)
- ipaddrlist = list(set([ (ai[0], ai[4][0]) for ai in addrinfo]))
- fp.write('# %s\n' % address)
- for addr in ipaddrlist:
- fp.write('host\tall\tall\t%s/%s\ttrust\n' % (addr[1], '32' if addr[0] == socket.AF_INET else '128'))
-
- for new_host in new_host_set:
- addrinfo = socket.getaddrinfo(new_host, None)
- ipaddrlist = list(set([ (ai[0], ai[4][0]) for ai in addrinfo]))
- fp.write('# %s\n' % new_host)
- for addr in ipaddrlist:
- fp.write('host\tall\tall\t%s/%s\ttrust\n' % (addr[1], '32' if addr[0] == socket.AF_INET else '128'))
-
- finally:
- fp.close()
- except IOError, msg:
- raise SegmentTemplateError('Failed to open %s/pg_hba.conf' % self.tempDir)
- except Exception, msg:
- raise SegmentTemplateError('Failed to add new segments to template pg_hba.conf')
-
-
- def _tar_template(self):
- """Tars up the template files"""
- self.logger.info('Creating schema tar file')
- tarCmd = CreateTar('gpexpand tar segment template', self.tempDir, self.schema_tar_file)
- tarCmd.run(validateAfter=True)
-
-
- @staticmethod
- def cleanup_build_segment_template(tarFile, tempDir):
- """Reverts the work done by build_segment_template. Deletes the temp
- directory and local tar file"""
- rmCmd = RemoveFiles('gpexpand remove temp dir: %s' % tempDir, tempDir)
- rmCmd.run(validateAfter=True)
- rmCmd = RemoveFiles('gpexpand remove segment template file', tarFile)
- rmCmd.run(validateAfter=True)
-
-
- @staticmethod
- def cleanup_build_new_segments(pool, tarFile, gparray, hosts=None, removeDataDirs=False):
- """Cleans up the work done by build_new_segments. Deletes remote tar files and
- and removes remote data directories"""
-
- if not hosts:
- hosts = []
- for seg in gparray.getExpansionSegDbList():
- hosts.append(seg.getSegmentHostName())
-
- # Remove template tar file
- for host in hosts:
- rmCmd = RemoveFiles('gpexpand remove segment template file on host: %s' % host,
- tarFile, ctxt=REMOTE, remoteHost=host)
- pool.addCommand(rmCmd)
-
- if removeDataDirs:
- for seg in gparray.getExpansionSegDbList():
- hostname = seg.getSegmentHostName()
- filespaces = seg.getSegmentFilespaces()
- for oid in filespaces:
- datadir = filespaces[oid]
- rmCmd = RemoveFiles('gpexpand remove new segment data directory: %s:%s' % (hostname, datadir),
- datadir, ctxt=REMOTE, remoteHost=hostname)
- pool.addCommand(rmCmd)
- pool.join()
- pool.check_results()
-
-
- def cleanup(self):
- """Cleans up temporary files from the local system and new segment hosts"""
-
- self.logger.info('Cleaning up temporary template files')
- SegmentTemplate.cleanup_build_segment_template(self.schema_tar_file, self.tempDir)
- SegmentTemplate.cleanup_build_new_segments(self.pool, self.schema_tar_file, self.gparray, self.hosts)
-
-
-
-
-
-#------------------------------------------------------------------------------------------------------
-#------------------------------------------------------------------------------------------------------
-class NewSegmentInput:
-
- def __init__(self, hostname, address, port, datadir, dbid, contentId, role, replicationPort = None, fileSpaces = None):
- self.hostname = hostname
- self.address = address
- self.port = port
- self.datadir = datadir
- self.dbid = dbid
- self.contentId = contentId
- self.role = role
- self.replicationPort = replicationPort
- self.fileSpaces = fileSpaces
-
-
-#------------------------------------------------------------------------------------------------------
-#------------------------------------------------------------------------------------------------------
-class gpexpand:
-
- def __init__(self,logger,gparray,dburl,parallel=1):
- self.pastThePointOfNoReturn = False
- self.logger = logger
- self.dburl = dburl
- self.numworkers=parallel
- self.gparray = gparray
- self.unique_index_tables = {}
- self.conn = dbconn.connect(self.dburl,utility=True, encoding='UTF8', allowSystemTableMods = 'dml')
- self.old_segments = gparray.getSegDbList()
-
- if dburl.pgdb == 'template0' or dburl.pgdb == 'template1' or dburl.pgdb == 'postgres':
- raise ExpansionError("Invalid database '%s' specified. Cannot use a template database.\n"
- "Please set the environment variable PGDATABASE to a different "
- "database or use the -D option to specify a database and re-run" % dburl.pgdb)
-
-
- datadir = gparray.master.getSegmentDataDirectory()
- self.statusLogger = GpExpandStatus(logger, datadir, gparray.standbyMaster)
-
- # Adjust batch size if it's too high given the number of segments
- seg_count = len(gparray.getSegDbList())
- if options.batch_size > seg_count:
- options.batch_size = seg_count
- self.pool = WorkerPool(numWorkers=options.batch_size)
-
- self.tempDir = self.statusLogger.get_temp_dir()
- if not self.tempDir:
- self.tempDir = createTempDirectoryName(options.master_data_directory, "gpexpand")
- self.queue = None
- self.segTemplate = None
- pass
-
- @staticmethod
- def prepare_gpdb_state(logger, dburl):
- """ Gets GPDB in the appropriate state for an expansion.
- This state will depend on if this is a new expansion setup,
- a continuation of a previous expansion or a rollback """
- # Get the database in the expected state for the expansion/rollback
- status_file_exists = os.path.exists(options.master_data_directory + '/gpexpand.status')
- gpexpand_db_status = None
-
- if status_file_exists:
- # gpexpand status file exists so the last run of gpexpand didn't finish properly
- gpexpand.get_gpdb_in_state(GPDB_UTILITY)
- else:
- gpexpand.get_gpdb_in_state(GPDB_STARTED)
-
- logger.info('Querying gpexpand schema for current expansion state')
- try:
- gpexpand_db_status = gpexpand.get_status_from_db(dburl)
- except Exception, e:
- raise Exception('Error while trying to query the gpexpand schema: %s' % e)
- logger.debug('Expansion status returned is %s' % gpexpand_db_status)
-
- if (not gpexpand_db_status and options.filename) and not options.clean:
- # New expansion, need to be in master only
- logger.info('Readying HAWQ for a new expansion')
- gpexpand.get_gpdb_in_state(GPDB_UTILITY)
-
- return gpexpand_db_status
-
- @staticmethod
- def get_gpdb_in_state(state):
- runningStatus = chk_local_db_running(options.master_data_directory, options.pgport)
- gpdb_running = runningStatus[0] and runningStatus[1] and runningStatus[2] and runningStatus[3]
- if gpdb_running:
- gpdb_mode = get_local_db_mode(options.master_data_directory)
-
- if state == GPDB_STARTED:
- if gpdb_running:
- if gpdb_mode != 'UTILITY':
- return
- else:
- GpStop.local('Stop GPDB', masterOnly=True, fast=True)
- GpStart.local('Start GPDB')
- elif state == GPDB_STOPPED:
- if gpdb_running:
- if gpdb_mode != 'UTILITY':
- GpStop.local('Stop GPDB', fast=True)
- else:
- GpStop.local('Stop GPDB', masterOnly=True, fast=True)
- elif state == GPDB_UTILITY:
- if gpdb_running:
- if gpdb_mode == 'UTILITY':
- return
- GpStop.local('Stop GPDB', fast=True)
- GpStart.local('Start GPDB in master only mode', masterOnly=True)
- else:
- raise Exception('Unkown gpdb state')
-
- @staticmethod
- def get_status_from_db(dburl):
- """Gets gpexpand status from the gpexpand schema"""
- status_conn = None
- gpexpand_db_status = None
- if get_local_db_mode(options.master_data_directory) == 'NORMAL':
- try:
- status_conn = dbconn.connect(dburl, encoding='UTF8')
- # Get the last status entry
- cursor = dbconn.execSQL(status_conn,'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
- if cursor.rowcount == 1:
- gpexpand_db_status = cursor.fetchone()[0]
-
- except Exception, e:
- # expansion schema doesn't exists or there was a connection failure.
- pass
- finally:
- if status_conn: status_conn.close()
-
- # make sure gpexpand schema doesn't exist since it wasn't in DB provided
- if not gpexpand_db_status:
- """
- MPP-14145 - If there's no discernable status, the schema must not exist.
-
- The checks in get_status_from_db claim to look for existence of the 'gpexpand' schema, but more accurately they're
- checking for non-emptiness of the gpexpand.status table. If the table were empty, but the schema did exist, gpexpand would presume
- a new expansion was taking place and it would try to CREATE SCHEMA later, which would fail. So, here, if this is the case, we error out.
-
- Note: -c/--clean will not necessarily work either, as it too has assumptions about the non-emptiness of the gpexpand schema.
- """
- with dbconn.connect(dburl, encoding='UTF8', utility=True) as conn:
- count = dbconn.execSQLForSingleton(conn, "select count(n.nspname) from pg_catalog.pg_namespace n where n.nspname = 'gpexpand'")
- if count > 0:
- raise ExpansionError("Existing expansion state could not be determined, but a gpexpand schema already exists. Cannot proceed.")
-
- # now determine whether gpexpand schema merely resides in another DB
- status_conn = dbconn.connect(dburl, encoding='UTF8')
- db_list = catalog.getDatabaseList(status_conn)
- status_conn.close()
-
- for db in db_list:
- dbname=db[0]
- if dbname in ['template0', 'template1', 'postgres', dburl.pgdb]:
- continue
- logger.debug('Looking for gpexpand schema in %s' % dbname.decode('utf-8'))
- test_url = copy.deepcopy(dburl)
- test_url.pgdb = dbname
- c = dbconn.connect(test_url, encoding='UTF8')
- try:
- cursor = dbconn.execSQL(c, 'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
- except:
- # Not in here
- pass
- else:
- raise ExpansionError("""gpexpand schema exists in database %s, not in %s.
-Set PGDATABASE or use the -D option to specify the correct database to use.""" % (dbname.decode('utf-8'), options.database))
- finally:
- if c:
- c.close()
-
- return gpexpand_db_status
-
- def validate_max_connections(self):
- try:
- conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
- max_connections = int(catalog.getSessionGUC(conn, 'max_connections'))
- except DatabaseError,ex:
- if options.verbose:
- logger.exception(ex)
- logger.error('Failed to check max_connections GUC')
- if conn: conn.close()
- raise ex
-
- if max_connections < options.parallel * 2 + 1:
- self.logger.error('max_connections is too small to expand %d tables at' % options.parallel)
- self.logger.error('a time. This will lead to connection errors. Either')
- self.logger.error('reduce the value for -n passed to gpexpand or raise')
- self.logger.error('max_connections in postgresql.conf')
- return False
-
- return True
-
- def validate_unalterable_tables(self):
- conn = None
- max_connections = 0
- unalterable_tables = []
-
- try:
- conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
- databases = catalog.getDatabaseList(conn)
- conn.close()
-
- tempurl = copy.deepcopy(self.dburl)
- for db in databases:
- if db[0] == 'template0':
- continue
- self.logger.info('Checking database %s for unalterable tables...' % db[0].decode('utf-8'))
- tempurl.pgdb = db[0]
- conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
- cursor = dbconn.execSQL(conn, unalterable_table_sql)
- for row in cursor:
- unalterable_tables.append(row)
- cursor.close()
- conn.close()
-
- except DatabaseError,ex:
- if options.verbose:
- logger.exception(ex)
- logger.error('Failed to check for unalterable tables.')
- if conn: conn.close()
- raise ex
-
- if len(unalterable_tables) > 0:
- self.logger.error('The following tables cannot be altered because they contain')
- self.logger.error('dropped columns of user defined types:')
- for t in unalterable_tables:
- self.logger.error('\t%s.%s' % (t[0].decode('utf-8'), t[1].decode('utf-8')))
- self.logger.error('Please consult the documentation for instructions on how to')
- self.logger.error('correct this issue, then run gpexpand again')
- return False
-
- return True
-
- def check_unique_indexes(self):
- """ Checks if there are tables with unique indexes.
- Returns true if unique indexes exist"""
-
- conn = None
- has_unique_indexes = False
-
- try:
- conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
- databases = catalog.getDatabaseList(conn)
- conn.close()
-
- tempurl = copy.deepcopy(self.dburl)
- for db in databases:
- if db[0] == 'template0':
- continue
- self.logger.info('Checking database %s for tables with unique indexes...' % db[0].decode('utf-8'))
- tempurl.pgdb = db[0]
- conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
- cursor = dbconn.execSQL(conn, has_unique_index_sql)
- for row in cursor:
- has_unique_indexes = True
- self.unique_index_tables[row[0]] = True
- cursor.close()
- conn.close()
-
- except DatabaseError,ex:
- if options.verbose:
- logger.exception(ex)
- logger.error('Failed to check for unique indexes.')
- if conn: conn.close()
- raise ex
-
- return has_unique_indexes
-
- def rollback(self):
- """Rolls back and expansion setup that didn't successfully complete"""
- cleanSchema = False
- status_history = self.statusLogger.get_status_history()
- if not status_history:
- raise ExpansionError('No status history to rollback.')
-
- if (status_history[-1])[0] == 'EXPANSION_PREPARE_DONE':
- raise ExpansionError('Expansion preparation complete. Nothing to rollback')
-
- for status in reversed(status_history):
- if status[0] == 'BUILD_SEGMENT_TEMPLATE_STARTED':
- if self.statusLogger.is_standby():
- self.logger.info('Running on standby master, skipping segment template rollback')
- continue
- self.logger.info('Rolling back segment template build')
- SegmentTemplate.cleanup_build_segment_template('gpexpand_schema.tar', status[1])
-
- elif status[0] == 'BUILD_SEGMENTS_STARTED':
- self.logger.info('Rolling back building of new segments')
- newSegList = self.read_input_files(self.statusLogger.get_input_filename())
- self.addNewSegments(newSegList)
- SegmentTemplate.cleanup_build_new_segments(self.pool, self.statusLogger.get_tar_filename(),
- self.gparray, removeDataDirs=True)
-
- elif status[0] == 'UPDATE_OLD_SEGMENTS_STARTED':
- self.logger.info('Rolling back update of original segments')
- self.restore_original_segments()
-
- elif status[0] == 'UPDATE_CATALOG_STARTED':
- self.logger.info('Rolling back master update')
- self.restore_master()
- self.gparray = GpArray.initFromCatalog(dburl, utility=True, useAllSegmentFileSpaces=True)
-
- elif status[0] == 'SETUP_EXPANSION_SCHEMA_STARTED':
- cleanSchema = True
- else:
- self.logger.debug('Skipping %s' % status[0])
-
- GpStop.local('gpexpand rollback', masterOnly=True, fast=True)
-
- if cleanSchema:
- GpStart.local('gpexpand rollback start database restricted', restricted=True)
- self.logger.info('Dropping expansion expansion schema')
- schema_conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods='dml')
- try:
- dbconn.execSQL(schema_conn, drop_schema_sql)
- schema_conn.commit()
- schema_conn.close()
- except:
- pass # schema wasn't created yet.
- GpStop('gpexpand rollback stop database', fast=True)
-
- self.statusLogger.remove_status_file()
- self.statusLogger.remove_segment_configuration_backup_file()
-
- def get_state(self):
- """Returns expansion state from status logger"""
- return self.statusLogger.get_current_status()[0]
-
- def generate_inputfile(self):
- """Writes a gpexpand input file based on expansion segments
- added to gparray by the gpexpand interview"""
- outputfile = 'gpexpand_inputfile_' + strftime("%Y%m%d_%H%M%S")
- outfile = open(outputfile, 'w')
-
- logger.info("Generating input file...")
-
- for db in gparray.getExpansionSegDbList():
- tempStr = "%s:%s:%d:%s:%d:%d:%s" % ( canonicalize_address( db.getSegmentHostName() )
- , canonicalize_address( db.getSegmentAddress() )
- , db.getSegmentPort()
- , db.getSegmentDataDirectory()
- , db.getSegmentDbId()
- , db.getSegmentContentId()
- , db.getSegmentPreferredRole()
- )
- if db.getSegmentReplicationPort() != None:
- tempStr = tempStr + ':' + str(db.getSegmentReplicationPort())
- outfile.write(tempStr + "\n")
-
- outfile.close()
-
- return outputfile
-
- #------------------------------------------------------------------------
- def generate_filespaces_inputfile(self, outFileNamePrefix):
- """
- Writes a gpexpand filespace input file based on expansion segments
- added to gparray by the gpexpand interview. If the new segments
- contain filespaces, then return the name of the file, else return
- None.
- """
- filespaces = self.gparray.getNonSystemFilespaces()
- if filespaces != None and len(filespaces) > 0:
- outputfile = outFileNamePrefix + FILE_SPACES_INPUT_FILENAME_SUFFIX
- else:
- outputfile = None
-
- if outputfile != None:
- outfileFD = open(outputfile, 'w')
-
- logger.info("Generating filespaces input file...")
-
- firstLine = FILE_SPACES_INPUT_FILE_LINE_1_PREFIX + "="
- firstFs = True
- for fs in filespaces:
- if firstFs == True:
- firstLine = firstLine + fs.getName()
- firstFs = False
- else:
- firstLine = firstLine + ":" + fs.getName()
- outfileFD.write(firstLine + '\n')
-
- for db in gparray.getExpansionSegDbList():
- dbid = db.getSegmentDbId()
- outLine = str(dbid)
- segmentFilespaces = db.getSegmentFilespaces()
- for fs in filespaces:
- oid = fs.getOid()
- path = segmentFilespaces[oid]
- outLine = outLine + "|" + path
- outfileFD.write(outLine + '\n')
-
- outfileFD.close()
-
- return outputfile
-
-
- def addNewSegments(self, inputFileEntryList):
- for seg in inputFileEntryList:
- self.gparray.addExpansionSeg( content = int(seg.contentId)
- , preferred_role = seg.role
- , dbid = int(seg.dbid)
- , role = seg.role
- , hostname = seg.hostname.strip()
- , address = seg.address.strip()
- , port = int(seg.port)
- , datadir = os.path.abspath(seg.datadir.strip())
- , replication_port = seg.replicationPort
- , fileSpaces = seg.fileSpaces
- )
- try:
- self.gparray.validateExpansionSegs()
- except Exception, e:
- raise ExpansionError('Invalid input file: %s' % e)
-
-
- def read_input_files(self, inputFilename=None):
- """Reads and validates line format of the input file passed
- in on the command line via the -i arg"""
-
- retValue = []
-
- if not options.filename and not inputFilename:
- raise ExpansionError('Missing input file')
-
- if options.filename:
- inputFilename = options.filename
- fsInputFilename = inputFilename + FILE_SPACES_INPUT_FILENAME_SUFFIX
- fsOidList = []
- fsDictionary = {}
- f = None
- try:
- existsCmd = FileDirExists(name = "gpexpand see if .fs file exists", directory = fsInputFilename)
- existsCmd.run(validateAfter = True)
- exists = existsCmd.filedir_exists()
- if exists == False and len(self.gparray.getNonSystemFilespaces()) != 0:
- raise ExpansionError("Expecting filespaces input file: " + fsInputFilename)
- if exists == True:
- # We'll save filespace paths for segment 0 to verify if the Filespace paths in the input .fs file
- # match with corrosponding shared filesystem filespace location prefixes
- # E.g. Seg 0 filespace location -> /gpsql/gpseg0
- # If Input Input file contains /gpsql/ as the filespace location for new segment that's correct
- # But instead if Input file contains /xyz/ as the filespace location that is incorrect
- fsPaths = []
- filespaceDict = gparray.segments[0].primaryDB.getSegmentFilespaces()
-
- f = open(fsInputFilename, 'r')
- for lineNumber, l in line_reader(f):
- if lineNumber == 1:
- fsNameString = l.strip().split("=")
- fsNameList = fsNameString[1].strip().split(":")
- for name in fsNameList:
- oid = self.gparray.getFileSpaceOid(name)
- if oid == None:
- raise ExpansionError("Unknown filespace name: " + str(name))
- fsOidList.append(oid)
-
- # These prefix paths will be used for matching prefix paths given in the '.fs' file
- fsPaths.append(GpDB.getDataDirPrefix(filespaceDict[oid]))
-
- # Make sure all the filepace names are specified.
- if len(fsNameList) != len(self.gparray.getNonSystemFilespaces()):
- missingFsNames = []
- filespaces = self.gparray.getAllFilespaces()
- for fs in filespaces:
- if fs.getName() not in fsNameList:
- missingFsNames.append(fs.getName())
- raise ExpansionError("Missing filespaces: " + str(missingFsNames))
-
- else:
- fsLine = l.strip().split("|")
-
- # Verify if filespace path prefixs match input file entries
- for idx in range(1, len(fsLine)):
- if (fsPaths[idx - 1] != GpDB.getDataDirPrefix(fsLine[idx])):
- raise ExpansionError("Prefix mismatch for input filespace location "
- "(Expected = " + fsPaths[idx-1] +
- " Actual = " + fsLine[idx] + ")")
- try:
- fsDbid = int(fsLine[0])
- fsDictionary[fsLine[0]] = fsLine[1:]
- except Exception, e:
- raise ExpansionError("Problem with inputfile %s, line number %s, exceptin %s." % \
- (fsInputFilename, str(lineNumber), str(e)))
-
- except IOError, ioe:
- raise ExpansionError('Problem with filespace input file: %s. Exception: %s' % (fsInputFilename, str(ioe)))
- finally:
- if f != None:
- f.close()
-
- try:
- f = open(inputFilename, 'r')
- try:
- for line, l in line_reader(f):
-
- hostname, address, port, datadir, dbid, contentId, role, replicationPort \
- = parse_gpexpand_segment_line(inputFilename, line, l)
-
- filespaces = {}
- if len(fsDictionary) > 0:
- fileSpacesPathList = fsDictionary[dbid]
- else:
- fileSpacesPathList = []
- index = 0
- for oid in fsOidList:
- filespaces[oid] = fileSpacesPathList[index]
- index = index + 1
-
- # Check that input values look reasonable.
- if hostname == None or len(hostname) == 0:
- raise ExpansionError("Invalid host name on line " + str(line))
- if address == None or len(address) == 0:
- raise ExpansionError("Invaid address on line " + str(line))
- if port == None or str(port).isdigit() == False or int(port) < 0:
- raise ExpansionError("Invalid port number on line " + str(line))
- if datadir == None or len(datadir) == 0:
- raise ExpansionError("Invalid data directory on line " + str(line))
- if dbid == None or str(dbid).isdigit() == False or int(dbid) < 0:
- raise ExpansionError("Invalid dbid on line " + str(line))
- if contentId == None or str(contentId).isdigit() == False or int(contentId) < 0:
- raise ExpansionError("Invalid contentId on line " + str(line))
- if role == None or len(role) > 1 or (role != 'p' and role != 'm'):
- raise ExpansionError("Invalid role on line " + str(line))
- if replicationPort != None and int(replicationPort) < 0:
- raise ExpansionError("Invalid replicationPort on line " + str(line))
-
- retValue.append(NewSegmentInput( hostname = hostname
- , port = port
- , address = address
- , datadir = datadir
- , dbid = dbid
- , contentId = contentId
- , role = role
- , replicationPort = replicationPort
- , fileSpaces = filespaces
- ) )
- except ValueError:
- raise ExpansionError('Missing or invalid value on line %d.' % line)
- except Exception, e:
- raise ExpansionError('Invalid input file on line %d: %s' % (line, str(e)))
- finally:
- f.close()
- return retValue
- except IOError:
- raise ExpansionError('Input file %s not found' % options.filename)
-
-
- def add_segments(self):
- """Starts the process of adding the new segments to the array"""
- self.segTemplate = SegmentTemplate(self.logger, self.statusLogger, self.pool,
- self.gparray, options.master_data_directory,
- self.dburl,self.conn, self.tempDir)
- try:
- self.segTemplate.build_segment_template()
- self.segTemplate.build_new_segments()
- except SegmentTemplateError, msg:
- raise ExpansionError(msg)
-
-
- def update_original_segments(self):
- """Updates the pg_hba.conf file and updates the gp_id catalog table
- of existing hosts"""
- self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_STARTED', self.gparray.get_primary_count())
-
- self.logger.info('Backing up pg_hba.conf file on original segments')
-
- # backup pg_hba.conf file on original segments
- for seg in self.old_segments:
- if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
- continue
-
- hostname = seg.getSegmentHostName()
- datadir = seg.getSegmentDataDirectory()
-
- srcFile = datadir + '/pg_hba.conf'
- dstFile = datadir + '/pg_hba.gpexpand.bak'
- cpCmd = RemoteCopy('gpexpand back up pg_hba.conf file on original segments',
- srcFile, hostname, dstFile, ctxt=REMOTE, remoteHost=hostname)
-
- self.pool.addCommand(cpCmd)
-
- self.pool.join()
-
- try:
- self.pool.check_results()
- except ExecutionError, msg:
- raise ExpansionError('Failed to configure original segments: %s' % msg)
-
-
- # Copy the new pg_hba.conf file to original segments
- self.logger.info('Copying new pg_hba.conf file to original segments')
- for seg in self.old_segments:
- if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
- continue
-
- hostname = seg.getSegmentHostName()
- datadir = seg.getSegmentDataDirectory()
-
- cpCmd = RemoteCopy('gpexpand copy new pg_hba.conf file to original segments',
- self.tempDir + '/pg_hba.conf', hostname, datadir)
-
- self.pool.addCommand(cpCmd)
-
- self.pool.join()
-
- try:
- self.pool.check_results()
- except ExecutionError, msg:
- raise ExpansionError('Failed to configure original segments: %s' % msg)
-
- # Update the gp_id of original segments
- self.newPrimaryCount = 0;
- for seg in self.gparray.getExpansionSegDbList():
- if seg.isSegmentPrimary(False):
- self.newPrimaryCount += 1
-
- self.newPrimaryCount += self.gparray.get_primary_count()
-
- self.logger.info('Configuring original segments')
-
- orig_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getSegDbList())
-
- if self.segTemplate:
- self.segTemplate.cleanup()
-
- self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_DONE')
-
-
- def restore_original_segments(self):
- """ Restores the original segments back to their state prior the expansion
- setup. This is only possible if the expansion setup has not completed
- successfully."""
- self.logger.info('Restoring original segments')
- gp_segment_configuration_backup_file = self.statusLogger.get_gp_segment_configuration_backup();
- if gp_segment_configuration_backup_file:
- originalArray = GpArray.initFromFile(self.statusLogger.get_gp_segment_configuration_backup())
- else:
- originalArray = self.gparray
-
- originalPrimaryCount = self.statusLogger.get_original_primary_count()
-
- # Restore pg_hba.conf file from backup
- self.logger.info('Restoring pg_hba.conf file on original segments')
- for seg in originalArray.getSegDbList():
-
- datadir = seg.getSegmentDataDirectory()
- hostname = seg.getSegmentHostName()
-
- srcFile = datadir + '/pg_hba.gpexpand.bak'
- dstFile = datadir + '/pg_hba.conf'
- cpCmd = RemoteCopy('gpexpand restore of pg_hba.conf file on original segments',
- srcFile, hostname, dstFile, ctxt=REMOTE,
- remoteHost=hostname)
-
- self.pool.addCommand(cpCmd)
-
- self.pool.join()
-
- try:
- self.pool.check_results()
- except:
- # Setup didn't get this far so no backup to restore.
- self.pool.empty_completed_items()
-
- # note: this code may not be needed -- it will NOT change gp_id
- # However, the call to gpconfigurenewsegment may still be doing some needed work (stopping the segment)
- # which could be unnecessary or could be moved here)
- self.logger.info('Restoring original segments catalog tables')
- orig_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(originalArray.getSegDbList())
- for host in iter(orig_segment_info):
- segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', orig_segment_info[host],
- verbose=gplog.logging_is_verbose(), batchSize=options.batch_size,
- ctxt=REMOTE, remoteHost=host)
- self.pool.addCommand(segCfgCmd)
-
- self.pool.join()
-
- try:
- self.pool.check_results()
- except ExecutionError:
- raise ExpansionError('Failed to restore original segments')
-
-
- def _construct_filespace_parameter(self, seg, gpFSobjList):
- """ return a string containing a filespace parameter appropriate for use in sql functions. """
- filespaces = []
- segFilespaces = seg.getSegmentFilespaces()
- filespaceNames = []
- filespaceLocations = []
- for entry in gpFSobjList:
- name = entry.getName()
- oid = entry.getOid()
- location = segFilespaces[oid]
- filespaceNames.append(name)
- filespaceLocations.append(location)
- for i in range(len(filespaceNames)):
- entry = [filespaceNames[i] , filespaceLocations[i]]
- filespaces.append(entry)
- return str(filespaces)
-
-
- def update_catalog(self):
- """
- Starts the database, calls updateSystemConfig() to setup
- the catalog tables and get the actual dbid and content id
- for the new segments.
- """
- self.statusLogger.set_gp_segment_configuration_backup(options.master_data_directory + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE)
- gparray.dumpToFile(self.statusLogger.get_gp_segment_configuration_backup())
- self.statusLogger.set_status('UPDATE_CATALOG_STARTED', self.statusLogger.get_gp_segment_configuration_backup())
-
- self.logger.info('Starting HAWQ in restricted mode')
- startCmd = GpStart('gpexpand update master start database restricted mode', restricted = True, verbose = True)
- startCmd.run(validateAfter=True)
-
- # Update the catalog
- configurationInterface.getConfigurationProvider().updateSystemConfig(
- self.gparray,
- "%s: segment config for resync" % getProgramName(),
- dbIdToForceMirrorRemoveAdd = {},
- useUtilityMode = True,
- allowPrimary = True
- )
-
- # The content IDs may have changed, so we must make sure the array is in proper order.
- self.gparray.reOrderExpansionSegs()
-
- # Issue checkpoint due to forced shutdown below
- self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
- dbconn.execSQL(self.conn, "CHECKPOINT")
- self.conn.close()
-
- self.logger.info('Stopping database')
- stopCmd = GpStop('gpexpand update master stop database', verbose = True, ctxt = LOCAL, force=True)
- # We do not check the results of GpStop becuase we will get errors for all the new segments.
- stopCmd.run(validateAfter = False)
-
- self.statusLogger.set_status('UPDATE_CATALOG_DONE')
-
- #--------------------------------------------------------------------------
- def restore_master(self):
- """Restores the gp_segment_configuration catalog table for rollback"""
- originalPrimaryCount = self.statusLogger.get_original_primary_count()
- backupFile = self.statusLogger.get_gp_segment_configuration_backup()
-
- if not os.path.exists(backupFile):
- raise ExpansionError('gp_segment_configuration backup file %s does not exist' % backupFile)
-
- # Create a new gpArray from the backup file
- array = GpArray.initFromFile(backupFile)
- arrayLen = len(array.getSegDbList())
-
- originalDbIds = ""
- originalDbIdsList = []
- first = True
- for seg in array.getDbList():
- originalDbIdsList.append(int(seg.getSegmentDbId()))
- if first == False:
- originalDbIds += ", "
- first = False
- originalDbIds += str(seg.getSegmentDbId())
-
- if len(originalDbIds) > 0:
- # Update the catalog with the contents of the backup
- restore_conn = None
- try:
- restore_conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods='dml')
-
- # Get a list of all the expand primary segments
- sqlStr = "select dbid from pg_catalog.gp_segment_configuration where dbid not in (%s) and role = 'p'" % str(originalDbIds)
- curs = dbconn.execSQL(restore_conn, sqlStr)
- deleteDbIdList = []
- rows = curs.fetchall()
- for row in rows:
- deleteDbIdList.append(int(row[0]))
-
- #
- # The following is a sanity check to make sure we don't do something bad here.
- #
- if len(originalDbIdsList) < 2:
- self.logger.error("The original DB DIS list is to small to be correct: %s " % str(len(originalDbIdsList)))
- raise Exception("Unable to complete rollback")
-
- totalToDelete = len(deleteDbIdList)
- if int(totalToDelete) > int(self.statusLogger.get_number_new_segments()):
- self.logger.error("There was a discrepancy between the number of expand segments to rollback (%s), and the expected number of segment to rollback (%s)" \
- % (str(totalToDelete), str(self.statusLogger.get_number_new_segments())))
- self.logger.error(" Expanded segment dbids = %s", str(deleteDbIdList))
- raise Exception("Unable to complete rollback")
-
- for dbid in deleteDbIdList:
- sqlStr = "select * from gp_remove_segment(%s::smallint)" % str(dbid)
- dbconn.execSQL(restore_conn, sqlStr)
-
- restore_conn.commit()
- except Exception, e:
- raise Exception("Unable to restore master. Exception: " + str(e))
- finally:
- if restore_conn != None:
- restore_conn.close()
-
- def start_prepare(self):
- """Inserts into gpexpand.status that expansion preparation has started."""
- if options.filename:
- self.statusLogger.create_status_file()
- self.statusLogger.set_status('EXPANSION_PREPARE_STARTED', os.path.abspath(options.filename))
-
- def finalize_prepare(self):
- """Removes the gpexpand status file and segment configuration backup file"""
- self.statusLogger.remove_status_file()
- self.statusLogger.remove_segment_configuration_backup_file()
- self.pastThePointOfNoReturn = True;
-
- def setup_schema(self):
- """Used to setup the gpexpand schema"""
- startCmd = GpStart('gpexpand update master start database restricted', restricted=True, verbose=True)
- startCmd.run(validateAfter=True)
-
- # Need to restore the connection used by the expansion
- self.conn = dbconn.connect(self.dburl, encoding='UTF8')
- self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_STARTED')
- self.logger.info('Creating expansion schema')
- dbconn.execSQL(self.conn,create_schema_sql)
- dbconn.execSQL(self.conn,status_table_sql)
- dbconn.execSQL(self.conn,status_detail_table_sql)
-
- # views
- dbconn.execSQL(self.conn, logical_status_view_sql)
- dbconn.execSQL(self.conn,progress_view_sql)
-
- self.conn.commit()
-
- self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_DONE')
-
- def prepare_schema(self):
- """Prepares the gpexpand schema"""
- self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_STARTED')
-
- if not self.conn:
- self.conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods='dml')
- self.gparray = GpArray.initFromCatalog(self.dburl, useAllSegmentFileSpaces=True)
-
- nowStr = datetime.datetime.now()
- statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP', '%s' ) " % (gpexpand_schema,status_table,nowStr)
-
- dbconn.execSQL(self.conn,statusSQL)
-
- db_list = catalog.getDatabaseList(self.conn)
-
- for db in db_list:
- dbname=db[0]
- if dbname == 'template0':
- continue
- self.logger.info('Populating %s.%s with data from database %s' % (gpexpand_schema, status_detail_table, dbname.decode('utf-8')) )
- self._populate_regular_tables(dbname)
- self._populate_partitioned_tables(dbname)
- inject_fault('gpexpand MPP-14620 fault injection')
- self._update_distribution_policy(dbname)
-
- nowStr = datetime.datetime.now()
- statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP DONE', '%s' ) " % (gpexpand_schema,status_table,nowStr)
- dbconn.execSQL(self.conn, statusSQL)
-
- self.conn.commit()
-
-
- self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_DONE')
- self.statusLogger.set_status('EXPANSION_PREPARE_DONE')
-
- # At this point, no rollback is possible and the the system
- # including new segments has been started once before so finalize
- self.finalize_prepare()
-
- self.logger.info('Stopping HAWQ')
- GpStop.local('gpexpand setup complete', fast=True)
-
-
- def _populate_regular_tables(self,dbname):
- """ we don't do 3.2+ style partitioned tables here, but we do
- all other table types.
- """
-
- sql="""SELECT
- n.nspname || '.' || c.relname as fq_name,
- n.oid as schemaoid,
- c.oid as tableoid,
- p.attrnums as distribution_policy,
- now() as last_updated,
- pg_relation_size(quote_ident(n.nspname) || '.' || quote_ident(c.relname))
-FROM
- pg_class c
- JOIN pg_namespace n ON (c.relnamespace=n.oid)
- JOIN pg_catalog.gp_distribution_policy p on (c.oid = p.localoid)
-WHERE
- c.oid NOT IN ( SELECT parrelid as oid FROM pg_partition
- UNION
- SELECT parchildrelid as oid FROM pg_partition_rule
- )
- AND n.nspname != 'gpexpand'
- AND n.nspname != 'pg_bitmapindex'
- AND c.relstorage != 'x';
-
- """
- self.logger.debug(sql)
- table_conn = self.connect_database(dbname)
- curs = dbconn.execSQL(table_conn, sql)
- rows = curs.fetchall()
- try:
- sql_file = os.path.abspath('./%s.dat' % status_detail_table)
- self.logger.debug('status_detail data file: %s' % sql_file)
- fp = open(sql_file, 'w')
- for row in rows:
- fqname = row[0]
- schema_oid = row[1]
- table_oid = row[2]
- if row[3]:
- self.logger.debug("dist policy raw: %s " % row[3].decode('utf-8'))
- else:
- self.logger.debug("dist policy raw: NULL")
- dist_policy = row[3]
- (policy_name,policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
- ts = datetime.datetime.now()
- rel_bytes = int(row[5])
-
- if dist_policy is None:
- dist_policy = 'NULL'
-
- full_name = '%s.%s' % (dbname, fqname)
- rank = 1 if self.unique_index_tables.has_key(full_name) else 2
-
- fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\t%s\n""" % (dbname,fqname,schema_oid,table_oid,
- dist_policy,policy_name,policy_oids,
- rank, undone_status, rel_bytes, ts))
- except Exception, e:
- raise ExpansionError(e)
- finally:
- if fp: fp.close()
-
- try:
- copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)
-
- self.logger.debug(copySQL)
- dbconn.execSQL(self.conn,copySQL)
- except Exception, e:
- raise ExpansionError(e)
- finally:
- os.unlink(sql_file)
-
- table_conn.commit()
- table_conn.close()
-
-
- def _populate_partitioned_tables(self,dbname):
- """population of status_detail for partitioned tables. """
- sql="""
-SELECT
- p.partitionschemaname || '.' || p.partitiontablename as fq_name,
- n.oid as schemaoid,
- c2.oid as tableoid,
- d.attrnums as distributed_policy,
- now() as last_updated,
- pg_relation_size(quote_ident(p.partitionschemaname) || '.' || quote_ident(p.partitiontablename)),
- partitiontype,partitionlevel,partitionrank,partitionposition,
- partitionrangestart
-FROM
- pg_partitions p,
- pg_class c,
- pg_class c2,
- pg_namespace n,
- pg_namespace n2,
- gp_distribution_policy d
-WHERE
- quote_ident(p.tablename) = quote_ident(c.relname)
- AND d.localoid = c2.oid
- AND quote_ident(p.schemaname) = quote_ident(n.nspname)
- AND c.relnamespace = n.oid
- AND p.partitionlevel = (select max(parlevel) FROM pg_partition WHERE parrelid = c.oid)
- AND quote_ident(p.partitionschemaname) = quote_ident(n2.nspname)
- AND quote_ident(p.partitiontablename) = quote_ident(c2.relname)
- AND c2.relnamespace = n2.oid
-ORDER BY tablename, c2.oid desc;
- """
- self.logger.debug(sql)
- table_conn = self.connect_database(dbname)
- curs = dbconn.execSQL(table_conn, sql)
- rows = curs.fetchall()
-
- try:
- sql_file = os.path.abspath('./%s.dat' % status_detail_table)
- self.logger.debug('status_detail data file: %s' % sql_file)
- fp = open(sql_file, 'w')
-
- for row in rows:
- fqname = row[0]
- schema_oid = row[1]
- table_oid = row[2]
- if row[3]:
- self.logger.debug("dist policy raw: %s " % row[3])
- else:
- self.logger.debug("dist policy raw: NULL")
- dist_policy = row[3]
- (policy_name,policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
- ts = datetime.datetime.now()
- rel_bytes = int(row[5])
-
- if dist_policy is None:
- dist_policy = 'NULL'
-
- full_name = '%s.%s' % (dbname, fqname)
- rank = 1 if self.unique_index_tables.has_key(full_name) else 2
-
- fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\t%s\n""" % (dbname,fqname,schema_oid,table_oid,
- dist_policy,policy_name,policy_oids,
- rank, undone_status, rel_bytes, ts))
- except Exception:
- raise
- finally:
- if fp: fp.close()
-
- try:
- copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)
-
- self.logger.debug(copySQL)
- dbconn.execSQL(self.conn,copySQL)
- except Exception, e:
- raise ExpansionError(e)
- finally:
- os.unlink(sql_file)
-
- table_conn.commit()
- table_conn.close()
-
-
- def _update_distribution_policy(self, dbname):
- """ NULL out the distribution policy for both
- regular and paritioned table before expansion
- """
-
- table_conn = self.connect_database(dbname)
- #null out the dist policies
- sql = """
-UPDATE gp_distribution_policy
- SET attrnums = NULL
-FROM pg_class c
- JOIN pg_namespace n ON (c.relnamespace=n.oid)
-WHERE
- localoid = c.oid
- AND c.oid NOT IN ( SELECT parrelid as oid FROM pg_partition
- UNION
- SELECT parchildrelid as oid FROM pg_partition_rule
- )
- AND n.nspname != 'gpexpand';
- """
-
- self.logger.debug(sql)
- dbconn.execSQL(table_conn,sql)
-
- sql = """
-UPDATE gp_distribution_policy
- SET attrnums = NULL
- FROM
- ( SELECT pp.parrelid AS table
<TRUNCATED>