You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2015/12/29 10:29:52 UTC

[1/2] incubator-hawq git commit: HAWQ-288. Support hawqfaultinjector as an internal utility for fault injection purpose - client side

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 424699998 -> 8cdae4145


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/programs/clsInjectFault.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/programs/clsInjectFault.py b/tools/bin/hawqpylib/programs/clsInjectFault.py
new file mode 100644
index 0000000..4cf8e36
--- /dev/null
+++ b/tools/bin/hawqpylib/programs/clsInjectFault.py
@@ -0,0 +1,441 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# Used to inject faults into the file replication code
+#
+
+#
+# THIS IMPORT MUST COME FIRST
+#
+# import mainUtils FIRST to get python version check
+from hawqpylib.mainUtils import *
+
+from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRESS_USAGE
+
+from gppylib.gpparseopts import OptParser, OptChecker
+from gppylib.utils import toNonNoneString
+from gppylib import gplog
+from hawqpylib import hawqarray
+from gppylib.commands import base
+from gppylib.commands import unix
+from gppylib.commands import gp
+from gppylib.commands import pg
+from gppylib.db import catalog
+from gppylib.db import dbconn
+from gppylib.system import configurationInterface, fileSystemInterface, osInterface
+from gppylib import pgconf
+from gppylib.testold.testUtils import testOutput
+from gppylib.system.environment import GpMasterEnvironment
+
+logger = gplog.get_default_logger()
+
+#-------------------------------------------------------------------------
+class HAWQInjectFaultProgram:
+    #
+    # Constructor:
+    #
+    # @param options the options as returned by the options parser
+    #
+    def __init__(self, options):
+        self.options = options
+
+    #
+    # Build the fault transition message.  Fault options themselves will NOT be validated by the
+    #   client -- the server will do that when we send the fault
+    #
+    def buildMessage(self) :
+
+        # note that we don't validate these strings -- if they contain newlines
+        # (and so mess up the transition protocol) then the server will error
+        result = ["faultInject"]
+        result.append(toNonNoneString(self.options.faultName))
+        result.append(toNonNoneString(self.options.type))
+        result.append(toNonNoneString(self.options.ddlStatement))
+        result.append(toNonNoneString(self.options.databaseName))
+        result.append(toNonNoneString(self.options.tableName))
+        result.append(toNonNoneString(self.options.numOccurrences))
+        result.append(toNonNoneString(self.options.sleepTimeSeconds))
+        return '\n'.join(result)
+
+    #
+    # build a message that will get status of the fault
+    #
+    def buildGetStatusMessage(self) :
+        # note that we don't validate this string then the server may error
+        result = ["getFaultInjectStatus"]
+        result.append(toNonNoneString(self.options.faultName))
+        return '\n'.join(result)
+
+    #
+    # return True if the segment matches the given role, False otherwise
+    #
+    def isMatchingRole(self, role, hawqdb):
+        hdbRole = hawqdb.getRole()
+        if role == "master":
+            return hdbRole == 'm'
+        elif role == "standby":
+            return hdbRole == 's'
+        elif role == "primary":
+            return hdbRole == 'p'
+        else:
+            raise ProgramArgumentValidationException("Invalid role specified: %s" % role)
+
+    #
+    # load the segments and filter to the ones we should target
+    #
+    def loadTargetSegments(self) :
+
+        targetHost              = self.options.targetHost
+        targetRole              = self.options.targetRole
+        targetRegistrationOrder = self.options.targetRegistrationOrder
+
+        if targetHost is None and targetRegistrationOrder is None:
+            raise ProgramArgumentValidationException(\
+                            "neither --host nor --registration_order is specified.  " \
+                            "Exactly one should be specified.")
+        if targetHost is not None and targetRegistrationOrder is not None:
+            raise ProgramArgumentValidationException(\
+                            "both --host and --registration_order are specified.  " \
+                            "Exactly one should be specified.")
+        if targetHost is not None and targetRole is None:
+            raise ProgramArgumentValidationException(\
+                            "--role is not specified when --host is specified.  " \
+                            "Role is required when targeting a host.")
+        if targetRegistrationOrder is not None and targetRole is not None:
+            raise ProgramArgumentValidationException(\
+                            "--role is specified when --registration_order is specified.  " \
+                            "Role should not be specified when targeting a single registration_order.")
+
+        #
+        # load from master db
+        #
+        masterPort = self.options.masterPort
+        if masterPort is None:
+            gpEnv = GpMasterEnvironment(self.options.masterDataDirectory, False)
+            masterPort = gpEnv.getMasterPort()
+        conf = configurationInterface.getConfigurationProvider().initializeProvider(masterPort)
+        hawqArray = conf.loadSystemConfig(useUtilityMode=True)
+        hawqdbs = hawqArray.getDbList()
+        
+        #
+        # prune gpArray according to filter settings
+        #
+        hawqdbs = [hdb for hdb in hawqdbs if hdb.isSegment()]
+        if targetHost is not None and targetHost != "ALL":
+            hawqdbs = [hdb for hdb in hawqdbs if hdb.getHostName() == targetHost]
+
+        if targetRegistrationOrder is not None:
+            hawqdbs = gpArray.getDbList()
+            regorder = int(targetRegistrationOrder)
+            hawqdbs = [hdb for hdb in hawqdbs if hdb.getRegistrationOrder() == regorder]
+
+        if targetRole is not None:
+            hawqdbs = [hdb for hdb in hawqdbs if self.isMatchingRole(targetRole, hdb)]
+
+        # only DOWN segments remaining?  Error out
+        downhawqdbs = [hdb for hdb in hawqdbs if hdb.getStatus() != 'u']
+        if len(downhawqdbs) > 0:
+            downhawqdbStr = "\n     Down Segment: "
+            raise ExceptionNoStackTraceNeeded(
+                "Unable to inject fault.  At least one segment is marked as down in the database.%s%s" % 
+                (downhawqdbStr, downhawqdbStr.join([str(downhdb) for downhdb in downhawqdbs])))
+
+        print "### DEBUG: loadTargetSegments"
+        print "### DEBUG: HAWQDBS "
+        print hawqdbs
+        return hawqdbs
+
+    #
+    # write string to a temporary file that will be deleted on completion
+    #
+    def writeToTempFile(self, str):
+        inputFile = fileSystemInterface.getFileSystemProvider().createNamedTemporaryFile()
+        inputFile.write(str)
+        inputFile.flush()
+        return inputFile
+
+    def injectFaults(self, segments, messageText):
+
+        inputFile = self.writeToTempFile(messageText)
+        logger.info("Injecting fault on %d segment(s)", len(segments))
+        testOutput("Injecting fault on %d segment(s)" % len(segments))
+
+        # run the command in serial to each target
+        for segment in segments :
+            logger.info("Injecting fault on %s", segment)
+            print "### DEBUG: injectFaults -> SendFilerepTransitionMessage inputFile.name = %s" % inputFile.name
+            print "### DEBUG: injectFaults -> SendFilerepTransitionMessage segment.getPort = %s" % segment.getPort()
+            print "### DEBUG: injectFaults -> SendFilerepTransitionMessage base.LOCAL = %s" % base.LOCAL
+            print "### DEBUG: injectFaults -> SendFilerepTransitionMessage segment.getHostName = %s" % segment.getHostName()
+            # if there is an error then an exception is raised by command execution
+            cmd = gp.SendFilerepTransitionMessage("Fault Injector", inputFile.name, \
+                                        segment.getPort(), base.LOCAL, segment.getHostName())
+            
+            print "### DEBUG: injectFaults -> Command = %s ###" % cmd
+            print "### DEBUG: injectFaults -> SendFilerepTransitionMessage ###"
+            cmd.run(validateAfter=False)
+
+ 
+            print "### DEBUG: injectFaults -> Run ###"
+            # validate ourselves
+            if cmd.results.rc != 0:
+                print "### DEBUG: injectFaults -> Injection Failed ###"
+                raise ExceptionNoStackTraceNeeded("Injection Failed: %s" % cmd.results.stderr)
+            elif self.options.type == "status":
+                # server side prints nice success messages on status...so print it
+                print "### DEBUG: injectFaults -> Report Status ###"
+                str = cmd.results.stderr
+                print "### DEBUG: injectFaults -> STDERROR = %s ###" % str
+                if str.startswith("Success: "):
+                    str = str.replace("Success: ", "", 1)
+                logger.info("%s", str)
+            
+            print "### DEBUG: injectFaults -> END ###" 
+        inputFile.close()
+
+    def waitForFaults(self, segments, statusQueryText):
+        inputFile = self.writeToTempFile(statusQueryText)
+        segments = [seg for seg in segments]
+        sleepTimeSec = 0.115610199
+        sleepTimeMultipler = 1.5  # sleepTimeMultipler * sleepTimeMultipler^11 ~= 10
+
+        logger.info("Awaiting fault on %d segment(s)", len(segments))
+        while len(segments) > 0 :
+            logger.info("Sleeping %.2f seconds " % sleepTimeSec)
+            osInterface.getOsProvider().sleep(sleepTimeSec)
+
+            segmentsForNextPass = []
+            for segment in segments:
+                logger.info("Checking for fault completion on %s", segment)
+                cmd = gp.SendFilerepTransitionMessage.local("Fault Injector Status Check", inputFile.name, \
+                                                            segment.getPort(), segment.getHostName())
+                resultStr = cmd.results.stderr.strip()
+                if resultStr == "Success: waitMore":
+                    segmentsForNextPass.append(segment)
+                elif resultStr != "Success: done":
+                    raise Exception("Unexpected result from server %s" % resultStr)
+
+            segments = segmentsForNextPass
+            sleepTimeSec = sleepTimeSec if sleepTimeSec > 7 else sleepTimeSec * sleepTimeMultipler
+        inputFile.close()
+
+    def isSyncableFaultType(self):
+        type = self.options.type
+        return type != "reset" and type != "status"
+
+    ######
+    def run(self):
+
+        if self.options.masterPort is not None and self.options.masterDataDirectory is not None:
+            raise ProgramArgumentValidationException("both master port and master data directory options are specified;" \
+                    " at most one should be specified, or specify none to use MASTER_DATA_DIRECTORY environment variable")
+
+        print "### DEBUG: Build Message ###"
+        messageText = self.buildMessage()
+        print "### DEBUG: Load Target Segments ###"
+        segments = self.loadTargetSegments()
+
+        # inject, maybe wait
+        print "### DEBUG: Inject Faults ###"
+        self.injectFaults(segments, messageText)
+        if self.isSyncableFaultType() :
+            statusQueryText = self.buildGetStatusMessage()
+            self.waitForFaults(segments, statusQueryText)
+
+        logger.info("DONE")
+        return 0  # success -- exit code 0!
+
+    def cleanup(self):
+        pass
+
+    #-------------------------------------------------------------------------
+    @staticmethod
+    def createParser():
+        description = ("""
+        This utility is NOT SUPPORTED and is for internal-use only.
+
+        Used to inject faults into the file replication code.
+        """)
+
+        help = ["""
+
+        Return codes:
+          0 - Fault injected
+          non-zero: Error or invalid options
+        """]
+
+        parser = OptParser(option_class=OptChecker,
+                    description='  '.join(description.split()),
+                    version='%prog version $Revision$')
+        parser.setHelp(help)
+
+        addStandardLoggingAndHelpOptions(parser, False)
+
+        # these options are used to determine the target segments
+        addTo = OptionGroup(parser, 'Target Segment Options: ')
+        parser.add_option_group(addTo)
+        addTo.add_option('-r', '--role', dest="targetRole", type='string', metavar="<role>",
+                         help="Role of segments to target: master, standby, primary")
+        addTo.add_option("-s", "--registration_order", dest="targetRegistrationOrder", type="string", metavar="<registration_order>",
+                         help="The segment registration_order on which fault should be set and triggered.")
+        addTo.add_option("-H", "--host", dest="targetHost", type="string", metavar="<host>",
+                         help="The hostname on which fault should be set and triggered; pass ALL to target all hosts")
+
+        addTo = OptionGroup(parser, 'Master Connection Options')
+        parser.add_option_group(addTo)
+
+        addMasterDirectoryOptionForSingleClusterProgram(addTo)
+        addTo.add_option("-p", "--master_port", dest="masterPort", type="int", default=None,
+                         metavar="<masterPort>",
+                         help="DEPRECATED, use MASTER_DATA_DIRECTORY environment variable or -d option.  " \
+                         "The port number of the master database on localhost, " \
+                         "used to fetch the segment configuration.")
+
+        addTo = OptionGroup(parser, 'Client Polling Options: ')
+        parser.add_option_group(addTo)
+        addTo.add_option('-m', '--mode', dest="syncMode", type='string', default="async",
+                         metavar="<syncMode>",
+                         help="Synchronization mode : sync (client waits for fault to occur)" \
+                         " or async (client only sets fault request on server)")
+
+        # these options are used to build the message for the segments
+        addTo = OptionGroup(parser, 'Fault Options: ')
+        parser.add_option_group(addTo)
+        addTo.add_option('-y', '--type', dest="type", type='string', metavar="<type>",
+                         help="fault type: sleep (insert sleep), fault (report fault to postmaster and fts prober), " \
+                  "fatal (inject FATAL error), panic (inject PANIC error), error (inject ERROR), " \
+                  "infinite_loop, data_curruption (corrupt data in memory and persistent media), " \
+                  "suspend (suspend execution), resume (resume execution that was suspended), " \
+                  "skip (inject skip i.e. skip checkpoint), " \
+                  "memory_full (all memory is consumed when injected), " \
+                  "reset (remove fault injection), status (report fault injection status), " \
+                  "panic_suppress (inject suppressed PANIC in critical section), " \
+                  "segv (inject a SEGV), " \
+                  "interrupt (inject an Interrupt) ")
+        addTo.add_option("-z", "--sleep_time_s", dest="sleepTimeSeconds", type="int", default="10" ,
+                            metavar="<sleepTime>",
+                            help="For 'sleep' faults, the amount of time for the sleep.  Defaults to %default." \
+                 "Min Max Range is [0, 7200 sec] ")
+        addTo.add_option('-f', '--fault_name', dest="faultName", type='string', metavar="<name>",
+                         help="fault name: " \
+                  "postmaster (inject fault when new connection is accepted in postmaster), " \
+                  "pg_control (inject fault when global/pg_control file is written), " \
+                  "pg_xlog (inject fault when files in pg_xlog directory are written), " \
+                  "start_prepare (inject fault during start prepare transaction), " \
+                  "filerep_consumer (inject fault before data are processed, i.e. if mirror " \
+                  "then before file operation is issued to file system, if primary " \
+                  "then before mirror file operation is acknowledged to backend processes), " \
+                  "filerep_consumer_verificaton (inject fault before ack verification data are processed on primary), " \
+                  "filerep_change_tracking_compacting (inject fault when compacting change tracking log files), " \
+                  "filerep_sender (inject fault before data are sent to network), " \
+                  "filerep_receiver (inject fault after data are received from network), " \
+                  "filerep_flush (inject fault before fsync is issued to file system), " \
+                  "filerep_resync (inject fault while in resync when first relation is ready to be resynchronized), " \
+                  "filerep_resync_in_progress (inject fault while resync is in progress), " \
+                  "filerep_resync_worker (inject fault after write to mirror), " \
+                  "filerep_resync_worker_read (inject fault before read required for resync), " \
+                  "filerep_transition_to_resync (inject fault during transition to InResync before mirror re-create), " \
+                  "filerep_transition_to_resync_mark_recreate (inject fault during transition to InResync before marking re-created), " \
+                  "filerep_transition_to_resync_mark_completed (inject fault during transition to InResync before marking completed), " \
+                  "filerep_transition_to_sync_begin (inject fault before transition to InSync begin), " \
+                  "filerep_transition_to_sync (inject fault during transition to InSync), " \
+                  "filerep_transition_to_sync_before_checkpoint (inject fault during transition to InSync before checkpoint is created), " \
+                  "filerep_transition_to_sync_mark_completed (inject fault during transition to InSync before marking completed), " \
+                  "filerep_transition_to_change_tracking (inject fault during transition to InChangeTracking), " \
+                  "checkpoint (inject fault before checkpoint is taken), " \
+                  "change_tracking_compacting_report (report if compacting is in progress), " \
+                  "change_tracking_disable (inject fault before fsync to Change Tracking log files), " \
+                  "transaction_abort_after_distributed_prepared (abort prepared transaction), " \
+                  "transaction_commit_pass1_from_create_pending_to_created, " \
+                  "transaction_commit_pass1_from_drop_in_memory_to_drop_pending, " \
+                  "transaction_commit_pass1_from_aborting_create_needed_to_aborting_create, " \
+                  "transaction_abort_pass1_from_create_pending_to_aborting_create, " \
+                  "transaction_abort_pass1_from_aborting_create_needed_to_aborting_create, " \
+                  "transaction_commit_pass2_from_drop_in_memory_to_drop_pending, " \
+                  "transaction_commit_pass2_from_aborting_create_needed_to_aborting_create, " \
+                  "transaction_abort_pass2_from_create_pending_to_aborting_create, " \
+                  "transaction_abort_pass2_from_aborting_create_needed_to_aborting_create, " \
+                  "finish_prepared_transaction_commit_pass1_from_create_pending_to_created, " \
+                  "finish_prepared_transaction_commit_pass2_from_create_pending_to_created, " \
+                  "finish_prepared_transaction_abort_pass1_from_create_pending_to_aborting_create, " \
+                  "finish_prepared_transaction_abort_pass2_from_create_pending_to_aborting_create, " \
+                  "finish_prepared_transaction_commit_pass1_from_drop_in_memory_to_drop_pending, " \
+                  "finish_prepared_transaction_commit_pass2_from_drop_in_memory_to_drop_pending, " \
+                  "finish_prepared_transaction_commit_pass1_aborting_create_needed, " \
+                  "finish_prepared_transaction_commit_pass2_aborting_create_needed, " \
+                  "finish_prepared_transaction_abort_pass1_aborting_create_needed, " \
+                  "finish_prepared_transaction_abort_pass2_aborting_create_needed, " \
+                  "twophase_transaction_commit_prepared (inject fault before transaction commit is inserted in xlog), " \
+                  "twophase_transaction_abort_prepared (inject fault before transaction abort is inserted in xlog), " \
+                  "dtm_broadcast_prepare (inject fault after prepare broadcast), " \
+                  "dtm_broadcast_commit_prepared (inject fault after commit broadcast), " \
+                  "dtm_broadcast_abort_prepared (inject fault after abort broadcast), " \
+                  "dtm_xlog_distributed_commit (inject fault after distributed commit was inserted in xlog), " \
+                  "fault_before_pending_delete_relation_entry (inject fault before putting pending delete relation entry, " \
+                  "fault_before_pending_delete_database_entry (inject fault before putting pending delete database entry, " \
+                  "fault_before_pending_delete_tablespace_entry (inject fault before putting pending delete tablespace entry, " \
+                  "fault_before_pending_delete_filespace_entry (inject fault before putting pending delete filespace entry, " \
+                  "dtm_init (inject fault before initializing dtm), " \
+                  "end_prepare_two_phase_sleep (inject sleep after two phase file creation), " \
+                  "segment_transition_request (inject fault after segment receives state transition request), " \
+                  "segment_probe_response (inject fault after segment is probed by FTS), " \
+                  "sync_persistent_table (inject fault to sync persistent table to disk), " \
+                  "xlog_insert (inject fault to skip insert record into xlog), " \
+                  "local_tm_record_transaction_commit (inject fault for local transactions after transaction commit is recorded and flushed in xlog ), " \
+                  "malloc_failure (inject fault to simulate memory allocation failure), " \
+                  "transaction_abort_failure (inject fault to simulate transaction abort failure), " \
+                  "update_committed_eof_in_persistent_table (inject fault before committed EOF is updated in gp_persistent_relation_node for Append Only segment files), " \
+                  "fault_during_exec_dynamic_table_scan (inject fault during scanning of a partition), " \
+                  "internal_flush_error (inject an error during internal_flush), " \
+                  "exec_simple_query_end_command (inject fault before EndCommand in exec_simple_query), " \
+                  "multi_exec_hash_large_vmem (allocate large vmem using palloc inside MultiExecHash to attempt to exceed vmem limit), " \
+                  "execsort_before_sorting (inject fault in nodeSort after receiving all tuples and before sorting), " \
+                  "workfile_cleanup_set (inject fault in workfile manager cleanup set)" \
+                  "execsort_mksort_mergeruns (inject fault in MKSort during the mergeruns phase), " \
+                  "cdb_copy_start_after_dispatch (inject fault in cdbCopyStart after dispatch), " \
+                  "fault_in_background_writer_main (inject fault in BackgroundWriterMain), " \
+                  "exec_hashjoin_new_batch (inject fault before switching to a new batch in Hash Join), " \
+                  "analyze_subxact_error (inject an error during analyze)," \
+                  "opt_task_allocate_string_buffer (inject fault while allocating string buffer), " \
+                  "runaway_cleanup (inject fault before starting the cleanup for a runaway query)" \
+                  "all (affects all faults injected, used for 'status' and 'reset'), ") 
+        addTo.add_option("-c", "--ddl_statement", dest="ddlStatement", type="string",
+                         metavar="ddlStatement",
+                         help="The DDL statement on which fault should be set and triggered " \
+                         "(i.e. create_database, drop_database, create_table, drop_table)")
+        addTo.add_option("-D", "--database_name", dest="databaseName", type="string",
+                         metavar="databaseName",
+                         help="The database name on which fault should be set and triggered.")
+        addTo.add_option("-t", "--table_name", dest="tableName", type="string",
+                         metavar="tableName",
+                         help="The table name on which fault should be set and triggered.")
+        addTo.add_option("-o", "--occurrence", dest="numOccurrences", type="int", default=1,
+                         metavar="numOccurrences",
+                         help="The number of occurrence of the DDL statement with the database name " \
+                         "and the table name before fault is triggered.  Defaults to %default. Max is 1000. " \
+             "Fault is triggered always if set to '0'. ")
+        parser.set_defaults()
+        return parser
+
+    @staticmethod
+    def createProgram(options, args):
+        if len(args) > 0 :
+            raise ProgramArgumentValidationException(\
+                            "too many arguments: only options may be specified")
+        return HAWQInjectFaultProgram(options)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py b/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py
new file mode 100755
index 0000000..2e53b04
--- /dev/null
+++ b/tools/bin/hawqpylib/system/ComputeCatalogUpdate.py
@@ -0,0 +1,442 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Line too long - pylint: disable=C0301
+# Invalid name  - pylint: disable=C0103
+
+"""
+  ComputeCatalogUpdate.py
+
+  Used by updateSystemConfig() to compare the db state and 
+  goal state of a HAWQArray containing the Greenplum segment 
+  configruation details and computes appropriate changes.
+"""
+import copy
+from gppylib.gplog import *
+from hawqpylib.hawqarray import ROLE_MASTER, ROLE_STANDBY, ROLE_PRIMARY
+from hawqpylib.hawqarray import MASTER_REGISTRATION_ORDER
+from hawqpylib import hawqarray
+
+logger = get_default_logger()
+
+class ComputeCatalogUpdate:
+    """
+    Helper class for GpConfigurationProvider.updateSystemConfig().
+    
+    This computes seven lists of GpDb objects (commonly referenced as 'seg') 
+    from a HAWQArray, reflecting the logical changes that need to be made
+    to the database catalog to make it match the system as defined.
+    The names of the lists are reasonably descriptive:
+
+    mirror_to_remove	     - to be removed (e.g. via gp_remove_segment_mirror())
+    primary_to_remove	     - to be removed (e.g. via gp_remove_segment())
+    primary_to_add	     - to be added (e.g. via gp_add_segment())
+    mirror_to_add	     - to be added (e.g. via gp_add_segment_mirror())
+    mirror_to_remove_and_add - change or force list requires this mirror
+                                to be removed and then added back
+    segment_to_update	     - to be updated (e.g. via SQL)
+    segment_unchanged	     - needs no update (included for validation)
+    """
+
+    def __init__(self, hawqArray, forceMap, useUtilityMode, allowPrimary):
+        """
+        This class just holds lists of objects in the underlying hawqArray.  
+        As such, it has no methods - the constructor does all the computation.
+
+        @param hawqArray the array containing the goal and db segment states.
+        @param forceMap a map of dbid->True for mirrors for which we should force updating via remove/add
+        @param useUtilityMode True if the operations we're doing are expected to run via utility moed
+        @param allowPrimary True if caller authorizes add/remove primary operations (e.g. gpexpand)
+        """
+
+        forceMap = forceMap or {}
+        self.useUtilityMode = useUtilityMode
+        self.allowPrimary = allowPrimary
+
+        # 'dbsegmap' reflects the current state of the catalog
+        self.dbsegmap    = dict([(seg.getSegmentDbId(), seg) for seg in hawqArray.getSegmentsAsLoadedFromDb()])
+
+        # 'goalsegmap' reflects the desired state of the catalog
+        self.goalsegmap  = dict([(seg.getSegmentDbId(), seg) for seg in hawqArray.getDbList(includeExpansionSegs=True)])
+
+        # find mirrors and primaries to remove
+        self.mirror_to_remove = [
+            seg for seg in self.dbsegmap.values()		        # segment in database
+                 if seg.isSegmentMirror()                               # segment is a mirror
+                and (seg.getSegmentDbId() not in self.goalsegmap)       # but not in goal configuration
+        ]
+        self.debuglog("mirror_to_remove:          %s", self.mirror_to_remove)
+
+        self.primary_to_remove = [
+            seg for seg in self.dbsegmap.values()              		# segment is database
+                 if seg.isSegmentPrimary()                      	# segment is a primary
+                and (seg.getSegmentDbId() not in self.goalsegmap)       # but not in goal configuration
+        ]
+        self.debuglog("primary_to_remove:         %s", self.primary_to_remove)
+
+        # find primaries and mirrors to add
+        self.primary_to_add = [
+            seg for seg in self.goalsegmap.values()			# segment in goal configuration
+                 if seg.isSegmentPrimary()				# segment is a primary
+                and (seg.getSegmentDbId() not in self.dbsegmap)		# but not in the database
+        ]
+        self.debuglog("primary_to_add:            %s", self.primary_to_add)
+
+        self.mirror_to_add = [
+            seg for seg in self.goalsegmap.values()			# segment in goal configuration
+                 if seg.isSegmentMirror()				# segment is a mirror
+                and (seg.getSegmentDbId() not in self.dbsegmap)		# but not in the database
+        ]
+        self.debuglog("mirror_to_add:             %s", self.mirror_to_add)
+
+        # find segments to update
+        initial_segment_to_update = [
+            seg for seg in self.goalsegmap.values()			# segment in goal configuration
+                 if (seg.getSegmentDbId() in self.dbsegmap)          	# and also in the database 
+                and (seg != self.dbsegmap[ seg.getSegmentDbId() ])   	# but some attributes differ
+        ]
+        self.debuglog("initial_segment_to_update: %s", initial_segment_to_update)
+
+        # create a map of the segments which we can't update in the 
+        # ordinary way either because they were on the forceMap or
+        # they differ in an attribute other than mode, status or replication port
+        removeandaddmap = {}
+        for seg in initial_segment_to_update:
+            dbid = seg.getSegmentDbId()
+            if dbid in forceMap:
+                removeandaddmap[dbid] = seg
+                continue
+            # In GPSQL, no needs to remove the original segment, updating the hostname and address is enough.
+            if hawqArray.getFaultStrategy() == hawqarray.FAULT_STRATEGY_NONE:
+                continue
+            if not seg.equalIgnoringStatus(self.dbsegmap[dbid]):
+                removeandaddmap[dbid] = seg
+                continue
+
+        # create list of mirrors to update via remove/add
+        self.mirror_to_remove_and_add = [seg for seg in removeandaddmap.values()]
+        self.debuglog("mirror_to_remove_and_add:  %s", self.mirror_to_remove_and_add)
+
+        # find segments to update in the ordinary way
+        self.segment_to_update = [
+            seg for seg in initial_segment_to_update			# segments to update
+                 if seg.getSegmentDbId() not in removeandaddmap 	# that don't require remove/add
+        ]
+        self.debuglog("segment_to_update:         %s", self.segment_to_update)
+
+        # find segments that don't need change
+        self.segment_unchanged = [
+            seg for seg in self.goalsegmap.values()			# segment in goal configuration
+                 if (seg.getSegmentDbId() in self.dbsegmap)          	# and also in the database 
+                and (seg == self.dbsegmap[ seg.getSegmentDbId() ])   	# and attribtutes are all the same
+        ]
+        self.debuglog("segment_unchanged:         %s", self.segment_unchanged)
+
+
+                
+    def final_segments(self):
+        """
+        Generate a series of segments appearing in the final configuration.
+        """
+        for seg in self.primary_to_add:
+            yield seg
+        for seg in self.mirror_to_add:
+            yield seg
+        for seg in self.mirror_to_remove_and_add:
+            yield seg
+        for seg in self.segment_to_update:
+            yield seg
+        for seg in self.segment_unchanged:
+            yield seg
+
+
+    def validate(self):
+        """
+        Check that the operation and new configuration is valid.
+        """
+
+        # Validate that we're not adding or removing primaries unless authorized
+        #
+        if not self.allowPrimary:
+            if len(self.primary_to_add) > 0:
+                p = self.primary_to_add[0]
+                raise Exception("Internal error: Operation may not add primary: %s" % repr(p))
+
+            if len(self.primary_to_remove) > 0:
+                p = self.primary_to_remove[0]
+                raise Exception("Internal error: Operation may not remove primary: %s" % repr(p))
+
+
+        # Validate that operations do not result in a contentid with a pair of segments in same preferred role
+        #
+        final = { ROLE_PRIMARY:{}, ROLE_MIRROR:{} }
+        for seg in self.final_segments():
+            subset = final[ seg.getSegmentPreferredRole() ]
+            other  = subset.get( seg.getSegmentContentId() )
+            if other is not None:
+                raise Exception("Segments sharing a content id may not have same preferred role: %s and %s" % (repr(seg), repr(other)))
+            subset[ seg.getSegmentContentId() ] = seg
+
+
+        # Validate that if we have any mirrors, that all primaries have mirrors
+        # If there is only one standby in mirror, skip this check.
+        check_mirror_sanity = len(final[ ROLE_MIRROR ]) > 0 and len([ contentId for contentId in final[ ROLE_MIRROR] if contentId != MASTER_CONTENT_ID ]) > 0
+        if check_mirror_sanity:
+            for contentId in final[ ROLE_PRIMARY ]:
+                if contentId != MASTER_CONTENT_ID and final[ ROLE_MIRROR ].get( contentId ) is None:
+                    seg = final[ ROLE_PRIMARY ][ contentId ]
+                    raise Exception("Primary must have mirror when mirroring enabled: %s" % repr(seg))
+
+
+        # Validate that the remove/add list contains only qualified mirrors.
+        # In particular, we disallow remove/add of the master, standby or a primary.
+        #
+        for seg in self.mirror_to_remove_and_add:
+            originalSeg = self.dbsegmap.get(seg.getSegmentDbId())
+
+            # filespace and other core has changed, or it's a mirror and we are recovering full
+            #     (in which case we want to call removeMirror and addMirror so we mark
+            #      the primary as full-resyncing)
+            #
+            if seg.isSegmentMaster(current_role=True) or seg.isSegmentStandby():
+
+                #
+                # Assertion here -- user should not be allowed to change master/standby info.
+                #
+                raise Exception("Internal error: Can only change core details of segments, not masters" \
+                                " (on segment %s) (seg %s vs original %s)" %
+                                (seg.getSegmentDbId(), repr(seg), repr(originalSeg)))
+
+            if not seg.isSegmentMirror(current_role=True):
+                #
+                # Assertion here -- user should not be allowed to change primary info.
+                #
+                return
+                raise Exception("Internal error: Can only change core details of mirrors, not primaries" \
+                                " (on segment %s) (seg %s vs original %s)" %
+                                (seg.getSegmentDbId(), repr(seg), repr(originalSeg)))
+
+            if self.useUtilityMode:
+                raise Exception("Cannot change core details of mirrors in utility mode")
+
+
+
+    def debuglog(self, msg, seglist):
+        """
+        Write debugging details about the specified segment list.
+        """
+        logger.debug(msg % ("%s segments" % len(seglist)))
+        for seg in seglist:
+            logger.debug(msg % repr(seg))
+
+
+
+
+# minimal test framework when run from command line
+#
+if __name__ == '__main__':
+    
+    ROLE_PRIMARY = 'p'
+    ROLE_MIRROR  = 'm'
+    
+    MODE_NOT_INITIALIZED = ''
+    MODE_CHANGELOGGING = 'c'
+    MODE_SYNCHRONIZED = 's'
+    MODE_RESYNCHRONIZATION = 'r'
+
+    class GpDb:
+        def __init__(self,dbid,content,pref,mode='',curr=None,status='u',rport=0,misc=None):
+            self.dbid           = dbid
+            self.content        = content
+            self.preferred_role = pref
+            self.mode           = mode  
+            self.role           = curr or pref
+            self.status         = status
+            self.rport          = rport 
+            self.misc           = misc  
+        def getSegmentDbId(self):                  return self.dbid
+        def getSegmentContentId(self):             return self.content
+        def getSegmentPreferredRole(self):         return self.preferred_role
+        def getSegmentMode(self):                  return self.mode
+        def getSegmentRole(self):                  return self.role
+        def getSegmentStatus(self):                return self.status
+        def getSegmentReplicationPort(self):       return self.rport
+        def setSegmentMode(self,mode):             self.mode = mode
+        def setSegmentStatus(self,status):         self.status = status
+        def setSegmentReplicationPort(self,rport): self.rport = rport
+        def isSegmentPrimary(self, current_role=False):
+            role = self.role if current_role else self.preferred_role
+            return self.content >= 0 and role == ROLE_PRIMARY
+        def isSegmentMirror(self, current_role=False):
+            role = self.role if current_role else self.preferred_role
+            return self.content >= 0 and role == ROLE_MIRROR
+        def isSegmentMaster(self, current_role=False):
+            role = self.role if current_role else self.preferred_role
+            return self.content < 0 and role == ROLE_PRIMARY
+        def isSegmentStandby(self):
+            role = self.role if current_role else self.preferred_role
+            return self.content < 0 and role == ROLE_MIRROR
+        def __cmp__(self,other):                   return cmp(repr(self), repr(other))
+        def __repr__(s): 
+            return '(%s,%s,%s,%s,%s,%s,%s,%s)' % (s.dbid, s.content, s.preferred_role, s.mode, s.role, s.status, s.rport, s.misc)
+        def equalIgnoringStatus(self, other):
+            tmp = copy.copy(self)
+            tmp.setSegmentMode( other.getSegmentMode() )
+            tmp.setSegmentStatus( other.getSegmentStatus() )
+            tmp.setSegmentReplicationPort( other.getSegmentReplicationPort() )
+            return tmp == other
+
+    class xxx:
+        def xxx():
+            print dbsegmap
+            print goalsegmap
+            print 'db not goal', [seg for seg in dbsegmap.values()   if seg.getSegmentDbId() not in goalsegmap]
+            print 'goal not db', [seg for seg in goalsegmap.values() if seg.getSegmentDbId() not in dbsegmap]
+    
+    class HAWQArray:
+        def __init__(s, forceMap=None, useUtilityMode=False, allowPrimary=True):
+            s.c = ComputeCatalogUpdate(s,forceMap,useUtilityMode,allowPrimary)
+            s.dump()
+        def dump(s):
+            print s.__class__.__name__, s.__class__.__doc__
+            s.c.validate()
+            print " -m", s.c.mirror_to_remove,
+            print " -p", s.c.primary_to_remove,
+            print " +p", s.c.primary_to_add,
+            print " +m", s.c.mirror_to_add,
+            print " +/-m", s.c.mirror_to_remove_and_add,
+            print " u",  s.c.segment_to_update,
+            print " n",  s.c.segment_unchanged
+        def __repr__(s):
+            return '<%s,%s>' % (s.getDbList(), s.getSegmentsAsLoadedFromDb())
+    
+    class HAWQArrayBad(HAWQArray):
+        def __init__(s, forceMap=None, useUtilityMode=False, allowPrimary=True):
+            try:
+                HAWQArray.__init__(s,forceMap,useUtilityMode,allowPrimary)
+                print " ERROR: expected exception"
+            except Exception, e:
+                print " EXPECTED: ", str(e)
+
+    class HAWQArray1(HAWQArray):
+        "expect no change"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray1()
+    
+    class HAWQArray1a(HAWQArray):
+        "expect update of mirror"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray1a()
+    
+    class HAWQArray2(HAWQArray):
+        "expect add mirror"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p','a'), GpDb(2,1,'m','a')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p')]
+    HAWQArray2()
+    
+    class HAWQArray3(HAWQArray):
+        "expect remove mirror"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
+    HAWQArray3()
+    
+    class HAWQArray4(HAWQArray):
+        "expect add primary and mirror"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray4()
+
+    class HAWQArray5(HAWQArray):
+        "expect remove primary/mirror and add/primary mirror"
+        def getDbList(self,includeExpansionSegs): return [GpDb(3,2,'p'), GpDb(4,2,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m','a')]
+    HAWQArray5()
+
+    class HAWQArray6(HAWQArray):
+        "expect update via add/remove"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m',misc='x')]
+    HAWQArray6()
+    
+    class HAWQArray7(HAWQArrayBad):
+        "can't rely on remove/add for primary"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p',misc='x'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray7()
+
+    class HAWQArray8(HAWQArrayBad):
+        "can't rely on remove/add for master"
+        def getDbList(self,includeExpansionSegs): return [GpDb(0,-1,'p',misc="x"), GpDb(1,1,'p'), GpDb(2,1,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(0,-1,'p'), GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray8()
+
+    class HAWQArray9(HAWQArrayBad):
+        "can't rely on remove/add for master"
+        def __init__(s): HAWQArrayBad.__init__(s,[0])
+        def getDbList(self,includeExpansionSegs): return [GpDb(0,-1,'p',rport=2), GpDb(1,1,'p'), GpDb(2,1,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(0,-1,'p'), GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray9()
+
+    class HAWQArray10(HAWQArray):
+        "expect update"
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m',rport=2)]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray10()
+
+    class HAWQArray11(HAWQArray):
+        "expect update via add/remove"
+        def __init__(s): HAWQArray.__init__(s,[2])
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m',rport=2)]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray11()
+    
+    class HAWQArray12(HAWQArrayBad):
+        "can't add primary"
+        def __init__(s): HAWQArrayBad.__init__(s,allowPrimary=False)
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p'), GpDb(4,2,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray12()
+    
+    class HAWQArray13(HAWQArrayBad):
+        "can't remove primary"
+        def __init__(s): HAWQArrayBad.__init__(s,allowPrimary=False)
+        def getDbList(self,includeExpansionSegs): return [GpDb(2,1,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray13()
+    
+    class HAWQArray14(HAWQArrayBad):
+        "can't have pair in same preferred role"
+        def __init__(s): HAWQArrayBad.__init__(s)
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'p')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray14()
+    
+    class HAWQArray15(HAWQArrayBad):
+        "can't have pair in same preferred role"
+        def __init__(s): HAWQArrayBad.__init__(s)
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'m'), GpDb(2,1,'m')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray15()
+    
+    class HAWQArray16(HAWQArrayBad):
+        "all primaries must have mirrors when mirroring"
+        def __init__(s): HAWQArrayBad.__init__(s)
+        def getDbList(self,includeExpansionSegs): return [GpDb(1,1,'p'), GpDb(2,1,'m'), GpDb(3,2,'p')]
+        def getSegmentsAsLoadedFromDb(self):      return [GpDb(1,1,'p'), GpDb(2,1,'m')]
+    HAWQArray16()

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/system/__init__.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/system/__init__.py b/tools/bin/hawqpylib/system/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/system/configurationImplHAWQ.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/system/configurationImplHAWQ.py b/tools/bin/hawqpylib/system/configurationImplHAWQ.py
new file mode 100644
index 0000000..cf0b656
--- /dev/null
+++ b/tools/bin/hawqpylib/system/configurationImplHAWQ.py
@@ -0,0 +1,474 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This file defines the interface that can be used to fetch and update system
+configuration information.
+"""
+import os, copy
+
+from gppylib.gplog import *
+from gppylib.utils import checkNotNone
+from gppylib.system.configurationInterface import *
+from gppylib.system.ComputeCatalogUpdate import ComputeCatalogUpdate
+from hawqpylib.hawqarray import HAWQArray, HAWQDB, InvalidSegmentConfiguration
+from hawqpylib import hawqarray
+from gppylib.db import dbconn
+from gppylib.commands.gp import get_local_db_mode
+
+logger = get_default_logger()
+
+class GpConfigurationProviderUsingHAWQCatalog(GpConfigurationProvider) :
+    """
+    An implementation of GpConfigurationProvider will provide functionality to
+    fetch and update gpdb system configuration information (as stored in the
+    database)
+
+    Note that the client of this is assuming that the database data is not
+    changed by another party between the time segment data is loaded and when it
+    is updated
+    """
+
+    def __init__(self):
+        self.__masterDbUrl = None
+
+
+    def initializeProvider( self, masterPort ) :
+        """
+        Initialize the provider to get information from the given master db, if
+        it chooses to get its data from the database
+     
+        returns self
+        """
+
+        checkNotNone("masterPort", masterPort)
+
+        dbUrl = dbconn.DbURL(port=masterPort, dbname='template1')
+
+        self.__masterDbUrl = dbUrl
+        return self
+
+
+    def loadSystemConfig( self, useUtilityMode ) :
+        """
+        Load all segment information from the configuration source.
+
+        Returns a new HAWQArray object
+        """
+
+        # ensure initializeProvider() was called
+        checkNotNone("masterDbUrl", self.__masterDbUrl) 
+
+        logger.info("Obtaining Segment details from master...")
+
+        array = HAWQArray.initFromCatalog(self.__masterDbUrl, useUtilityMode)
+
+        print "### MDD = %s" % (array.master.getDataDirectory()) 
+        if get_local_db_mode(array.master.getDataDirectory()) != 'UTILITY':
+            logger.debug("Validating configuration...")
+            if not array.is_array_valid():
+                raise InvalidSegmentConfiguration(array)
+            
+        return array
+
+
+    def sendPgElogFromMaster( self, msg, sendAlerts):
+        """
+        Send a message from the master database using select pg_elog ...
+        """
+        # ensure initializeProvider() was called
+        checkNotNone("masterDbUrl", self.__masterDbUrl)
+
+        conn = None
+        try:
+            conn = dbconn.connect(self.__masterDbUrl, utility=True)
+            dbconn.execSQL(conn, "SELECT GP_ELOG(" +
+                        self.__toSqlCharValue(msg) + "," +
+                        ("true" if sendAlerts else "false") + ")")
+        finally:
+            if conn:
+                conn.close()
+
+
+    def updateSystemConfig( self, hawqArray, textForConfigTable, dbIdToForceMirrorRemoveAdd, useUtilityMode, allowPrimary) :
+        """
+        Update the configuration for the given segments in the underlying
+        configuration store to match the current values
+    
+        Also resets any dirty bits on saved/updated objects
+
+        @param textForConfigTable label to be used when adding to segment configuration history
+        @param dbIdToForceMirrorRemoveAdd a map of dbid -> True for mirrors for which we should force updating the mirror
+        @param useUtilityMode True if the operations we're doing are expected to run via utility moed
+        @param allowPrimary True if caller authorizes add/remove primary operations (e.g. gpexpand)
+        """
+
+        # ensure initializeProvider() was called
+        checkNotNone("masterDbUrl", self.__masterDbUrl) 
+
+        logger.debug("Validating configuration changes...")
+
+        if not hawqArray.is_array_valid():
+            logger.critical("Configuration is invalid")
+            raise InvalidSegmentConfiguration(hawqArray)
+
+        conn = dbconn.connect(self.__masterDbUrl, useUtilityMode, allowSystemTableMods='dml')
+        dbconn.execSQL(conn, "BEGIN")
+
+        # compute what needs to be updated
+        update = ComputeCatalogUpdate(hawqArray, dbIdToForceMirrorRemoveAdd, useUtilityMode, allowPrimary)
+        update.validate()
+
+        # put the mirrors in a map by content id so we can update them later
+        mirror_map = {}
+        for seg in update.mirror_to_add:
+            mirror_map[ seg.getRegistrationOrder() ] = seg
+
+        # reset dbId of new primary and mirror segments to -1
+        # before invoking the operations which will assign them new ids
+        for seg in update.primary_to_add:
+            seg.setSegmentDbId(-1)
+        for seg in update.mirror_to_add:
+            seg.setSegmentDbId(-1)
+
+        # remove mirror segments (e.g. for gpexpand rollback)
+        for seg in update.mirror_to_remove:
+            self.__updateSystemConfigRemoveMirror(conn, seg, textForConfigTable)
+
+        # remove primary segments (e.g for gpexpand rollback)
+        for seg in update.primary_to_remove:
+            self.__updateSystemConfigRemovePrimary(conn, seg, textForConfigTable)
+
+        # add new primary segments
+        for seg in update.primary_to_add:
+            self.__updateSystemConfigAddPrimary(conn, hawqArray, seg, textForConfigTable, mirror_map)
+
+        # add new mirror segments
+        for seg in update.mirror_to_add:
+            self.__updateSystemConfigAddMirror(conn, hawqArray, seg, textForConfigTable)
+
+        # remove and add mirror segments necessitated by catalog attribute update
+        for seg in update.mirror_to_remove_and_add:
+            self.__updateSystemConfigRemoveAddMirror(conn, hawqArray, seg, textForConfigTable)
+
+        # apply updates to existing segments
+        for seg in update.segment_to_update:
+            originalSeg = update.dbsegmap.get(seg.getRegistrationOrder())
+            self.__updateSystemConfigUpdateSegment(conn, hawqArray, seg, originalSeg, textForConfigTable)
+
+        # apply update to fault strategy
+        if hawqArray.getStrategyAsLoadedFromDb() != hawqArray.getFaultStrategy():
+            self.__updateSystemConfigFaultStrategy(conn, hawqArray)
+
+        # commit changes
+        logger.debug("Committing configuration table changes")
+        dbconn.execSQL(conn, "COMMIT")
+        conn.close()
+
+        hawqArray.setStrategyAsLoadedFromDb( [hawqArray.getFaultStrategy()])
+        hawqArray.setSegmentsAsLoadedFromDb([seg.copy() for seg in hawqArray.getDbList()])
+
+
+    def __updateSystemConfigRemoveMirror(self, conn, seg, textForConfigTable):
+        """
+        Remove a mirror segment currently in gp_segment_configuration 
+        but not present in the goal configuration and record our action 
+        in gp_configuration_history.
+        """
+        dbId   = seg.getRegistrationOrder()
+        self.__callSegmentRemoveMirror(conn, seg)
+        self.__insertConfigHistory(conn, dbId, "%s: removed mirror segment configuration" % textForConfigTable)
+
+
+    def __updateSystemConfigRemovePrimary(self, conn, seg, textForConfigTable):
+        """
+        Remove a primary segment currently in gp_segment_configuration 
+        but not present in the goal configuration and record our action 
+        in gp_configuration_history.
+        """
+        dbId = seg.getRegistration()
+        self.__callSegmentRemove(conn, seg)
+        self.__insertConfigHistory(conn, dbId, "%s: removed primary segment configuration" % textForConfigTable)
+
+
+    def __updateSystemConfigAddPrimary(self, conn, hawqArray, seg, textForConfigTable, mirror_map):
+        """
+        Add a primary segment specified in our goal configuration but
+        which is missing from the current gp_segment_configuration table
+        and record our action in gp_configuration_history.
+        """
+        # lookup the mirror (if any) so that we may correct its content id
+        mirrorseg = mirror_map.get( seg.getRegistrationOrder() )
+
+        # add the new segment
+        dbId = self.__callSegmentAdd(conn, hawqArray, seg)
+
+        # update the segment mode, status and replication port
+        self.__updateSegmentModeStatus(conn, seg)
+
+        # get the newly added segment's content id
+        # MPP-12393 et al WARNING: there is an unusual side effect going on here.  
+        # Although gp_add_segment() executed by __callSegmentAdd() above returns 
+        # the dbId of the new row in gp_segment_configuration, the following
+        # select from gp_segment_configuration can return 0 rows if the updates
+        # done by __updateSegmentModeStatus() and/or __updateSegmentReplicationPort()
+        # are not done first.  Don't change the order of these operations unless you 
+        # understand why gp_add_segment() behaves as it does.
+        sql = "select content from pg_catalog.gp_segment_configuration where dbId = %s" % self.__toSqlIntValue(seg.getRegistrationOrder())
+        logger.debug(sql)
+        sqlResult = self.__fetchSingleOutputRow(conn, sql)
+        contentId = int(sqlResult[0])
+
+        # Set the new content id for the primary as well the mirror if present.
+        seg.setSegmentContentId(contentId)
+        if mirrorseg is not None:
+            mirrorseg.setSegmentContentId(contentId)
+
+        self.__insertConfigHistory(conn, dbId, "%s: inserted primary segment configuration with contentid %s" % (textForConfigTable, contentId))
+
+
+    def __updateSystemConfigAddMirror(self, conn, hawqArray, seg, textForConfigTable):
+        """
+        Add a mirror segment specified in our goal configuration but
+        which is missing from the current gp_segment_configuration table
+        and record our action in gp_configuration_history.
+        """
+        dbId = self.__callSegmentAddMirror(conn, hawqArray, seg)
+        self.__updateSegmentModeStatus(conn, seg)
+        self.__insertConfigHistory(conn, dbId, "%s: inserted mirror segment configuration" % textForConfigTable)
+
+
+    def __updateSystemConfigRemoveAddMirror(self, conn, hawqArray, seg, textForConfigTable):
+        """
+        We've been asked to update the mirror in a manner that require
+        it to be removed and then re-added.   Perform the tasks
+        and record our action in gp_configuration_history.
+        """
+        origDbId = seg.getRegistrationOrder()
+        self.__callSegmentRemoveMirror(conn, seg)
+
+        dbId = self.__callSegmentAddMirror(conn, hawqArray, seg)
+
+        # now update mode/status since this is not done by gp_add_segment_mirror
+        self.__updateSegmentModeStatus(conn, seg)
+        self.__insertConfigHistory(conn, seg.getRegistrationOrder(), 
+                                   "%s: inserted segment configuration for full recovery or original dbid %s" \
+                                   % (textForConfigTable, origDbId))
+
+
+    def __updateSystemConfigUpdateSegment(self, conn, hawqArray, seg, originalSeg, textForConfigTable):
+
+        # update mode and status
+        # when adding a mirror, the replication port may change as well
+        #
+        if hawqArray.getFaultStrategy() == hawqarray.FAULT_STRATEGY_NONE:
+            what = "%s: segment hostname and address"
+            self.__updateSegmentAddress(conn, seg)
+        else:
+            what = "%s: segment mode and status"
+            self.__updateSegmentModeStatus(conn, seg)
+
+        if seg.getReplicationPort() != originalSeg.getReplicationPort():
+            what = "%s: segment mode, status, and replication port"
+            self.__updateSegmentReplicationPort(conn, seg)
+
+        self.__insertConfigHistory(conn, seg.getRegistrationOrder(), what % textForConfigTable)
+
+
+    def __updateSystemConfigFaultStrategy(self, conn, hawqArray):
+        """
+        Update the fault strategy.
+        """
+        fs  = hawqArray.getFaultStrategy()
+        sql = "UPDATE gp_fault_strategy\n SET fault_strategy = " + self.__toSqlCharValue(fs) + "\n"
+        logger.debug(sql)
+        dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+    def __callSegmentRemoveMirror(self, conn, seg):
+        """
+        Call gp_remove_segment_mirror() to remove the mirror.
+        """
+        sql = "SELECT gp_remove_segment_mirror(%s::int2)" % (self.__toSqlIntValue(seg.getContentId()))
+        logger.debug(sql)
+        result = self.__fetchSingleOutputRow(conn, sql)
+        assert result[0] # must return True
+
+
+    def __callSegmentRemove(self, conn, seg):
+        """
+        Call gp_remove_segment() to remove the primary.
+        """
+        sql = "SELECT gp_remove_segment(%s::int2)" % (self.__toSqlIntValue(seg.getRegistrationOrder()))
+        logger.debug(sql)
+        result = self.__fetchSingleOutputRow(conn, sql)
+        assert result[0]
+
+
+    def __callSegmentAdd(self, conn, hawqArray, seg):
+        """
+        Call gp_add_segment() to add the primary.
+        Return the new segment's dbid.
+        """
+        logger.debug('callSegmentAdd %s' % repr(seg))
+        filespaceMapStr = self.__toSqlFilespaceMapStr(hawqArray, seg)
+
+        sql = "SELECT gp_add_segment(%s, %s, %s, %s)" \
+            % (
+                self.__toSqlTextValue(seg.getHostName()), 
+                self.__toSqlTextValue(seg.getAddress()), 
+                self.__toSqlIntValue(seg.getPort()), 
+                self.__toSqlTextValue(filespaceMapStr)
+              )
+        logger.debug(sql)
+        sqlResult = self.__fetchSingleOutputRow(conn, sql)
+        dbId = int(sqlResult[0])
+        seg.setSegmentDbId(dbId)
+        return dbId
+
+
+    def __callSegmentAddMirror(self, conn, hawqArray, seg):
+        """
+        Call gp_add_segment_mirror() to add the mirror.
+        Return the new segment's dbid.
+        """
+        logger.debug('callSegmentAddMirror %s' % repr(seg))
+        filespaceMapStr = self.__toSqlFilespaceMapStr(hawqArray, seg)
+
+        sql = "SELECT gp_add_segment_mirror(%s::int2, %s, %s, %s, %s, %s)" \
+            % (
+                self.__toSqlIntValue(seg.getRegistrationOrder()),
+                self.__toSqlTextValue(seg.getHostName()),
+                self.__toSqlTextValue(seg.getAddress()),
+                self.__toSqlIntValue(seg.getPort()),
+                self.__toSqlIntValue(seg.getReplicationPort()),
+                self.__toSqlTextValue(filespaceMapStr)
+              )
+
+        logger.debug(sql)
+        sqlResult = self.__fetchSingleOutputRow(conn, sql)
+        dbId = int(sqlResult[0])
+        seg.setSegmentDbId(dbId)
+        return dbId
+
+
+    def __updateSegmentReplicationPort(self, conn, seg):
+        # run an update
+        sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
+            "  SET\n" + \
+            "  replication_port = " + self.__toSqlIntValue(seg.getReplicationPort()) + "\n" \
+            "WHERE dbid = " + self.__toSqlIntValue(seg.getRegistrationOrder())
+        logger.debug(sql)
+        dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+    def __updateSegmentModeStatus(self, conn, seg):
+        # run an update
+        sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
+            "  SET\n" + \
+            "  mode = " + self.__toSqlCharValue(seg.getMode()) + ",\n" \
+            "  status = " + self.__toSqlCharValue(seg.getStatus()) + "\n" \
+            "WHERE dbid = " + self.__toSqlIntValue(seg.getRegistrationOrder())
+        logger.debug(sql)
+        dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+    def __updateSegmentAddress(self, conn, seg):
+        # run an update
+        sql = "UPDATE pg_catalog.gp_segment_configuration\n" + \
+            "  SET\n" + \
+            "  hostname = " + self.__toSqlCharValue(seg.getHostName()) + ",\n" \
+            "  address = " + self.__toSqlCharValue(seg.getAddress()) + "\n" \
+            "WHERE dbid = " + self.__toSqlIntValue(seg.getRegistrationOrder())
+        logger.debug(sql)
+        dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+    def __fetchSingleOutputRow(self, conn, sql, retry=False):
+        """
+        Execute specified SQL command and return what we expect to be a single row.
+        Raise an exception when more or fewer than one row is seen and when more
+        than one row is seen display up to 10 rows as logger warnings.
+        """
+        cursor   = dbconn.execSQL(conn, sql)
+        numrows  = cursor.rowcount
+        numshown = 0
+        res      = None
+        for row in cursor:
+            if numrows != 1:
+                #
+                # if we got back more than one row 
+                # we print a few of the rows first
+                # instead of immediately raising an exception
+                #
+                numshown += 1
+                if numshown > 10:
+                    break
+                logger.warning('>>> %s' % row)
+            else:
+                assert res is None
+                res = row
+                assert res is not None
+        cursor.close()
+        if numrows != 1:
+            raise Exception("SQL returned %d rows, not 1 as expected:\n%s" % (numrows, sql))
+        return res
+
+
+    def __insertConfigHistory(self, conn, dbId, msg ):
+        # now update change history
+        sql = "INSERT INTO gp_configuration_history (time, dbid, \"desc\") VALUES(\n" \
+                    "now(),\n  " + \
+                    self.__toSqlIntValue(dbId) + ",\n  " + \
+                    self.__toSqlCharValue(msg) + "\n)"
+        logger.debug(sql)
+        dbconn.executeUpdateOrInsert(conn, sql, 1)
+
+
+    def __toSqlFilespaceMapStr(self, hawqArray, seg):
+        """
+        Return a string representation of the filespace map suitable
+        for inclusion into the call to gp_add_segment_mirror().
+        """
+        filespaceArrayString = []
+        for fs in hawqArray.getAllFilespaces():
+            path = seg.getFilespaces()[ fs.getOid() ]
+            filespaceArrayString.append("{%s,%s}" % \
+                        (self.__toSqlArrayStringValue(fs.getName()), \
+                         self.__toSqlArrayStringValue(path)))
+
+        filespaceMapStr = "{" + ",".join(filespaceArrayString) + "}"
+        return filespaceMapStr
+
+    def __toSqlIntValue(self, val):
+        if val is None:
+            return "null"
+        return str(val)
+
+    def __toSqlArrayStringValue(self, val):
+        if val is None:
+            return "null"
+        return '"' + val.replace('"','\\"').replace('\\','\\\\') + '"'
+
+    def __toSqlCharValue(self, val):
+        return self.__toSqlTextValue(val)
+
+    def __toSqlTextValue(self, val):
+        if val is None:
+            return "null"
+        return "'" + val.replace("'","''").replace('\\','\\\\') + "'"


[2/2] incubator-hawq git commit: HAWQ-288. Support hawqfaultinjector as an internal utility for fault injection purpose - client side

Posted by hu...@apache.org.
HAWQ-288. Support hawqfaultinjector as an internal utility for fault injection purpose - client side


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/8cdae414
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/8cdae414
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/8cdae414

Branch: refs/heads/master
Commit: 8cdae4145822373f2fe8de0083179bcbdece7a75
Parents: 4246999
Author: Ruilong Huo <rh...@pivotal.io>
Authored: Tue Dec 29 17:27:02 2015 +0800
Committer: Ruilong Huo <rh...@pivotal.io>
Committed: Tue Dec 29 17:29:07 2015 +0800

----------------------------------------------------------------------
 src/all_src_files.txt                           |   1 +
 src/backend/postmaster/postmaster.c             |  25 +-
 src/bin/Makefile                                |   2 +-
 src/bin/gpmirrortransition/.gitignore           |   2 +
 src/bin/gpmirrortransition/Makefile             |  49 ++
 src/bin/gpmirrortransition/gpmirrortransition.c | 353 +++++++++
 src/include/pg_config_manual.h                  |   2 +-
 tools/bin/Makefile                              |   6 +-
 tools/bin/gppylib/commands/base.py              |   8 +
 tools/bin/hawqfaultinjector                     |  27 +
 tools/bin/hawqpylib/Makefile                    |  28 +
 tools/bin/hawqpylib/hawqarray.py                | 729 +++++++++++++++++++
 tools/bin/hawqpylib/mainUtils.py                | 655 +++++++++++++++++
 tools/bin/hawqpylib/programs/__init__.py        |   0
 tools/bin/hawqpylib/programs/clsInjectFault.py  | 441 +++++++++++
 .../hawqpylib/system/ComputeCatalogUpdate.py    | 442 +++++++++++
 tools/bin/hawqpylib/system/__init__.py          |   0
 .../hawqpylib/system/configurationImplHAWQ.py   | 474 ++++++++++++
 18 files changed, 3239 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/all_src_files.txt
----------------------------------------------------------------------
diff --git a/src/all_src_files.txt b/src/all_src_files.txt
index 68738d7..f7ca7ad 100644
--- a/src/all_src_files.txt
+++ b/src/all_src_files.txt
@@ -1450,6 +1450,7 @@ bin/gpfusion/common.h
 bin/gpfusion/gpbridgeapi.c
 bin/gpfusion/gpdbwritableformatter.c
 bin/gpfusion/pxf.c
+bin/gpmirrortransition/gpmirrortransition.c
 bin/gpupgrade/gpmodcatversion.c
 bin/gpupgrade/gpviewcp.c
 bin/initdb/initdb.c

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/backend/postmaster/postmaster.c
----------------------------------------------------------------------
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index ff954b8..c2fafb3 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2675,6 +2675,30 @@ readNextStringFromString(char *buf, int *offsetInOut, int length)
 	return result;
 }
 
+#ifdef FAULT_INJECTOR
+/**
+ *  Returns 0 if the string could not be read and sets *wasRead (if wasRead is non-NULL) to false
+ */
+static int
+readIntFromString( char *buf, int *offsetInOut, int length, bool *wasRead)
+{
+	int res;
+	char *val = readNextStringFromString(buf, offsetInOut, length);
+	if (val == NULL)
+	{
+		if (wasRead)
+			*wasRead = false;
+		return 0;
+	}
+
+	if (wasRead)
+		*wasRead = true;
+	res = atoi(val);
+	pfree(val);
+	return res;
+}
+#endif
+
 static void sendPrimaryMirrorTransitionResult( const char *msg)
 {
 	StringInfoData buf;
@@ -2792,7 +2816,6 @@ static void processTransitionRequest_getFaultInjectStatus(void * buf, int *offse
 static void
 processTransitionRequest_faultInject(void * inputBuf, int *offsetPtr, int length)
 {
-#undef FAULT_INJECTOR
 #ifdef FAULT_INJECTOR
 	bool wasRead;
 	char *faultName = readNextStringFromString(inputBuf, offsetPtr, length);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/Makefile
----------------------------------------------------------------------
diff --git a/src/bin/Makefile b/src/bin/Makefile
index 73425b9..9d4919d 100644
--- a/src/bin/Makefile
+++ b/src/bin/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 
 DIRS = initdb ipcclean pg_ctl pg_dump pgbench \
 	psql scripts pg_config pg_controldata pg_resetxlog \
-	gpfilesystem/hdfs gpupgrade \
+	gpfilesystem/hdfs gpmirrortransition gpupgrade \
 	gpfusion gp_workfile_mgr gpcheckhdfs gpfdist
 
 all install installdirs uninstall distprep clean distclean maintainer-clean:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/.gitignore
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/.gitignore b/src/bin/gpmirrortransition/.gitignore
new file mode 100644
index 0000000..e999eb8
--- /dev/null
+++ b/src/bin/gpmirrortransition/.gitignore
@@ -0,0 +1,2 @@
+gp_primarymirror
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/Makefile
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/Makefile b/src/bin/gpmirrortransition/Makefile
new file mode 100755
index 0000000..93e1e33
--- /dev/null
+++ b/src/bin/gpmirrortransition/Makefile
@@ -0,0 +1,49 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/bin/gpmirrortransition
+#
+# Portions Copyright (c) 2009 Greenplum Inc 
+#
+# This Makefile was copied from the pg_dump makefile and modified accordingly
+#
+# $PostgreSQL: pgsql/src/bin/gpmirrortransition/Makefile,v 1.62 2006/03/05 15:58:50 momjian Exp $
+#
+#-------------------------------------------------------------------------
+
+PGFILEDESC = "gp_primarymirror - inform a segment of a change in primary/mirror status"
+subdir = src/bin/gpmirrortransition
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+# The frontend doesn't need everything that's in LIBS, some are backend only
+LIBS := $(filter-out -llapack -lblas -lf2c -lresolv, $(LIBS))
+# This program isn't interactive, so doesn't need these
+LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses -lcurl -lssl -lcrypto, $(LIBS))
+
+# the use of tempnam in pg_backup_tar.c causes a warning when using newer versions of GCC
+override CPPFLAGS := -Wno-deprecated-declarations -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
+
+OBJS=gpmirrortransition.o $(WIN32RES)
+
+EXTRA_OBJS = $(top_builddir)/src/backend/libpq/ip.o $(top_builddir)/src/backend/postmaster/primary_mirror_transition_client.o $(top_builddir)/src/timezone/gptime.o
+
+all: submake-libpq submake-libpgport submake-backend gp_primarymirror
+
+gp_primarymirror: gpmirrortransition.o $(OBJS) $(EXTRA_OBJS) $(libpq_builddir)/libpq.a 
+	$(CC) $(CFLAGS) $(OBJS) $(EXTRA_OBJS) $(libpq_pgport) $(LDFLAGS) $(LIBS) -o $@$(X)
+
+.PHONY: submake-backend
+submake-backend:
+	$(MAKE) -C $(top_builddir)/src/backend/libpq ip.o
+
+install: all installdirs
+	$(INSTALL_PROGRAM) gp_primarymirror$(X) '$(DESTDIR)$(bindir)'/gp_primarymirror$(X)
+
+installdirs:
+	$(MKDIR_P) '$(DESTDIR)$(bindir)'
+
+uninstall:
+	rm -f $(addprefix '$(DESTDIR)$(bindir)'/, gp_primarymirror$(X))
+
+clean distclean maintainer-clean:
+	rm -f gp_primarymirror$(X) $(OBJS) gpmirrortransition.o

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/bin/gpmirrortransition/gpmirrortransition.c
----------------------------------------------------------------------
diff --git a/src/bin/gpmirrortransition/gpmirrortransition.c b/src/bin/gpmirrortransition/gpmirrortransition.c
new file mode 100755
index 0000000..06e952a
--- /dev/null
+++ b/src/bin/gpmirrortransition/gpmirrortransition.c
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Utility to contact a segment and issue a primary/mirror mode transition
+ */
+#include "postmaster/primary_mirror_mode.h"
+#include "postmaster/primary_mirror_transition_client.h"
+#include <unistd.h>
+#include "libpq/pqcomm.h"
+#include "libpq/ip.h"
+
+#ifdef HAVE_GETOPT_H
+#include <getopt.h>
+#endif
+
+/* buffer size for message to segment */
+#define SEGMENT_MSG_BUF_SIZE     4096
+
+/**
+ * gpmirrortransition builds a message from parameters and transmits it to the given server as
+ *    mirror transition message.
+ */
+
+
+static inline bool
+isEmpty(char *str)
+{
+	return str == NULL || str[0] == '\0';
+}
+
+static bool
+gpCheckForNeedToExitFn(void)
+{
+	return false;
+}
+
+static void
+gpMirrorErrorLogFunction(char *str)
+{
+	fprintf(stderr, "%s\n", str);
+}
+
+static void
+gpMirrorReceivedDataCallbackFunction(char *buf)
+{
+	fprintf(stderr, "%s\n", buf);
+}
+
+/**
+ * *addrList will be filled in with the address(es) of the host/port when true is returned
+ *
+ * host/port may not be NULL
+ */
+static bool
+determineTargetHost( struct addrinfo **addrList, char *host, char *port)
+{
+	struct addrinfo hint;
+	int			ret;
+
+	*addrList = NULL;
+
+	/* Initialize hint structure */
+	MemSet(&hint, 0, sizeof(hint));
+	hint.ai_socktype = SOCK_STREAM;
+	hint.ai_family = AF_UNSPEC;
+
+	/* Using pghost, so we have to look-up the hostname */
+	hint.ai_family = AF_UNSPEC;
+
+	/* Use pg_getaddrinfo_all() to resolve the address */
+	ret = pg_getaddrinfo_all(host, port, &hint, addrList);
+	if (ret || ! *addrList)
+	{
+		fprintf(stderr,"could not translate host name \"%s\" to address: %s\n", host, gai_strerror(ret));
+		return false;
+	}
+	return true;
+}
+
+static char*
+readFully(FILE *f, int *msgLenOut)
+{
+	int bufSize = 10;
+	char *buf = malloc(bufSize * sizeof(char));
+	int bufOffset = 0;
+
+	if ( buf == NULL )
+	{
+		fprintf(stderr, "Out of memory\n");
+		return NULL;
+	}
+
+	for ( ;; )
+	{
+		int numRead;
+
+		errno = 0;
+		numRead = fread(buf + bufOffset, sizeof(char), bufSize - bufOffset, f);
+		if ( errno != 0 )
+		{
+			if ( feof(f))
+				break;
+			fprintf( stderr, "Error reading input.  Error code %d\n", errno);
+			return NULL;
+		}
+		else if ( numRead <= 0 && feof(f))
+			break;
+
+		bufOffset += numRead;
+
+		if ( bufOffset == bufSize )
+		{
+			// increase size!
+			bufSize *= 2;
+			buf = realloc(buf, bufSize * sizeof(char));
+			if ( buf == NULL )
+			{
+				fprintf(stderr, "Out of memory\n");
+				return NULL;
+			}
+		}
+	}
+
+	*msgLenOut = bufOffset;
+	return buf;
+}
+
+
+int
+main(int argc, char **argv)
+{
+	struct addrinfo *addrList = NULL;
+
+	char *host = NULL, *port = NULL, *inputFile = NULL;
+
+	char *mode = NULL;
+	char *status = NULL;
+	char *seg_addr = NULL;
+	char *seg_pm_port = NULL;
+	char *seg_rep_port = NULL;
+	char *peer_addr = NULL;
+	char *peer_pm_port = NULL;
+	char *peer_rep_port = NULL;
+
+	char *num_retries_str = NULL;
+	char *transition_timeout_str = NULL;
+	
+	int num_retries = 20;
+	int transition_timeout = 3600;  /* 1 hour */
+	
+	char opt;
+
+	char msgBuffer[SEGMENT_MSG_BUF_SIZE];
+	char *msg = NULL;
+	int msgLen = 0;
+
+	while ((opt = getopt(argc, argv, "m:s:H:P:R:h:p:r:i:n:t:")) != -1)
+	{
+		switch (opt)
+		{
+			case 'i':
+				inputFile = optarg;
+				break;
+			case 'm':
+				mode = optarg;
+				break;
+			case 's':
+				status = optarg;
+				break;
+			case 'H':
+				seg_addr = optarg;
+				break;
+			case 'P':
+				seg_pm_port = optarg;
+				break;
+			case 'R':
+				seg_rep_port = optarg;
+				break;
+			case 'h':
+				host = peer_addr = optarg;
+				break;
+			case 'p':
+				port = peer_pm_port = optarg;
+				break;
+			case 'r':
+				peer_rep_port = optarg;
+				break;
+			case 'n':
+				num_retries_str = optarg;
+				break;
+			case 't':
+				transition_timeout_str = optarg;
+				break;
+			case '?':
+				fprintf(stderr, "Unrecognized option: -%c\n", optopt);
+		}
+	}
+
+	if (num_retries_str != NULL)
+	{
+		num_retries = (int) strtol(num_retries_str, NULL, 10);
+		if (num_retries == 0 || errno == ERANGE)
+		{
+			fprintf(stderr, "Invalid num_retries (-n) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+	}
+	
+	if (transition_timeout_str != NULL)
+	{
+		transition_timeout = (int) strtol (transition_timeout_str, NULL, 10);
+		if (transition_timeout == 0 || errno == ERANGE)
+		{
+			fprintf(stderr, "Invalid transition_timeout (-t) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+	}
+
+	/* check if input file parameter is passed */
+	if (seg_addr == NULL)
+	{
+		if ( host == NULL)
+		{
+			fprintf(stderr, "Missing host (-h) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if ( port == NULL )
+		{
+			fprintf(stderr, "Missing port (-p) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+
+		/* find the target machine */
+		if ( ! determineTargetHost(&addrList, host, port))
+		{
+			return TRANS_ERRCODE_ERROR_HOST_LOOKUP_FAILED;
+		}
+
+		/* load the input message into memory */
+		if ( inputFile == NULL)
+		{
+			msg = readFully(stdin, &msgLen);
+		}
+		else
+		{
+
+			FILE *f = fopen(inputFile, "r");
+			if ( f == NULL)
+			{
+				fprintf(stderr, "Unable to open file %s\n", inputFile);
+				return TRANS_ERRCODE_ERROR_READING_INPUT;
+			}
+			msg = readFully(f, &msgLen);
+			fclose(f);
+		}
+	}
+	else
+	{
+		/* build message from passed parameters */
+
+		if (mode == NULL)
+		{
+			fprintf(stderr, "Missing mode (-m) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (status == NULL)
+		{
+			fprintf(stderr, "Missing status (-s) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (seg_addr == NULL)
+		{
+			fprintf(stderr, "Missing segment host (-H) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (seg_pm_port == NULL)
+		{
+			fprintf(stderr, "Missing segment postmaster port (-P) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (seg_rep_port == NULL)
+		{
+			fprintf(stderr, "Missing segment replication port (-R) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (peer_addr == NULL)
+		{
+			fprintf(stderr, "Missing peer host (-h) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (peer_pm_port == NULL)
+		{
+			fprintf(stderr, "Missing peer postmaster port (-p) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+		if (peer_rep_port == NULL)
+		{
+			fprintf(stderr, "Missing peer replication port (-r) argument\n");
+			return TRANS_ERRCODE_ERROR_INVALID_ARGUMENT;
+		}
+
+		/* build message */
+		msgLen = snprintf(
+			msgBuffer, sizeof(msgBuffer),
+			"%s\n%s\n%s\n%s\n%s\n%s\n%s\n",
+			mode,
+			status,
+			seg_addr,
+			seg_rep_port,
+			peer_addr,
+			peer_rep_port,
+			peer_pm_port
+			);
+
+		msg = msgBuffer;
+
+		/* find the target machine */
+		if (!determineTargetHost(&addrList, seg_addr, seg_pm_port))
+		{
+			return TRANS_ERRCODE_ERROR_HOST_LOOKUP_FAILED;
+		}
+	}
+
+	 /* check for errors while building the message */
+	if ( msg == NULL )
+	{
+		return TRANS_ERRCODE_ERROR_READING_INPUT;
+	}
+
+	/* send the message */
+	PrimaryMirrorTransitionClientInfo client;
+	client.receivedDataCallbackFn = gpMirrorReceivedDataCallbackFunction;
+	client.errorLogFn = gpMirrorErrorLogFunction;
+	client.checkForNeedToExitFn = gpCheckForNeedToExitFn;
+	return sendTransitionMessage(&client, addrList, msg, msgLen, num_retries, transition_timeout);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/src/include/pg_config_manual.h
----------------------------------------------------------------------
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index d97a280..002d92e 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -255,7 +255,7 @@
 /*
  * Enable injecting faults.
  */
-#define FAULT_INJECTOR 0
+#define FAULT_INJECTOR 1
 
 /*
  * Enable tracing of resource consumption during sort operations;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/Makefile
----------------------------------------------------------------------
diff --git a/tools/bin/Makefile b/tools/bin/Makefile
index e8a9847..88b9634 100644
--- a/tools/bin/Makefile
+++ b/tools/bin/Makefile
@@ -179,6 +179,7 @@ unittest2:
 PYTHON_FILES=`grep -l --exclude=Makefile --exclude=gplogfilter --exclude=gpcheckos --exclude=gpgenfsmap.py --exclude=throttlingD.py "/bin/env python" *`\
 			 `grep -l "/bin/env python" $(SRC)/../sbin/*`\
 			 `find ./gppylib -name "*.py"`\
+			 `find ./hawqpylib -name "*.py"`\
 			 `find $(SRC)/../sbin -name "*.py"`
 
 checkcode: pylint
@@ -244,9 +245,10 @@ install: all
 	${INSTALL_SCRIPT} -d ${bindir}
 	for files in `find * -maxdepth 0 -type f | grep -x -v -E "${SKIP_INSTALL}"`; do ${INSTALL_SCRIPT} $${files} ${bindir}; done
 	${MAKE} -C gppylib $@
+	${MAKE} -C hawqpylib $@
 	${MAKE} -C ext $@
-	for dirs in `find hawqpylib -type d` ; do ${INSTALL_SCRIPT} -d ${bindir}/hawqpylib/$${dirs}; done
-	for files in `find hawqpylib -type f` ; do ${INSTALL_SCRIPT} $${files} ${bindir}/hawqpylib/; done
+	# for dirs in `find hawqpylib -type d` ; do ${INSTALL_SCRIPT} -d ${bindir}/hawqpylib/$${dirs}; done
+	# for files in `find hawqpylib -type f` ; do ${INSTALL_SCRIPT} $${files} ${bindir}/hawqpylib/; done
 	${INSTALL_SCRIPT} -d ${bindir}/lib
 	for files in `find lib -type f`; do ${INSTALL_SCRIPT} $${files} ${bindir}/lib; done
 	unset LIBPATH; ./generate-greenplum-path.sh $(prefix) > ${prefix}/greenplum_path.sh

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/gppylib/commands/base.py
----------------------------------------------------------------------
diff --git a/tools/bin/gppylib/commands/base.py b/tools/bin/gppylib/commands/base.py
index 801d85f..4ca68fa 100755
--- a/tools/bin/gppylib/commands/base.py
+++ b/tools/bin/gppylib/commands/base.py
@@ -697,11 +697,19 @@ class Command:
 
     def run(self,validateAfter=False):
         faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')
+        print "### DEBUG: ENV[GP_COMMAND_FAULT_POINT] = %s" % (faultPoint if faultPoint else "None")
+        print "### DEBUG: self.name = %s" % ("SomeName" if self.name else "None")
         if not faultPoint or (self.name and not self.name.startswith(faultPoint)):
+            print "### DEBUG: EXECUTE name = %s" % ("SomeName" if self.name else "None")
+            print "### DEBUG: EXECUTE cmdStr = %s" % ("SomeCmdStr" if self.cmdStr else "None")
+            print "### DEBUG: EXECUTE context = %s" % ("SomeExecContext" if self.exec_context else "None")
+            print "### DEBUG: EXECUTE remoteHost = %s" % ("SomeRemoteHost" if self.remoteHost else "None")
             self.exec_context.execute(self)
         else:
             # simulate error
+            print "### DEBUG: CommandResult"
             self.results = CommandResult(1,'Fault Injection','Fault Injection' ,False,True)
+            print self.results
         
         if validateAfter:
             self.validate()

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqfaultinjector
----------------------------------------------------------------------
diff --git a/tools/bin/hawqfaultinjector b/tools/bin/hawqfaultinjector
new file mode 100755
index 0000000..466dddf
--- /dev/null
+++ b/tools/bin/hawqfaultinjector
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from hawqpylib.mainUtils import *
+
+# now reset of imports
+from hawqpylib.programs.clsInjectFault import *
+
+#-------------------------------------------------------------------------
+if __name__ == '__main__':
+    simple_main( HAWQInjectFaultProgram.createParser, HAWQInjectFaultProgram.createProgram)
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/Makefile
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/Makefile b/tools/bin/hawqpylib/Makefile
new file mode 100644
index 0000000..d66f5d7
--- /dev/null
+++ b/tools/bin/hawqpylib/Makefile
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for the managerment utilities
+#
+#-------------------------------------------------------------------------
+
+subdir = tools/bin/hawqpylib
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+SKIP_INSTALL=.gitignore|.p4ignore|.rcfile|Makefile|test/
+
+install:
+	${INSTALL_SCRIPT} -d ${libdir}/python/hawqpylib
+	@for file in `find * -type f | grep -v -E "${SKIP_INSTALL}"`; \
+		do \
+			echo "install $${file} into ${libdir}/python/hawqpylib/$${file}" ; \
+			${INSTALL_SCRIPT} $${file} ${libdir}/python/hawqpylib/$${file}; \
+		done
+	@for dirs in `find * -type d | grep -v test` ;\
+		do \
+			${INSTALL_SCRIPT} -d ${libdir}/python/hawqpylib/$${dirs}; \
+			for file in `find $${dirs} -type f | grep -v -E "${SKIP_INSTALL}"`; do \
+				echo "install $${file} into ${libdir}/python/hawqpylib/$${file}" ; \
+				${INSTALL_SCRIPT} $${file} ${libdir}/python/hawqpylib/$${file}; \
+			done \
+		done
+	

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/hawqarray.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/hawqarray.py b/tools/bin/hawqpylib/hawqarray.py
new file mode 100755
index 0000000..eef59b9
--- /dev/null
+++ b/tools/bin/hawqpylib/hawqarray.py
@@ -0,0 +1,729 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+  hawqarray.py:
+
+    Contains three classes representing configuration information of a
+    Greenplum array:
+
+      HAWQArray - The primary interface - collection of all HAWQDB within an array
+      HAWQDB    - represents configuration information for a single registration_order
+      Segment   - collection of all HAWQDB with the same registration_order
+"""
+
+# ============================================================================
+from datetime import date
+import copy
+import traceback
+
+from gppylib.utils import checkNotNone, checkIsInt
+from gppylib    import gplog
+from gppylib.db import dbconn
+from gppylib.gpversion import GpVersion
+from gppylib.commands.unix import *
+from hawqpylib.hawqlib import HawqXMLParser
+
+
+SYSTEM_FILESPACE = 3052        # oid of the system filespace
+
+logger = gplog.get_default_logger()
+
+DESTINATION_FILE_SPACES_DIRECTORY = "fs_directory"
+
+ROLE_MASTER  = 'm'
+ROLE_STANDBY = 's'
+ROLE_PRIMARY = 'p'
+VALID_ROLES  = [ROLE_MASTER, ROLE_STANDBY, ROLE_PRIMARY]
+
+
+STATUS_UP    = 'u'
+STATUS_DOWN  = 'd'
+VALID_STATUS = [STATUS_UP, STATUS_DOWN]
+
+
+# SegmentState values returned from gp_primarymirror.
+SEGMENT_STATE_NOT_INITIALIZED               = "NotInitialized"
+SEGMENT_STATE_INITIALIZATION                = "Initialization"
+SEGMENT_STATE_IN_CHANGE_TRACKING_TRANSITION = "InChangeTrackingTransition"
+SEGMENT_STATE_IN_RESYNCTRANSITION           = "InResyncTransition"                       
+SEGMENT_STATE_IN_SYNC_TRANSITION            = "InSyncTransition"
+SEGMENT_STATE_READY                         = "Ready"
+SEGMENT_STATE_CHANGE_TRACKING_DISABLED      = "ChangeTrackingDisabled"
+SEGMENT_STATE_FAULT                         = "Fault"
+SEGMENT_STATE_SHUTDOWN_BACKENDS             = "ShutdownBackends"
+SEGMENT_STATE_SHUTDOWN                      = "Shutdown"
+SEGMENT_STATE_IMMEDIATE_SHUTDOWN            = "ImmediateShutdown"
+
+
+MASTER_REGISTRATION_ORDER = 0
+
+class InvalidSegmentConfiguration(Exception):
+    """Exception raised when an invalid hawqarray configuration is
+    read from gp_segment_configuration or an attempt to save an 
+    invalid hawqarray configuration is made."""
+    def __init__(self, array):
+        self.array = array
+        
+    def __str__(self):
+        return "Invalid HAWQArray: %s" % self.array
+
+# ============================================================================
+# ============================================================================
+class HAWQDB:
+    """
+    HAWQDB class representing configuration information for a single
+    registration_order within a HAWQ cluster.
+    """
+
+    # --------------------------------------------------------------------
+    def __init__(self, registration_order, role, status,
+                 hostname, address, port, datadir):
+
+        self.registration_order = registration_order
+        self.role = role
+        self.status = status
+        self.hostname = hostname
+        self.address = address
+        self.port = port
+        self.datadir = datadir
+        self.catdir = datadir
+
+        # Filespace mappings for a HAWQ DB
+        self.filespaces = None
+
+        # Pending filespace creation
+        self.pending_filespace = None
+
+        # Check if the status is 'u' up, 'd' for down
+        self.valid = (status == 'u')
+        
+    # --------------------------------------------------------------------
+    def __str__(self):
+        """
+        Construct a printable string representation of a HAWQDB
+        """
+        return "%s:%s:registration_order=%s:status=%s" % (
+            self.hostname,
+            self.datadir,
+            self.registration_order,
+            self.status
+            )
+
+    #
+    # Note that this is not an ideal comparison -- it uses the string representation
+    #   for comparison
+    #
+    def __cmp__(self,other):
+        left = repr(self)
+        right = repr(other)
+        if left < right: return -1
+        elif left > right: return 1
+        else: return 0
+
+    def equalIgnoringStatus(self, other):
+        """
+        Return true if none of the "core" attributes (e.g. filespace) 
+          of two segments differ, false otherwise.
+
+        This method is used by updateSystemConfig() to know when a catalog
+        change will cause removing and re-adding a mirror segment.
+        """
+        firstStatus = self.getStatus()
+        try:
+
+            # make the elements we don't want to compare match and see if they are then equal
+            self.setStatus(other.getStatus())
+
+            return self == other
+        finally:
+            # restore mode and status after comaprison 
+            self.setStatus(firstStatus)
+
+    # --------------------------------------------------------------------
+    @staticmethod
+    def getDataDirPrefix(datadir):
+        retValue = ""
+        retValue = datadir[:datadir.rfind('/')]
+        return retValue
+
+    # --------------------------------------------------------------------
+    @staticmethod
+    def getFileSpaceDirsWithNewSuffix(fileSpaceDictionary, suffix, includeSystemFilespace = True):
+        """
+        This method will take the a dictionary of file spaces and return the same dictionary with the new sufix.
+        """
+        retValue = {}
+
+        for entry in fileSpaceDictionary:
+            if entry == SYSTEM_FILESPACE and includeSystemFilespace == False:
+                continue
+            newDir = HAWQDB.getDataDirPrefix(fileSpaceDictionary[entry])
+            newDir = newDir + "/" + suffix
+            retValue[entry] = newDir
+        return retValue
+
+    # --------------------------------------------------------------------
+    def copy(self):
+        """
+        Creates a copy of the segment, shallow for everything except the filespaces map
+
+        """
+        res = copy.copy(self)
+        res.filespaces = copy.copy(self.filespaces)
+        return res
+
+    # --------------------------------------------------------------------
+    # Six simple helper functions to identify what role a segment plays:
+    #  + QD (Query Dispatcher)
+    #     + master
+    #     + standby master
+    #  + QE (Query Executor)
+    #     + primary
+    # --------------------------------------------------------------------    
+    def isMaster(self):
+        return self.role == ROLE_MASTER
+
+    def isStandby(self):
+        return self.role == ROLE_STANDBY
+
+    def isSegment(self):
+        return self.role == ROLE_PRIMARY
+
+    def isUp(self):
+        return self.status == STATUS_UP
+
+    def isDown(self):
+        return self.status == STATUS_DOWN
+
+    # --------------------------------------------------------------------
+    # getters
+    # --------------------------------------------------------------------
+    def getRegistrationOrder(self):
+        return checkNotNone("registration_order", self.registration_order)
+
+    def getRole(self):
+        return checkNotNone("role", self.role)
+
+    def getStatus(self):
+        return checkNotNone("status", self.status)
+
+    def getPort(self):
+        """
+        Returns the listening port for the postmaster for this segment.
+
+        Note: With file replication the postmaster will not be active for
+        mirrors so nothing will be listening on this port, instead the
+        "replicationPort" is used for primary-mirror communication.
+        """
+        return checkNotNone("port", self.port)
+
+    def getHostName(self):
+        """
+        Returns the actual `hostname` for the host
+
+        Note: use getSegmentAddress for the network address to use
+        """
+        return self.hostname
+
+    def getAddress(self):
+        """
+        Returns the network address to use to contact the segment (i.e. the NIC address).
+
+        """
+        return self.address
+
+    def getDataDirectory(self):
+        """
+        Return the primary datadirectory location for the segment.
+
+        Note: the datadirectory is just one of the filespace locations
+        associated with the segment, calling code should be carefull not 
+        to assume that this is the only directory location for this segment.
+
+        Todo: evaluate callers of this function to see if they should really
+        be dealing with a list of filespaces.
+        """
+        return checkNotNone("dataDirectory", self.datadir)
+
+    def getFilespaces(self):
+        """
+        Returns the filespace dictionary of oid->path pairs
+        """
+        return self.filespaces        
+
+
+    # --------------------------------------------------------------------
+    # setters
+    # --------------------------------------------------------------------
+    def setRegistrationOrder(self, registration_order):
+        checkNotNone("registration_order", registration_order)
+        checkIsInt("registration_order", registration_order)
+        self.registration_order = registration_order
+
+    def setRole(self, role):
+        checkNotNone("role", role)
+
+        if role not in VALID_ROLES:
+            raise Exception("Invalid role '%s'" % role)
+
+        self.role = role
+
+    def setStatus(self, status):
+        checkNotNone("status", status)
+
+        if status not in VALID_STATUS:
+            raise Exception("Invalid status '%s'" % status)
+
+        self.status = status
+
+    def setPort(self, port):
+        checkNotNone("port", port)
+        checkIsInt("port", port)
+        self.port = port
+
+    def setHostName(self, hostName):
+        # None is allowed -- don't check
+        self.hostname = hostName
+
+    def setAddress(self, address):
+        # None is allowed -- don't check
+        self.address = address
+
+    def setDataDirectory(self, dataDirectory):
+        checkNotNone("dataDirectory", dataDirectory)
+        self.datadir = dataDirectory
+
+    def addFilespace(self, oid, path):
+        """
+        Add a filespace path for this segment.
+        
+        Throws: 
+           Exception - if a path has already been specified for this segment.
+        """
+
+        # gpfilespace adds a special filespace with oid=None to indicate
+        # the filespace that it is currently building, since the filespace
+        # does not yet exist there is no valid value that could be used.
+        if oid == None:
+            if self.pending_filespace:
+                raise Exception("Duplicate filespace path for registration_order %d" % 
+                                self.registration_order)
+            self.pending_filespace = path
+            return
+        
+        # oids should always be integer values > 0
+        oid = int(oid)
+        assert(oid > 0)
+
+        # The more usual case just sets the filespace in the filespace 
+        # dictionary
+        if oid in self.filespaces:
+            raise Exception("Duplicate filespace path for "
+                            "registration_order %d filespace %d" % (self.registration_order, oid))
+        self.filespaces[oid] = path
+
+    def getPendingFilespace(self):
+        """
+        Returns the pending filespace location for this segment
+        (called by gpfilespace)
+        """
+        return self.pending_filespace
+
+
+class HAWQFilesystemObj:
+    """
+    List information for a filesystem, as stored in pg_filesystem
+    """
+    def __init__(self, oid, name, shared):
+        self.__oid = oid
+        self.__name = name
+        self.__shared = shared
+
+    def getOid(self):
+        return self.__oid
+
+    def getName(self):
+        return self.__name
+
+    def isShared(self):
+        return self.__shared == True
+
+    @staticmethod
+    def getFilesystemObj(filesystemArr, fsoid):
+        # local storage
+        if fsoid == 0:
+            return None
+        # plugin storage
+        for fsys in filesystemArr:
+            if (fsys.getOid() == fsoid):
+                return fsys
+        raise Exception("Error: invalid file system oid %d" % (fsoid))
+
+class HAWQFilespaceObj:
+    """
+    List information for a filespace, as stored in pg_filespace
+    """
+    def __init__(self, oid, name, fsys):
+        self.__oid = oid
+        self.__name = name
+        self.__fsys = fsys
+
+    def getOid(self):
+        return self.__oid
+
+    def getName(self):
+        return self.__name
+
+    def getFsys(self):
+        return self.__fsys
+
+    def isSystemFilespace(self):
+        return self.__oid == SYSTEM_FILESPACE
+
+
+
+class HAWQArray:
+    """ 
+    HAWQArray is a python class that describes a HAWQ array.
+
+    A HAWQ array consists of:
+      master         - The primary QD for the array
+      standby master - The mirror QD for the array [optional]
+      segment array  - an array of segments within the cluster
+
+    Each segment is either a single HAWQDB object, or a primary/mirror pair.
+
+    It can be initialized either from a database connection, in which case
+    it discovers the configuration information by examining the catalog, or
+    via a configuration file.
+    """
+
+    # --------------------------------------------------------------------
+    def __init__(self, hawqdbs):
+        """
+        segmentsInDb is used only be the configurationImpl* providers; it is used to track the state of the
+          segments in the database
+
+        TODO:
+
+        """
+
+        self.master = None
+        self.standbyMaster = None
+        self.segments = []
+        self.numSegments = 0
+        self.version = None
+
+        self.setFilespaces([])
+
+        for hdb in hawqdbs:
+
+            # Handle master
+            if hdb.isMaster():
+                if self.master != None:
+                    logger.error("multiple master dbs defined")
+                    raise Exception("HAWQArray - multiple master dbs defined")
+                self.master = hdb
+
+            # Handle standby
+            elif hdb.isStandby():
+                if self.standbyMaster != None:
+                    logger.error("multiple standby master dbs defined")
+                    raise Exception("HAWQArray - multiple standby master dbs defined")
+                self.standbyMaster = hdb
+
+            # Handle segments
+            elif hdb.isSegment():
+                self.addSegment(hdb)
+
+            else:
+                # Not a master, standbymaster, primary, or mirror?
+                # shouldn't even be possible.
+                logger.error("FATAL - invalid dbs defined")
+                raise Exception("Error: HAWQArray() - invalid dbs defined")
+
+        # Make sure HAWQ cluster has a master
+        if self.master is None:
+            logger.error("FATAL - no master defined!")
+            raise Exception("Error: HAWQArray() - no master defined")
+
+    def __str__(self):
+        return "Master: %s\nStandby: %s\nSegments: %s" % (str(self.master),
+                                                          str(self.standbyMaster) if self.standbyMaster else 'Not Configured',
+                                                          "\n".join([str(seg) for seg in self.segments]))
+
+    def addSegment(self, hdb):
+        if hdb.isSegment():
+            self.segments.append(hdb)
+            self.numSegments += 1
+        else:
+            raise Exception("Error: adding invalid segment to HAWQArray")
+
+
+    # --------------------------------------------------------------------
+    @staticmethod
+    def initFromCatalog(dbURL, utility=False, useAllSegmentFileSpaces=False):
+        """
+        Factory method, initializes a HAWQArray from provided database URL
+
+        Please note that -
+        useAllSegmentFilespaces when set to true makes this method add *all* filespaces
+        to the segments of hawqarray. If false, only returns Master/Standby all filespaces
+        This is *hacky* and we know that it is not the right way to design methods/interfaces
+        We are doing this so that we do not affect behavior of existing tools like upgrade, gprecoverseg etc
+        """
+
+        conn = dbconn.connect(dbURL, utility)
+
+        # Get the version from the database:
+        version_str = None
+        for row in dbconn.execSQL(conn, "SELECT version()"):
+            version_str = row[0]
+        version = GpVersion(version_str)
+
+        # Only for HAWQ 2.0
+        if version.getVersionRelease() in ("2.0"):
+
+            hawq_site = HawqXMLParser(GPHOME)
+            master_data_directory  = hawq_site.get_value_from_name('hawq_master_directory')
+            segment_data_directory = hawq_site.get_value_from_name('hawq_segment_directory')
+
+            # strategy_rows = dbconn.execSQL(conn, "show gp_fault_action")
+            strategy_rows = []
+
+            config_rows = dbconn.execSQL(conn, '''
+                               SELECT sc.registration_order,
+                                      sc.role,
+                                      sc.status,
+                                      sc.hostname,
+                                      sc.address,
+                                      sc.port,
+                                      fs.oid,
+                                      CASE
+                                          WHEN sc.registration_order <= 0 THEN '%s'
+                                          ELSE '%s'
+                                      END AS datadir
+                               FROM pg_catalog.gp_segment_configuration sc,
+                                    pg_catalog.pg_filespace fs,
+                                    pg_catalog.pg_filespace_entry fse
+                               WHERE fse.fsefsoid = fs.oid
+                               ORDER BY sc.registration_order;''' %
+                               (master_data_directory, segment_data_directory))
+
+            # All of filesystem is shared storage
+            filesystemRows = dbconn.execSQL(conn, '''
+                SELECT oid, fsysname, true AS fsysshared
+                FROM pg_filesystem
+                ORDER BY fsysname
+            ''')
+
+            filesystemArr = [HAWQFilesystemObj(fsysRow[0], fsysRow[1], fsysRow[2]) for fsysRow in filesystemRows]
+
+            filespaceRows = dbconn.execSQL(conn, '''
+                SELECT oid, fsname, fsfsys AS fsoid
+                FROM pg_filespace
+                WHERE oid != %d
+                ORDER BY fsname;
+            ''' % (SYSTEM_FILESPACE))
+
+            filespaceArr = [HAWQFilespaceObj(fsRow[0], fsRow[1], HAWQFilesystemObj.getFilesystemObj(filesystemArr, fsRow[2])) for fsRow in filespaceRows]
+
+        else:
+            raise Exception("HAWQ version is invalid: %s" % version)
+
+        hawqdbs = []
+        print "### initFromCatalog ###"
+        hdb = None
+        for row in config_rows:
+
+            print row
+
+            # Extract fields from the row
+            (registration_order, role, status, hostname, 
+             address, port, fsoid, datadir) = row
+
+            # In GPSQL, only master maintain the filespace information.
+            # if registration_order != MASTER_REGISTRATION_ORDER and \
+            #    fsoid != SYSTEM_FILESPACE and \
+            #    not useAllSegmentFileSpaces:
+            #    print "### initFromCatalog ... continue ###"
+            #    continue
+
+            # The query returns all the filespaces for a segment on separate
+            # rows.  If this row is the same dbid as the previous row simply
+            # add this filespace to the existing list, otherwise create a
+            # new segment.
+            # if seg and seg.getSegmentRegistrationOrder() == registration_order:
+            #     seg.addSegmentFilespace(fsoid, fslocation)
+            # else:
+            #     seg = HAWQDB(registration_order, role, status, 
+            #                 hostname, address, port, datadir)
+            #    segments.append(seg)
+
+            hdb = HAWQDB(registration_order, role, status, 
+                         hostname, address, port, datadir)
+            print "### initFromCatalog ... hdb ###"
+            print hdb
+            hawqdbs.append(hdb)
+            print "### initFromCatalog ... hawqdbs ###"
+            print hawqdbs
+        
+        conn.close()
+        
+        # origSegments = [seg.copy() for seg in segments]
+        
+        array = HAWQArray(hawqdbs)
+        array.version = version
+        array.setFilespaces(filespaceArr)
+        array.setFilesystem(filesystemArr)
+        
+        return array
+
+    # --------------------------------------------------------------------
+    def is_array_valid(self):
+        """Checks that each array is in a valid state"""
+
+        if self.master.getStatus() != STATUS_UP:
+            return False
+
+        if self.standbyMaster and self.standbyMaster.getStatus() != STATUS_UP:
+            return False
+
+        for seg in self.segments:
+            if not seg.status == STATUS_UP:
+                return False
+        return True
+
+    # --------------------------------------------------------------------
+    def setFilesystem(self, filesystemArr):
+        """
+        @param filesystemArr of GpFilesystemObj objects
+        """
+        self.filesystemArr = [fsys for fsys in filesystemArr]
+
+    def getFilesystem(self):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filesystem name
+        """
+        return [fsys for fsys in self.filesystemArr]
+
+    def setFilespaces(self, filespaceArr):
+        """
+        @param filespaceArr of GpFilespaceObj objects
+        """
+        self.filespaceArr = [fs for fs in filespaceArr]
+
+    def getFilespaces(self, includeSystemFilespace=True):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+        """
+        return [fs for fs in self.filespaceArr if fs.isSystemFilespace()]
+
+    def getNonSystemFilespaces(self):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+        """
+        return [fs for fs in self.filespaceArr if not fs.isSystemFilespace()]
+
+    def getAllFilespaces(self):
+        """
+        @return a newly allocated list of GpFilespaceObj objects, will have been sorted by filespace name
+        """
+        return [fs for fs in self.filespaceArr]
+
+    # --------------------------------------------------------------
+    def getFileSpaceName(self, filespaceOid):
+        retValue = None
+        
+        if self.filespaceArr != None:
+            for entry in self.filespaceArr:
+                if entry.getOid() == filespaceOid:
+                    retValue = entry.getName()
+                    break
+        return retValue
+
+    # --------------------------------------------------------------
+    def getFileSpaceOid(self, filespaceName):
+        retValue = None
+        
+        if self.filespaceArr != None:
+            for entry in self.filespaceArr:
+                if entry.getName() == filespaceName:
+                    retValue = entry.getOid()
+                    break
+        return retValue
+
+    # --------------------------------------------------------------
+    def isFileSpaceShared(self, filespaceOid):
+        retValue = False
+        
+        if self.filespaceArr != None:
+            for entry in self.filespaceArr:
+                if entry.getOid() == filespaceOid:
+                    retValue = entry.getFsys() != None and entry.getFsys().isShared()
+                    break
+        return retValue
+
+
+    # --------------------------------------------------------------------
+    def getDbList(self):
+        """
+        Return a list of all HAWQDB objects that make up the array
+        """
+        dbs=[]
+
+        dbs.append(self.master)
+
+        if self.standbyMaster:
+            dbs.append(self.standbyMaster)
+
+        dbs.extend(self.getSegDbList())
+
+        return dbs
+
+    # --------------------------------------------------------------------
+    def getHostList(self, includeExpansionSegs = False):
+        """
+        Return a list of all Hosts that make up the array
+        """
+        hostList = []
+
+        hostList.append(self.master.getSegmentHostName())
+
+        if self.standbyMaster:
+            hostList.append(self.standbyMaster.getSegmentHostName())
+            
+        dbList = self.getDbList()
+        for db in dbList:
+            if db.getSegmentHostName() in hostList:
+                continue
+            else:
+                hostList.append(db.getSegmentHostName())
+
+        return hostList
+
+    def getSegDbList(self):
+        """Return a list of all HAWQDB objects for all segments in the array"""
+        dbs=[]
+
+        for seg in self.segments:
+            dbs.append(seg)
+
+        return dbs
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/mainUtils.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/mainUtils.py b/tools/bin/hawqpylib/mainUtils.py
new file mode 100644
index 0000000..d0148e9
--- /dev/null
+++ b/tools/bin/hawqpylib/mainUtils.py
@@ -0,0 +1,655 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Line too long - pylint: disable=C0301
+# Invalid name  - pylint: disable=C0103
+
+"""
+mainUtils.py
+------------
+
+This file provides a rudimentary framework to support top-level option
+parsing, initialization and cleanup logic common to multiple programs.
+
+It also implements workarounds to make other modules we use like
+GpCoverage() work properly.
+
+The primary interface function is 'simple_main'.  For an example of
+how it is expected to be used, see gprecoverseg.
+
+It is anticipated that the functionality of this file will grow as we
+extend common functions of our gp utilities.  Please keep this in mind
+and try to avoid placing logic for a specific utility here.
+"""
+
+import os, sys, signal, errno, yaml
+
+gProgramName = os.path.split(sys.argv[0])[-1]
+if sys.version_info < (2, 5, 0):
+    sys.exit(
+'''Error: %s is supported on Python versions 2.5 or greater
+Please upgrade python installed on this machine.''' % gProgramName)
+
+from gppylib import gplog
+from gppylib.commands import gp, unix
+from gppylib.commands.base import ExecutionError
+from gppylib.system import configurationInterface, fileSystemInterface, fileSystemImplOs
+from gppylib.system import osInterface, osImplNative, faultProberInterface, faultProberImplGpdb
+from hawqpylib.system import configurationImplHAWQ
+from optparse import OptionGroup, OptionParser, SUPPRESS_HELP
+from gppylib.gpcoverage import GpCoverage
+from lockfile.pidlockfile import PIDLockFile, LockTimeout
+
+
+def getProgramName():
+    """
+    Return the name of the current top-level program from sys.argv[0]
+    or the programNameOverride option passed to simple_main via mainOptions.
+    """
+    global gProgramName
+    return gProgramName
+
+
+class SimpleMainLock:
+    """
+    Tools like gprecoverseg prohibit running multiple instances at the same time
+    via a simple lock file created in the MASTER_DATA_DIRECTORY.  This class takes
+    care of the work to manage this lock as appropriate based on the mainOptions
+    specified.
+
+    Note that in some cases, the utility may want to recursively invoke
+    itself (e.g. gprecoverseg -r).  To handle this, the caller may specify
+    the name of an environment variable holding the pid already acquired by
+    the parent process.
+    """
+    def __init__(self, mainOptions):
+        self.pidfilename   = mainOptions.get('pidfilename', None)       # the file we're using for locking
+        self.parentpidvar  = mainOptions.get('parentpidvar', None)      # environment variable holding parent pid
+        self.parentpid     = None                                       # parent pid which already has the lock
+        self.ppath         = None                                       # complete path to the lock file
+        self.pidlockfile   = None                                       # PIDLockFile object
+        self.pidfilepid    = None                                       # pid of the process which has the lock
+        self.locktorelease = None                                       # PIDLockFile object we should release when done
+
+        if self.parentpidvar is not None and self.parentpidvar in os.environ:
+            self.parentpid = int(os.environ[self.parentpidvar])
+
+        if self.pidfilename is not None:
+            self.ppath       = os.path.join(gp.get_masterdatadir(), self.pidfilename)
+            self.pidlockfile = PIDLockFile( self.ppath )
+
+
+    def acquire(self):
+        """
+        Attempts to acquire the lock this process needs to proceed.
+
+        Returns None on successful acquisition of the lock or 
+          the pid of the other process which already has the lock.
+        """
+        # nothing to do if utiliity requires no locking
+        if self.pidlockfile is None:
+            return None
+
+        # look for a lock file
+        self.pidfilepid = self.pidlockfile.read_pid()
+        if self.pidfilepid is not None:
+
+            # we found a lock file
+            # allow the process to proceed if the locker was our parent
+            if self.pidfilepid == self.parentpid:
+                return None
+
+            # cleanup stale locks
+            try:
+                os.kill(self.pidfilepid, signal.SIG_DFL)
+            except OSError, exc:
+                if exc.errno == errno.ESRCH:
+                    self.pidlockfile.break_lock()
+                    self.pidfilepid = None
+
+        # try and acquire the lock
+        try:
+            self.pidlockfile.acquire(1)
+
+        except LockTimeout:
+            self.pidfilepid = self.pidlockfile.read_pid()
+            return self.pidfilepid
+
+        # we have the lock
+        # prepare for a later call to release() and take good
+        # care of the process environment for the sake of our children
+        self.locktorelease = self.pidlockfile
+        self.pidfilepid    = self.pidlockfile.read_pid()
+        if self.parentpidvar is not None:
+            os.environ[self.parentpidvar] = str(self.pidfilepid)
+
+        return None
+
+
+    def release(self):
+        """
+        Releases the lock this process acquired.
+        """
+        if self.locktorelease is not None:
+            self.locktorelease.release()
+            self.locktorelease = None
+
+
+
+#
+# exceptions we handle specially by the simple_main framework.
+#
+
+class ProgramArgumentValidationException(Exception):
+    """
+    Throw this out to main to have the message possibly
+    printed with a help suggestion.
+    """
+    def __init__(self, msg, shouldPrintHelp=False):
+        "init"
+        Exception.__init__(self)
+        self.__shouldPrintHelp = shouldPrintHelp
+        self.__msg = msg
+
+    def shouldPrintHelp(self): 
+        "shouldPrintHelp"
+        return self.__shouldPrintHelp
+
+    def getMessage(self): 
+        "getMessage"
+        return self.__msg
+
+
+class ExceptionNoStackTraceNeeded(Exception):
+    """
+    Our code throws this exception when we encounter a condition
+    we know can arise which demands immediate termination.
+    """
+    pass
+
+
+class UserAbortedException(Exception):
+    """
+    UserAbortedException should be thrown when a user decides to stop the 
+    program (at a y/n prompt, for example).
+    """
+    pass
+
+
+def simple_main( createOptionParserFn, createCommandFn, mainOptions=None) :
+    """
+     createOptionParserFn : a function that takes no arguments and returns an OptParser
+     createCommandFn : a function that takes two argument (the options and the args (those that are not processed into
+                       options) and returns an object that has "run" and "cleanup" functions.  Its "run" function must
+                       run and return an exit code.  "cleanup" will be called to clean up before the program exits;
+                       this can be used to clean up, for example, to clean up a worker pool
+
+     mainOptions can include: forceQuietOutput (map to bool),
+                              programNameOverride (map to string)
+                              suppressStartupLogMessage (map to bool)
+                              useHelperToolLogging (map to bool)
+                              setNonuserOnToolLogger (map to bool, defaults to false)
+                              pidfilename (string)
+                              parentpidvar (string)
+
+    """
+    coverage = GpCoverage()
+    coverage.start()
+    try:
+        simple_main_internal(createOptionParserFn, createCommandFn, mainOptions)
+    finally:
+        coverage.stop()
+        coverage.generate_report()
+
+
+def simple_main_internal(createOptionParserFn, createCommandFn, mainOptions):
+    """
+    If caller specifies 'pidfilename' in mainOptions then we manage the
+    specified pid file within the MASTER_DATA_DIRECTORY before proceeding
+    to execute the specified program and we clean up the pid file when
+    we're done.
+    """
+    sml = None
+    if mainOptions is not None and 'pidfilename' in mainOptions:
+        sml      = SimpleMainLock(mainOptions)
+        otherpid = sml.acquire()
+        if otherpid is not None:
+            logger = gplog.get_default_logger()
+            logger.error("An instance of %s is already running (pid %s)" % (getProgramName(), otherpid))
+            return
+
+    # at this point we have whatever lock we require
+    try:
+        simple_main_locked(createOptionParserFn, createCommandFn, mainOptions)
+    finally:
+        if sml is not None:
+            sml.release()
+
+
+def simple_main_locked(createOptionParserFn, createCommandFn, mainOptions):
+    """
+    Not to be called externally -- use simple_main instead
+    """
+    logger = gplog.get_default_logger()
+
+    configurationInterface.registerConfigurationProvider( configurationImplHAWQ.GpConfigurationProviderUsingHAWQCatalog())
+    fileSystemInterface.registerFileSystemProvider( fileSystemImplOs.GpFileSystemProviderUsingOs())
+    osInterface.registerOsProvider( osImplNative.GpOsProviderUsingNative())
+    faultProberInterface.registerFaultProber( faultProberImplGpdb.GpFaultProberImplGpdb())
+
+    commandObject = None
+    parser = None
+
+    forceQuiet = mainOptions is not None and mainOptions.get("forceQuietOutput")
+    options = None
+
+    if mainOptions is not None and mainOptions.get("programNameOverride"):
+        global gProgramName
+        gProgramName = mainOptions.get("programNameOverride")
+    suppressStartupLogMessage = mainOptions is not None and mainOptions.get("suppressStartupLogMessage")
+
+    useHelperToolLogging = mainOptions is not None and mainOptions.get("useHelperToolLogging")
+    nonuser = True if mainOptions is not None and mainOptions.get("setNonuserOnToolLogger") else False
+
+    # NOTE: if this logic is changed then also change test_main in testUtils.py
+    try:
+        execname = getProgramName()
+        hostname = unix.getLocalHostname()
+        username = unix.getUserName()
+
+        parser = createOptionParserFn()
+        (options, args) = parser.parse_args()
+
+        if useHelperToolLogging:
+            gplog.setup_helper_tool_logging(execname, hostname, username)
+        else:
+            gplog.setup_tool_logging(execname, hostname, username,
+                                        logdir=options.ensure_value("logfileDirectory", None), nonuser=nonuser )
+
+        if forceQuiet:
+            gplog.quiet_stdout_logging()
+        else:
+            if options.ensure_value("verbose", False):
+                gplog.enable_verbose_logging()
+            if options.ensure_value("quiet", False):
+                gplog.quiet_stdout_logging()
+
+        if options.ensure_value("masterDataDirectory", None) is not None:
+            options.master_data_directory = os.path.abspath(options.masterDataDirectory)
+
+        if not suppressStartupLogMessage:
+            logger.info("Starting %s with args: %s" % (gProgramName, ' '.join(sys.argv[1:])))
+
+        commandObject = createCommandFn(options, args)
+        exitCode = commandObject.run()
+        sys.exit(exitCode)
+
+    except ProgramArgumentValidationException, e:
+        if e.shouldPrintHelp():
+            parser.print_help()
+        logger.error("%s: error: %s" %(gProgramName, e.getMessage()))
+        sys.exit(2)
+    except ExceptionNoStackTraceNeeded, e:
+        logger.error( "%s error: %s" % (gProgramName, e))
+        sys.exit(2)
+    except UserAbortedException, e:
+        logger.info("User abort requested, Exiting...")
+        sys.exit(4)
+    except ExecutionError, e:
+        logger.fatal("Error occurred: %s\n Command was: '%s'\n"
+                     "rc=%d, stdout='%s', stderr='%s'" %\
+                     (e.summary,e.cmd.cmdStr, e.cmd.results.rc, e.cmd.results.stdout,
+                     e.cmd.results.stderr ))
+        sys.exit(2)
+    except Exception, e:
+        if options is None:
+            logger.exception("%s failed.  exiting...", gProgramName)
+        else:
+            if options.ensure_value("verbose", False):
+                logger.exception("%s failed.  exiting...", gProgramName)
+            else:
+                logger.fatal("%s failed. (Reason='%s') exiting..." % (gProgramName, e))
+        sys.exit(2)
+    except KeyboardInterrupt:
+        sys.exit('\nUser Interrupted')
+    finally:
+        if commandObject:
+            commandObject.cleanup()
+
+
+def addStandardLoggingAndHelpOptions(parser, includeNonInteractiveOption, includeUsageOption=False):
+    """
+    Add the standard options for help and logging
+    to the specified parser object.
+    """
+    parser.set_usage('%prog [--help] [options] ')
+    parser.remove_option('-h')
+
+    addTo = parser
+    addTo.add_option('-h', '-?', '--help', action='help',
+                      help='show this help message and exit')
+    if includeUsageOption:
+        parser.add_option('--usage', action="briefhelp")
+
+    addTo = OptionGroup(parser, "Logging Options")
+    parser.add_option_group(addTo)
+    addTo.add_option('-v', '--verbose', action='store_true', 
+                      help='debug output.')
+    addTo.add_option('-q', '--quiet', action='store_true',
+                      help='suppress status messages')
+    addTo.add_option("-l", None, dest="logfileDirectory", metavar="<directory>", type="string",
+                  help="Logfile directory")
+
+    if includeNonInteractiveOption:
+        addTo.add_option('-a', dest="interactive" , action='store_false', default=True,
+                    help="quiet mode, do not require user input for confirmations")
+
+
+def addMasterDirectoryOptionForSingleClusterProgram(addTo):
+    """
+    Add the -d master directory option to the specified parser object
+    which is intended to provide the value of the master data directory.
+
+    For programs that operate on multiple clusters at once, this function/option
+    is not appropriate.
+    """
+    addTo.add_option('-d', '--master_data_directory', type='string',
+                        dest="masterDataDirectory",
+                        metavar="<master data directory>",
+                        help="Optional. The master host data directory. If not specified, the value set"\
+                            "for $MASTER_DATA_DIRECTORY will be used.")
+    
+
+
+#
+# YamlMain
+# 
+
+def get_yaml(targetclass):
+    "get_yaml"
+
+    # doc    - class's doc string
+    # pos    - where YAML starts in doc
+    # ystr   - YAML string extracted from doc
+
+    if not hasattr(targetclass, '_yaml') or targetclass._yaml is None:
+        doc = targetclass.__doc__
+        pos = doc.find('%YAML')
+        assert pos >= 0, "targetclass doc string is missing %YAML plan"
+        ystr  = doc[pos:].replace('\n    ','\n')
+        targetclass._yaml = yaml.load(ystr)
+    return targetclass._yaml
+
+
+class YamlMain:
+    "YamlMain"
+
+    def __init__(self):
+        "Parse arguments based on yaml docstring"
+        self.current       = None
+        self.plan          = None
+        self.scenario_name = None
+        self.logger        = None
+        self.logfilename   = None
+        self.errmsg        = None
+
+        self.parser = YamlOptions(self).parser
+        self.options, self.args = self.parser.parse_args()
+        self.options.quiet = self.options.q
+        self.options.verbose = self.options.v
+
+
+    #
+    # simple_main interface
+    #
+    def __call__(self, *args):
+        "Allows us to use self as the create_parser and create_program functions in call to simple_main"
+        return self
+
+    def parse_args(self):
+        "Called by simple_main to obtain results from parser returned by create_parser"
+        return self.options, self.args
+
+    def run(self):
+        "Called by simple_main to execute the program returned by create_program"
+        self.plan = Plan(self)
+        self.scenario_name = self.plan.name
+        self.logger        = self.plan.logger
+        self.logfilename   = self.plan.logfilename
+        self.errmsg        = self.plan.errmsg
+        self.current       = []
+        self.plan.run()
+
+    def cleanup(self):
+        "Called by simple_main to cleanup after program returned by create_program finishes"
+        pass
+
+    def simple(self):
+        "Delegates setup and control to mainUtils.simple_main"
+        simple_main(self, self)
+
+
+#
+# option parsing
+#
+
+class YamlOptions:
+    "YamlOptions"
+
+    def __init__(self, target):
+        """
+        Scan the class doc string of the given object, looking for the %YAML
+        containing the option specification.  Parse the YAML and setup the
+        corresponding OptionParser object.
+        """
+        # target - options object (input)
+        # gname  - option group name
+
+        self.y      = get_yaml(target.__class__)
+        self.parser = OptionParser( description=self.y['Description'], version='%prog version $Revision$')
+        self.parser.remove_option('-h')
+        self.parser.set_usage(self.y['Usage'])
+        self.opty   = self.y['Options']
+        for gname in self.opty.get('Groups', []):
+            self._register_group(gname)
+            
+
+    def _register_group(self, gname):
+        """
+        Register options for the specified option group name to the OptionParser
+        using an OptionGroup unless the group name starts with 'Help' in which
+        case we just register the options with the top level OptionParser object.
+        """
+        # gname    - option group name (input)
+        # gy       - option group YAML object
+        # grp      - option group object
+        # tgt      - where to add options (parser or option group)
+        # optkey   - comma separated list of option flags
+        # optval   - help string or dict with detailed option settings
+        # listargs - list of option flags (e.g. ['-h', '--help'])
+        # dictargs - key/value arguments to add_option
+
+        gy = self.opty.get(gname, None)
+        if gname.startswith('Help'): 
+            grp = None
+            tgt = self.parser
+        else:
+            grp = OptionGroup(self.parser, gname)
+            tgt = grp
+
+        for optkey, optval in gy.items():
+            listargs = optkey.split(',')
+            if type(optval) == type(''):
+                # short form: optval is just a help string
+                dictargs = {
+                    'action': 'store_true',
+                    'help':   optval
+                }
+            else:
+                # optval is the complete option specification
+                dictargs = optval
+
+            # hide hidden options
+            if dictargs.get('help','').startswith('hidden'):
+                dictargs['help'] = SUPPRESS_HELP
+
+            #print 'adding', listargs, dictargs
+            tgt.add_option(*listargs, **dictargs)
+
+        if grp is not None:
+            self.parser.add_option_group(grp)
+
+
+
+#
+# plan execution
+#
+
+class Task:
+    "Task"
+
+    def __init__(self, key, name, subtasks=None):
+        self.Key      = key    		# task key
+        self.Name     = name    	# task name
+        self.SubTasks = subtasks	# subtasks, if any
+        self.Func     = None            # task function, set by _task
+
+
+    def _print(self, main, prefix):
+        print '%s %s %s:' % (prefix, self.Key, self.Name)
+
+    def _debug(self, main, prefix):
+        main.logger.debug('Execution Plan:%s %s %s%s' % (prefix, self.Key, self.Name, ':' if self.SubTasks else ''))
+
+    def _run(self, main, prefix):
+        main.logger.debug(' Now Executing:%s %s %s' % (prefix, self.Key, self.Name))
+        if self.Func:
+            self.Func()
+        
+
+class Exit(Exception):
+    def __init__(self, rc, code=None, call_support=False):
+        Exception.__init__(self)
+        self.code         = code
+        self.prm          = sys._getframe(1).f_locals
+        self.rc           = rc
+        self.call_support = call_support
+
+
+class Plan:
+    "Plan"
+
+    def __init__(self, main):
+        """
+        Create cached yaml from class doc string of the given object, 
+        looking for the %YAML indicating the beginning of the object's YAML plan and parse it.
+        Build the plan stages and tasks for the specified scenario.
+        """
+        # main - object with yaml scenarios (input)
+        # sy   - Stage yaml
+
+        self.logger      = gplog.get_default_logger()
+        self.logfilename = gplog.get_logfile()
+
+        self.main        = main
+        self.y           = get_yaml(main.__class__)
+        self.name        = main.options.scenario
+        if not self.name:
+            self.name    = self.y['Default Scenario']
+        self.scenario    = self.y['Scenarios'][self.name]
+        self.errors      = self.y['Errors']
+        self.Tasks       = [ self._task(ty) for ty in self.scenario ]
+
+
+    def _task(self, ty):
+        "Invoked by __init__ to build a top-level task from the YAML"
+
+        # ty   - Task yaml (input)
+        # tyk  - Task yaml key
+        # tyv  - Task yaml value
+        # sty  - Sub Task yaml
+        # t    - Task (returned)
+
+        for tyk, tyv in ty.items():
+            key, workers = tyk.split(None, 1)
+            subtasks = [ self._subtask(sty) for sty in tyv ]
+            t = Task(key, workers, subtasks)
+            return t
+
+    def _subtask(self, sty):
+        "Invoked by _stage to build a task from the YAML"
+
+        # sty  - Sub Task yaml (input)
+        # st   - Sub Task (returned)
+
+        key, rest = sty.split(None, 1)
+        st = Task(key, rest)
+        fn = st.Name.lower().replace(' ','_')
+        try:
+            st.Func = getattr(self.main, fn)
+        except AttributeError, e:
+            raise Exception("Failed to lookup '%s' for sub task '%s': %s" % (fn, st.Name, str(e)))
+        return st
+
+
+
+    def _dotasks(self, subtasks, prefix, action):
+        "Apply an action to each subtask recursively"
+
+        # st   - Sub Task
+
+        for st in subtasks or []:
+            self.main.current.append(st)
+            action(st, self.main, prefix)
+            self._dotasks(st.SubTasks, '  '+prefix, action)
+            self.main.current.pop()
+
+
+    def _print(self):
+        "Print in YAML form."
+
+        print '%s:' % self.name
+        self._dotasks(self.Tasks, ' -', lambda t,m,p:t._print(m,p))
+
+
+    def run(self):
+        "Run the stages and tasks."
+
+        self.logger.debug('Execution Plan: %s' % self.name)
+        self._dotasks(self.Tasks, ' -', lambda t,m,p:t._debug(m,p))
+                
+        self.logger.debug(' Now Executing: %s' % self.name)
+        try:
+            self._dotasks(self.Tasks, ' -', lambda t,m,p:t._run(m,p))
+        except Exit, e:
+            self.exit(e.code, e.prm, e.rc, e.call_support)
+
+
+    def errmsg(self, code, prm={}):
+        "Return a formatted error message"
+        return self.errors[code] % prm
+        
+
+    def exit(self, code=None, prm={}, rc=1, call_support=False):
+        "Terminate the application"
+        if code:
+            msg = self.errmsg(code, prm)
+            self.logger.error(msg)
+        if call_support:
+            self.logger.error('Please send %s to Greenplum support.' % self.logfilename)
+        self.logger.debug('exiting with status %(rc)s' % locals())
+        sys.exit(rc)
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8cdae414/tools/bin/hawqpylib/programs/__init__.py
----------------------------------------------------------------------
diff --git a/tools/bin/hawqpylib/programs/__init__.py b/tools/bin/hawqpylib/programs/__init__.py
new file mode 100644
index 0000000..e69de29