You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2015/12/29 10:29:52 UTC
[1/2] incubator-hawq git commit: HAWQ-288. Support hawqfaultinjector
as an internal utility for fault injection purpose - client side
Repository: incubator-hawq
Updated Branches:
refs/heads/master 424699998 -> 8cdae4145
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/programs/clsInjectFault.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/programs/clsInjectFault.py b/tools/bin/hawqpylib/programs/clsInjectFault.py
new file mode 100644
index 0000000..4cf8e36
--- /dev/null
+++ b/tools/bin/hawqpylib/programs/clsInjectFault.py
@@ -0,0 +1,441 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# Used to inject faults into the file replication code
+#
+
+#
+# THIS IMPORT MUST COME FIRST
+#
+# import mainUtils FIRST to get python version check
+from hawqpylib.mainUtils import *
+
+from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRESS_USAGE
+
+from gppylib.gpparseopts import OptParser, OptChecker
+from gppylib.utils import toNonNoneString
+from gppylib import gplog
+from hawqpylib import hawqarray
+from gppylib.commands import base
+from gppylib.commands import unix
+from gppylib.commands import gp
+from gppylib.commands import pg
+from gppylib.db import catalog
+from gppylib.db import dbconn
+from gppylib.system import configurationInterface, fileSystemInterface, osInterface
+from gppylib import pgconf
+from gppylib.testold.testUtils import testOutput
+from gppylib.system.environment import GpMasterEnvironment
+
+logger = gplog.get_default_logger()
+
+#-------------------------------------------------------------------------
+class HAWQInjectFaultProgram:
+ #
+ # Constructor:
+ #
+ # @param options the options as returned by the options parser
+ #
+ def __init__(self, options):
+ self.options = options
+
+ #
+ # Build the fault transition message. Fault options themselves will NOT be validated by the
+ # client -- the server will do that when we send the fault
+ #
+ def buildMessage(self) :
+
+ # note that we don't validate these strings -- if they contain newlines
+ # (and so mess up the transition protocol) then the server will error
+ result = ["faultInject"]
+ result.append(toNonNoneString(self.options.faultName))
+ result.append(toNonNoneString(self.options.type))
+ result.append(toNonNoneString(self.options.ddlStatement))
+ result.append(toNonNoneString(self.options.databaseName))
+ result.append(toNonNoneString(self.options.tableName))
+ result.append(toNonNoneString(self.options.numOccurrences))
+ result.append(toNonNoneString(self.options.sleepTimeSeconds))
+ return '\n'.join(result)
+
+ #
+ # build a message that will get status of the fault
+ #
+ def buildGetStatusMessage(self) :
+ # note that we don't validate this string then the server may error
+ result = ["getFaultInjectStatus"]
+ result.append(toNonNoneString(self.options.faultName))
+ return '\n'.join(result)
+
+ #
+ # return True if the segment matches the given role, False otherwise
+ #
+ def isMatchingRole(self, role, hawqdb):
+ hdbRole = hawqdb.getRole()
+ if role == "master":
+ return hdbRole == 'm'
+ elif role == "standby":
+ return hdbRole == 's'
+ elif role == "primary":
+ return hdbRole == 'p'
+ else:
+ raise ProgramArgumentValidationException("Invalid role specified: %s" % role)
+
+ #
+ # load the segments and filter to the ones we should target
+ #
+ def loadTargetSegments(self) :
+
+ targetHost = self.options.targetHost
+ targetRole = self.options.targetRole
+ targetRegistrationOrder = self.options.targetRegistrationOrder
+
+ if targetHost is None and targetRegistrationOrder is None:
+ raise ProgramArgumentValidationException(\
+ "neither --host nor --registration_order is specified. " \
+ "Exactly one should be specified.")
+ if targetHost is not None and targetRegistrationOrder is not None:
+ raise ProgramArgumentValidationException(\
+ "both --host and --registration_order are specified. " \
+ "Exactly one should be specified.")
+ if targetHost is not None and targetRole is None:
+ raise ProgramArgumentValidationException(\
+ "--role is not specified when --host is specified. " \
+ "Role is required when targeting a host.")
+ if targetRegistrationOrder is not None and targetRole is not None:
+ raise ProgramArgumentValidationException(\
+ "--role is specified when --registration_order is specified. " \
+ "Role should not be specified when targeting a single registration_order.")
+
+ #
+ # load from master db
+ #
+ masterPort = self.options.masterPort
+ if masterPort is None:
+ gpEnv = GpMasterEnvironment(self.options.masterDataDirectory, False)
+ masterPort = gpEnv.getMasterPort()
+ conf = configurationInterface.getConfigurationProvider().initializeProvider(masterPort)
+ hawqArray = conf.loadSystemConfig(useUtilityMode=True)
+ hawqdbs = hawqArray.getDbList()
+
+ #
+ # prune gpArray according to filter settings
+ #
+ hawqdbs = [hdb for hdb in hawqdbs if hdb.isSegment()]
+ if targetHost is not None and targetHost != "ALL":
+ hawqdbs = [hdb for hdb in hawqdbs if hdb.getHostName() == targetHost]
+
+ if targetRegistrationOrder is not None:
+ hawqdbs = gpArray.getDbList()
+ regorder = int(targetRegistrationOrder)
+ hawqdbs = [hdb for hdb in hawqdbs if hdb.getRegistrationOrder() == regorder]
+
+ if targetRole is not None:
+ hawqdbs = [hdb for hdb in hawqdbs if self.isMatchingRole(targetRole, hdb)]
+
+ # only DOWN segments remaining? Error out
+ downhawqdbs = [hdb for hdb in hawqdbs if hdb.getStatus() != 'u']
+ if len(downhawqdbs) > 0:
+ downhawqdbStr = "\n Down Segment: "
+ raise ExceptionNoStackTraceNeeded(
+ "Unable to inject fault. At least one segment is marked as down in the database.%s%s" %
+ (downhawqdbStr, downhawqdbStr.join([str(downhdb) for downhdb in downhawqdbs])))
+
+ print "### DEBUG: loadTargetSegments"
+ print "### DEBUG: HAWQDBS "
+ print hawqdbs
+ return hawqdbs
+
+ #
+ # write string to a temporary file that will be deleted on completion
+ #
+ def writeToTempFile(self, str):
+ inputFile = fileSystemInterface.getFileSystemProvider().createNamedTemporaryFile()
+ inputFile.write(str)
+ inputFile.flush()
+ return inputFile
+
+ def injectFaults(self, segments, messageText):
+
+ inputFile = self.writeToTempFile(messageText)
+ logger.info("Injecting fault on %d segment(s)", len(segments))
+ testOutput("Injecting fault on %d segment(s)" % len(segments))
+
+ # run the command in serial to each target
+ for segment in segments :
+ logger.info("Injecting fault on %s", segment)
+ print "### DEBUG: injectFaults -> SendFilerepTransitionMessage inputFile.name = %s" % inputFile.name
+ print "### DEBUG: injectFaults -> SendFilerepTransitionMessage segment.getPort = %s" % segment.getPort()
+ print "### DEBUG: injectFaults -> SendFilerepTransitionMessage base.LOCAL = %s" % base.LOCAL
+ print "### DEBUG: injectFaults -> SendFilerepTransitionMessage segment.getHostName = %s" % segment.getHostName()
+ # if there is an error then an exception is raised by command execution
+ cmd = gp.SendFilerepTransitionMessage("Fault Injector", inputFile.name, \
+ segment.getPort(), base.LOCAL, segment.getHostName())
+
+ print "### DEBUG: injectFaults -> Command = %s ###" % cmd
+ print "### DEBUG: injectFaults -> SendFilerepTransitionMessage ###"
+ cmd.run(validateAfter=False)
+
+
+ print "### DEBUG: injectFaults -> Run ###"
+ # validate ourselves
+ if cmd.results.rc != 0:
+ print "### DEBUG: injectFaults -> Injection Failed ###"
+ raise ExceptionNoStackTraceNeeded("Injection Failed: %s" % cmd.results.stderr)
+ elif self.options.type == "status":
+ # server side prints nice success messages on status...so print it
+ print "### DEBUG: injectFaults -> Report Status ###"
+ str = cmd.results.stderr
+ print "### DEBUG: injectFaults -> STDERROR = %s ###" % str
+ if str.startswith("Success: "):
+ str = str.replace("Success: ", "", 1)
+ logger.info("%s", str)
+
+ print "### DEBUG: injectFaults -> END ###"
+ inputFile.close()
+
+ def waitForFaults(self, segments, statusQueryText):
+ inputFile = self.writeToTempFile(statusQueryText)
+ segments = [seg for seg in segments]
+ sleepTimeSec = 0.115610199
+ sleepTimeMultipler = 1.5 # sleepTimeMultipler * sleepTimeMultipler^11 ~= 10
+
+ logger.info("Awaiting fault on %d segment(s)", len(segments))
+ while len(segments) > 0 :
+ logger.info("Sleeping %.2f seconds " % sleepTimeSec)
+ osInterface.getOsProvider().sleep(sleepTimeSec)
+
+ segmentsForNextPass = []
+ for segment in segments:
+ logger.info("Checking for fault completion on %s", segment)
+ cmd = gp.SendFilerepTransitionMessage.local("Fault Injector Status Check", inputFile.name, \
+ segment.getPort(), segment.getHostName())
+ resultStr = cmd.results.stderr.strip()
+ if resultStr == "Success: waitMore":
+ segmentsForNextPass.append(segment)
+ elif resultStr != "Success: done":
+ raise Exception("Unexpected result from server %s" % resultStr)
+
+ segments = segmentsForNextPass
+ sleepTimeSec = sleepTimeSec if sleepTimeSec > 7 else sleepTimeSec * sleepTimeMultipler
+ inputFile.close()
+
+ def isSyncableFaultType(self):
+ type = self.options.type
+ return type != "reset" and type != "status"
+
+ ######
+ def run(self):
+
+ if self.options.masterPort is not None and self.options.masterDataDirectory is not None:
+ raise ProgramArgumentValidationException("both master port and master data directory options are specified;" \
+ " at most one should be specified, or specify none to use MASTER_DATA_DIRECTORY environment variable")
+
+ print "### DEBUG: Build Message ###"
+ messageText = self.buildMessage()
+ print "### DEBUG: Load Target Segments ###"
+ segments = self.loadTargetSegments()
+
+ # inject, maybe wait
+ print "### DEBUG: Inject Faults ###"
+ self.injectFaults(segments, messageText)
+ if self.isSyncableFaultType() :
+ statusQueryText = self.buildGetStatusMessage()
+ self.waitForFaults(segments, statusQueryText)
+
+ logger.info("DONE")
+ return 0 # success -- exit code 0!
+
+ def cleanup(self):
+ pass
+
+ #-------------------------------------------------------------------------
+ @staticmethod
+ def createParser():
+ description = ("""
+ This utility is NOT SUPPORTED and is for internal-use only.
+
+ Used to inject faults into the file replication code.
+ """)
+
+ help = ["""
+
+ Return codes:
+ 0 - Fault injected
+ non-zero: Error or invalid options
+ """]
+
+ parser = OptParser(option_class=OptChecker,
+ description=' '.join(description.split()),
+ version='%prog version $Revision$')
+ parser.setHelp(help)
+
+ addStandardLoggingAndHelpOptions(parser, False)
+
+ # these options are used to determine the target segments
+ addTo = OptionGroup(parser, 'Target Segment Options: ')
+ parser.add_option_group(addTo)
+ addTo.add_option('-r', '--role', dest="targetRole", type='string', metavar="<role>",
+ help="Role of segments to target: master, standby, primary")
+ addTo.add_option("-s", "--registration_order", dest="targetRegistrationOrder", type="string", metavar="<registration_order>",
+ help="The segment registration_order on which fault should be set and triggered.")
+ addTo.add_option("-H", "--host", dest="targetHost", type="string", metavar="<host>",
+ help="The hostname on which fault should be set and triggered; pass ALL to target all hosts")
+
+ addTo = OptionGroup(parser, 'Master Connection Options')
+ parser.add_option_group(addTo)
+
+ addMasterDirectoryOptionForSingleClusterProgram(addTo)
+ addTo.add_option("-p", "--master_port", dest="masterPort", type="int", default=None,
+ metavar="<masterPort>",
+ help="DEPRECATED, use MASTER_DATA_DIRECTORY environment variable or -d option. " \
+ "The port number of the master database on localhost, " \
+ "used to fetch the segment configuration.")
+
+ addTo = OptionGroup(parser, 'Client Polling Options: ')
+ parser.add_option_group(addTo)
+ addTo.add_option('-m', '--mode', dest="syncMode", type='string', default="async",
+ metavar="<syncMode>",
+ help="Synchronization mode : sync (client waits for fault to occur)" \
+ " or async (client only sets fault request on server)")
+
+ # these options are used to build the message for the segments
+ addTo = OptionGroup(parser, 'Fault Options: ')
+ parser.add_option_group(addTo)
+ addTo.add_option('-y', '--type', dest="type", type='string', metavar="<type>",
+ help="fault type: sleep (insert sleep), fault (report fault to postmaster and fts prober), " \
+ "fatal (inject FATAL error), panic (inject PANIC error), error (inject ERROR), " \
+ "infinite_loop, data_curruption (corrupt data in memory and persistent media), " \
+ "suspend (suspend execution), resume (resume execution that was suspended), " \
+ "skip (inject skip i.e. skip checkpoint), " \
+ "memory_full (all memory is consumed when injected), " \
+ "reset (remove fault injection), status (report fault injection status), " \
+ "panic_suppress (inject suppressed PANIC in critical section), " \
+ "segv (inject a SEGV), " \
+ "interrupt (inject an Interrupt) ")
+ addTo.add_option("-z", "--sleep_time_s", dest="sleepTimeSeconds", type="int", default="10" ,
+ metavar="<sleepTime>",
+ help="For 'sleep' faults, the amount of time for the sleep. Defaults to %default." \
+ "Min Max Range is [0, 7200 sec] ")
+ addTo.add_option('-f', '--fault_name', dest="faultName", type='string', metavar="<name>",
+ help="fault name: " \
+ "postmaster (inject fault when new connection is accepted in postmaster), " \
+ "pg_control (inject fault when global/pg_control file is written), " \
+ "pg_xlog (inject fault when files in pg_xlog directory are written), " \
+ "start_prepare (inject fault during start prepare transaction), " \
+ "filerep_consumer (inject fault before data are processed, i.e. if mirror " \
+ "then before file operation is issued to file system, if primary " \
+ "then before mirror file operation is acknowledged to backend processes), " \
+ "filerep_consumer_verificaton (inject fault before ack verification data are processed on primary), " \
+ "filerep_change_tracking_compacting (inject fault when compacting change tracking log files), " \
+ "filerep_sender (inject fault before data are sent to network), " \
+ "filerep_receiver (inject fault after data are received from network), " \
+ "filerep_flush (inject fault before fsync is issued to file system), " \
+ "filerep_resync (inject fault while in resync when first relation is ready to be resynchronized), " \
+ "filerep_resync_in_progress (inject fault while resync is in progress), " \
+ "filerep_resync_worker (inject fault after write to mirror), " \
+ "filerep_resync_worker_read (inject fault before read required for resync), " \
+ "filerep_transition_to_resync (inject fault during transition to InResync before mirror re-create), " \
+ "filerep_transition_to_resync_mark_recreate (inject fault during transition to InResync before marking re-created), " \
+ "filerep_transition_to_resync_mark_completed (inject fault during transition to InResync before marking completed), " \
+ "filerep_transition_to_sync_begin (inject fault before transition to InSync begin), " \
+ "filerep_transition_to_sync (inject fault during transition to InSync), " \
+ "filerep_transition_to_sync_before_checkpoint (inject fault during transition to InSync before checkpoint is created), " \
+ "filerep_transition_to_sync_mark_completed (inject fault during transition to InSync before marking completed), " \
+ "filerep_transition_to_change_tracking (inject fault during transition to InChangeTracking), " \
+ "checkpoint (inject fault before checkpoint is taken), " \
+ "change_tracking_compacting_report (report if compacting is in progress), " \
+ "change_tracking_disable (inject fault before fsync to Change Tracking log files), " \
+ "transaction_abort_after_distributed_prepared (abort prepared transaction), " \
+ "transaction_commit_pass1_from_create_pending_to_created, " \
+ "transaction_commit_pass1_from_drop_in_memory_to_drop_pending, " \
+ "transaction_commit_pass1_from_aborting_create_needed_to_aborting_create, " \
+ "transaction_abort_pass1_from_create_pending_to_aborting_create, " \
+ "transaction_abort_pass1_from_aborting_create_needed_to_aborting_create, " \
+ "transaction_commit_pass2_from_drop_in_memory_to_drop_pending, " \
+ "transaction_commit_pass2_from_aborting_create_needed_to_aborting_create, " \
+ "transaction_abort_pass2_from_create_pending_to_aborting_create, " \
+ "transaction_abort_pass2_from_aborting_create_needed_to_aborting_create, " \
+ "finish_prepared_transaction_commit_pass1_from_create_pending_to_created, " \
+ "finish_prepared_transaction_commit_pass2_from_create_pending_to_created, " \
+ "finish_prepared_transaction_abort_pass1_from_create_pending_to_aborting_create, " \
+ "finish_prepared_transaction_abort_pass2_from_create_pending_to_aborting_create, " \
+ "finish_prepared_transaction_commit_pass1_from_drop_in_memory_to_drop_pending, " \
+ "finish_prepared_transaction_commit_pass2_from_drop_in_memory_to_drop_pending, " \
+ "finish_prepared_transaction_commit_pass1_aborting_create_needed, " \
+ "finish_prepared_transaction_commit_pass2_aborting_create_needed, " \
+ "finish_prepared_transaction_abort_pass1_aborting_create_needed, " \
+ "finish_prepared_transaction_abort_pass2_aborting_create_needed, " \
+ "twophase_transaction_commit_prepared (inject fault before transaction commit is inserted in xlog), " \
+ "twophase_transaction_abort_prepared (inject fault before transaction abort is inserted in xlog), " \
+ "dtm_broadcast_prepare (inject fault after prepare broadcast), " \
+ "dtm_broadcast_commit_prepared (inject fault after commit broadcast), " \
+ "dtm_broadcast_abort_prepared (inject fault after abort broadcast), " \
+ "dtm_xlog_distributed_commit (inject fault after distributed commit was inserted in xlog), " \
+ "fault_before_pending_delete_relation_entry (inject fault before putting pending delete relation entry, " \
+ "fault_before_pending_delete_database_entry (inject fault before putting pending delete database entry, " \
+ "fault_before_pending_delete_tablespace_entry (inject fault before putting pending delete tablespace entry, " \
+ "fault_before_pending_delete_filespace_entry (inject fault before putting pending delete filespace entry, " \
+ "dtm_init (inject fault before initializing dtm), " \
+ "end_prepare_two_phase_sleep (inject sleep after two phase file creation), " \
+ "segment_transition_request (inject fault after segment receives state transition request), " \
+ "segment_probe_response (inject fault after segment is probed by FTS), " \
+ "sync_persistent_table (inject fault to sync persistent table to disk), " \
+ "xlog_insert (inject fault to skip insert record into xlog), " \
+ "local_tm_record_transaction_commit (inject fault for local transactions after transaction commit is recorded and flushed in xlog ), " \
+ "malloc_failure (inject fault to simulate memory allocation failure), " \
+ "transaction_abort_failure (inject fault to simulate transaction abort failure), " \
+ "update_committed_eof_in_persistent_table (inject fault before committed EOF is updated in gp_persistent_relation_node for Append Only segment files), " \
+ "fault_during_exec_dynamic_table_scan (inject fault during scanning of a partition), " \
+ "internal_flush_error (inject an error during internal_flush), " \
+ "exec_simple_query_end_command (inject fault before EndCommand in exec_simple_query), " \
+ "multi_exec_hash_large_vmem (allocate large vmem using palloc inside MultiExecHash to attempt to exceed vmem limit), " \
+ "execsort_before_sorting (inject fault in nodeSort after receiving all tuples and before sorting), " \
+ "workfile_cleanup_set (inject fault in workfile manager cleanup set)" \
+ "execsort_mksort_mergeruns (inject fault in MKSort during the mergeruns phase), " \
+ "cdb_copy_start_after_dispatch (inject fault in cdbCopyStart after dispatch), " \
+ "fault_in_background_writer_main (inject fault in BackgroundWriterMain), " \
+ "exec_hashjoin_new_batch (inject fault before switching to a new batch in Hash Join), " \
+ "analyze_subxact_error (inject an error during analyze)," \
+ "opt_task_allocate_string_buffer (inject fault while allocating string buffer), " \
+ "runaway_cleanup (inject fault before starting the cleanup for a runaway query)" \
+ "all (affects all faults injected, used for 'status' and 'reset'), ")
+ addTo.add_option("-c", "--ddl_statement", dest="ddlStatement", type="string",
+ metavar="ddlStatement",
+ help="The DDL statement on which fault should be set and triggered " \
+ "(i.e. create_database, drop_database, create_table, drop_table)")
+ addTo.add_option("-D", "--database_name", dest="databaseName", type="string",
+ metavar="databaseName",
+ help="The database name on which fault should be set and triggered.")
+ addTo.add_option("-t", "--table_name", dest="tableName", type="string",
+ metavar="tableName",
+ help="The table name on which fault should be set and triggered.")
+ addTo.add_option("-o", "--occurrence", dest="numOccurrences", type="int", default=1,
+ metavar="numOccurrences",
+ help="The number of occurrence of the DDL statement with the database name " \
+ "and the table name before fault is triggered. Defaults to %default. Max is 1000. " \
+ "Fault is triggered always if set to '0'. ")
+ parser.set_defaults()
+ return parser
+
+ @staticmethod
+ def createProgram(options, args):
+ if len(args) > 0 :
+ raise ProgramArgumentValidationException(\
+ "too many arguments: only options may be specified")
+ return HAWQInjectFaultProgram(options)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py b/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py
new file mode 100755
index 0000000..2e53b04
--- /dev/null
+++ b/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py
@@ -0,0 +1,442 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Line too long - pylint: disable=C0301
+# Invalid name - pylint: disable=C0103
+
+"""
+ ComputeCatalogUpdate.py
+
+ Used by updateSystemConfig() to compare the db state and
+ goal state of a HAWQArray containing the Greenplum segment
+ configruation details and computes appropriate changes.
+"""
+import copy
+from gppylib.gplog import *
+from hawqpylib.hawqarray import ROLE_MASTER, ROLE_STANDBY, ROLE_PRIMARY
+from hawqpylib.hawqarray import MASTER_REGISTRATION_ORDER
+from hawqpylib import hawqarray
+
+logger = get_default_logger()
+
+class ComputeCatalogUpdate:
+ """
+ Helper class for GpConfigurationProvider.updateSystemConfig().
+
+ This computes seven lists of GpDb objects (commonly referenced as 'seg')
+ from a HAWQArray, reflecting the logical changes that need to be made
+ to the database catalog to make it match the system as defined.
+ The names of the lists are reasonably descriptive:
+
+ mirror_to_remove - to be removed (e.g. via gp_remove_segment_mirror())
+ primary_to_remove - to be removed (e.g. via gp_remove_segment())
+ primary_to_add - to be added (e.g. via gp_add_segment())
+ mirror_to_add - to be added (e.g. via gp_add_segment_mirror())
+ mirror_to_remove_and_add - change or force list requires this mirror
+ to be removed and then added back
+ segment_to_update - to be updated (e.g. via SQL)
+ segment_unchanged - needs no update (included for validation)
+ """
+
+ def __init__(self, hawqArray, forceMap, useUtilityMode, allowPrimary):
+ """
+ This class just holds lists of objects in the underlying hawqArray.
+ As such, it has no methods - the constructor does all the computation.
+
+ @param hawqArray the array containing the goal and db segment states.
+ @param forceMap a map of dbid->True for mirrors for which we should force updating via remove/add
+ @param useUtilityMode True if the operations we're doing are expected to run via utility moed
+ @param allowPrimary True if caller authorizes add/remove primary operations (e.g. gpexpand)
+ """
+
+ forceMap = forceMap or {}
+ self.useUtilityMode = useUtilityMode
+ self.allowPrimary = allowPrimary
+
+ # 'dbsegmap' reflects the current state of the catalog
+ self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in hawqArray.getSegmentsAsLoadedFromDb()])
+
+ # 'goalsegmap' reflects the desired state of the catalog
+ self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in hawqArray.getDbList(includeExpansionSegs=True)])
+
+ # find mirrors and primaries to remove
+ self.mirror_to_remove = [
+ seg for seg in self.dbsegmap.values() # segment in database
+ if seg.isSegmentMirror() # segment is a mirror
+ and (seg.getSegmentDbId() not in self.goalsegmap) # but not in goal configuration
+ ]
+ self.debuglog("mirror_to_remove: %s", self.mirror_to_remove)
+
+ self.primary_to_remove = [
+ seg for seg in self.dbsegmap.values() # segment is database
+ if seg.isSegmentPrimary() # segment is a primary
+ and (seg.getSegmentDbId() not in self.goalsegmap) # but not in goal configuration
+ ]
+ self.debuglog("primary_to_remove: %s", self.primary_to_remove)
+
+ # find primaries and mirrors to add
+ self.primary_to_add = [
+ seg for seg in self.goalsegmap.values() # segment in goal configuration
+ if seg.isSegmentPrimary() # segment is a primary
+ and (seg.getSegmentDbId() not in self.dbsegmap) # but not in the database
+ ]
+ self.debuglog("primary_to_add: %s", self.primary_to_add)
+
+ self.mirror_to_add = [
+ seg for seg in self.goalsegmap.values() # segment in goal configuration
+ if seg.isSegmentMirror() # segment is a mirror
+ and (seg.getSegmentDbId() not in self.dbsegmap) # but not in the database
+ ]
+ self.debuglog("mirror_to_add: %s", self.mirror_to_add)
+
+ # find segments to update
+ initial_segment_to_update = [
+ seg for seg in self.goalsegmap.values() # segment in goal configuration
+ if (seg.getSegmentDbId() in self.dbsegmap) # and also in the database
+ and (seg != self.dbsegmap[ seg.getSegmentDbId() ]) # but some attributes differ
+ ]
+ self.debuglog("initial_segment_to_update: %s", initial_segment_to_update)
+
+ # create a map of the segments which we can't update in the
+ # ordinary way either because they were on the forceMap or
+ # they differ in an attribute other than mode, status or replication port
+ removeandaddmap = {}
+ for seg in initial_segment_to_update:
+ dbid = seg.getSegmentDbId()
+ if dbid in forceMap:
+ removeandaddmap[dbid] = seg
+ continue
+ # In GPSQL, no needs to remove the original segment, updating the hostname and address is enough.
+ if hawqArray.getFaultStrategy() == hawqarray.FAULT_STRATEGY_NONE:
+ continue
+ if not seg.equalIgnoringStatus(self.dbsegmap[dbid]):
+ removeandaddmap[dbid] = seg
+ continue
+
+ # create list of mirrors to update via remove/add
+ self.mirror_to_remove_and_add = [seg for seg in removeandaddmap.values()]
+ self.debuglog("mirror_to_remove_and_add: %s", self.mirror_to_remove_and_add)
+
+ # find segments to update in the ordinary way
+ self.segment_to_update = [
+ seg for seg in initial_segment_to_update # segments to update
+ if seg.getSegmentDbId() not in removeandaddmap # that don't require remove/add
+ ]
+ self.debuglog("segment_to_update: %s", self.segment_to_update)
+
+ # find segments that don't need change
+ self.segment_unchanged = [
+ seg for seg in self.goalsegmap.values() # segment in goal configuration
+ if (seg.getSegmentDbId() in self.dbsegmap) # and also in the database
+ and (seg == self.dbsegmap[ seg.getSegmentDbId() ]) # and attribtutes are all the same
+ ]
+ self.debuglog("segment_unchanged: %s", self.segment_unchanged)
+
+
+
+ def final_segments(self):
+ """
+ Generate a series of segments appearing in the final configuration.
+ """
+ for seg in self.primary_to_add:
+ yield seg
+ for seg in self.mirror_to_add:
+ yield seg
+ for seg in self.mirror_to_remove_and_add:
+ yield seg
+ for seg in self.segment_to_update:
+ yield seg
+ for seg in self.segment_unchanged:
+ yield seg
+
+
+ def validate(self):
+ """
+ Check that the operation and new configuration is valid.
+ """
+
+ # Validate that we're not adding or removing primaries unless authorized
+ #
+ if not self.allowPrimary:
+ if len(self.primary_to_add) > 0:
+ p = self.primary_to_add[0]
+ raise Exception("Internal error: Operation may not add primary: %s" % repr(p))
+
+ if len(self.primary_to_remove) > 0:
+ p = self.primary_to_remove[0]
+ raise Exception("Internal error: Operation may not remove primary: %s" % repr(p))
+
+
+ # Validate that operations do not result in a contentid with a pair of segments in same preferred role
+ #
+ final = { ROLE_PRIMARY:{}, ROLE_MIRROR:{} }
+ for seg in self.final_segments():
+ subset = final[ seg.getSegmentPreferredRole() ]
+ other = subset.get( seg.getSegmentContentId() )
+ if other is not None:
+ raise Exception("Segments sharing a content id may not have same preferred role: %s and %s" % (repr(seg), repr(other)))
+ subset[ seg.getSegmentContentId() ] = seg
+
+
+ # Validate that if we have any mirrors, that all primaries have mirrors
+ # If there is only one standby in mirror, skip this check.
+ check_mirror_sanity = len(final[ ROLE_MIRROR ]) > 0 and len([ contentId for contentId in final[ ROLE_MIRROR] if contentId != MASTER_CONTENT_ID ]) > 0
+ if check_mirror_sanity:
+ for contentId in final[ ROLE_PRIMARY ]:
+ if contentId != MASTER_CONTENT_ID and final[ ROLE_MIRROR ].get( contentId ) is None:
+ seg = final[ ROLE_PRIMARY ][ contentId ]
+ raise Exception("Primary must have mirror when mirroring enabled: %s" % repr(seg))
+
+
+ # Validate that the remove/add list contains only qualified mirrors.
+ # In particular, we disallow remove/add of the master, standby or a primary.
+ #
+ for seg in self.mirror_to_remove_and_add:
+ originalSeg = self.dbsegmap.get(seg.getSegmentDbId())
+
+ # filespace and other core has changed, or it's a mirror and we are recovering full
+ # (in which case we want to call removeMirror and addMirror so we mark
+ # the primary as full-resyncing)
+ #
+ if seg.isSegmentMaster(current_role=True) or seg.isSegmentStandby():
+
+ #
+ # Assertion here -- user should not be allowed to change master/standby info.
+ #
+ raise Exception("Internal error: Can only change core details of segments, not masters" \
+ " (on segment %s) (seg %s vs original %s)" %
+ (seg.getSegmentDbId(), repr(seg), repr(originalSeg)))
+
+ if not seg.isSegmentMirror(current_role=True):
+ #
+ # Assertion here -- user should not be allowed to change primary info.
+ #
+ return
+ raise Exception("Internal error: Can only change core details of mirrors, not primaries" \
+ " (on segment %s) (seg %s vs original %s)" %
+ (seg.getSegmentDbId(), repr(seg), repr(originalSeg)))
+
+ if self.useUtilityMode:
+ raise Exception("Cannot change core details of mirrors in utility mode")
+
+
+
+ def debuglog(self, msg, seglist):
+ """
+ Write debugging details about the specified segment list.
+ """
+ logger.debug(msg % ("%s segments" % len(seglist)))
+ for seg in seglist:
+ logger.debug(msg % repr(seg))
+
+
+
+
+# minimal test framework when run from command line
+#
+if __name__ == '__main__':
+
+ ROLE_PRIMARY = 'p'
+ ROLE_MIRROR = 'm'
+
+ MODE_NOT_INITIALIZED = ''
+ MODE_CHANGELOGGING = 'c'
+ MODE_SYNCHRONIZED = 's'
+ MODE_RESYNCHRONIZATION = 'r'
+
+ class GpDb:
+ def __init__(self,dbid,content,pref,mode='',curr=None,status='u',rport=0,misc=None):
+ self.dbid = dbid
+ self.content = content
+ self.preferred_role = pref
+ self.mode = mode
+ self.role = curr or pref
+ self.status = status
+ self.rport = rport
+ self.misc = misc
+ def getSegmentDbId(self): return self.dbid
+ def getSegmentContentId(self): return self.content
+ def getSegmentPreferredRole(self): return self.preferred_role
+ def getSegmentMode(self): return self.mode
+ def getSegmentRole(self): return self.role
+ def getSegmentStatus(self): return self.status
+ def getSegmentReplicationPort(self): return self.rport
+ def setSegmentMode(self,mode): self.mode = mode
+ def setSegmentStatus(self,status): self.status = status
+ def setSegmentReplicationPort(self,rport): self.rport = rport
+ def isSegmentPrimary(self, current_role=False):
+ role = self.role if current_role else self.preferred_role
+ return self.content >= 0 and role == ROLE_PRIMARY
+ def isSegmentMirror(self, current_role=False):
+ role = self.role if current_role else self.preferred_role
+ return self.content >= 0 and role == ROLE_MIRROR
+ def isSegmentMaster(self, current_role=False):
+ role = self.role if current_role else self.preferred_role
+ return self.content < 0 and role == ROLE_PRIMARY
+ def isSegmentStandby(self):
+ role = self.role if current_role else self.preferred_role
+ return self.content < 0 and role == ROLE_MIRROR
+ def __cmp__(self,other): return cmp(repr(self), repr(other))
+ def __repr__(s):
+ return '(%s,%s,%s,%s,%s,%s,%s,%s)' % (s.dbid, s.content, s.preferred_role, s.mode, s.role, s.status, s.rport, s.misc)
+ def equalIgnoringStatus(self, other):
+ tmp = copy.copy(self)
+ tmp.setSegmentMode( other.getSegmentMode() )
+ tmp.setSegmentStatus( other.getSegmentStatus() )
+ tmp.setSegmentReplicationPort( other.getSegmentReplicationPort() )
+ return tmp == other
+
+ class xxx:
+ def xxx():
+ print dbsegmap
+ print goalsegmap
+ print 'db not goal', [seg for seg in dbsegmap.values() if seg.getSegmentDbId() not in goalsegmap]
+ print 'goal not db', [seg for seg in goalsegmap.values() if seg.getSegmentDbId() not in dbsegmap]
+
+ class HAWQArray:
+ def __init__(s, forceMap=None, useUtilityMode=False, allowPrimary=True):
+ s.c = ComputeCatalogUpdate(s,forceMap,useUtilityMode,allowPrimary)
+ s.dump()
+ def dump(s):
+ print s.__class__.__name__, s.__class__.__doc__
+ s.c.validate()
+ print " -m", s.c.mirror_to_remove,
+ print " -p", s.c.primary_to_remove,
+ print " +p", s.c.primary_to_add,
+ print " +m", s.c.mirror_to_add,
+ print " +/-m", s.c.mirror_to_remove_and_add,
+ print " u", s.c.segment_to_update,
+ print " n", s.c.segment_unchanged
+ def __repr__(s):
+ return '<%s,%s>' % (s.getDbList(), s.getSegmentsAsLoadedFromDb())
+
+ class HAWQArrayBad(HAWQArray):
+ def __init__(s, forceMap=None, useUtilityMode=False, allowPrimary=True):
+ try:
+ HAWQArray.__init__(s,forceMap,useUtilityMode,allowPrimary)
+ print " ERROR: expected exception"
+ except Exception, e:
+ print " EXPECTED: ", str(e)
+
+ class HAWQArray1(HAWQArray):
+ "expect no change"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray1()
+
+ class HAWQArray1a(HAWQArray):
+ "expect update of mirror"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray1a()
+
+ class HAWQArray2(HAWQArray):
+ "expect add mirror"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p','a'), GpDb(2,1,'m','a')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p')]
+ HAWQArray2()
+
+ class HAWQArray3(HAWQArray):
+ "expect remove mirror"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
+ HAWQArray3()
+
+ class HAWQArray4(HAWQArray):
+ "expect add primary and mirror"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray4()
+
+ class HAWQArray5(HAWQArray):
+ "expect remove primary/mirror and add/primary mirror"
+ def getDbList(self,includeExpansionSegs): return [GpDb(3,2,'p'), GpDb(4,2,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
+ HAWQArray5()
+
+ class HAWQArray6(HAWQArray):
+ "expect update via add/remove"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m',misc='x')]
+ HAWQArray6()
+
+ class HAWQArray7(HAWQArrayBad):
+ "can't rely on remove/add for primary"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p',misc='x'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray7()
+
+ class HAWQArray8(HAWQArrayBad):
+ "can't rely on remove/add for master"
+ def getDbList(self,includeExpansionSegs): return [GpDb(0,-1,'p',misc="x"), GpDb(1,1,'p'), GpDb(2,1,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(0,-1,'p'), GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray8()
+
+ class HAWQArray9(HAWQArrayBad):
+ "can't rely on remove/add for master"
+ def __init__(s): HAWQArrayBad.__init__(s,[0])
+ def getDbList(self,includeExpansionSegs): return [GpDb(0,-1,'p',rport=2), GpDb(1,1,'p'), GpDb(2,1,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(0,-1,'p'), GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray9()
+
+ class HAWQArray10(HAWQArray):
+ "expect update"
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m',rport=2)]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray10()
+
+ class HAWQArray11(HAWQArray):
+ "expect update via add/remove"
+ def __init__(s): HAWQArray.__init__(s,[2])
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m',rport=2)]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray11()
+
+ class HAWQArray12(HAWQArrayBad):
+ "can't add primary"
+ def __init__(s): HAWQArrayBad.__init__(s,allowPrimary=False)
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray12()
+
+ class HAWQArray13(HAWQArrayBad):
+ "can't remove primary"
+ def __init__(s): HAWQArrayBad.__init__(s,allowPrimary=False)
+ def getDbList(self,includeExpansionSegs): return [GpDb(2,1,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray13()
+
+ class HAWQArray14(HAWQArrayBad):
+ "can't have pair in same preferred role"
+ def __init__(s): HAWQArrayBad.__init__(s)
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'p')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray14()
+
+ class HAWQArray15(HAWQArrayBad):
+ "can't have pair in same preferred role"
+ def __init__(s): HAWQArrayBad.__init__(s)
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'m'), GpDb(2,1,'m')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray15()
+
+ class HAWQArray16(HAWQArrayBad):
+ "all primaries must have mirrors when mirroring"
+ def __init__(s): HAWQArrayBad.__init__(s)
+ def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p')]
+ def getSegmentsAsLoadedFromDb(self): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+ HAWQArray16()
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/system/__init__.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/system/__init__.py b/tools/bin/hawqpylib/system/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/system/configurationImplHAWQ.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/system/configurationImplHAWQ.py b/tools/bin/hawqpylib/system/configurationImplHAWQ.py
new file mode 100644
index 0000000..cf0b656
--- /dev/null
+++ b/tools/bin/hawqpylib/system/configurationImplHAWQ.py
@@ -0,0 +1,474 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This file defines the interface that can be used to fetch and update system
+configuration information.
+"""
+import os, copy
+
+from gppylib.gplog import *
+from gppylib.utils import checkNotNone
+from gppylib.system.configurationInterface import *
+from gppylib.system.ComputeCatalogUpdate import ComputeCatalogUpdate
+from hawqpylib.hawqarray import HAWQArray, HAWQDB, InvalidSegmentConfiguration
+from hawqpylib import hawqarray
+from gppylib.db import dbconn
+from gppylib.commands.gp import get_local_db_mode
+
+logger = get_default_logger()
+
+class GpConfigurationProviderUsingHAWQCatalog(GpConfigurationProvider) :
+ """
+ An implementation of GpConfigurationProvider will provide functionality to
+ fetch and update gpdb system configuration information (as stored in the
+ database)
+
+ Note that the client of this is assuming that the database data is not
+ changed by another party between the time segment data is loaded and when it
+ is updated
+ """
+
+ def __init__(self):
+ self.__masterDbUrl = None
+
+
+ def initializeProvider( self, masterPort ) :
+ """
+ Initialize the provider to get information from the given master db, if
+ it chooses to get its data from the database
+
+ returns self
+ """
+
+ checkNotNone("masterPort", masterPort)
+
+ dbUrl = dbconn.DbURL(port=masterPort, dbname='template1')
+
+ self.__masterDbUrl = dbUrl
+ return self
+
+
+ def loadSystemConfig( self, useUtilityMode ) :
+ """
+ Load all segment information from the configuration source.
+
+ Returns a new HAWQArray object
+ """
+
+ # ensure initializeProvider() was called
+ checkNotNone("masterDbUrl", self.__masterDbUrl)
+
+ logger.info("Obtaining Segment details from master...")
+
+ array = HAWQArray.initFromCatalog(self.__masterDbUrl, useUtilityMode)
+
+ print "### MDD = %s" % (array.master.getDataDirectory())
+ if get_local_db_mode(array.master.getDataDirectory()) != 'UTILITY':
+ logger.debug("Validating configuration...")
+ if not array.is_array_valid():
+ raise InvalidSegmentConfiguration(array)
+
+ return array
+
+
+ def sendPgElogFromMaster( self, msg, sendAlerts):
+ """
+ Send a message from the master database using select pg_elog ...
+ """
+ # ensure initializeProvider() was called
+ checkNotNone("masterDbUrl", self.__masterDbUrl)
+
+ conn = None
+ try:
+ conn = dbconn.connect(self.__masterDbUrl, utility=True)
+ dbconn.execSQL(conn, "SELECT GP_ELOG(" +
+ self.__toSqlCharValue(msg) + "," +
+ ("true" if sendAlerts else "false") + ")")
+ finally:
+ if conn:
+ conn.close()
+
+
+ def updateSystemConfig( self, hawqArray, textForConfigTable, dbIdToForceMirrorRemoveAdd, useUtilityMode, allowPrimary) :
+ """
+ Update the configuration for the given segments in the underlying
+ configuration store to match the current values
+
+ Also resets any dirty bits on saved/updated objects
+
+ @param textForConfigTable label to be used when adding to segment configuration history
+ @param dbIdToForceMirrorRemoveAdd a map of dbid -> True for mirrors for which we should force updating the mirror
+ @param useUtilityMode True if the operations we're doing are expected to run via utility moed
+ @param allowPrimary True if caller authorizes add/remove primary operations (e.g. gpexpand)
+ """
+
+ # ensure initializeProvider() was called
+ checkNotNone("masterDbUrl", self.__masterDbUrl)
+
+ logger.debug("Validating configuration changes...")
+
+ if not hawqArray.is_array_valid():
+ logger.critical("Configuration is invalid")
+ raise InvalidSegmentConfiguration(hawqArray)
+
+ conn = dbconn.connect(self.__masterDbUrl, useUtilityMode, allowSystemTableMods='dml')
+ dbconn.execSQL(conn, "BEGIN")
+
+ # compute what needs to be updated
+ update = ComputeCatalogUpdate(hawqArray, dbIdToForceMirrorRemoveAdd, useUtilityMode, allowPrimary)
+ update.validate()
+
+ # put the mirrors in a map by content id so we can update them later
+ mirror_map = {}
+ for seg in update.mirror_to_add:
+ mirror_map[ seg.getRegistrationOrder() ] = seg
+
+ # reset dbId of new primary and mirror segments to -1
+ # before invoking the operations which will assign them new ids
+ for seg in update.primary_to_add:
+ seg.setSegmentDbId(-1)
+ for seg in update.mirror_to_add:
+ seg.setSegmentDbId(-1)
+
+ # remove mirror segments (e.g. for gpexpand rollback)
+ for seg in update.mirror_to_remove:
+ self.__updateSystemConfigRemoveMirror(conn, seg, textForConfigTable)
+
+ # remove primary segments (e.g for gpexpand rollback)
+ for seg in update.primary_to_remove:
+ self.__updateSystemConfigRemovePrimary(conn, seg, textForConfigTable)
+
+ # add new primary segments
+ for seg in update.primary_to_add:
+ self.__updateSystemConfigAddPrimary(conn, hawqArray, seg, textForConfigTable, mirror_map)
+
+ # add new mirror segments
+ for seg in update.mirror_to_add:
+ self.__updateSystemConfigAddMirror(conn, hawqArray, seg, textForConfigTable)
+
+ # remove and add mirror segments necessitated by catalog attribute update
+ for seg in update.mirror_to_remove_and_add:
+ self.__updateSystemConfigRemoveAddMirror(conn, hawqArray, seg, textForConfigTable)
+
+ # apply updates to existing segments
+ for seg in update.segment_to_update:
+ originalSeg = update.dbsegmap.get(seg.getRegistrationOrder())
+ self.__updateSystemConfigUpdateSegment(conn, hawqArray, seg, originalSeg, textForConfigTable)
+
+ # apply update to fault strategy
+ if hawqArray.getStrategyAsLoadedFromDb() != hawqArray.getFaultStrategy():
+ self.__updateSystemConfigFaultStrategy(conn, hawqArray)
+
+ # commit changes
+ logger.debug("Committing configuration table changes")
+ dbconn.execSQL(conn, "COMMIT")
+ conn.close()
+
+ hawqArray.setStrategyAsLoadedFromDb( [hawqArray.getFaultStrategy()])
+ hawqArray.setSegmentsAsLoadedFromDb([seg.copy() for seg in hawqArray.getDbList()])
+
+
+ def __updateSystemConfigRemoveMirror(self, conn, seg, textForConfigTable):
+ """
+ Remove a mirror segment currently in gp_segment_configuration
+ but not present in the goal configuration and record our action
+ in gp_configuration_history.
+ """
+ dbId = seg.getRegistrationOrder()
+ self.__callSegmentRemoveMirror(conn, seg)
+ self.__insertConfigHistory(conn, dbId, "%s: removed mirror segment configuration" % textForConfigTable)
+
+
+ def __updateSystemConfigRemovePrimary(self, conn, seg, textForConfigTable):
+ """
+ Remove a primary segment currently in gp_segment_configuration
+ but not present in the goal configuration and record our action
+ in gp_configuration_history.
+ """
+ dbId = seg.getRegistration()
+ self.__callSegmentRemove(conn, seg)
+ self.__insertConfigHistory(conn, dbId, "%s: removed primary segment configuration" % textForConfigTable)
+
+
+ def __updateSystemConfigAddPrimary(self, conn, hawqArray, seg, textForConfigTable, mirror_map):
+ """
+ Add a primary segment specified in our goal configuration but
+ which is missing from the current gp_segment_configuration table
+ and record our action in gp_configuration_history.
+ """
+ # lookup the mirror (if any) so that we may correct its content id
+ mirrorseg = mirror_map.get( seg.getRegistrationOrder() )
+
+ # add the new segment
+ dbId = self.__callSegmentAdd(conn, hawqArray, seg)
+
+ # update the segment mode, status and replication port
+ self.__updateSegmentModeStatus(conn, seg)
+
+ # get the newly added segment's content id
+ # MPP-12393 et al WARNING: there is an unusual side effect going on here.
+ # Although gp_add_segment() executed by __callSegmentAdd() above returns
+ # the dbId of the new row in gp_segment_configuration, the following
+ # select from gp_segment_configuration can return 0 rows if the updates
+ # done by __updateSegmentModeStatus() and/or __updateSegmentReplicationPort()
+ # are not done first. Don't change the order of these operations unless you
+ # understand why gp_add_segment() behaves as it does.
+ sql = "select content from pg_catalog.gp_segment_configuration where dbId = %s" % self.__toSqlIntValue(seg.getRegistrationOrder())
+ logger.debug(sql)
+ sqlResult = self.__fetchSingleOutputRow(conn, sql)
+ contentId = int(sqlResult[0])
+
+ # Set the new content id for the primary as well the mirror if present.
+ seg.setSegmentContentId(contentId)
+ if mirrorseg is not None:
+ mirrorseg.setSegmentContentId(contentId)
+
+ self.__insertConfigHistory(conn, dbId, "%s: inserted primary segment configuration with contentid %s" % (textForConfigTable, contentId))
+
+
+ def __updateSystemConfigAddMirror(self, conn, hawqArray, seg, textForConfigTable):
+ """
+ Add a mirror segment specified in our goal configuration but
+ which is missing from the current gp_segment_configuration table
+ and record our action in gp_configuration_history.
+ """
+ dbId = self.__callSegmentAddMirror(conn, hawqArray, seg)
+ self.__updateSegmentModeStatus(conn, seg)
+ self.__insertConfigHistory(conn, dbId, "%s: inserted mirror segment configuration" % textForConfigTable)
+
+
+ def __updateSystemConfigRemoveAddMirror(self, conn, hawqArray, seg, textForConfigTable):
+ """
+ We've been asked to update the mirror in a manner that require
+ it to be removed and then re-added. Perform the tasks
+ and record our action in gp_configuration_history.
+ """
+ origDbId = seg.getRegistrationOrder()
+ self.__callSegmentRemoveMirror(conn, seg)
+
+ dbId = self.__callSegmentAddMirror(conn, hawqArray, seg)
+
+ # now update mode/status since this is not done by gp_add_segment_mirror
+ self.__updateSegmentModeStatus(conn, seg)
+ self.__insertConfigHistory(conn, seg.getRegistrationOrder(),
+ "%s: inserted segment configuration for full recovery or original dbid %s" \
+ % (textForConfigTable, origDbId))
+
+
+ def __updateSystemConfigUpdateSegment(self, conn, hawqArray, seg, originalSeg, textForConfigTable):
+
+ # update mode and status
+ # when adding a mirror, the replication port may change as well
+ #
+ if hawqArray.getFaultStrategy() == hawqarray.FAULT_STRATEGY_NONE:
+ what = "%s: segment hostname and address"
+ self.__updateSegmentAddress(conn, seg)
+ else:
+ what = "%s: segment mode and status"
+ self.__updateSegmentModeStatus(conn, seg)
+
+ if seg.getReplicationPort() != originalSeg.getReplicationPort():
+ what = "%s: segment mode, status, and replication port"
+ self.__updateSegmentReplicationPort(conn, seg)
+
+ self.__insertConfigHistory(conn, seg.getRegistrationOrder(), what % textForConfigTable)
+
+
+ def __updateSystemConfigFaultStrategy(self, conn, hawqArray):
+ """
+ Update the fault strategy.
+ """
+ fs = hawqArray.getFaultStrategy()
+ sql = "UPDATE gp_fault_strategy\n SET fault_strategy = " + self.__toSqlCharValue(fs) + "\n"
+ logger.debug(sql)
+ dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+ def __callSegmentRemoveMirror(self, conn, seg):
+ """
+ Call gp_remove_segment_mirror() to remove the mirror.
+ """
+ sql = "SELECT gp_remove_segment_mirror(%s::int2)" % (self.__toSqlIntValue(seg.getContentId()))
+ logger.debug(sql)
+ result = self.__fetchSingleOutputRow(conn, sql)
+ assert result[0] # must return True
+
+
+ def __callSegmentRemove(self, conn, seg):
+ """
+ Call gp_remove_segment() to remove the primary.
+ """
+ sql = "SELECT gp_remove_segment(%s::int2)" % (self.__toSqlIntValue(seg.getRegistrationOrder()))
+ logger.debug(sql)
+ result = self.__fetchSingleOutputRow(conn, sql)
+ assert result[0]
+
+
+ def __callSegmentAdd(self, conn, hawqArray, seg):
+ """
+ Call gp_add_segment() to add the primary.
+ Return the new segment's dbid.
+ """
+ logger.debug('callSegmentAdd %s' % repr(seg))
+ filespaceMapStr = self.__toSqlFilespaceMapStr(hawqArray, seg)
+
+ sql = "SELECT gp_add_segment(%s, %s, %s, %s)" \
+ % (
+ self.__toSqlTextValue(seg.getHostName()),
+ self.__toSqlTextValue(seg.getAddress()),
+ self.__toSqlIntValue(seg.getPort()),
+ self.__toSqlTextValue(filespaceMapStr)
+ )
+ logger.debug(sql)
+ sqlResult = self.__fetchSingleOutputRow(conn, sql)
+ dbId = int(sqlResult[0])
+ seg.setSegmentDbId(dbId)
+ return dbId
+
+
+ def __callSegmentAddMirror(self, conn, hawqArray, seg):
+ """
+ Call gp_add_segment_mirror() to add the mirror.
+ Return the new segment's dbid.
+ """
+ logger.debug('callSegmentAddMirror %s' % repr(seg))
+ filespaceMapStr = self.__toSqlFilespaceMapStr(hawqArray, seg)
+
+ sql = "SELECT gp_add_segment_mirror(%s::int2, %s, %s, %s, %s, %s)" \
+ % (
+ self.__toSqlIntValue(seg.getRegistrationOrder()),
+ self.__toSqlTextValue(seg.getHostName()),
+ self.__toSqlTextValue(seg.getAddress()),
+ self.__toSqlIntValue(seg.getPort()),
+ self.__toSqlIntValue(seg.getReplicationPort()),
+ self.__toSqlTextValue(filespaceMapStr)
+ )
+
+ logger.debug(sql)
+ sqlResult = self.__fetchSingleOutputRow(conn, sql)
+ dbId = int(sqlResult[0])
+ seg.setSegmentDbId(dbId)
+ return dbId
+
+
+ def __updateSegmentReplicationPort(self, conn, seg):
+ # run an update
+ sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
+ " SET\n" + \
+ " replication_port = " + self.__toSqlIntValue(seg.getReplicationPort()) + "\n" \
+ "WHERE dbid = " + self.__toSqlIntValue(seg.getRegistrationOrder())
+ logger.debug(sql)
+ dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+ def __updateSegmentModeStatus(self, conn, seg):
+ # run an update
+ sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
+ " SET\n" + \
+ " mode = " + self.__toSqlCharValue(seg.getMode()) + ",\n" \
+ " status = " + self.__toSqlCharValue(seg.getStatus()) + "\n" \
+ "WHERE dbid = " + self.__toSqlIntValue(seg.getRegistrationOrder())
+ logger.debug(sql)
+ dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+ def __updateSegmentAddress(self, conn, seg):
+ # run an update
+ sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
+ " SET\n" + \
+ " hostname = " + self.__toSqlCharValue(seg.getHostName()) + ",\n" \
+ " address = " + self.__toSqlCharValue(seg.getAddress()) + "\n" \
+ "WHERE dbid = " + self.__toSqlIntValue(seg.getRegistrationOrder())
+ logger.debug(sql)
+ dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+ def __fetchSingleOutputRow(self, conn, sql, retry=False):
+ """
+ Execute specified SQL command and return what we expect to be a single row.
+ Raise an exception when more or fewer than one row is seen and when more
+ than one row is seen display up to 10 rows as logger warnings.
+ """
+ cursor = dbconn.execSQL(conn, sql)
+ numrows = cursor.rowcount
+ numshown = 0
+ res = None
+ for row in cursor:
+ if numrows != 1:
+ #
+ # if we got back more than one row
+ # we print a few of the rows first
+ # instead of immediately raising an exception
+ #
+ numshown += 1
+ if numshown > 10:
+ break
+ logger.warning('>>> %s' % row)
+ else:
+ assert res is None
+ res = row
+ assert res is not None
+ cursor.close()
+ if numrows != 1:
+ raise Exception("SQL returned %d rows, not 1 as expected:\n%s" % (numrows, sql))
+ return res
+
+
+ def __insertConfigHistory(self, conn, dbId, msg ):
+ # now update change history
+ sql = "INSERT INTO gp_configuration_history (time, dbid, \"desc\") VALUES(\n" \
+ "now(),\n " + \
+ self.__toSqlIntValue(dbId) + ",\n " + \
+ self.__toSqlCharValue(msg) + "\n)"
+ logger.debug(sql)
+ dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+ def __toSqlFilespaceMapStr(self, hawqArray, seg):
+ """
+ Return a string representation of the filespace map suitable
+ for inclusion into the call to gp_add_segment_mirror().
+ """
+ filespaceArrayString = []
+ for fs in hawqArray.getAllFilespaces():
+ path = seg.getFilespaces()[ fs.getOid() ]
+ filespaceArrayString.append("{%s,%s}" % \
+ (self.__toSqlArrayStringValue(fs.getName()), \
+ self.__toSqlArrayStringValue(path)))
+
+ filespaceMapStr = "{" + ",".join(filespaceArrayString) + "}"
+ return filespaceMapStr
+
+ def __toSqlIntValue(self, val):
+ if val is None:
+ return "null"
+ return str(val)
+
+ def __toSqlArrayStringValue(self, val):
+ if val is None:
+ return "null"
+ return '"' + val.replace('"','\\"').replace('\\','\\\\') + '"'
+
+ def __toSqlCharValue(self, val):
+ return self.__toSqlTextValue(val)
+
+ def __toSqlTextValue(self, val):
+ if val is None:
+ return "null"
+ return "'" + val.replace("'","''").replace('\\','\\\\') + "'"
[2/2] incubator-hawq git commit: HAWQ-288. Support hawqfaultinjector
as an internal utility for fault injection purpose - client side
Posted by hu...@apache.org.
HAWQ-288. Support hawqfaultinjector as an internal utility for fault injection purpose - client side
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8cdae414
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8cdae414
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8cdae414
Branch: refs/heads/master
Commit: 8cdae4145822373f2fe8de0083179bcbdece7a75
Parents: 4246999
Author: Ruilong Huo <rh...@pivotal.io>
Authored: Tue Dec 29 17:27:02 2015 +0800
Committer: Ruilong Huo <rh...@pivotal.io>
Committed: Tue Dec 29 17:29:07 2015 +0800
----------------------------------------------------------------------
src/all_src_files.txt | 1 +
src/backend/postmaster/postmaster.c | 25 +-
src/bin/Makefile | 2 +-
src/bin/gpmirrortransition/.gitignore | 2 +
src/bin/gpmirrortransition/Makefile | 49 ++
src/bin/gpmirrortransition/gpmirrortransition.c | 353 +++++++++
src/include/pg_config_manual.h | 2 +-
tools/bin/Makefile | 6 +-
tools/bin/gppylib/commands/base.py | 8 +
tools/bin/hawqfaultinjector | 27 +
tools/bin/hawqpylib/Makefile | 28 +
tools/bin/hawqpylib/hawqarray.py | 729 +++++++++++++++++++
tools/bin/hawqpylib/mainUtils.py | 655 +++++++++++++++++
tools/bin/hawqpylib/programs/__init__.py | 0
tools/bin/hawqpylib/programs/clsInjectFault.py | 441 +++++++++++
.../hawqpylib/system/ComputeCatalogUpdate.py | 442 +++++++++++
tools/bin/hawqpylib/system/__init__.py | 0
.../hawqpylib/system/configurationImplHAWQ.py | 474 ++++++++++++
18 files changed, 3239 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/all_src_files.txt
----------------------------------------------------------------------
diff --git a/src/all_src_files.txt b/src/all_src_files.txt
index 68738d7..f7ca7ad 100644
--- a/src/all_src_files.txt
+++ b/src/all_src_files.txt
@@ -1450,6 +1450,7 @@ bin/gpfusion/common.h
bin/gpfusion/gpbridgeapi.c
bin/gpfusion/gpdbwritableformatter.c
bin/gpfusion/pxf.c
+bin/gpmirrortransition/gpmirrortransition.c
bin/gpupgrade/gpmodcatversion.c
bin/gpupgrade/gpviewcp.c
bin/initdb/initdb.c
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/backend/postmaster/postmaster.c
----------------------------------------------------------------------
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index ff954b8..c2fafb3 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2675,6 +2675,30 @@ readNextStringFromString(char *buf, int *offsetInOut, int length)
return result;
}
+#ifdef FAULT_INJECTOR
+/**
+ * Returns 0 if the string could not be read and sets *wasRead (if wasRead is non-NULL) to false
+ */
+static int
+readIntFromString( char *buf, int *offsetInOut, int length, bool *wasRead)
+{
+ int res;
+ char *val = readNextStringFromString(buf, offsetInOut, length);
+ if (val == NULL)
+ {
+ if (wasRead)
+ *wasRead = false;
+ return 0;
+ }
+
+ if (wasRead)
+ *wasRead = true;
+ res = atoi(val);
+ pfree(val);
+ return res;
+}
+#endif
+
static void sendPrimaryMirrorTransitionResult( const char *msg)
{
StringInfoData buf;
@@ -2792,7 +2816,6 @@ static void processTransitionRequest_getFaultInjectStatus(void * buf, int *offse
static void
processTransitionRequest_faultInject(void * inputBuf, int *offsetPtr, int length)
{
-#undef FAULT_INJECTOR
#ifdef FAULT_INJECTOR
bool wasRead;
char *faultName = readNextStringFromString(inputBuf, offsetPtr, length);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/Makefile
----------------------------------------------------------------------
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 73425b9..9d4919d 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
DIRS = initdb ipcclean pg_ctl pg_dump pgbench \
psql scripts pg_config pg_controldata pg_resetxlog \
- gpfilesystem/hdfs gpupgrade \
+ gpfilesystem/hdfs gpmirrortransition gpupgrade \
gpfusion gp_workfile_mgr gpcheckhdfs gpfdist
all install installdirs uninstall distprep clean distclean maintainer-clean:
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/.gitignore
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/.gitignore b/src/bin/gpmirrortransition/.gitignore
new file mode 100644
index 0000000..e999eb8
--- /dev/null
+++ b/src/bin/gpmirrortransition/.gitignore
@@ -0,0 +1,2 @@
+gp_primarymirror
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/Makefile
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/Makefile b/src/bin/gpmirrortransition/Makefile
new file mode 100755
index 0000000..93e1e33
--- /dev/null
+++ b/src/bin/gpmirrortransition/Makefile
@@ -0,0 +1,49 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/gpmirrortransition
+#
+# Portions Copyright (c) 2009 Greenplum Inc
+#
+# This Makefile was copied from the pg_dump makefile and modified accordingly
+#
+# $PostgreSQL: pgsql/src/bin/gpmirrortransition/Makefile,v 1.62 2006/03/05 15:58:50 momjian Exp $
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "gp_primarymirror - inform a segment of a change in primary/mirror status"
+subdir = src/bin/gpmirrortransition
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+# The frontend doesn't need everything that's in LIBS, some are backend only
+LIBS := $(filter-out -llapack -lblas -lf2c -lresolv, $(LIBS))
+# This program isn't interactive, so doesn't need these
+LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses -lcurl -lssl -lcrypto, $(LIBS))
+
+# the use of tempnam in pg_backup_tar.c causes a warning when using newer versions of GCC
+override CPPFLAGS := -Wno-deprecated-declarations -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS=gpmirrortransition.o $(WIN32RES)
+
+EXTRA_OBJS = $(top_builddir)/src/backend/libpq/ip.o $(top_builddir)/src/backend/postmaster/primary_mirror_transition_client.o $(top_builddir)/src/timezone/gptime.o
+
+all: submake-libpq submake-libpgport submake-backend gp_primarymirror
+
+gp_primarymirror: gpmirrortransition.o $(OBJS) $(EXTRA_OBJS) $(libpq_builddir)/libpq.a
+ $(CC) $(CFLAGS) $(OBJS) $(EXTRA_OBJS) $(libpq_pgport) $(LDFLAGS) $(LIBS) -o $@$(X)
+
+.PHONY: submake-backend
+submake-backend:
+ $(MAKE) -C $(top_builddir)/src/backend/libpq ip.o
+
+install: all installdirs
+ $(INSTALL_PROGRAM) gp_primarymirror$(X) '$(DESTDIR)$(bindir)'/gp_primarymirror$(X)
+
+installdirs:
+ $(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+ rm -f $(addprefix '$(DESTDIR)$(bindir)'/, gp_primarymirror$(X))
+
+clean distclean maintainer-clean:
+ rm -f gp_primarymirror$(X) $(OBJS) gpmirrortransition.o
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/gpmirrortransition.c
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/gpmirrortransition.c b/src/bin/gpmirrortransition/gpmirrortransition.c
new file mode 100755
index 0000000..06e952a
--- /dev/null
+++ b/src/bin/gpmirrortransition/gpmirrortransition.c
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Utility to contact a segment and issue a primary/mirror mode transition
+ */
+#include "postmaster/primary_mirror_mode.h"
+#include "postmaster/primary_mirror_transition_client.h"
+#include <unistd.h>
+#include "libpq/pqcomm.h"
+#include "libpq/ip.h"
+
+#ifdef HAVE_GETOPT_H
+#include <getopt.h>
+#endif
+
+/* buffer size for message to segment */
+#define SEGMENT_MSG_BUF_SIZE 4096
+
+/**
+ * gpmirrortransition builds a message from parameters and transmits it to the given server as
+ * mirror transition message.
+ */
+
+
+static inline bool
+isEmpty(char *str)
+{
+ return str == NULL || str[0] == '\0';
+}
+
+static bool
+gpCheckForNeedToExitFn(void)
+{
+ return false;
+}
+
+static void
+gpMirrorErrorLogFunction(char *str)
+{
+ fprintf(stderr, "%s\n", str);
+}
+
+static void
+gpMirrorReceivedDataCallbackFunction(char *buf)
+{
+ fprintf(stderr, "%s\n", buf);
+}
+
+/**
+ * *addrList will be filled in with the address(es) of the host/port when true is returned
+ *
+ * host/port may not be NULL
+ */
+static bool
+determineTargetHost( struct addrinfo **addrList, char *host, char *port)
+{
+ struct addrinfo hint;
+ int ret;
+
+ *addrList = NULL;
+
+ /* Initialize hint structure */
+ MemSet(&hint, 0, sizeof(hint));
+ hint.ai_socktype = SOCK_STREAM;
+ hint.ai_family = AF_UNSPEC;
+
+ /* Using pghost, so we have to look-up the hostname */
+ hint.ai_family = AF_UNSPEC;
+
+ /* Use pg_getaddrinfo_all() to resolve the address */
+ ret = pg_getaddrinfo_all(host, port, &hint, addrList);
+ if (ret || ! *addrList)
+ {
+ fprintf(stderr,"could not translate host name \"%s\" to address: %s\n", host, gai_strerror(ret));
+ return false;
+ }
+ return true;
+}
+
+static char*
+readFully(FILE *f, int *msgLenOut)
+{
+ int bufSize = 10;
+ char *buf = malloc(bufSize * sizeof(char));
+ int bufOffset = 0;
+
+ if ( buf == NULL )
+ {
+ fprintf(stderr, "Out of memory\n");
+ return NULL;
+ }
+
+ for ( ;; )
+ {
+ int numRead;
+
+ errno = 0;
+ numRead = fread(buf + bufOffset, sizeof(char), bufSize - bufOffset, f);
+ if ( errno != 0 )
+ {
+ if ( feof(f))
+ break;
+ fprintf( stderr, "Error reading input. Error code %d\n", errno);
+ return NULL;
+ }
+ else if ( numRead <= 0 && feof(f))
+ break;
+
+ bufOffset += numRead;
+
+ if ( bufOffset == bufSize )
+ {
+ // increase size!
+ bufSize *= 2;
+ buf = realloc(buf, bufSize * sizeof(char));
+ if ( buf == NULL )
+ {
+ fprintf(stderr, "Out of memory\n");
+ return NULL;
+ }
+ }
+ }
+
+ *msgLenOut = bufOffset;
+ return buf;
+}
+
+
+int
+main(int argc, char **argv)
+{
+ struct addrinfo *addrList = NULL;
+
+ char *host = NULL, *port = NULL, *inputFile = NULL;
+
+ char *mode = NULL;
+ char *status = NULL;
+ char *seg_addr = NULL;
+ char *seg_pm_port = NULL;
+ char *seg_rep_port = NULL;
+ char *peer_addr = NULL;
+ char *peer_pm_port = NULL;
+ char *peer_rep_port = NULL;
+
+ char *num_retries_str = NULL;
+ char *transition_timeout_str = NULL;
+
+ int num_retries = 20;
+ int transition_timeout = 3600; /* 1 hour */
+
+ char opt;
+
+ char msgBuffer[SEGMENT_MSG_BUF_SIZE];
+ char *msg = NULL;
+ int msgLen = 0;
+
+ while ((opt = getopt(argc, argv, "m:s:H:P:R:h:p:r:i:n:t:")) != -1)
+ {
+ switch (opt)
+ {
+ case 'i':
+ inputFile = optarg;
+ break;
+ case 'm':
+ mode = optarg;
+ break;
+ case 's':
+ status = optarg;
+ break;
+ case 'H':
+ seg_addr = optarg;
+ break;
+ case 'P':
+ seg_pm_port = optarg;
+ break;
+ case 'R':
+ seg_rep_port = optarg;
+ break;
+ case 'h':
+ host = peer_addr = optarg;
+ break;
+ case 'p':
+ port = peer_pm_port = optarg;
+ break;
+ case 'r':
+ peer_rep_port = optarg;
+ break;
+ case 'n':
+ num_retries_str = optarg;
+ break;
+ case 't':
+ transition_timeout_str = optarg;
+ break;
+ case '?':
+ fprintf(stderr, "Unrecognized option: -%c\n", optopt);
+ }
+ }
+
+ if (num_retries_str != NULL)
+ {
+ num_retries = (int) strtol(num_retries_str, NULL, 10);
+ if (num_retries == 0 || errno == ERANGE)
+ {
+ fprintf(stderr, "Invalid num_retries (-n) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ }
+
+ if (transition_timeout_str != NULL)
+ {
+ transition_timeout = (int) strtol (transition_timeout_str, NULL, 10);
+ if (transition_timeout == 0 || errno == ERANGE)
+ {
+ fprintf(stderr, "Invalid transition_timeout (-t) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ }
+
+ /* check if input file parameter is passed */
+ if (seg_addr == NULL)
+ {
+ if ( host == NULL)
+ {
+ fprintf(stderr, "Missing host (-h) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if ( port == NULL )
+ {
+ fprintf(stderr, "Missing port (-p) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+
+ /* find the target machine */
+ if ( ! determineTargetHost(&addrList, host, port))
+ {
+ return TRANS_ERRCODE_ERROR_HOST_LOOKUP_FAILED;
+ }
+
+ /* load the input message into memory */
+ if ( inputFile == NULL)
+ {
+ msg = readFully(stdin, &msgLen);
+ }
+ else
+ {
+
+ FILE *f = fopen(inputFile, "r");
+ if ( f == NULL)
+ {
+ fprintf(stderr, "Unable to open file %s\n", inputFile);
+ return TRANS_ERRCODE_ERROR_READING_INPUT;
+ }
+ msg = readFully(f, &msgLen);
+ fclose(f);
+ }
+ }
+ else
+ {
+ /* build message from passed parameters */
+
+ if (mode == NULL)
+ {
+ fprintf(stderr, "Missing mode (-m) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if (status == NULL)
+ {
+ fprintf(stderr, "Missing status (-s) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if (seg_addr == NULL)
+ {
+ fprintf(stderr, "Missing segment host (-H) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if (seg_pm_port == NULL)
+ {
+ fprintf(stderr, "Missing segment postmaster port (-P) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if (seg_rep_port == NULL)
+ {
+ fprintf(stderr, "Missing segment replication port (-R) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if (peer_addr == NULL)
+ {
+ fprintf(stderr, "Missing peer host (-h) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if (peer_pm_port == NULL)
+ {
+ fprintf(stderr, "Missing peer postmaster port (-p) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+ if (peer_rep_port == NULL)
+ {
+ fprintf(stderr, "Missing peer replication port (-r) argument\n");
+ return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+ }
+
+ /* build message */
+ msgLen = snprintf(
+ msgBuffer, sizeof(msgBuffer),
+ "%s\n%s\n%s\n%s\n%s\n%s\n%s\n",
+ mode,
+ status,
+ seg_addr,
+ seg_rep_port,
+ peer_addr,
+ peer_rep_port,
+ peer_pm_port
+ );
+
+ msg = msgBuffer;
+
+ /* find the target machine */
+ if (!determineTargetHost(&addrList, seg_addr, seg_pm_port))
+ {
+ return TRANS_ERRCODE_ERROR_HOST_LOOKUP_FAILED;
+ }
+ }
+
+ /* check for errors while building the message */
+ if ( msg == NULL )
+ {
+ return TRANS_ERRCODE_ERROR_READING_INPUT;
+ }
+
+ /* send the message */
+ PrimaryMirrorTransitionClientInfo client;
+ client.receivedDataCallbackFn = gpMirrorReceivedDataCallbackFunction;
+ client.errorLogFn = gpMirrorErrorLogFunction;
+ client.checkForNeedToExitFn = gpCheckForNeedToExitFn;
+ return sendTransitionMessage(&client, addrList, msg, msgLen, num_retries, transition_timeout);
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/include/pg_config_manual.h
----------------------------------------------------------------------
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index d97a280..002d92e 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -255,7 +255,7 @@
/*
* Enable injecting faults.
*/
-#define FAULT_INJECTOR 0
+#define FAULT_INJECTOR 1
/*
* Enable tracing of resource consumption during sort operations;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/Makefile
----------------------------------------------------------------------
diff --git a/tools/bin/Makefile b/tools/bin/Makefile
index e8a9847..88b9634 100644
--- a/tools/bin/Makefile
+++ b/tools/bin/Makefile
@@ -179,6 +179,7 @@ unittest2:
PYTHON_FILES=`grep -l --exclude=Makefile --exclude=gplogfilter --exclude=gpcheckos --exclude=gpgenfsmap.py --exclude=throttlingD.py "/bin/env python" *`\
`grep -l "/bin/env python" $(SRC)/../sbin/*`\
`find ./gppylib -name "*.py"`\
+ `find ./hawqpylib -name "*.py"`\
`find $(SRC)/../sbin -name "*.py"`
checkcode: pylint
@@ -244,9 +245,10 @@ install: all
${INSTALL_SCRIPT} -d ${bindir}
for files in `find * -maxdepth 0 -type f | grep -x -v -E "${SKIP_INSTALL}"`; do ${INSTALL_SCRIPT} $${files} ${bindir}; done
${MAKE} -C gppylib $@
+ ${MAKE} -C hawqpylib $@
${MAKE} -C ext $@
- for dirs in `find hawqpylib -type d` ; do ${INSTALL_SCRIPT} -d ${bindir}/hawqpylib/$${dirs}; done
- for files in `find hawqpylib -type f` ; do ${INSTALL_SCRIPT} $${files} ${bindir}/hawqpylib/; done
+ # for dirs in `find hawqpylib -type d` ; do ${INSTALL_SCRIPT} -d ${bindir}/hawqpylib/$${dirs}; done
+ # for files in `find hawqpylib -type f` ; do ${INSTALL_SCRIPT} $${files} ${bindir}/hawqpylib/; done
${INSTALL_SCRIPT} -d ${bindir}/lib
for files in `find lib -type f`; do ${INSTALL_SCRIPT} $${files} ${bindir}/lib; done
unset LIBPATH; ./generate-greenplum-path.sh $(prefix) > ${prefix}/greenplum_path.sh
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/gppylib/commands/base.py
----------------------------------------------------------------------
diff --git a/tools/bin/gppylib/commands/base.py b/tools/bin/gppylib/commands/base.py
index 801d85f..4ca68fa 100755
--- a/tools/bin/gppylib/commands/base.py
+++ b/tools/bin/gppylib/commands/base.py
@@ -697,11 +697,19 @@ class Command:
def run(self,validateAfter=False):
faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')
+ print "### DEBUG: ENV[GP_COMMAND_FAULT_POINT] = %s" % (faultPoint if faultPoint else "None")
+ print "### DEBUG: self.name = %s" % ("SomeName" if self.name else "None")
if not faultPoint or (self.name and not self.name.startswith(faultPoint)):
+ print "### DEBUG: EXECUTE name = %s" % ("SomeName" if self.name else "None")
+ print "### DEBUG: EXECUTE cmdStr = %s" % ("SomeCmdStr" if self.cmdStr else "None")
+ print "### DEBUG: EXECUTE context = %s" % ("SomeExecContext" if self.exec_context else "None")
+ print "### DEBUG: EXECUTE remoteHost = %s" % ("SomeRemoteHost" if self.remoteHost else "None")
self.exec_context.execute(self)
else:
# simulate error
+ print "### DEBUG: CommandResult"
self.results = CommandResult(1,'Fault Injection','Fault Injection' ,False,True)
+ print self.results
if validateAfter:
self.validate()
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqfaultinjector
----------------------------------------------------------------------
diff --git a/tools/bin/hawqfaultinjector b/tools/bin/hawqfaultinjector
new file mode 100755
index 0000000..466dddf
--- /dev/null
+++ b/tools/bin/hawqfaultinjector
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from hawqpylib.mainUtils import *
+
+# now reset of imports
+from hawqpylib.programs.clsInjectFault import *
+
+#-------------------------------------------------------------------------
+if __name__ == '__main__':
+ simple_main( HAWQInjectFaultProgram.createParser, HAWQInjectFaultProgram.createProgram)
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/Makefile
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/Makefile b/tools/bin/hawqpylib/Makefile
new file mode 100644
index 0000000..d66f5d7
--- /dev/null
+++ b/tools/bin/hawqpylib/Makefile
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for the managerment utilities
+#
+#-------------------------------------------------------------------------
+
+subdir = tools/bin/hawqpylib
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+SKIP_INSTALL=.gitignore|.p4ignore|.rcfile|Makefile|test/
+
+install:
+ ${INSTALL_SCRIPT} -d ${libdir}/python/hawqpylib
+ @for file in `find * -type f | grep -v -E "${SKIP_INSTALL}"`; \
+ do \
+ echo "install $${file} into ${libdir}/python/hawqpylib/$${file}" ; \
+ ${INSTALL_SCRIPT} $${file} ${libdir}/python/hawqpylib/$${file}; \
+ done
+ @for dirs in `find * -type d | grep -v test` ;\
+ do \
+ ${INSTALL_SCRIPT} -d ${libdir}/python/hawqpylib/$${dirs}; \
+ for file in `find $${dirs} -type f | grep -v -E "${SKIP_INSTALL}"`; do \
+ echo "install $${file} into ${libdir}/python/hawqpylib/$${file}" ; \
+ ${INSTALL_SCRIPT} $${file} ${libdir}/python/hawqpylib/$${file}; \
+ done \
+ done
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/hawqarray.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/hawqarray.py b/tools/bin/hawqpylib/hawqarray.py
new file mode 100755
index 0000000..eef59b9
--- /dev/null
+++ b/tools/bin/hawqpylib/hawqarray.py
@@ -0,0 +1,729 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+ hawqarray.py:
+
+ Contains three classes representing configuration information of a
+ Greenplum array:
+
+ HAWQArray - The primary interface - collection of all HAWQDB within an array
+ HAWQDB - represents configuration information for a single registration_order
+ Segment - collection of all HAWQDB with the same registration_order
+"""
+
+# ============================================================================
+from datetime import date
+import copy
+import traceback
+
+from gppylib.utils import checkNotNone, checkIsInt
+from gppylib import gplog
+from gppylib.db import dbconn
+from gppylib.gpversion import GpVersion
+from gppylib.commands.unix import *
+from hawqpylib.hawqlib import HawqXMLParser
+
+
+SYSTEM_FILESPACE = 3052 # oid of the system filespace
+
+logger = gplog.get_default_logger()
+
+DESTINATION_FILE_SPACES_DIRECTORY = "fs_directory"
+
+ROLE_MASTER = 'm'
+ROLE_STANDBY = 's'
+ROLE_PRIMARY = 'p'
+VALID_ROLES = [ROLE_MASTER, ROLE_STANDBY, ROLE_PRIMARY]
+
+
+STATUS_UP = 'u'
+STATUS_DOWN = 'd'
+VALID_STATUS = [STATUS_UP, STATUS_DOWN]
+
+
+# SegmentState values returned from gp_primarymirror.
+SEGMENT_STATE_NOT_INITIALIZED = "NotInitialized"
+SEGMENT_STATE_INITIALIZATION = "Initialization"
+SEGMENT_STATE_IN_CHANGE_TRACKING_TRANSITION = "InChangeTrackingTransition"
+SEGMENT_STATE_IN_RESYNCTRANSITION = "InResyncTransition"
+SEGMENT_STATE_IN_SYNC_TRANSITION = "InSyncTransition"
+SEGMENT_STATE_READY = "Ready"
+SEGMENT_STATE_CHANGE_TRACKING_DISABLED = "ChangeTrackingDisabled"
+SEGMENT_STATE_FAULT = "Fault"
+SEGMENT_STATE_SHUTDOWN_BACKENDS = "ShutdownBackends"
+SEGMENT_STATE_SHUTDOWN = "Shutdown"
+SEGMENT_STATE_IMMEDIATE_SHUTDOWN = "ImmediateShutdown"
+
+
+MASTER_REGISTRATION_ORDER = 0
+
+class InvalidSegmentConfiguration(Exception):
+ """Exception raised when an invalid hawqarray configuration is
+ read from gp_segment_configuration or an attempt to save an
+ invalid hawqarray configuration is made."""
+ def __init__(self, array):
+ self.array = array
+
+ def __str__(self):
+ return "Invalid HAWQArray: %s" % self.array
+
+# ============================================================================
+# ============================================================================
+class HAWQDB:
+ """
+ HAWQDB class representing configuration information for a single
+ registration_order within a HAWQ cluster.
+ """
+
+ # --------------------------------------------------------------------
+ def __init__(self, registration_order, role, status,
+ hostname, address, port, datadir):
+
+ self.registration_order = registration_order
+ self.role = role
+ self.status = status
+ self.hostname = hostname
+ self.address = address
+ self.port = port
+ self.datadir = datadir
+ self.catdir = datadir
+
+ # Filespace mappings for a HAWQ DB
+ self.filespaces = None
+
+ # Pending filespace creation
+ self.pending_filespace = None
+
+ # Check if the status is 'u' up, 'd' for down
+ self.valid = (status == 'u')
+
+ # --------------------------------------------------------------------
+ def __str__(self):
+ """
+ Construct a printable string representation of a HAWQDB
+ """
+ return "%s:%s:registration_order=%s:status=%s" % (
+ self.hostname,
+ self.datadir,
+ self.registration_order,
+ self.status
+ )
+
+ #
+ # Note that this is not an ideal comparison -- it uses the string representation
+ # for comparison
+ #
+ def __cmp__(self,other):
+ left = repr(self)
+ right = repr(other)
+ if left < right: return -1
+ elif left > right: return 1
+ else: return 0
+
+ def equalIgnoringStatus(self, other):
+ """
+ Return true if none of the "core" attributes (e.g. filespace)
+ of two segments differ, false otherwise.
+
+ This method is used by updateSystemConfig() to know when a catalog
+ change will cause removing and re-adding a mirror segment.
+ """
+ firstStatus = self.getStatus()
+ try:
+
+ # make the elements we don't want to compare match and see if they are then equal
+ self.setStatus(other.getStatus())
+
+ return self == other
+ finally:
+ # restore mode and status after comaprison
+ self.setStatus(firstStatus)
+
+ # --------------------------------------------------------------------
+ @staticmethod
+ def getDataDirPrefix(datadir):
+ retValue = ""
+ retValue = datadir[:datadir.rfind('/')]
+ return retValue
+
+ # --------------------------------------------------------------------
+ @staticmethod
+ def getFileSpaceDirsWithNewSuffix(fileSpaceDictionary, suffix, includeSystemFilespace = True):
+ """
+ This method will take the a dictionary of file spaces and return the same dictionary with the new sufix.
+ """
+ retValue = {}
+
+ for entry in fileSpaceDictionary:
+ if entry == SYSTEM_FILESPACE and includeSystemFilespace == False:
+ continue
+ newDir = HAWQDB.getDataDirPrefix(fileSpaceDictionary[entry])
+ newDir = newDir + "/" + suffix
+ retValue[entry] = newDir
+ return retValue
+
+ # --------------------------------------------------------------------
+ def copy(self):
+ """
+ Creates a copy of the segment, shallow for everything except the filespaces map
+
+ """
+ res = copy.copy(self)
+ res.filespaces = copy.copy(self.filespaces)
+ return res
+
+ # --------------------------------------------------------------------
+ # Six simple helper functions to identify what role a segment plays:
+ # + QD (Query Dispatcher)
+ # + master
+ # + standby master
+ # + QE (Query Executor)
+ # + primary
+ # --------------------------------------------------------------------
+ def isMaster(self):
+ return self.role == ROLE_MASTER
+
+ def isStandby(self):
+ return self.role == ROLE_STANDBY
+
+ def isSegment(self):
+ return self.role == ROLE_PRIMARY
+
+ def isUp(self):
+ return self.status == STATUS_UP
+
+ def isDown(self):
+ return self.status == STATUS_DOWN
+
+ # --------------------------------------------------------------------
+ # getters
+ # --------------------------------------------------------------------
+ def getRegistrationOrder(self):
+ return checkNotNone("registration_order", self.registration_order)
+
+ def getRole(self):
+ return checkNotNone("role", self.role)
+
+ def getStatus(self):
+ return checkNotNone("status", self.status)
+
+ def getPort(self):
+ """
+ Returns the listening port for the postmaster for this segment.
+
+ Note: With file replication the postmaster will not be active for
+ mirrors so nothing will be listening on this port, instead the
+ "replicationPort" is used for primary-mirror communication.
+ """
+ return checkNotNone("port", self.port)
+
+ def getHostName(self):
+ """
+ Returns the actual `hostname` for the host
+
+ Note: use getSegmentAddress for the network address to use
+ """
+ return self.hostname
+
+ def getAddress(self):
+ """
+ Returns the network address to use to contact the segment (i.e. the NIC address).
+
+ """
+ return self.address
+
+ def getDataDirectory(self):
+ """
+ Return the primary datadirectory location for the segment.
+
+ Note: the datadirectory is just one of the filespace locations
+ associated with the segment, calling code should be carefull not
+ to assume that this is the only directory location for this segment.
+
+ Todo: evaluate callers of this function to see if they should really
+ be dealing with a list of filespaces.
+ """
+ return checkNotNone("dataDirectory", self.datadir)
+
+ def getFilespaces(self):
+ """
+ Returns the filespace dictionary of oid->path pairs
+ """
+ return self.filespaces
+
+
+ # --------------------------------------------------------------------
+ # setters
+ # --------------------------------------------------------------------
+ def setRegistrationOrder(self, registration_order):
+ checkNotNone("registration_order", registration_order)
+ checkIsInt("registration_order", registration_order)
+ self.registration_order = registration_order
+
+ def setRole(self, role):
+ checkNotNone("role", role)
+
+ if role not in VALID_ROLES:
+ raise Exception("Invalid role '%s'" % role)
+
+ self.role = role
+
+ def setStatus(self, status):
+ checkNotNone("status", status)
+
+ if status not in VALID_STATUS:
+ raise Exception("Invalid status '%s'" % status)
+
+ self.status = status
+
+ def setPort(self, port):
+ checkNotNone("port", port)
+ checkIsInt("port", port)
+ self.port = port
+
+ def setHostName(self, hostName):
+ # None is allowed -- don't check
+ self.hostname = hostName
+
+ def setAddress(self, address):
+ # None is allowed -- don't check
+ self.address = address
+
+ def setDataDirectory(self, dataDirectory):
+ checkNotNone("dataDirectory", dataDirectory)
+ self.datadir = dataDirectory
+
+ def addFilespace(self, oid, path):
+ """
+ Add a filespace path for this segment.
+
+ Throws:
+ Exception - if a path has already been specified for this segment.
+ """
+
+ # gpfilespace adds a special filespace with oid=None to indicate
+ # the filespace that it is currently building, since the filespace
+ # does not yet exist there is no valid value that could be used.
+ if oid == None:
+ if self.pending_filespace:
+ raise Exception("Duplicate filespace path for registration_order %d" %
+ self.registration_order)
+ self.pending_filespace = path
+ return
+
+ # oids should always be integer values > 0
+ oid = int(oid)
+ assert(oid > 0)
+
+ # The more usual case just sets the filespace in the filespace
+ # dictionary
+ if oid in self.filespaces:
+ raise Exception("Duplicate filespace path for "
+ "registration_order %d filespace %d" % (self.registration_order, oid))
+ self.filespaces[oid] = path
+
+ def getPendingFilespace(self):
+ """
+ Returns the pending filespace location for this segment
+ (called by gpfilespace)
+ """
+ return self.pending_filespace
+
+
+class HAWQFilesystemObj:
+ """
+ List information for a filesystem, as stored in pg_filesystem
+ """
+ def __init__(self, oid, name, shared):
+ self.__oid = oid
+ self.__name = name
+ self.__shared = shared
+
+ def getOid(self):
+ return self.__oid
+
+ def getName(self):
+ return self.__name
+
+ def isShared(self):
+ return self.__shared == True
+
+ @staticmethod
+ def getFilesystemObj(filesystemArr, fsoid):
+ # local storage
+ if fsoid == 0:
+ return None
+ # plugin storage
+ for fsys in filesystemArr:
+ if (fsys.getOid() == fsoid):
+ return fsys
+ raise Exception("Error: invalid file system oid %d" % (fsoid))
+
+class HAWQFilespaceObj:
+ """
+ List information for a filespace, as stored in pg_filespace
+ """
+ def __init__(self, oid, name, fsys):
+ self.__oid = oid
+ self.__name = name
+ self.__fsys = fsys
+
+ def getOid(self):
+ return self.__oid
+
+ def getName(self):
+ return self.__name
+
+ def getFsys(self):
+ return self.__fsys
+
+ def isSystemFilespace(self):
+ return self.__oid == SYSTEM_FILESPACE
+
+
+
+class HAWQArray:
+ """
+ HAWQArray is a python class that describes a HAWQ array.
+
+ A HAWQ array consists of:
+ master - The primary QD for the array
+ standby master - The mirror QD for the array [optional]
+ segment array - an array of segments within the cluster
+
+ Each segment is either a single HAWQDB object, or a primary/mirror pair.
+
+ It can be initialized either from a database connection, in which case
+ it discovers the configuration information by examining the catalog, or
+ via a configuration file.
+ """
+
+ # --------------------------------------------------------------------
+ def __init__(self, hawqdbs):
+ """
+ segmentsInDb is used only be the configurationImpl* providers; it is used to track the state of the
+ segments in the database
+
+ TODO:
+
+ """
+
+ self.master = None
+ self.standbyMaster = None
+ self.segments = []
+ self.numSegments = 0
+ self.version = None
+
+ self.setFilespaces([])
+
+ for hdb in hawqdbs:
+
+ # Handle master
+ if hdb.isMaster():
+ if self.master != None:
+ logger.error("multiple master dbs defined")
+ raise Exception("HAWQArray - multiple master dbs defined")
+ self.master = hdb
+
+ # Handle standby
+ elif hdb.isStandby():
+ if self.standbyMaster != None:
+ logger.error("multiple standby master dbs defined")
+ raise Exception("HAWQArray - multiple standby master dbs defined")
+ self.standbyMaster = hdb
+
+ # Handle segments
+ elif hdb.isSegment():
+ self.addSegment(hdb)
+
+ else:
+ # Not a master, standbymaster, primary, or mirror?
+ # shouldn't even be possible.
+ logger.error("FATAL - invalid dbs defined")
+ raise Exception("Error: HAWQArray() - invalid dbs defined")
+
+ # Make sure HAWQ cluster has a master
+ if self.master is None:
+ logger.error("FATAL - no master defined!")
+ raise Exception("Error: HAWQArray() - no master defined")
+
+ def __str__(self):
+ return "Master: %s\nStandby: %s\nSegments: %s" % (str(self.master),
+ str(self.standbyMaster) if self.standbyMaster else 'Not Configured',
+ "\n".join([str(seg) for seg in self.segments]))
+
+ def addSegment(self, hdb):
+ if hdb.isSegment():
+ self.segments.append(hdb)
+ self.numSegments += 1
+ else:
+ raise Exception("Error: adding invalid segment to HAWQArray")
+
+
+ # --------------------------------------------------------------------
+ @staticmethod
+ def initFromCatalog(dbURL, utility=False, useAllSegmentFileSpaces=False):
+ """
+ Factory method, initializes a HAWQArray from provided database URL
+
+ Please note that -
+ useAllSegmentFilespaces when set to true makes this method add *all* filespaces
+ to the segments of hawqarray. If false, only returns Master/Standby all filespaces
+ This is *hacky* and we know that it is not the right way to design methods/interfaces
+ We are doing this so that we do not affect behavior of existing tools like upgrade, gprecoverseg etc
+ """
+
+ conn = dbconn.connect(dbURL, utility)
+
+ # Get the version from the database:
+ version_str = None
+ for row in dbconn.execSQL(conn, "SELECT version()"):
+ version_str = row[0]
+ version = GpVersion(version_str)
+
+ # Only for HAWQ 2.0
+ if version.getVersionRelease() in ("2.0"):
+
+ hawq_site = HawqXMLParser(GPHOME)
+ master_data_directory = hawq_site.get_value_from_name('hawq_master_directory')
+ segment_data_directory = hawq_site.get_value_from_name('hawq_segment_directory')
+
+ # strategy_rows = dbconn.execSQL(conn, "show gp_fault_action")
+ strategy_rows = []
+
+ config_rows = dbconn.execSQL(conn, '''
+ SELECT sc.registration_order,
+ sc.role,
+ sc.status,
+ sc.hostname,
+ sc.address,
+ sc.port,
+ fs.oid,
+ CASE
+ WHEN sc.registration_order <= 0 THEN '%s'
+ ELSE '%s'
+ END AS datadir
+ FROM pg_catalog.gp_segment_configuration sc,
+ pg_catalog.pg_filespace fs,
+ pg_catalog.pg_filespace_entry fse
+ WHERE fse.fsefsoid = fs.oid
+ ORDER BY sc.registration_order;''' %
+ (master_data_directory, segment_data_directory))
+
+ # All of filesystem is shared storage
+ filesystemRows = dbconn.execSQL(conn, '''
+ SELECT oid, fsysname, true AS fsysshared
+ FROM pg_filesystem
+ ORDER BY fsysname
+ ''')
+
+ filesystemArr = [HAWQFilesystemObj(fsysRow[0], fsysRow[1], fsysRow[2]) for fsysRow in filesystemRows]
+
+ filespaceRows = dbconn.execSQL(conn, '''
+ SELECT oid, fsname, fsfsys AS fsoid
+ FROM pg_filespace
+ WHERE oid != %d
+ ORDER BY fsname;
+ ''' % (SYSTEM_FILESPACE))
+
+ filespaceArr = [HAWQFilespaceObj(fsRow[0], fsRow[1], HAWQFilesystemObj.getFilesystemObj(filesystemArr, fsRow[2])) for fsRow in filespaceRows]
+
+ else:
+ raise Exception("HAWQ version is invalid: %s" % version)
+
+ hawqdbs = []
+ print "### initFromCatalog ###"
+ hdb = None
+ for row in config_rows:
+
+ print row
+
+ # Extract fields from the row
+ (registration_order, role, status, hostname,
+ address, port, fsoid, datadir) = row
+
+ # In GPSQL, only master maintain the filespace information.
+ # if registration_order != MASTER_REGISTRATION_ORDER and \
+ # fsoid != SYSTEM_FILESPACE and \
+ # not useAllSegmentFileSpaces:
+ # print "### initFromCatalog ... continue ###"
+ # continue
+
+ # The query returns all the filespaces for a segment on separate
+ # rows. If this row is the same dbid as the previous row simply
+ # add this filespace to the existing list, otherwise create a
+ # new segment.
+ # if seg and seg.getSegmentRegistrationOrder() == registration_order:
+ # seg.addSegmentFilespace(fsoid, fslocation)
+ # else:
+ # seg = HAWQDB(registration_order, role, status,
+ # hostname, address, port, datadir)
+ # segments.append(seg)
+
+ hdb = HAWQDB(registration_order, role, status,
+ hostname, address, port, datadir)
+ print "### initFromCatalog ... hdb ###"
+ print hdb
+ hawqdbs.append(hdb)
+ print "### initFromCatalog ... hawqdbs ###"
+ print hawqdbs
+
+ conn.close()
+
+ # origSegments = [seg.copy() for seg in segments]
+
+ array = HAWQArray(hawqdbs)
+ array.version = version
+ array.setFilespaces(filespaceArr)
+ array.setFilesystem(filesystemArr)
+
+ return array
+
+ # --------------------------------------------------------------------
+ def is_array_valid(self):
+ """Checks that each array is in a valid state"""
+
+ if self.master.getStatus() != STATUS_UP:
+ return False
+
+ if self.standbyMaster and self.standbyMaster.getStatus() != STATUS_UP:
+ return False
+
+ for seg in self.segments:
+ if not seg.status == STATUS_UP:
+ return False
+ return True
+
+ # --------------------------------------------------------------------
+ def setFilesystem(self, filesystemArr):
+ """
+ @param filesystemArr of GpFilesystemObj objects
+ """
+ self.filesystemArr = [fsys for fsys in filesystemArr]
+
+ def getFilesystem(self):
+ """
+ @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filesystem name
+ """
+ return [fsys for fsys in self.filesystemArr]
+
+ def setFilespaces(self, filespaceArr):
+ """
+ @param filespaceArr of GpFilespaceObj objects
+ """
+ self.filespaceArr = [fs for fs in filespaceArr]
+
+ def getFilespaces(self, includeSystemFilespace=True):
+ """
+ @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+ """
+ return [fs for fs in self.filespaceArr if fs.isSystemFilespace()]
+
+ def getNonSystemFilespaces(self):
+ """
+ @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+ """
+ return [fs for fs in self.filespaceArr if not fs.isSystemFilespace()]
+
+ def getAllFilespaces(self):
+ """
+ @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+ """
+ return [fs for fs in self.filespaceArr]
+
+ # --------------------------------------------------------------
+ def getFileSpaceName(self, filespaceOid):
+ retValue = None
+
+ if self.filespaceArr != None:
+ for entry in self.filespaceArr:
+ if entry.getOid() == filespaceOid:
+ retValue = entry.getName()
+ break
+ return retValue
+
+ # --------------------------------------------------------------
+ def getFileSpaceOid(self, filespaceName):
+ retValue = None
+
+ if self.filespaceArr != None:
+ for entry in self.filespaceArr:
+ if entry.getName() == filespaceName:
+ retValue = entry.getOid()
+ break
+ return retValue
+
+ # --------------------------------------------------------------
+ def isFileSpaceShared(self, filespaceOid):
+ retValue = False
+
+ if self.filespaceArr != None:
+ for entry in self.filespaceArr:
+ if entry.getOid() == filespaceOid:
+ retValue = entry.getFsys() != None and entry.getFsys().isShared()
+ break
+ return retValue
+
+
+ # --------------------------------------------------------------------
+ def getDbList(self):
+ """
+ Return a list of all HAWQDB objects that make up the array
+ """
+ dbs=[]
+
+ dbs.append(self.master)
+
+ if self.standbyMaster:
+ dbs.append(self.standbyMaster)
+
+ dbs.extend(self.getSegDbList())
+
+ return dbs
+
+ # --------------------------------------------------------------------
+ def getHostList(self, includeExpansionSegs = False):
+ """
+ Return a list of all Hosts that make up the array
+ """
+ hostList = []
+
+ hostList.append(self.master.getSegmentHostName())
+
+ if self.standbyMaster:
+ hostList.append(self.standbyMaster.getSegmentHostName())
+
+ dbList = self.getDbList()
+ for db in dbList:
+ if db.getSegmentHostName() in hostList:
+ continue
+ else:
+ hostList.append(db.getSegmentHostName())
+
+ return hostList
+
+ def getSegDbList(self):
+ """Return a list of all HAWQDB objects for all segments in the array"""
+ dbs=[]
+
+ for seg in self.segments:
+ dbs.append(seg)
+
+ return dbs
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/mainUtils.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/mainUtils.py b/tools/bin/hawqpylib/mainUtils.py
new file mode 100644
index 0000000..d0148e9
--- /dev/null
+++ b/tools/bin/hawqpylib/mainUtils.py
@@ -0,0 +1,655 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Line too long - pylint: disable=C0301
+# Invalid name - pylint: disable=C0103
+
+"""
+mainUtils.py
+------------
+
+This file provides a rudimentary framework to support top-level option
+parsing, initialization and cleanup logic common to multiple programs.
+
+It also implements workarounds to make other modules we use like
+GpCoverage() work properly.
+
+The primary interface function is 'simple_main'. For an example of
+how it is expected to be used, see gprecoverseg.
+
+It is anticipated that the functionality of this file will grow as we
+extend common functions of our gp utilities. Please keep this in mind
+and try to avoid placing logic for a specific utility here.
+"""
+
+import os, sys, signal, errno, yaml
+
+gProgramName = os.path.split(sys.argv[0])[-1]
+if sys.version_info < (2, 5, 0):
+ sys.exit(
+'''Error: %s is supported on Python versions 2.5 or greater
+Please upgrade python installed on this machine.''' % gProgramName)
+
+from gppylib import gplog
+from gppylib.commands import gp, unix
+from gppylib.commands.base import ExecutionError
+from gppylib.system import configurationInterface, fileSystemInterface, fileSystemImplOs
+from gppylib.system import osInterface, osImplNative, faultProberInterface, faultProberImplGpdb
+from hawqpylib.system import configurationImplHAWQ
+from optparse import OptionGroup, OptionParser, SUPPRESS_HELP
+from gppylib.gpcoverage import GpCoverage
+from lockfile.pidlockfile import PIDLockFile, LockTimeout
+
+
+def getProgramName():
+ """
+ Return the name of the current top-level program from sys.argv[0]
+ or the programNameOverride option passed to simple_main via mainOptions.
+ """
+ global gProgramName
+ return gProgramName
+
+
+class SimpleMainLock:
+ """
+ Tools like gprecoverseg prohibit running multiple instances at the same time
+ via a simple lock file created in the MASTER_DATA_DIRECTORY. This class takes
+ care of the work to manage this lock as appropriate based on the mainOptions
+ specified.
+
+ Note that in some cases, the utility may want to recursively invoke
+ itself (e.g. gprecoverseg -r). To handle this, the caller may specify
+ the name of an environment variable holding the pid already acquired by
+ the parent process.
+ """
+ def __init__(self, mainOptions):
+ self.pidfilename = mainOptions.get('pidfilename', None) # the file we're using for locking
+ self.parentpidvar = mainOptions.get('parentpidvar', None) # environment variable holding parent pid
+ self.parentpid = None # parent pid which already has the lock
+ self.ppath = None # complete path to the lock file
+ self.pidlockfile = None # PIDLockFile object
+ self.pidfilepid = None # pid of the process which has the lock
+ self.locktorelease = None # PIDLockFile object we should release when done
+
+ if self.parentpidvar is not None and self.parentpidvar in os.environ:
+ self.parentpid = int(os.environ[self.parentpidvar])
+
+ if self.pidfilename is not None:
+ self.ppath = os.path.join(gp.get_masterdatadir(), self.pidfilename)
+ self.pidlockfile = PIDLockFile( self.ppath )
+
+
+ def acquire(self):
+ """
+ Attempts to acquire the lock this process needs to proceed.
+
+ Returns None on successful acquisition of the lock or
+ the pid of the other process which already has the lock.
+ """
+ # nothing to do if utiliity requires no locking
+ if self.pidlockfile is None:
+ return None
+
+ # look for a lock file
+ self.pidfilepid = self.pidlockfile.read_pid()
+ if self.pidfilepid is not None:
+
+ # we found a lock file
+ # allow the process to proceed if the locker was our parent
+ if self.pidfilepid == self.parentpid:
+ return None
+
+ # cleanup stale locks
+ try:
+ os.kill(self.pidfilepid, signal.SIG_DFL)
+ except OSError, exc:
+ if exc.errno == errno.ESRCH:
+ self.pidlockfile.break_lock()
+ self.pidfilepid = None
+
+ # try and acquire the lock
+ try:
+ self.pidlockfile.acquire(1)
+
+ except LockTimeout:
+ self.pidfilepid = self.pidlockfile.read_pid()
+ return self.pidfilepid
+
+ # we have the lock
+ # prepare for a later call to release() and take good
+ # care of the process environment for the sake of our children
+ self.locktorelease = self.pidlockfile
+ self.pidfilepid = self.pidlockfile.read_pid()
+ if self.parentpidvar is not None:
+ os.environ[self.parentpidvar] = str(self.pidfilepid)
+
+ return None
+
+
+ def release(self):
+ """
+ Releases the lock this process acquired.
+ """
+ if self.locktorelease is not None:
+ self.locktorelease.release()
+ self.locktorelease = None
+
+
+
+#
+# exceptions we handle specially by the simple_main framework.
+#
+
+class ProgramArgumentValidationException(Exception):
+ """
+ Throw this out to main to have the message possibly
+ printed with a help suggestion.
+ """
+ def __init__(self, msg, shouldPrintHelp=False):
+ "init"
+ Exception.__init__(self)
+ self.__shouldPrintHelp = shouldPrintHelp
+ self.__msg = msg
+
+ def shouldPrintHelp(self):
+ "shouldPrintHelp"
+ return self.__shouldPrintHelp
+
+ def getMessage(self):
+ "getMessage"
+ return self.__msg
+
+
+class ExceptionNoStackTraceNeeded(Exception):
+ """
+ Our code throws this exception when we encounter a condition
+ we know can arise which demands immediate termination.
+ """
+ pass
+
+
+class UserAbortedException(Exception):
+ """
+ UserAbortedException should be thrown when a user decides to stop the
+ program (at a y/n prompt, for example).
+ """
+ pass
+
+
+def simple_main( createOptionParserFn, createCommandFn, mainOptions=None) :
+ """
+ createOptionParserFn : a function that takes no arguments and returns an OptParser
+ createCommandFn : a function that takes two argument (the options and the args (those that are not processed into
+ options) and returns an object that has "run" and "cleanup" functions. Its "run" function must
+ run and return an exit code. "cleanup" will be called to clean up before the program exits;
+ this can be used to clean up, for example, to clean up a worker pool
+
+ mainOptions can include: forceQuietOutput (map to bool),
+ programNameOverride (map to string)
+ suppressStartupLogMessage (map to bool)
+ useHelperToolLogging (map to bool)
+ setNonuserOnToolLogger (map to bool, defaults to false)
+ pidfilename (string)
+ parentpidvar (string)
+
+ """
+ coverage = GpCoverage()
+ coverage.start()
+ try:
+ simple_main_internal(createOptionParserFn, createCommandFn, mainOptions)
+ finally:
+ coverage.stop()
+ coverage.generate_report()
+
+
+def simple_main_internal(createOptionParserFn, createCommandFn, mainOptions):
+ """
+ If caller specifies 'pidfilename' in mainOptions then we manage the
+ specified pid file within the MASTER_DATA_DIRECTORY before proceeding
+ to execute the specified program and we clean up the pid file when
+ we're done.
+ """
+ sml = None
+ if mainOptions is not None and 'pidfilename' in mainOptions:
+ sml = SimpleMainLock(mainOptions)
+ otherpid = sml.acquire()
+ if otherpid is not None:
+ logger = gplog.get_default_logger()
+ logger.error("An instance of %s is already running (pid %s)" % (getProgramName(), otherpid))
+ return
+
+ # at this point we have whatever lock we require
+ try:
+ simple_main_locked(createOptionParserFn, createCommandFn, mainOptions)
+ finally:
+ if sml is not None:
+ sml.release()
+
+
+def simple_main_locked(createOptionParserFn, createCommandFn, mainOptions):
+ """
+ Not to be called externally -- use simple_main instead
+ """
+ logger = gplog.get_default_logger()
+
+ configurationInterface.registerConfigurationProvider( configurationImplHAWQ.GpConfigurationProviderUsingHAWQCatalog())
+ fileSystemInterface.registerFileSystemProvider( fileSystemImplOs.GpFileSystemProviderUsingOs())
+ osInterface.registerOsProvider( osImplNative.GpOsProviderUsingNative())
+ faultProberInterface.registerFaultProber( faultProberImplGpdb.GpFaultProberImplGpdb())
+
+ commandObject = None
+ parser = None
+
+ forceQuiet = mainOptions is not None and mainOptions.get("forceQuietOutput")
+ options = None
+
+ if mainOptions is not None and mainOptions.get("programNameOverride"):
+ global gProgramName
+ gProgramName = mainOptions.get("programNameOverride")
+ suppressStartupLogMessage = mainOptions is not None and mainOptions.get("suppressStartupLogMessage")
+
+ useHelperToolLogging = mainOptions is not None and mainOptions.get("useHelperToolLogging")
+ nonuser = True if mainOptions is not None and mainOptions.get("setNonuserOnToolLogger") else False
+
+ # NOTE: if this logic is changed then also change test_main in testUtils.py
+ try:
+ execname = getProgramName()
+ hostname = unix.getLocalHostname()
+ username = unix.getUserName()
+
+ parser = createOptionParserFn()
+ (options, args) = parser.parse_args()
+
+ if useHelperToolLogging:
+ gplog.setup_helper_tool_logging(execname, hostname, username)
+ else:
+ gplog.setup_tool_logging(execname, hostname, username,
+ logdir=options.ensure_value("logfileDirectory", None), nonuser=nonuser )
+
+ if forceQuiet:
+ gplog.quiet_stdout_logging()
+ else:
+ if options.ensure_value("verbose", False):
+ gplog.enable_verbose_logging()
+ if options.ensure_value("quiet", False):
+ gplog.quiet_stdout_logging()
+
+ if options.ensure_value("masterDataDirectory", None) is not None:
+ options.master_data_directory = os.path.abspath(options.masterDataDirectory)
+
+ if not suppressStartupLogMessage:
+ logger.info("Starting %s with args: %s" % (gProgramName, ' '.join(sys.argv[1:])))
+
+ commandObject = createCommandFn(options, args)
+ exitCode = commandObject.run()
+ sys.exit(exitCode)
+
+ except ProgramArgumentValidationException, e:
+ if e.shouldPrintHelp():
+ parser.print_help()
+ logger.error("%s: error: %s" %(gProgramName, e.getMessage()))
+ sys.exit(2)
+ except ExceptionNoStackTraceNeeded, e:
+ logger.error( "%s error: %s" % (gProgramName, e))
+ sys.exit(2)
+ except UserAbortedException, e:
+ logger.info("User abort requested, Exiting...")
+ sys.exit(4)
+ except ExecutionError, e:
+ logger.fatal("Error occurred: %s\n Command was: '%s'\n"
+ "rc=%d, stdout='%s', stderr='%s'" %\
+ (e.summary,e.cmd.cmdStr, e.cmd.results.rc, e.cmd.results.stdout,
+ e.cmd.results.stderr ))
+ sys.exit(2)
+ except Exception, e:
+ if options is None:
+ logger.exception("%s failed. exiting...", gProgramName)
+ else:
+ if options.ensure_value("verbose", False):
+ logger.exception("%s failed. exiting...", gProgramName)
+ else:
+ logger.fatal("%s failed. (Reason='%s') exiting..." % (gProgramName, e))
+ sys.exit(2)
+ except KeyboardInterrupt:
+ sys.exit('\nUser Interrupted')
+ finally:
+ if commandObject:
+ commandObject.cleanup()
+
+
+def addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption, includeUsageOption=False):
+ """
+ Add the standard options for help and logging
+ to the specified parser object.
+ """
+ parser.set_usage('%prog [--help] [options] ')
+ parser.remove_option('-h')
+
+ addTo = parser
+ addTo.add_option('-h', '-?', '--help', action='help',
+ help='show this help message and exit')
+ if includeUsageOption:
+ parser.add_option('--usage', action="briefhelp")
+
+ addTo = OptionGroup(parser, "Logging Options")
+ parser.add_option_group(addTo)
+ addTo.add_option('-v', '--verbose', action='store_true',
+ help='debug output.')
+ addTo.add_option('-q', '--quiet', action='store_true',
+ help='suppress status messages')
+ addTo.add_option("-l", None, dest="logfileDirectory", metavar="<directory>", type="string",
+ help="Logfile directory")
+
+ if includeNonInteractiveOption:
+ addTo.add_option('-a', dest="interactive" , action='store_false', default=True,
+ help="quiet mode, do not require user input for confirmations")
+
+
+def addMasterDirectoryOptionForSingleClusterProgram(addTo):
+ """
+ Add the -d master directory option to the specified parser object
+ which is intended to provide the value of the master data directory.
+
+ For programs that operate on multiple clusters at once, this function/option
+ is not appropriate.
+ """
+ addTo.add_option('-d', '--master_data_directory', type='string',
+ dest="masterDataDirectory",
+ metavar="<master data directory>",
+ help="Optional. The master host data directory. If not specified, the value set"\
+ "for $MASTER_DATA_DIRECTORY will be used.")
+
+
+
+#
+# YamlMain
+#
+
+def get_yaml(targetclass):
+ "get_yaml"
+
+ # doc - class's doc string
+ # pos - where YAML starts in doc
+ # ystr - YAML string extracted from doc
+
+ if not hasattr(targetclass, '_yaml') or targetclass._yaml is None:
+ doc = targetclass.__doc__
+ pos = doc.find('%YAML')
+ assert pos >= 0, "targetclass doc string is missing %YAML plan"
+ ystr = doc[pos:].replace('\n ','\n')
+ targetclass._yaml = yaml.load(ystr)
+ return targetclass._yaml
+
+
+class YamlMain:
+ "YamlMain"
+
+ def __init__(self):
+ "Parse arguments based on yaml docstring"
+ self.current = None
+ self.plan = None
+ self.scenario_name = None
+ self.logger = None
+ self.logfilename = None
+ self.errmsg = None
+
+ self.parser = YamlOptions(self).parser
+ self.options, self.args = self.parser.parse_args()
+ self.options.quiet = self.options.q
+ self.options.verbose = self.options.v
+
+
+ #
+ # simple_main interface
+ #
+ def __call__(self, *args):
+ "Allows us to use self as the create_parser and create_program functions in call to simple_main"
+ return self
+
+ def parse_args(self):
+ "Called by simple_main to obtain results from parser returned by create_parser"
+ return self.options, self.args
+
+ def run(self):
+ "Called by simple_main to execute the program returned by create_program"
+ self.plan = Plan(self)
+ self.scenario_name = self.plan.name
+ self.logger = self.plan.logger
+ self.logfilename = self.plan.logfilename
+ self.errmsg = self.plan.errmsg
+ self.current = []
+ self.plan.run()
+
+ def cleanup(self):
+ "Called by simple_main to cleanup after program returned by create_program finishes"
+ pass
+
+ def simple(self):
+ "Delegates setup and control to mainUtils.simple_main"
+ simple_main(self, self)
+
+
+#
+# option parsing
+#
+
+class YamlOptions:
+ "YamlOptions"
+
+ def __init__(self, target):
+ """
+ Scan the class doc string of the given object, looking for the %YAML
+ containing the option specification. Parse the YAML and setup the
+ corresponding OptionParser object.
+ """
+ # target - options object (input)
+ # gname - option group name
+
+ self.y = get_yaml(target.__class__)
+ self.parser = OptionParser( description=self.y['Description'], version='%prog version $Revision$')
+ self.parser.remove_option('-h')
+ self.parser.set_usage(self.y['Usage'])
+ self.opty = self.y['Options']
+ for gname in self.opty.get('Groups', []):
+ self._register_group(gname)
+
+
+ def _register_group(self, gname):
+ """
+ Register options for the specified option group name to the OptionParser
+ using an OptionGroup unless the group name starts with 'Help' in which
+ case we just register the options with the top level OptionParser object.
+ """
+ # gname - option group name (input)
+ # gy - option group YAML object
+ # grp - option group object
+ # tgt - where to add options (parser or option group)
+ # optkey - comma separated list of option flags
+ # optval - help string or dict with detailed option settings
+ # listargs - list of option flags (e.g. ['-h', '--help'])
+ # dictargs - key/value arguments to add_option
+
+ gy = self.opty.get(gname, None)
+ if gname.startswith('Help'):
+ grp = None
+ tgt = self.parser
+ else:
+ grp = OptionGroup(self.parser, gname)
+ tgt = grp
+
+ for optkey, optval in gy.items():
+ listargs = optkey.split(',')
+ if type(optval) == type(''):
+ # short form: optval is just a help string
+ dictargs = {
+ 'action': 'store_true',
+ 'help': optval
+ }
+ else:
+ # optval is the complete option specification
+ dictargs = optval
+
+ # hide hidden options
+ if dictargs.get('help','').startswith('hidden'):
+ dictargs['help'] = SUPPRESS_HELP
+
+ #print 'adding', listargs, dictargs
+ tgt.add_option(*listargs, **dictargs)
+
+ if grp is not None:
+ self.parser.add_option_group(grp)
+
+
+
+#
+# plan execution
+#
+
+class Task:
+ "Task"
+
+ def __init__(self, key, name, subtasks=None):
+ self.Key = key # task key
+ self.Name = name # task name
+ self.SubTasks = subtasks # subtasks, if any
+ self.Func = None # task function, set by _task
+
+
+ def _print(self, main, prefix):
+ print '%s %s %s:' % (prefix, self.Key, self.Name)
+
+ def _debug(self, main, prefix):
+ main.logger.debug('Execution Plan:%s %s %s%s' % (prefix, self.Key, self.Name, ':' if self.SubTasks else ''))
+
+ def _run(self, main, prefix):
+ main.logger.debug(' Now Executing:%s %s %s' % (prefix, self.Key, self.Name))
+ if self.Func:
+ self.Func()
+
+
+class Exit(Exception):
+ def __init__(self, rc, code=None, call_support=False):
+ Exception.__init__(self)
+ self.code = code
+ self.prm = sys._getframe(1).f_locals
+ self.rc = rc
+ self.call_support = call_support
+
+
+class Plan:
+ "Plan"
+
+ def __init__(self, main):
+ """
+ Create cached yaml from class doc string of the given object,
+ looking for the %YAML indicating the beginning of the object's YAML plan and parse it.
+ Build the plan stages and tasks for the specified scenario.
+ """
+ # main - object with yaml scenarios (input)
+ # sy - Stage yaml
+
+ self.logger = gplog.get_default_logger()
+ self.logfilename = gplog.get_logfile()
+
+ self.main = main
+ self.y = get_yaml(main.__class__)
+ self.name = main.options.scenario
+ if not self.name:
+ self.name = self.y['Default Scenario']
+ self.scenario = self.y['Scenarios'][self.name]
+ self.errors = self.y['Errors']
+ self.Tasks = [ self._task(ty) for ty in self.scenario ]
+
+
+ def _task(self, ty):
+ "Invoked by __init__ to build a top-level task from the YAML"
+
+ # ty - Task yaml (input)
+ # tyk - Task yaml key
+ # tyv - Task yaml value
+ # sty - Sub Task yaml
+ # t - Task (returned)
+
+ for tyk, tyv in ty.items():
+ key, workers = tyk.split(None, 1)
+ subtasks = [ self._subtask(sty) for sty in tyv ]
+ t = Task(key, workers, subtasks)
+ return t
+
+ def _subtask(self, sty):
+ "Invoked by _stage to build a task from the YAML"
+
+ # sty - Sub Task yaml (input)
+ # st - Sub Task (returned)
+
+ key, rest = sty.split(None, 1)
+ st = Task(key, rest)
+ fn = st.Name.lower().replace(' ','_')
+ try:
+ st.Func = getattr(self.main, fn)
+ except AttributeError, e:
+ raise Exception("Failed to lookup '%s' for sub task '%s': %s" % (fn, st.Name, str(e)))
+ return st
+
+
+
+ def _dotasks(self, subtasks, prefix, action):
+ "Apply an action to each subtask recursively"
+
+ # st - Sub Task
+
+ for st in subtasks or []:
+ self.main.current.append(st)
+ action(st, self.main, prefix)
+ self._dotasks(st.SubTasks, ' '+prefix, action)
+ self.main.current.pop()
+
+
+ def _print(self):
+ "Print in YAML form."
+
+ print '%s:' % self.name
+ self._dotasks(self.Tasks, ' -', lambda t,m,p:t._print(m,p))
+
+
+ def run(self):
+ "Run the stages and tasks."
+
+ self.logger.debug('Execution Plan: %s' % self.name)
+ self._dotasks(self.Tasks, ' -', lambda t,m,p:t._debug(m,p))
+
+ self.logger.debug(' Now Executing: %s' % self.name)
+ try:
+ self._dotasks(self.Tasks, ' -', lambda t,m,p:t._run(m,p))
+ except Exit, e:
+ self.exit(e.code, e.prm, e.rc, e.call_support)
+
+
+ def errmsg(self, code, prm={}):
+ "Return a formatted error message"
+ return self.errors[code] % prm
+
+
+ def exit(self, code=None, prm={}, rc=1, call_support=False):
+ "Terminate the application"
+ if code:
+ msg = self.errmsg(code, prm)
+ self.logger.error(msg)
+ if call_support:
+ self.logger.error('Please send %s to Greenplum support.' % self.logfilename)
+ self.logger.debug('exiting with status %(rc)s' % locals())
+ sys.exit(rc)
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/programs/__init__.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/programs/__init__.py b/tools/bin/hawqpylib/programs/__init__.py
new file mode 100644
index 0000000..e69de29