You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hawq.apache.org by kdunn926 <gi...@git.apache.org> on 2016/09/26 21:11:46 UTC

[GitHub] incubator-hawq pull request #940: HAWQ 1078. Implement hawqsync-falcon DR ut...

GitHub user kdunn926 opened a pull request:

    https://github.com/apache/incubator-hawq/pull/940

    HAWQ 1078. Implement hawqsync-falcon DR utility.

    This is the initial commit for a Python utility to orchestrate a DR syncronization for HAWQ, based on Falcon HDFS replication and a cold backup of the active HAWQ master's MASTER_DATA_DIRECTORY.
    
    A code review would be greatly appreciated, when someone has cycles. Active testing is currently underway in a production deployment.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kdunn926/incubator-hawq dr

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-hawq/pull/940.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #940
    
----
commit 1ca0c75b8310b7aaad5a016d8d59c03bab865b8f
Author: Kyle Dunn <kd...@gmail.com>
Date:   2016-09-26T21:09:13Z

    Initial commit

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #940: HAWQ 1078. Implement hawqsync-falcon DR ut...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/940#discussion_r83184402
  
    --- Diff: tools/bin/hawqsync-falcon ---
    @@ -0,0 +1,1331 @@
    +#!/usr/bin/env python
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +import os
    +import sys
    +from optparse import OptionParser
    +from subprocess import Popen, PIPE
    +from hashlib import md5
    +from json import loads
    +from time import strftime, sleep, time
    +from collections import defaultdict
    +# TODO - make use of these common HAWQ libs instead of print
    +#from gppylib.gplog import setup_hawq_tool_logging, enable_verbose_logging
    +#from gppylib.commands.unix import getLocalHostname, getUserName
    +try:
    +    from xml.etree import cElementTree as ElementTree
    +except ImportError, e:
    +    from xml.etree import ElementTree
    +
    +def parseargs():
    +    parser = OptionParser(usage="HAWQ sync options.")
    +    parser.add_option('-v', '--verbose', action='store_true',
    +                      default=False)
    +    parser.add_option("-a", "--prompt", action="store_false",
    +                      dest="prompt", default=True,
    +                      help="Execute without prompt.")
    +    parser.add_option("-l", "--logdir", dest="logDir",
    +                      help="Sets the directory for log files")
    +    parser.add_option('-d', '--dryRun', action='store_true',
    +                      default=False,
    +                      dest='testMode', help="Execute in test mode")
    +    parser.add_option('-u', '--user', dest='userName', default="gpadmin",
    +                      help="The user to own Falcon ACLs and run job as")
    +    parser.add_option('--maxMaps', dest='distcpMaxMaps',
    +                      default="10",
    +                      help="The maximum number of map jobs to allow")
    +    parser.add_option('--mapBandwidth', dest='distcpMaxMBpsPerMap',
    +                      default="100",
    +                      help="The maximum allowable bandwidth for each map job, in MB/s")
    +    parser.add_option('-s', '--sourceNamenode', dest='sourceNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-S', '--sourceEntity', dest='sourceClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name of the source")
    +    parser.add_option('-m', '--sourceHawqMaster', dest='sourceHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the source HAWQ master")
    +    parser.add_option('-M', '--targetHawqMaster', dest='targetHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the target HAWQ master")
    +    parser.add_option('-f', '--falconUri', dest='falconUri',
    +                      default="http://localhost:15000",
    +                      help="The URI to use for issuing Falcon REST calls")
    +    parser.add_option('-t', '--targetNamenode', dest='targetNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-T', '--targetEntity', dest='targetClusterEntityName',
    +                      default="target",
    +                      help="The Falcon cluster entity name of the target")
    +    parser.add_option('-e', '--executionEntity',
    +                      dest='executionClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name specifying where to execute the job")
    +    parser.add_option('-w', '--workflowHdfsFilename', dest='workflowFilename',
    +                      default="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                      help="The HDFS location of the underlying Oozie workflow to use for sync job")
    +    parser.add_option('-p', '--pathToSync', dest='pathToSync',
    +                      default="/tmp/syncTest",
    +                      help="The root directory to be syncronized")
    +    parser.add_option('-j', '--jobName', dest='jobName', default="drSync",
    +                      help="The Falcon job entity name to be executed")
    +
    +    (options, args) = parser.parse_args()
    +    return (options, args)
    +
    +def extractFilenameAndSize(line, hdfsPort):
    +    """Utility function to extract filename and file
    +    size from a line of output from `hdfs dfs -ls -R`
    +
    +    """
    +
    +    tokens = line.split()
    +    return tokens[-1].split(":" + hdfsPort)[-1], tokens[4]
    +
    +def flattenFilelist(data, hdfsPort):
    +    """Utility function to convert a list of output
    +    lines from `hdfs dfs -ls -R` into a single, sorted, 
    +    delimited string to be used as a syncronization
    +    fingerprint
    +
    +    """
    +
    +    # Ensure record contains expected number of fields
    +    isValid = lambda r: len(r.strip().split()) == 8
    +
    +    # Subset the records to only filename and size fields
    +    filenameAndSize = [extractFilenameAndSize(line, hdfsPort) for line in data.split("\n") if isValid(line)]
    +
    +    # Reverse sort the list by filename column
    +    sortedFilenameAndSize = sorted(filenameAndSize, key=lambda r: r[0], reverse=True)
    +
    +    # Flatten a single line into a delimited string
    +    mergeLines = lambda l: "-".join(l)
    +
    +    # Perform the flatten for every line and join lines into a string
    +    return "\n".join(map(mergeLines, sortedFilenameAndSize))
    +
    +def computeMd5(data):
    +    """Utility function to compute MD5 checksum
    +
    +    """
    +    hasher = md5()
    +    hasher.update(data)
    +
    +    return hasher.hexdigest()
    +
    +def getHdfsFingerprint(hdfsUri="", hdfsDir="/hawq_default", isTesting=False):
    +    """Utility function to compute an MD5 
    +    hash from the output of a recursive HDFS 
    +    directory listing
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsPort = hdfsUri.split(":")[-1]
    +
    +    hdfsCommand = "hdfs dfs -ls -R {u}{d}".format(u=hdfsUri, d=hdfsDir)
    +    #print hdfsCommand
    +
    +    filelist = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as gpadmin user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (filelist, stderr) = hdfsProcess.communicate()
    +
    +        retVal = hdfsProcess.returncode
    +
    +        if retVal != 0:
    +            return retVal, stderr
    +
    +    # Sample output to follow
    +    else:
    +        filelist = """
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:32 hdfs://sandbox:8020/hawq_default/16385
    +        drwx------   - gpadmin gpadmin          0 2016-08-04 18:58 hdfs://sandbox:8020/hawq_default/16385/16387
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:14 hdfs://sandbox:8020/hawq_default/16385/16387/18947
    +        """;
    +
    +        retVal = 0
    +
    +    data = flattenFilelist(filelist, hdfsPort)
    +
    +    # sample yields: 342f414e7519f8c6a9eacce94777ba08
    +    return retVal, computeMd5(data)
    +
    +def etree_to_dict(t):
    +    """Utility function to turn an XML 
    +    element tree into a dictionary
    +
    +    """
    +
    +    d = {t.tag: {} if t.attrib else None}
    +    children = list(t)
    +    if children:
    +        dd = defaultdict(list)
    +        for dc in map(etree_to_dict, children):
    +            for k, v in dc.iteritems():
    +                dd[k].append(v)
    +
    +        d[t.tag] = dict((k, v[0]) if len(v) == 1 else dict((k, v)) for k, v in dd.iteritems())
    +    if t.attrib:
    +        d[t.tag].update(('@' + k, v) for k, v in t.attrib.iteritems())
    +    if t.text:
    +        text = t.text.strip()
    +        if children or t.attrib:
    +            if text:
    +              d[t.tag]['#text'] = text
    +        else:
    +            d[t.tag] = text
    +    return d
    +
    +def xmlToDict(data):
    +    """Wrapper function to convert 
    +    XML string into a Python dictionary
    +
    +    """
    +
    +    e = ElementTree.XML(data)
    +    return etree_to_dict(e)
    +
    +def getFalconStatus(falconUri="http://localhost:15000", entity="drSyncTest",
    +                    user="gpadmin", onlyRuntime=False, isTesting=False,
    +                    doDebug=False):
    +    """Get the current status of an existing Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        entity (str): the Falcon process entity name to get status for
    +        user (str): the username used for authorization
    +        onlyRuntime (bool): only query for process runtime (e.g. post-completion)
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +        doDebug (bool): debugging mode for additional verbosity
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise. message can contain process start time or logfile
    +        message (str): message can contain process end time or status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    # GET http://localhost:15000/api/entities/status/process/drSyncTest?user.name=falcon&fields=status,clusters,tags
    +
    +    endpoint = "/api/instance/status/process/{e}?user.name={u}&fields=status".format(u=user, e=entity)
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    if doDebug:
    +        print curlCommand
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +    if doDebug:
    +        print "stdout:", stdout, "stderr:", stderr
    +
    +    try:
    +        result = loads(stdout)
    +        if 'instances' not in result:
    +            print "No instance was started, try deleting the job and re-running"
    +            return -1, "stdout: {0}, stderr: {1}".format(stdout, stderr)
    +        if onlyRuntime:
    +            # Parse the start/end times in JSON result from cURL
    +            resultPayload = result['instances'][0]
    +            return resultPayload['startTime'], resultPayload['endTime']
    +        else:
    +            # Parse the logfile/status in JSON result from cURL
    +            resultPayload = result['instances'][0] #['actions'][0]
    +            return resultPayload['logFile'], resultPayload['status']
    +    except KeyError as e:
    +        print "KeyError in  getFalconStatus()", str(e), "\n", stdout
    +        return -1, str(e)
    +    except ValueError as e:
    +        print "ValueError in  getFalconStatus()", str(e), "\n", stdout
    +        print "Is Falcon running at : {} ?".format(falconUri)
    +        return -1, str(e)
    +
    +    # Example output follows:
    +    else:
    +        stdout = """{
    +            "status":"SUCCEEDED",
    +            "message":"default/STATUS\n",
    +            "requestId":"default/1436392466@qtp-1730704097-152 - 8dd8f7fa-2024-4bdb-a048-c188759c2f47\n",
    +            "instances":[{
    +                "instance":"2016-08-17T13:22Z",
    +                "status":"SUSPENDED",
    +                "logFile":"http://sandbox2.hortonworks.com:11000/oozie?job=0000014-160805194358788-oozie-oozi-W",
    +                "cluster":"secondaryIDP",
    +                "startTime":"2016-08-17T12:25:23-07:00",
    +                "details":"",
    +                "actions":[{
    +                    "action":"user-action",
    +                    "status":"RUNNING"
    +                }, {
    +                    "action":"failed-post-processing",
    +                    "status":"RUNNING",
    +                    "logFile":"http://sandbox2.hortonworks.com:8088/proxy/application_1470437437449_0002/"
    +                }]
    +            }]
    +        }""".replace("\n", "");
    +
    +        return 0, loads(stdout)['instances'][0]['actions'][0]
    +
    +def doFalconSchedule(falconUri="http://localhost:15000", jobName="drSyncTest",
    +                     userName="gpadmin", isTesting=False):
    +    """Schedule an existing Falcon process/job entity for execution
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobName (str): the Falcon process entity name to get status for
    +        userName (str): the username used for authorization
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false
    +
    +    endpoint = "/api/entities/schedule/process/{n}?user.name={u}&skipDryRun=true".format(n=jobName, u=userName)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except KeyError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +        except:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>default/drSyncTest(process) scheduled successfully</message>
    +            <requestId>default/2028387903@qtp-1730704097-6 - 89554f01-91cf-4bbd-97c2-ee175711b2ba</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +# Falcon process entity template used to create/update job attributes
    +drSyncTemplate="""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +<process name="{name}" xmlns="uri:falcon:process:0.1">
    +    <tags>_falcon_mirroring_type=HDFS</tags>
    +    <clusters>
    +        <cluster name="{executionClusterEntityName}">
    +            <validity start="{startTime}" end="{endTime}"/>
    +        </cluster>
    +    </clusters>
    +    <parallel>1</parallel>
    +    <order>LAST_ONLY</order>
    +    <frequency>days(7)</frequency>
    +    <timezone>GMT{gmtOffset}</timezone>
    +    <properties>
    +        <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
    +        <property name="distcpMaxMaps" value="{distcpMaxMaps}"/>
    +        <property name="distcpMapBandwidth" value="{distcpMaxMBpsPerMap}"/>
    +        <property name="drSourceDir" value="{pathToSync}"/>
    +        <property name="drTargetDir" value="{pathToSync}"/>
    +        <property name="drTargetClusterFS" value="{targetHdfsUri}"/>
    +        <property name="drSourceClusterFS" value="{sourceHdfsUri}"/>
    +        <!-- This can be a list of emails for notifications -->
    +        <property name="drNotificationReceivers" value="NA"/>
    +        <property name="targetCluster" value="{targetClusterEntityName}"/>
    +        <property name="sourceCluster" value="{sourceClusterEntityName}"/>
    +        <property name="queueName" value="default"/>
    +        <property name="jobPriority" value="HIGH"/>
    +    </properties>
    +    <workflow name="drSyncTest-WF" engine="oozie" path="{workflowFilename}" lib=""/>
    +    <retry policy="periodic" delay="minutes(1)" attempts="3"/>
    +    <ACL owner="{userName}" group="users" permission="0755"/>
    +</process>"""
    +
    +def doFalconSubmit(falconUri="http://localhost:15000", jobParameters=None,
    +                   isTesting=False):
    +    """Submit/create a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/submit/process?user.name=falcon
    +
    +    endpoint = "/api/entities/submit/process?user.name={u}".format(u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    # TODO long term would be to encapsulate Falcon functions in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except ElementTree.ParseError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>falcon/default/Submit successful (process) drSyncTest</message>
    +            <requestId>falcon/default/2028387903@qtp-1730704097-6 - 7ddba052-527b-462f-823f-e7dd0a1a08fa</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def doFalconSoar(falconUri="http://localhost:15000", jobParameters=None,
    +                 isTesting=False):
    +    """Update, schedule, and monitor a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/update/process/drSyncTest?user.name=falcon
    +
    +    endpoint = "/api/entities/update/process/{n}?user.name={u}".format(n=jobParameters['jobName'],
    +                                                                       u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")[:3] + ":00"
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    print "Scheduling for", oneMinuteLater
    +    print "Ending on", oneYearLater
    +
    +    # TODO encapsulate everything in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting AWS S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    # TODO output for debug level
    +    #from pprint import pprint
    +    #pprint (payload)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {uri} -d ".format(uri=falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Note, needed to seperate out the payload as it can't be split on spaces
    +        curlProcess = Popen(curlCommand.split() + [payload],
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        # Curl process completed successfully
    +        if retVal == 0:
    +
    +            try:
    +                # Parse the XML result from cURL into a dictionary
    +                result = xmlToDict(stdout)['result']
    +                stderr = result['message']
    +
    +                # Falcon REST update operation successful
    +                if "SUCCEEDED" in result['status']:
    +                    print "Falcon process update was successful"
    +
    +                    # We should doFalconSchedule() here
    +                    status, message = doFalconSchedule(falconUri=falconUri,
    +                                                       jobName=jobParameters['jobName'],
    +                                                       isTesting=False)
    +
    +                    # If we suceeded in scheduling
    +                    if "SUCCEEDED" in status:
    +                        print "Falcon process scheduling was successful"
    +
    +                        # Reset retVal to catch error between scheduled and running states
    +                        retVal = -1
    +                        sleep(5)
    +
    +                        message, status = getFalconStatus(falconUri=falconUri,
    +                                                          entity=jobParameters['jobName'])
    +
    +                        # Continuously poll for hdfs-mirroring status
    +                        while "RUNNING" in status:
    +                            message, status = getFalconStatus(falconUri=falconUri,
    +                                                              entity=jobParameters['jobName'])
    +                            print status
    +
    +                            # flag RUNNING state reached using retVal
    +                            retVal = 0
    +                            sleep(10)
    +
    +                        if status == "KILLED":
    +                            return -1, message
    +
    +                        # Poll one last time for runtimes
    +                        start, finish = getFalconStatus(falconUri=falconUri,
    +                                                        entity=jobParameters['jobName'],
    +                                                        onlyRuntime=True)
    +
    +                        return retVal, "logfile: {0} started: {1} finished: {2}".format(message, start, finish)
    +
    +                    # Scheduling failed
    +                    else:
    +                        return -1, message
    +
    +                # Falcon REST update operation NOT successful
    +                else:
    +                    print "Falcon REST operation not successful"
    +                    return result['status'], stderr
    +
    +            except KeyError:
    +                print "Are you using the correct Falcon server URI?", falconUri
    +                return -1, stdout
    +
    +        # Curl process did not complete successfully
    +        else:
    +            print "Curl command failed"
    +            return retVal, stderr
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +            <result>
    +                <status>SUCCEEDED</status>
    +                <message>falcon/update/default/Updated successfully</message>
    +                <requestId>falcon/update/default/868391317@qtp-1730704097-47 - b2391bd7-3ae0-468e-b39c-5d002099a446</requestId>
    +            </result>
    +        """.replace("\n", "");
    +
    +        # Parse the XML result from cURL into a dictionary
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def hdfsFingerprintsMatch(reference, comparison):
    +    """Helper function to compare two fingerprints / md5 hashes
    +
    +    Args:
    +        reference (str): the reference MD5 checksum string
    +        comparison (str): the comparison MD5 checksum string
    +    
    +    Returns:
    +        isEqual (bool): Zero for success, negative one otherwise 
    +
    +    """
    +
    +    return reference == comparison
    +
    +
    +def stopHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a quick stop of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStopCommand = "hawq stop master -a -M fast"
    +
    +    if masterHost is not None:
    +        hawqStopCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                    c=hawqStopCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStopProcess = Popen(hawqStopCommand,
    +                                stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStopProcess.communicate()
    +
    +        return hawqStopProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +def startHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a start of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStartCommand = "hawq start master -a"
    +
    +    if masterHost is not None:
    +        hawqStartCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                     c=hawqStartCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStartProcess = Popen(hawqStartCommand,
    +                                 stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStartProcess.communicate()
    +
    +        return hawqStartProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +
    +def copyToHdfs(source, dest, isTesting=False):
    +    """Utility function to copy a source file
    +    to the destination HDFS directory/file
    +
    +    Args:
    +        source (str): the source file on the local FS
    +        dest (str): the target HDFS directory and filename
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source,
    +                                                                    d=dest)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def checkHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to query HDFS for 
    +    safemode enabled or disabled
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +    
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode get"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        try:
    +            offOrOn = True if "ON" in stdout.split()[-1] else False
    +        except IndexError as e:
    +            return -1, str(e)
    +
    +        return hdfsProcess.returncode, offOrOn
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def enableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to enable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode enter"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def disableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to disable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode leave"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def forceHdfsCheckpoint(namenodeHost=None, isTesting=False):
    +    """Utility function to force an HDFS checkpoint
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -saveNamespace"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +
    +def createTarball(masterDataBase="/data/hawq/",
    +                  targetTarball="/tmp/hawqMdd-{t}.tar", isTesting=False):
    +    """Utility function to create a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        masterDataBase (str): the base directory containing the MASTER_DATA_DIRECTORY
    +        targetTarball (str): the target directory and filename of the tarball
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # tar cpf /tmp/test.tar.bz2 --exclude=csv -C /data/hawq master
    +
    +    theTime = strftime("%Y-%m-%d-%H%M")
    +
    +    tarCommand = "tar -cpf {t} --exclude=csv -C {c} master".format(t=targetTarball.format(t=theTime),
    +                                                                   c=masterDataBase)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = tarProcess.communicate()
    +
    +        except OSError as e:
    +            return -1, str(e), -1
    +
    +        if tarProcess.returncode == 0:
    +
    +            md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime))
    +
    +            try:
    +                md5Process = Popen(md5Command.split(),
    +                                   stdout=PIPE, stderr=PIPE)
    +
    +                (stdout2, stderr2) = md5Process.communicate()
    +
    +                checksum = stdout2.split()[0].strip()
    +
    +                if md5Process.returncode != 0:
    +                    return -1, "md5 checksum creation failed : " + stderr2, -1
    +                else:
    +                    return 0, targetTarball.format(t=theTime), checksum
    +
    +            except OSError as e:
    +                return -1, str(e), -1
    +
    +        else:
    +            print "Tarball creation failed : " + stderr
    +            return -1, stderr, -1
    +
    +    else:
    +        return 0, "TEST BRANCH", -1
    +
    +def cleanupTarball(filename, isTesting=False):
    +    """Utility function to delete a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        filename (str): the target directory and filename of the tarball to clean up
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # rm -f /tmp/test.tar
    +
    +    rmCommand = "rm -f {f}".format(f=filename)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            rmProcess = Popen(rmCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = rmProcess.communicate()
    +
    +            retVal = rmProcess.returncode
    +
    +            return retVal, stderr
    +
    +        except OSError as e:
    +            return -1, str(e)
    +    else:
    +        return 0, "TEST BRANCH"
    +
    +if __name__ == '__main__':
    +    options, args = parseargs()
    +
    +    #if options.verbose:
    +    #    enable_verbose_logging()
    +
    +    # TODO - switch prints to this once using gppylibs
    +    #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir)
    +
    +
    +    # ### HAWQ Extract every non-system table (source)
    +    # Note: the asusmption is this has been done in
    +    # advance of executing this tool.
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        print ""
    +        print "Please confirm you've performed hawq_extract() on all critical data tables"
    +        print "and saved this information outside of the cluster (e.g. version control)"
    +        print "or are using Falcon with an atomic option (i.e. in HDP-2.5: snapshot-based replication)"
    +        print ""
    +        print "This is critical for data recovery if a sync operation partially completes!"
    +        print ""
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Falcon cluster entities
    +    # Note: the assumption is both source and target
    +    # cluster entities have alredy been created in Falcon
    +    # TODO add a confirmation step, later a REST call to check
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print ""
    +        print "Please confirm you've created both source and target Falcon cluster entities:"
    +        print ""
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Stop HAWQ
    +    #
    +    # TODO?: switch to Ambari REST, followed by pkill -5 <<some hawq filter>>
    +
    +    # Source
    +    earlier = int(time())
    +    print "Stopping source HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Stopping target HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(masterHost=options.targetHawqMaster,
    +                              isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Restarting source HAWQ" if options.verbose else None;
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +
    +    if retVal == 0:
    +        print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +    else:
    +        print "Source HAWQ failed to restart after pre-sync failure."
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Create HAWQ Master Data Directory archive
    +    print "Creating MDD tarball" if options.verbose else None;
    +    retVal, filenameOrStderr, md5sum = createTarball(masterDataBase="/data/hawq/",
    +                                                     isTesting=options.testMode)
    +    print retVal, filenameOrStderr, md5sum if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to create archive of source HAWQ MASTER_DATA_DIRECTORY"
    +        print "Error message was : " + filenameOrStderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +        if retVal == 0:
    +            print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +        else:
    +            print "Source HAWQ failed to restart after pre-sync failure."
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Start HAWQ
    +    print "Starting source HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(isTesting=options.testMode)
    +    later = int(time())
    +
    +    if retVal != 0:
    +        print "Failed to start source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +
    +    # TODO add a CLI flag to force source into read-write
    +    if checkHdfsSafemode()[1] == True:
    +        print "Source cluster HDFS is read-only, cannot proceed"
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        sys.exit(1)
    +
    +    # ### Copy MDD archive to HDFS
    +    print "Copying MDD tarball to HDFS" if options.verbose else None;
    +    retVal, stderr = copyToHdfs(source=filenameOrStderr,
    +                                dest=options.pathToSync,
    +                                isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to copy MDD tarball to HDFS"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Cleanup MDD archive from /tmp
    +    print "Cleaning up MDD tarball on local FS" if options.verbose else None;
    +    retVal, stderr = cleanupTarball(filenameOrStderr,
    +                                    isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to clean up MDD tarball"
    +        print "Error message was " + stderr
    +        print ""
    +        print "You will need to manually remove the following file"
    +        print filenameOrStderr
    +        print ""
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    """
    +    # ### Force HDFS checkpoint and enable safemode on source
    +    print "Enabling HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = enableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on source cluster" if options.verbose else None;
    +    retVal, stderr = forceHdfsCheckpoint(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    """;
    +
    +    # ### Leave safemode on target HDFS
    +    print "Disabling HDFS safemode on target" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(namenodeHost=options.targetNamenode,
    +                                         isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Note, the entity names refer to Falcon
    +    # entities that have been created prior
    +    # to execution of this tool
    +    """
    +    jobParameters = dict(userName="gpadmin",
    +                         distcpMaxMaps="100",
    +                         distcpMaxMBpsPerMap="1000",
    +                         sourceClusterEntityName="sourceCluster",
    +                         sourceHdfsUri="hdfs://{0}:8020".format(sourceNamenode),
    +                         targetClusterEntityName="targetCluster",
    +                         txMapsargetHdfsUri="hdfs://{0}:8020".format(targetNamenode),
    +                         executionClusterEntityName="sourceCluster",
    +                         workflowFilename="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                         pathToSync="/tmp/syncTest",
    +                         jobName="drSync")
    +    """;
    +
    +    jobParameters = dict(userName=options.userName,
    +                         distcpMaxMaps=options.distcpMaxMaps,
    +                         distcpMaxMBpsPerMap=options.distcpMaxMBpsPerMap,
    +                         sourceClusterEntityName=options.sourceClusterEntityName,
    +                         sourceHdfsUri="hdfs://{0}:8020".format(options.sourceNamenode),
    +                         targetClusterEntityName=options.targetClusterEntityName,
    +                         targetHdfsUri="hdfs://{0}:8020".format(options.targetNamenode),
    +                         executionClusterEntityName=options.executionClusterEntityName,
    +                         workflowFilename=options.workflowFilename,
    +                         pathToSync=options.pathToSync,
    +                         jobName=options.jobName)
    +
    +    print jobParameters if options.verbose else None;
    +
    +    # ### Update and Schedule Job - monitor until completion
    +    print "Falcon Soar" if options.verbose else None;
    +    retVal, stderr = doFalconSoar(falconUri=options.falconUri,
    +                                  jobParameters=jobParameters,
    +                                  isTesting=options.testMode)
    +    falconOutput = stderr
    +
    +    if retVal != 0:
    +        print "Falcon replication job failed"
    +        print "Error message was " + stderr
    +        print "Source cluster will be left in safemode for remediation"
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Leave safemode on source HDFS
    +    print "Disable HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Force HDFS checkpoint and enable safemode on target
    +    print "Enabling HDFS safemode on target cluster" if options.verbose else None
    +    retVal, stderr = enableHdfsSafemode(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on target cluster" if options.verbose else None
    +    retVal, stderr = forceHdfsCheckpoint(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### HDFS Fingerprint comparison
    +
    +    print "Validating HDFS fingerprints match between source and target clusters" if options.verbose else None
    +
    +    retVal, md5OrStderr = getHdfsFingerprint(hdfsUri=jobParameters['sourceHdfsUri'],
    +                                             hdfsDir=jobParameters['pathToSync'],
    +                                             isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on source cluster"
    +        print "Error message was " + md5OrStderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    retVal, md5OrStderr2 = getHdfsFingerprint(hdfsUri=jobParameters['targetHdfsUri'],
    +                                              hdfsDir=jobParameters['pathToSync'],
    +                                              isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on target cluster"
    +        print "Error message was " + md5OrStderr2
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    isValidSync = hdfsFingerprintsMatch(md5OrStderr, md5OrStderr2)
    +
    +    if not isValidSync:
    +        print "Source and target cluster HDFS fingerprints do not match."
    +        print "Source checksum : " + md5OrStderr
    +        print "Target checksum : " + md5OrStderr2
    +        print "This is bad, please check Falcon sync logs : " + falconOutput
    +        sys.exit(1)
    +    else:
    +        print "Source and target HDFS fingerprints match."
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Starting target HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(masterHost=options.targetHawqMaster,
    +                               isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    --- End diff --
    
    @kdunn926 , may I understand till here all the files are copied to new HAWQ master data directory, but HAWQ catalog information has not been changed. We can leverage hawq register to register the files into HAWQ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #940: HAWQ 1078. Implement hawqsync-falcon DR utility.

Posted by kdunn-pivotal <gi...@git.apache.org>.
Github user kdunn-pivotal commented on the issue:

    https://github.com/apache/incubator-hawq/pull/940
  
    Here is the step-by-step process - it may have some gaps but this is likely 90% of the steps:
    
    # HAWQSYNC initial setup runbook:
    
    1. Ensure network connectivity between source and DR sites 
    
    | Port  	| Function 	| Servers                                                                    	|
    |-------	|----------	|----------------------------------------------------------------------------	|
    | 11000 	| Oozie    	| From Falcon server in each env to Oozie server in other env                	|
    | 15000 	| Falcon   	| From HAWQ master to Falcon server in other env                             	|
    | 50010 	| Datanode 	| From Falcon server & datanodes in each env to datanodes in other env       	|
    | 50070 	| Namenode 	| From Falcon server to namenodes,(primary and standby) in other env         	|
    | 8020  	| Namenode 	| From datanodes to namenodes (primary and standby) other env                	|
    | 8050  	| YARN RM  	| From Falcon server in each env to YARN ResourceManager server in other env 	|
    
    2. Install Falcon and Oozie on source and DR HAWQ clusters
    
    3. Make prerequisite directories on both clusters (source, DR):
    
    ```
    $ sudo su falcon -l -c 'hdfs dfs -mkdir /tmp/{staging,working}'
    $ sudo su falcon -l -c 'hdfs dfs -chmod 777 /tmp/staging'
    $ sudo su hdfs -l -c 'hdfs dfs -mkdir /apps/data-mirroring/workflows/lib'
    $ sudo su hdfs -l -c 'hdfs dfs -chmod -R 777 /apps/data-mirroring'
    $ sudo su hdfs -l -c 'hdfs dfs -mkdir /user/falcon && hdfs dfs -chown falcon:falcon /user/falcon'
    $ sudo su hdfs -l -c 'hdfs dfs -mkdir /user/gpadmin && hdfs dfs -chown gpadmin:gpadmin /user/gpadmin/'
    ```
    
    4. Setup cluster entities for source and DR clusters:
    
    ```
    gpadmin@source $ curl -H "Content-Type:text/xml" -X POST http://<FALCON_HOST>:15000/api/entities/submit/cluster?user.name=falcon -d '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <cluster name="primaryIDP" description="" colo="SOURCE" xmlns="uri:falcon:cluster:0.1">
        <interfaces>
            <interface type="readonly" endpoint="hftp://sandbox.hortonworks.com:50070" version="2.2.0"/>
            <interface type="write" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.2.0"/>
            <interface type="execute" endpoint="sandbox.hortonworks.com:8050" version="2.2.0"/>
            <interface type="workflow" endpoint="http://sandbox.hortonworks.com:11000/oozie/" version="4.0.0"/>
            <interface type="messaging" endpoint="tcp://sandbox.hortonworks.com:61616?daemon=true" version="5.1.6"/>
        </interfaces>
        <locations>
            <location name="staging" path="/tmp/staging"/>
            <location name="temp" path="/tmp"/>
            <location name="working" path="/tmp/working"/>
        </locations>
        <ACL owner="hdfs" group="users" permission="0755"/>
    </cluster>'
    ```
    
    ```
    gpadmin@dr $ curl -H "Content-Type:text/xml" -X POST http://<FALCON_HOST>:15000/api/entities/submit/cluster?user.name=falcon -d '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <cluster name="drIDP" description="" colo="DR" xmlns="uri:falcon:cluster:0.1">
        <interfaces>
            <interface type="readonly" endpoint="hftp://sandbox2.hortonworks.com:50070" version="2.2.0"/>
            <interface type="write" endpoint="hdfs://sandbox2.hortonworks.com:8020" version="2.2.0"/>
            <interface type="execute" endpoint="sandbox2.hortonworks.com:8050" version="2.2.0"/>
            <interface type="workflow" endpoint="http://sandbox2.hortonworks.com:11000/oozie/" version="4.0.0"/>
            <interface type="messaging" endpoint="tcp://sandbox2.hortonworks.com:61616?daemon=true" version="5.1.6"/>
        </interfaces>
        <locations>
            <location name="staging" path="/tmp/staging"/>
            <location name="temp" path="/tmp"/>
            <location name="working" path="/tmp/working"/>
        </locations>
        <ACL owner="hdfs" group="users" permission="0755"/>
    </cluster>'
    ```
    
    5. Stage distcp-based replication workflow on both source, DR HDFS
    ```
    gpadmin@{source,dr} $ hdfs dfs -cat - /apps/data-mirroring/workflows/hdfs-replication-workflow-v2.xml <<EOF
    <!--
           Licensed to the Apache Software Foundation (ASF) under one
      or more contributor license agreements.  See the NOTICE file
      distributed with this work for additional information
      regarding copyright ownership.  The ASF licenses this file
      to you under the Apache License, Version 2.0 (the
      "License"); you may not use this file except in compliance
      with the License.  You may obtain a copy of the License at
    
          http://www.apache.org/licenses/LICENSE-2.0
    
      Unless required by applicable law or agreed to in writing, software
      distributed under the License is distributed on an "AS IS" BASIS,
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      See the License for the specific language governing permissions and
      limitations under the License.
      -->
    <workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-fs-workflow'>
        <start to='dr-replication'/>
        <!-- Replication action -->
        <action name="dr-replication">
            <distcp xmlns="uri:oozie:distcp-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.job.priority</name>
                        <value>${jobPriority}</value>
                    </property>
                    <property>
                        <name>mapred.job.queue.name</name>
                        <value>${queueName}</value>
                    </property>
                </configuration>
                <arg>-update</arg>
                <arg>-delete</arg>
                <arg>-m</arg>
                <arg>${distcpMaxMaps}</arg>
                <arg>-bandwidth</arg>
                <arg>${distcpMapBandwidth}</arg>
                <arg>-strategy</arg>
                <arg>dynamic</arg>
                <arg>${drSourceClusterFS}${drSourceDir}</arg>
                <arg>${drTargetClusterFS}${drTargetDir}</arg>
            </distcp>
            <ok to="end"/>
            <error to="fail"/>
        </action>
        <kill name="fail">
            <message>
                Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
            </message>
        </kill>
        <end name="end"/>
    </workflow-app>
    EOF
    ```
    
    # Sync operation runbook:
    1. Run hawqsync-extract to capture known-good HDFS file sizes (protects against HDFS / catalog inconsistency if failure during sync)
    
    2. Run ETL batch
    
    3. Run hawqsync-falcon, which performs the following steps:
    (source safemode during sync is only allowable if using a remote Falcon to "pull", meaning the distcp job executes on the DR site)
      1. Stop both HAWQ masters (source and target)
      2. Archive source MASTER_DATA_DIRECTORY (MDD) tarball to HDFS
      3. Restart source HAWQ master
      4. Enable HDFS safe mode and force source checkpoint
      5. Disable remote HDFS safe mode
      6. Execute Apache Falcon-based distcp sync process
      7. Disable source HDFS safe mode
      8. Enable HDFS safe mode and force remote checkpoint
    
    # DR event runbook:
    1. Copy new catalog to local filesystem
    ```
    [gpadmin@dr-hawqmaster ~]$ hdfs dfs -copyToLocal /hawq_default/hawqMdd-2016-10-11-1028.tar .
    ```
    
    2. Archive previous catalog
    ```
    # cd /data/hawq/
    root@dr-hawqmaster:/data/hawq # mv master master.save-11oct2016
    ```
    
    3. Unpack new catalog
    ```
    root@dr-hawqmaster:/data/hawq # tar xpf ~gpadmin/hawqMdd-2016-10-11-1028.tar -C `pwd`
    ```
    
    4. Restart Master with new catalog in place
    ```
    [gpadmin@dr-hawqmaster ~]$ hawq start master -a
    ```
    
    5. Update Standby Master Identity
    ```
    [gpadmin@dr-hawqmaster ~]$ export PGOPTIONS="-c gp_session_role=UTILITY -c allow_system_table_mods=dml"
    [gpadmin@dr-hawqmaster ~]$ cat > psql template1 <<SQL
    
    UPDATE gp_segment_configuration SET (hostname, address) = ('clppn1prhdbmn02.infosolco.net', '10.228.45.12') WHERE role = 's' ;
    SQL
    ```
    
    6. Start cluster segments
    ```
    [gpadmin@dr-hawqmaster ~]$ hawq start allsegments -a
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #940: HAWQ 1078. Implement hawqsync-falcon DR utility.

Posted by kdunn-pivotal <gi...@git.apache.org>.
Github user kdunn-pivotal commented on the issue:

    https://github.com/apache/incubator-hawq/pull/940
  
    @vvineet How can we get this prioritized for the next release? Also, anyone that can put eyes on it for a code review would be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #940: HAWQ 1078. Implement hawqsync-falcon DR ut...

Posted by kdunn-pivotal <gi...@git.apache.org>.
Github user kdunn-pivotal commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/940#discussion_r83278114
  
    --- Diff: tools/bin/hawqsync-falcon ---
    @@ -0,0 +1,1331 @@
    +#!/usr/bin/env python
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +import os
    +import sys
    +from optparse import OptionParser
    +from subprocess import Popen, PIPE
    +from hashlib import md5
    +from json import loads
    +from time import strftime, sleep, time
    +from collections import defaultdict
    +# TODO - make use of these common HAWQ libs instead of print
    +#from gppylib.gplog import setup_hawq_tool_logging, enable_verbose_logging
    +#from gppylib.commands.unix import getLocalHostname, getUserName
    +try:
    +    from xml.etree import cElementTree as ElementTree
    +except ImportError, e:
    +    from xml.etree import ElementTree
    +
    +def parseargs():
    +    parser = OptionParser(usage="HAWQ sync options.")
    +    parser.add_option('-v', '--verbose', action='store_true',
    +                      default=False)
    +    parser.add_option("-a", "--prompt", action="store_false",
    +                      dest="prompt", default=True,
    +                      help="Execute without prompt.")
    +    parser.add_option("-l", "--logdir", dest="logDir",
    +                      help="Sets the directory for log files")
    +    parser.add_option('-d', '--dryRun', action='store_true',
    +                      default=False,
    +                      dest='testMode', help="Execute in test mode")
    +    parser.add_option('-u', '--user', dest='userName', default="gpadmin",
    +                      help="The user to own Falcon ACLs and run job as")
    +    parser.add_option('--maxMaps', dest='distcpMaxMaps',
    +                      default="10",
    +                      help="The maximum number of map jobs to allow")
    +    parser.add_option('--mapBandwidth', dest='distcpMaxMBpsPerMap',
    +                      default="100",
    +                      help="The maximum allowable bandwidth for each map job, in MB/s")
    +    parser.add_option('-s', '--sourceNamenode', dest='sourceNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-S', '--sourceEntity', dest='sourceClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name of the source")
    +    parser.add_option('-m', '--sourceHawqMaster', dest='sourceHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the source HAWQ master")
    +    parser.add_option('-M', '--targetHawqMaster', dest='targetHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the target HAWQ master")
    +    parser.add_option('-f', '--falconUri', dest='falconUri',
    +                      default="http://localhost:15000",
    +                      help="The URI to use for issuing Falcon REST calls")
    +    parser.add_option('-t', '--targetNamenode', dest='targetNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-T', '--targetEntity', dest='targetClusterEntityName',
    +                      default="target",
    +                      help="The Falcon cluster entity name of the target")
    +    parser.add_option('-e', '--executionEntity',
    +                      dest='executionClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name specifying where to execute the job")
    +    parser.add_option('-w', '--workflowHdfsFilename', dest='workflowFilename',
    +                      default="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                      help="The HDFS location of the underlying Oozie workflow to use for sync job")
    +    parser.add_option('-p', '--pathToSync', dest='pathToSync',
    +                      default="/tmp/syncTest",
    +                      help="The root directory to be syncronized")
    +    parser.add_option('-j', '--jobName', dest='jobName', default="drSync",
    +                      help="The Falcon job entity name to be executed")
    +
    +    (options, args) = parser.parse_args()
    +    return (options, args)
    +
    +def extractFilenameAndSize(line, hdfsPort):
    +    """Utility function to extract filename and file
    +    size from a line of output from `hdfs dfs -ls -R`
    +
    +    """
    +
    +    tokens = line.split()
    +    return tokens[-1].split(":" + hdfsPort)[-1], tokens[4]
    +
    +def flattenFilelist(data, hdfsPort):
    +    """Utility function to convert a list of output
    +    lines from `hdfs dfs -ls -R` into a single, sorted, 
    +    delimited string to be used as a syncronization
    +    fingerprint
    +
    +    """
    +
    +    # Ensure record contains expected number of fields
    +    isValid = lambda r: len(r.strip().split()) == 8
    +
    +    # Subset the records to only filename and size fields
    +    filenameAndSize = [extractFilenameAndSize(line, hdfsPort) for line in data.split("\n") if isValid(line)]
    +
    +    # Reverse sort the list by filename column
    +    sortedFilenameAndSize = sorted(filenameAndSize, key=lambda r: r[0], reverse=True)
    +
    +    # Flatten a single line into a delimited string
    +    mergeLines = lambda l: "-".join(l)
    +
    +    # Perform the flatten for every line and join lines into a string
    +    return "\n".join(map(mergeLines, sortedFilenameAndSize))
    +
    +def computeMd5(data):
    +    """Utility function to compute MD5 checksum
    +
    +    """
    +    hasher = md5()
    +    hasher.update(data)
    +
    +    return hasher.hexdigest()
    +
    +def getHdfsFingerprint(hdfsUri="", hdfsDir="/hawq_default", isTesting=False):
    +    """Utility function to compute an MD5 
    +    hash from the output of a recursive HDFS 
    +    directory listing
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsPort = hdfsUri.split(":")[-1]
    +
    +    hdfsCommand = "hdfs dfs -ls -R {u}{d}".format(u=hdfsUri, d=hdfsDir)
    +    #print hdfsCommand
    +
    +    filelist = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as gpadmin user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (filelist, stderr) = hdfsProcess.communicate()
    +
    +        retVal = hdfsProcess.returncode
    +
    +        if retVal != 0:
    +            return retVal, stderr
    +
    +    # Sample output to follow
    +    else:
    +        filelist = """
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:32 hdfs://sandbox:8020/hawq_default/16385
    +        drwx------   - gpadmin gpadmin          0 2016-08-04 18:58 hdfs://sandbox:8020/hawq_default/16385/16387
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:14 hdfs://sandbox:8020/hawq_default/16385/16387/18947
    +        """;
    +
    +        retVal = 0
    +
    +    data = flattenFilelist(filelist, hdfsPort)
    +
    +    # sample yields: 342f414e7519f8c6a9eacce94777ba08
    +    return retVal, computeMd5(data)
    +
    +def etree_to_dict(t):
    +    """Utility function to turn an XML 
    +    element tree into a dictionary
    +
    +    """
    +
    +    d = {t.tag: {} if t.attrib else None}
    +    children = list(t)
    +    if children:
    +        dd = defaultdict(list)
    +        for dc in map(etree_to_dict, children):
    +            for k, v in dc.iteritems():
    +                dd[k].append(v)
    +
    +        d[t.tag] = dict((k, v[0]) if len(v) == 1 else dict((k, v)) for k, v in dd.iteritems())
    +    if t.attrib:
    +        d[t.tag].update(('@' + k, v) for k, v in t.attrib.iteritems())
    +    if t.text:
    +        text = t.text.strip()
    +        if children or t.attrib:
    +            if text:
    +              d[t.tag]['#text'] = text
    +        else:
    +            d[t.tag] = text
    +    return d
    +
    +def xmlToDict(data):
    +    """Wrapper function to convert 
    +    XML string into a Python dictionary
    +
    +    """
    +
    +    e = ElementTree.XML(data)
    +    return etree_to_dict(e)
    +
    +def getFalconStatus(falconUri="http://localhost:15000", entity="drSyncTest",
    +                    user="gpadmin", onlyRuntime=False, isTesting=False,
    +                    doDebug=False):
    +    """Get the current status of an existing Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        entity (str): the Falcon process entity name to get status for
    +        user (str): the username used for authorization
    +        onlyRuntime (bool): only query for process runtime (e.g. post-completion)
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +        doDebug (bool): debugging mode for additional verbosity
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise. message can contain process start time or logfile
    +        message (str): message can contain process end time or status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    # GET http://localhost:15000/api/entities/status/process/drSyncTest?user.name=falcon&fields=status,clusters,tags
    +
    +    endpoint = "/api/instance/status/process/{e}?user.name={u}&fields=status".format(u=user, e=entity)
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    if doDebug:
    +        print curlCommand
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +    if doDebug:
    +        print "stdout:", stdout, "stderr:", stderr
    +
    +    try:
    +        result = loads(stdout)
    +        if 'instances' not in result:
    +            print "No instance was started, try deleting the job and re-running"
    +            return -1, "stdout: {0}, stderr: {1}".format(stdout, stderr)
    +        if onlyRuntime:
    +            # Parse the start/end times in JSON result from cURL
    +            resultPayload = result['instances'][0]
    +            return resultPayload['startTime'], resultPayload['endTime']
    +        else:
    +            # Parse the logfile/status in JSON result from cURL
    +            resultPayload = result['instances'][0] #['actions'][0]
    +            return resultPayload['logFile'], resultPayload['status']
    +    except KeyError as e:
    +        print "KeyError in  getFalconStatus()", str(e), "\n", stdout
    +        return -1, str(e)
    +    except ValueError as e:
    +        print "ValueError in  getFalconStatus()", str(e), "\n", stdout
    +        print "Is Falcon running at : {} ?".format(falconUri)
    +        return -1, str(e)
    +
    +    # Example output follows:
    +    else:
    +        stdout = """{
    +            "status":"SUCCEEDED",
    +            "message":"default/STATUS\n",
    +            "requestId":"default/1436392466@qtp-1730704097-152 - 8dd8f7fa-2024-4bdb-a048-c188759c2f47\n",
    +            "instances":[{
    +                "instance":"2016-08-17T13:22Z",
    +                "status":"SUSPENDED",
    +                "logFile":"http://sandbox2.hortonworks.com:11000/oozie?job=0000014-160805194358788-oozie-oozi-W",
    +                "cluster":"secondaryIDP",
    +                "startTime":"2016-08-17T12:25:23-07:00",
    +                "details":"",
    +                "actions":[{
    +                    "action":"user-action",
    +                    "status":"RUNNING"
    +                }, {
    +                    "action":"failed-post-processing",
    +                    "status":"RUNNING",
    +                    "logFile":"http://sandbox2.hortonworks.com:8088/proxy/application_1470437437449_0002/"
    +                }]
    +            }]
    +        }""".replace("\n", "");
    +
    +        return 0, loads(stdout)['instances'][0]['actions'][0]
    +
    +def doFalconSchedule(falconUri="http://localhost:15000", jobName="drSyncTest",
    +                     userName="gpadmin", isTesting=False):
    +    """Schedule an existing Falcon process/job entity for execution
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobName (str): the Falcon process entity name to get status for
    +        userName (str): the username used for authorization
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false
    +
    +    endpoint = "/api/entities/schedule/process/{n}?user.name={u}&skipDryRun=true".format(n=jobName, u=userName)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except KeyError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +        except:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>default/drSyncTest(process) scheduled successfully</message>
    +            <requestId>default/2028387903@qtp-1730704097-6 - 89554f01-91cf-4bbd-97c2-ee175711b2ba</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +# Falcon process entity template used to create/update job attributes
    +drSyncTemplate="""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +<process name="{name}" xmlns="uri:falcon:process:0.1">
    +    <tags>_falcon_mirroring_type=HDFS</tags>
    +    <clusters>
    +        <cluster name="{executionClusterEntityName}">
    +            <validity start="{startTime}" end="{endTime}"/>
    +        </cluster>
    +    </clusters>
    +    <parallel>1</parallel>
    +    <order>LAST_ONLY</order>
    +    <frequency>days(7)</frequency>
    +    <timezone>GMT{gmtOffset}</timezone>
    +    <properties>
    +        <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
    +        <property name="distcpMaxMaps" value="{distcpMaxMaps}"/>
    +        <property name="distcpMapBandwidth" value="{distcpMaxMBpsPerMap}"/>
    +        <property name="drSourceDir" value="{pathToSync}"/>
    +        <property name="drTargetDir" value="{pathToSync}"/>
    +        <property name="drTargetClusterFS" value="{targetHdfsUri}"/>
    +        <property name="drSourceClusterFS" value="{sourceHdfsUri}"/>
    +        <!-- This can be a list of emails for notifications -->
    +        <property name="drNotificationReceivers" value="NA"/>
    +        <property name="targetCluster" value="{targetClusterEntityName}"/>
    +        <property name="sourceCluster" value="{sourceClusterEntityName}"/>
    +        <property name="queueName" value="default"/>
    +        <property name="jobPriority" value="HIGH"/>
    +    </properties>
    +    <workflow name="drSyncTest-WF" engine="oozie" path="{workflowFilename}" lib=""/>
    +    <retry policy="periodic" delay="minutes(1)" attempts="3"/>
    +    <ACL owner="{userName}" group="users" permission="0755"/>
    +</process>"""
    +
    +def doFalconSubmit(falconUri="http://localhost:15000", jobParameters=None,
    +                   isTesting=False):
    +    """Submit/create a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/submit/process?user.name=falcon
    +
    +    endpoint = "/api/entities/submit/process?user.name={u}".format(u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    # TODO long term would be to encapsulate Falcon functions in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except ElementTree.ParseError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>falcon/default/Submit successful (process) drSyncTest</message>
    +            <requestId>falcon/default/2028387903@qtp-1730704097-6 - 7ddba052-527b-462f-823f-e7dd0a1a08fa</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def doFalconSoar(falconUri="http://localhost:15000", jobParameters=None,
    +                 isTesting=False):
    +    """Update, schedule, and monitor a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/update/process/drSyncTest?user.name=falcon
    +
    +    endpoint = "/api/entities/update/process/{n}?user.name={u}".format(n=jobParameters['jobName'],
    +                                                                       u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")[:3] + ":00"
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    print "Scheduling for", oneMinuteLater
    +    print "Ending on", oneYearLater
    +
    +    # TODO encapsulate everything in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting AWS S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    # TODO output for debug level
    +    #from pprint import pprint
    +    #pprint (payload)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {uri} -d ".format(uri=falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Note, needed to seperate out the payload as it can't be split on spaces
    +        curlProcess = Popen(curlCommand.split() + [payload],
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        # Curl process completed successfully
    +        if retVal == 0:
    +
    +            try:
    +                # Parse the XML result from cURL into a dictionary
    +                result = xmlToDict(stdout)['result']
    +                stderr = result['message']
    +
    +                # Falcon REST update operation successful
    +                if "SUCCEEDED" in result['status']:
    +                    print "Falcon process update was successful"
    +
    +                    # We should doFalconSchedule() here
    +                    status, message = doFalconSchedule(falconUri=falconUri,
    +                                                       jobName=jobParameters['jobName'],
    +                                                       isTesting=False)
    +
    +                    # If we suceeded in scheduling
    +                    if "SUCCEEDED" in status:
    +                        print "Falcon process scheduling was successful"
    +
    +                        # Reset retVal to catch error between scheduled and running states
    +                        retVal = -1
    +                        sleep(5)
    +
    +                        message, status = getFalconStatus(falconUri=falconUri,
    +                                                          entity=jobParameters['jobName'])
    +
    +                        # Continuously poll for hdfs-mirroring status
    +                        while "RUNNING" in status:
    +                            message, status = getFalconStatus(falconUri=falconUri,
    +                                                              entity=jobParameters['jobName'])
    +                            print status
    +
    +                            # flag RUNNING state reached using retVal
    +                            retVal = 0
    +                            sleep(10)
    +
    +                        if status == "KILLED":
    +                            return -1, message
    +
    +                        # Poll one last time for runtimes
    +                        start, finish = getFalconStatus(falconUri=falconUri,
    +                                                        entity=jobParameters['jobName'],
    +                                                        onlyRuntime=True)
    +
    +                        return retVal, "logfile: {0} started: {1} finished: {2}".format(message, start, finish)
    +
    +                    # Scheduling failed
    +                    else:
    +                        return -1, message
    +
    +                # Falcon REST update operation NOT successful
    +                else:
    +                    print "Falcon REST operation not successful"
    +                    return result['status'], stderr
    +
    +            except KeyError:
    +                print "Are you using the correct Falcon server URI?", falconUri
    +                return -1, stdout
    +
    +        # Curl process did not complete successfully
    +        else:
    +            print "Curl command failed"
    +            return retVal, stderr
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +            <result>
    +                <status>SUCCEEDED</status>
    +                <message>falcon/update/default/Updated successfully</message>
    +                <requestId>falcon/update/default/868391317@qtp-1730704097-47 - b2391bd7-3ae0-468e-b39c-5d002099a446</requestId>
    +            </result>
    +        """.replace("\n", "");
    +
    +        # Parse the XML result from cURL into a dictionary
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def hdfsFingerprintsMatch(reference, comparison):
    +    """Helper function to compare two fingerprints / md5 hashes
    +
    +    Args:
    +        reference (str): the reference MD5 checksum string
    +        comparison (str): the comparison MD5 checksum string
    +    
    +    Returns:
    +        isEqual (bool): Zero for success, negative one otherwise 
    +
    +    """
    +
    +    return reference == comparison
    +
    +
    +def stopHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a quick stop of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStopCommand = "hawq stop master -a -M fast"
    +
    +    if masterHost is not None:
    +        hawqStopCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                    c=hawqStopCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStopProcess = Popen(hawqStopCommand,
    +                                stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStopProcess.communicate()
    +
    +        return hawqStopProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +def startHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a start of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStartCommand = "hawq start master -a"
    +
    +    if masterHost is not None:
    +        hawqStartCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                     c=hawqStartCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStartProcess = Popen(hawqStartCommand,
    +                                 stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStartProcess.communicate()
    +
    +        return hawqStartProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +
    +def copyToHdfs(source, dest, isTesting=False):
    +    """Utility function to copy a source file
    +    to the destination HDFS directory/file
    +
    +    Args:
    +        source (str): the source file on the local FS
    +        dest (str): the target HDFS directory and filename
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source,
    +                                                                    d=dest)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def checkHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to query HDFS for 
    +    safemode enabled or disabled
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +    
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode get"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        try:
    +            offOrOn = True if "ON" in stdout.split()[-1] else False
    +        except IndexError as e:
    +            return -1, str(e)
    +
    +        return hdfsProcess.returncode, offOrOn
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def enableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to enable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode enter"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def disableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to disable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode leave"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def forceHdfsCheckpoint(namenodeHost=None, isTesting=False):
    +    """Utility function to force an HDFS checkpoint
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -saveNamespace"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +
    +def createTarball(masterDataBase="/data/hawq/",
    +                  targetTarball="/tmp/hawqMdd-{t}.tar", isTesting=False):
    +    """Utility function to create a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        masterDataBase (str): the base directory containing the MASTER_DATA_DIRECTORY
    +        targetTarball (str): the target directory and filename of the tarball
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # tar cpf /tmp/test.tar.bz2 --exclude=csv -C /data/hawq master
    +
    +    theTime = strftime("%Y-%m-%d-%H%M")
    +
    +    tarCommand = "tar -cpf {t} --exclude=csv -C {c} master".format(t=targetTarball.format(t=theTime),
    +                                                                   c=masterDataBase)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = tarProcess.communicate()
    +
    +        except OSError as e:
    +            return -1, str(e), -1
    +
    +        if tarProcess.returncode == 0:
    +
    +            md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime))
    +
    +            try:
    +                md5Process = Popen(md5Command.split(),
    +                                   stdout=PIPE, stderr=PIPE)
    +
    +                (stdout2, stderr2) = md5Process.communicate()
    +
    +                checksum = stdout2.split()[0].strip()
    +
    +                if md5Process.returncode != 0:
    +                    return -1, "md5 checksum creation failed : " + stderr2, -1
    +                else:
    +                    return 0, targetTarball.format(t=theTime), checksum
    +
    +            except OSError as e:
    +                return -1, str(e), -1
    +
    +        else:
    +            print "Tarball creation failed : " + stderr
    +            return -1, stderr, -1
    +
    +    else:
    +        return 0, "TEST BRANCH", -1
    +
    +def cleanupTarball(filename, isTesting=False):
    +    """Utility function to delete a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        filename (str): the target directory and filename of the tarball to clean up
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # rm -f /tmp/test.tar
    +
    +    rmCommand = "rm -f {f}".format(f=filename)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            rmProcess = Popen(rmCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = rmProcess.communicate()
    +
    +            retVal = rmProcess.returncode
    +
    +            return retVal, stderr
    +
    +        except OSError as e:
    +            return -1, str(e)
    +    else:
    +        return 0, "TEST BRANCH"
    +
    +if __name__ == '__main__':
    +    options, args = parseargs()
    +
    +    #if options.verbose:
    +    #    enable_verbose_logging()
    +
    +    # TODO - switch prints to this once using gppylibs
    +    #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir)
    +
    +
    +    # ### HAWQ Extract every non-system table (source)
    +    # Note: the asusmption is this has been done in
    +    # advance of executing this tool.
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        print ""
    +        print "Please confirm you've performed hawq_extract() on all critical data tables"
    +        print "and saved this information outside of the cluster (e.g. version control)"
    +        print "or are using Falcon with an atomic option (i.e. in HDP-2.5: snapshot-based replication)"
    +        print ""
    +        print "This is critical for data recovery if a sync operation partially completes!"
    +        print ""
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Falcon cluster entities
    +    # Note: the assumption is both source and target
    +    # cluster entities have alredy been created in Falcon
    +    # TODO add a confirmation step, later a REST call to check
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print ""
    +        print "Please confirm you've created both source and target Falcon cluster entities:"
    +        print ""
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Stop HAWQ
    +    #
    +    # TODO?: switch to Ambari REST, followed by pkill -5 <<some hawq filter>>
    +
    +    # Source
    +    earlier = int(time())
    +    print "Stopping source HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Stopping target HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(masterHost=options.targetHawqMaster,
    +                              isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Restarting source HAWQ" if options.verbose else None;
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +
    +    if retVal == 0:
    +        print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +    else:
    +        print "Source HAWQ failed to restart after pre-sync failure."
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Create HAWQ Master Data Directory archive
    +    print "Creating MDD tarball" if options.verbose else None;
    +    retVal, filenameOrStderr, md5sum = createTarball(masterDataBase="/data/hawq/",
    +                                                     isTesting=options.testMode)
    +    print retVal, filenameOrStderr, md5sum if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to create archive of source HAWQ MASTER_DATA_DIRECTORY"
    +        print "Error message was : " + filenameOrStderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +        if retVal == 0:
    +            print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +        else:
    +            print "Source HAWQ failed to restart after pre-sync failure."
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Start HAWQ
    +    print "Starting source HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(isTesting=options.testMode)
    +    later = int(time())
    +
    +    if retVal != 0:
    +        print "Failed to start source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +
    +    # TODO add a CLI flag to force source into read-write
    +    if checkHdfsSafemode()[1] == True:
    +        print "Source cluster HDFS is read-only, cannot proceed"
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        sys.exit(1)
    +
    +    # ### Copy MDD archive to HDFS
    +    print "Copying MDD tarball to HDFS" if options.verbose else None;
    +    retVal, stderr = copyToHdfs(source=filenameOrStderr,
    +                                dest=options.pathToSync,
    +                                isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to copy MDD tarball to HDFS"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Cleanup MDD archive from /tmp
    +    print "Cleaning up MDD tarball on local FS" if options.verbose else None;
    +    retVal, stderr = cleanupTarball(filenameOrStderr,
    +                                    isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to clean up MDD tarball"
    +        print "Error message was " + stderr
    +        print ""
    +        print "You will need to manually remove the following file"
    +        print filenameOrStderr
    +        print ""
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    """
    +    # ### Force HDFS checkpoint and enable safemode on source
    +    print "Enabling HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = enableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on source cluster" if options.verbose else None;
    +    retVal, stderr = forceHdfsCheckpoint(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    """;
    +
    +    # ### Leave safemode on target HDFS
    +    print "Disabling HDFS safemode on target" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(namenodeHost=options.targetNamenode,
    +                                         isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Note, the entity names refer to Falcon
    +    # entities that have been created prior
    +    # to execution of this tool
    +    """
    +    jobParameters = dict(userName="gpadmin",
    +                         distcpMaxMaps="100",
    +                         distcpMaxMBpsPerMap="1000",
    +                         sourceClusterEntityName="sourceCluster",
    +                         sourceHdfsUri="hdfs://{0}:8020".format(sourceNamenode),
    +                         targetClusterEntityName="targetCluster",
    +                         txMapsargetHdfsUri="hdfs://{0}:8020".format(targetNamenode),
    +                         executionClusterEntityName="sourceCluster",
    +                         workflowFilename="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                         pathToSync="/tmp/syncTest",
    +                         jobName="drSync")
    +    """;
    +
    +    jobParameters = dict(userName=options.userName,
    +                         distcpMaxMaps=options.distcpMaxMaps,
    +                         distcpMaxMBpsPerMap=options.distcpMaxMBpsPerMap,
    +                         sourceClusterEntityName=options.sourceClusterEntityName,
    +                         sourceHdfsUri="hdfs://{0}:8020".format(options.sourceNamenode),
    +                         targetClusterEntityName=options.targetClusterEntityName,
    +                         targetHdfsUri="hdfs://{0}:8020".format(options.targetNamenode),
    +                         executionClusterEntityName=options.executionClusterEntityName,
    +                         workflowFilename=options.workflowFilename,
    +                         pathToSync=options.pathToSync,
    +                         jobName=options.jobName)
    +
    +    print jobParameters if options.verbose else None;
    +
    +    # ### Update and Schedule Job - monitor until completion
    +    print "Falcon Soar" if options.verbose else None;
    +    retVal, stderr = doFalconSoar(falconUri=options.falconUri,
    +                                  jobParameters=jobParameters,
    +                                  isTesting=options.testMode)
    +    falconOutput = stderr
    +
    +    if retVal != 0:
    +        print "Falcon replication job failed"
    +        print "Error message was " + stderr
    +        print "Source cluster will be left in safemode for remediation"
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Leave safemode on source HDFS
    +    print "Disable HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Force HDFS checkpoint and enable safemode on target
    +    print "Enabling HDFS safemode on target cluster" if options.verbose else None
    +    retVal, stderr = enableHdfsSafemode(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on target cluster" if options.verbose else None
    +    retVal, stderr = forceHdfsCheckpoint(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### HDFS Fingerprint comparison
    +
    +    print "Validating HDFS fingerprints match between source and target clusters" if options.verbose else None
    +
    +    retVal, md5OrStderr = getHdfsFingerprint(hdfsUri=jobParameters['sourceHdfsUri'],
    +                                             hdfsDir=jobParameters['pathToSync'],
    +                                             isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on source cluster"
    +        print "Error message was " + md5OrStderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    retVal, md5OrStderr2 = getHdfsFingerprint(hdfsUri=jobParameters['targetHdfsUri'],
    +                                              hdfsDir=jobParameters['pathToSync'],
    +                                              isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on target cluster"
    +        print "Error message was " + md5OrStderr2
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    isValidSync = hdfsFingerprintsMatch(md5OrStderr, md5OrStderr2)
    +
    +    if not isValidSync:
    +        print "Source and target cluster HDFS fingerprints do not match."
    +        print "Source checksum : " + md5OrStderr
    +        print "Target checksum : " + md5OrStderr2
    +        print "This is bad, please check Falcon sync logs : " + falconOutput
    +        sys.exit(1)
    +    else:
    +        print "Source and target HDFS fingerprints match."
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Starting target HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(masterHost=options.targetHawqMaster,
    +                               isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to start target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    else:
    +        print "HAWQ sync completed successfully!"
    +        print """
    +        ## Manual runbook during DR event
    +        1. Copy MDD archive from HDFS to target master (CLI)
    +        2. Restore archive in /data/hawq/ (CLI)
    --- End diff --
    
    Yes, the MDD archive is the `MASTER_DATA_DIRECTORY` from the source HAWQ cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #940: HAWQ 1078. Implement hawqsync-falcon DR utility.

Posted by vVineet <gi...@git.apache.org>.
Github user vVineet commented on the issue:

    https://github.com/apache/incubator-hawq/pull/940
  
    @kdunn-pivotal : I propose a discussion including @ictmalili as this ties with HAWQ Register feature. I'd love to see the contribution make it in HAWQ.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #940: HAWQ 1078. Implement hawqsync-falcon DR ut...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/940#discussion_r83184770
  
    --- Diff: tools/bin/hawqsync-falcon ---
    @@ -0,0 +1,1331 @@
    +#!/usr/bin/env python
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +import os
    +import sys
    +from optparse import OptionParser
    +from subprocess import Popen, PIPE
    +from hashlib import md5
    +from json import loads
    +from time import strftime, sleep, time
    +from collections import defaultdict
    +# TODO - make use of these common HAWQ libs instead of print
    +#from gppylib.gplog import setup_hawq_tool_logging, enable_verbose_logging
    +#from gppylib.commands.unix import getLocalHostname, getUserName
    +try:
    +    from xml.etree import cElementTree as ElementTree
    +except ImportError, e:
    +    from xml.etree import ElementTree
    +
    +def parseargs():
    +    parser = OptionParser(usage="HAWQ sync options.")
    +    parser.add_option('-v', '--verbose', action='store_true',
    +                      default=False)
    +    parser.add_option("-a", "--prompt", action="store_false",
    +                      dest="prompt", default=True,
    +                      help="Execute without prompt.")
    +    parser.add_option("-l", "--logdir", dest="logDir",
    +                      help="Sets the directory for log files")
    +    parser.add_option('-d', '--dryRun', action='store_true',
    +                      default=False,
    +                      dest='testMode', help="Execute in test mode")
    +    parser.add_option('-u', '--user', dest='userName', default="gpadmin",
    +                      help="The user to own Falcon ACLs and run job as")
    +    parser.add_option('--maxMaps', dest='distcpMaxMaps',
    +                      default="10",
    +                      help="The maximum number of map jobs to allow")
    +    parser.add_option('--mapBandwidth', dest='distcpMaxMBpsPerMap',
    +                      default="100",
    +                      help="The maximum allowable bandwidth for each map job, in MB/s")
    +    parser.add_option('-s', '--sourceNamenode', dest='sourceNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-S', '--sourceEntity', dest='sourceClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name of the source")
    +    parser.add_option('-m', '--sourceHawqMaster', dest='sourceHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the source HAWQ master")
    +    parser.add_option('-M', '--targetHawqMaster', dest='targetHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the target HAWQ master")
    +    parser.add_option('-f', '--falconUri', dest='falconUri',
    +                      default="http://localhost:15000",
    +                      help="The URI to use for issuing Falcon REST calls")
    +    parser.add_option('-t', '--targetNamenode', dest='targetNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-T', '--targetEntity', dest='targetClusterEntityName',
    +                      default="target",
    +                      help="The Falcon cluster entity name of the target")
    +    parser.add_option('-e', '--executionEntity',
    +                      dest='executionClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name specifying where to execute the job")
    +    parser.add_option('-w', '--workflowHdfsFilename', dest='workflowFilename',
    +                      default="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                      help="The HDFS location of the underlying Oozie workflow to use for sync job")
    +    parser.add_option('-p', '--pathToSync', dest='pathToSync',
    +                      default="/tmp/syncTest",
    +                      help="The root directory to be syncronized")
    +    parser.add_option('-j', '--jobName', dest='jobName', default="drSync",
    +                      help="The Falcon job entity name to be executed")
    +
    +    (options, args) = parser.parse_args()
    +    return (options, args)
    +
    +def extractFilenameAndSize(line, hdfsPort):
    +    """Utility function to extract filename and file
    +    size from a line of output from `hdfs dfs -ls -R`
    +
    +    """
    +
    +    tokens = line.split()
    +    return tokens[-1].split(":" + hdfsPort)[-1], tokens[4]
    +
    +def flattenFilelist(data, hdfsPort):
    +    """Utility function to convert a list of output
    +    lines from `hdfs dfs -ls -R` into a single, sorted, 
    +    delimited string to be used as a syncronization
    +    fingerprint
    +
    +    """
    +
    +    # Ensure record contains expected number of fields
    +    isValid = lambda r: len(r.strip().split()) == 8
    +
    +    # Subset the records to only filename and size fields
    +    filenameAndSize = [extractFilenameAndSize(line, hdfsPort) for line in data.split("\n") if isValid(line)]
    +
    +    # Reverse sort the list by filename column
    +    sortedFilenameAndSize = sorted(filenameAndSize, key=lambda r: r[0], reverse=True)
    +
    +    # Flatten a single line into a delimited string
    +    mergeLines = lambda l: "-".join(l)
    +
    +    # Perform the flatten for every line and join lines into a string
    +    return "\n".join(map(mergeLines, sortedFilenameAndSize))
    +
    +def computeMd5(data):
    +    """Utility function to compute MD5 checksum
    +
    +    """
    +    hasher = md5()
    +    hasher.update(data)
    +
    +    return hasher.hexdigest()
    +
    +def getHdfsFingerprint(hdfsUri="", hdfsDir="/hawq_default", isTesting=False):
    +    """Utility function to compute an MD5 
    +    hash from the output of a recursive HDFS 
    +    directory listing
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsPort = hdfsUri.split(":")[-1]
    +
    +    hdfsCommand = "hdfs dfs -ls -R {u}{d}".format(u=hdfsUri, d=hdfsDir)
    +    #print hdfsCommand
    +
    +    filelist = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as gpadmin user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (filelist, stderr) = hdfsProcess.communicate()
    +
    +        retVal = hdfsProcess.returncode
    +
    +        if retVal != 0:
    +            return retVal, stderr
    +
    +    # Sample output to follow
    +    else:
    +        filelist = """
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:32 hdfs://sandbox:8020/hawq_default/16385
    +        drwx------   - gpadmin gpadmin          0 2016-08-04 18:58 hdfs://sandbox:8020/hawq_default/16385/16387
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:14 hdfs://sandbox:8020/hawq_default/16385/16387/18947
    +        """;
    +
    +        retVal = 0
    +
    +    data = flattenFilelist(filelist, hdfsPort)
    +
    +    # sample yields: 342f414e7519f8c6a9eacce94777ba08
    +    return retVal, computeMd5(data)
    +
    +def etree_to_dict(t):
    +    """Utility function to turn an XML 
    +    element tree into a dictionary
    +
    +    """
    +
    +    d = {t.tag: {} if t.attrib else None}
    +    children = list(t)
    +    if children:
    +        dd = defaultdict(list)
    +        for dc in map(etree_to_dict, children):
    +            for k, v in dc.iteritems():
    +                dd[k].append(v)
    +
    +        d[t.tag] = dict((k, v[0]) if len(v) == 1 else dict((k, v)) for k, v in dd.iteritems())
    +    if t.attrib:
    +        d[t.tag].update(('@' + k, v) for k, v in t.attrib.iteritems())
    +    if t.text:
    +        text = t.text.strip()
    +        if children or t.attrib:
    +            if text:
    +              d[t.tag]['#text'] = text
    +        else:
    +            d[t.tag] = text
    +    return d
    +
    +def xmlToDict(data):
    +    """Wrapper function to convert 
    +    XML string into a Python dictionary
    +
    +    """
    +
    +    e = ElementTree.XML(data)
    +    return etree_to_dict(e)
    +
    +def getFalconStatus(falconUri="http://localhost:15000", entity="drSyncTest",
    +                    user="gpadmin", onlyRuntime=False, isTesting=False,
    +                    doDebug=False):
    +    """Get the current status of an existing Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        entity (str): the Falcon process entity name to get status for
    +        user (str): the username used for authorization
    +        onlyRuntime (bool): only query for process runtime (e.g. post-completion)
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +        doDebug (bool): debugging mode for additional verbosity
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise. message can contain process start time or logfile
    +        message (str): message can contain process end time or status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    # GET http://localhost:15000/api/entities/status/process/drSyncTest?user.name=falcon&fields=status,clusters,tags
    +
    +    endpoint = "/api/instance/status/process/{e}?user.name={u}&fields=status".format(u=user, e=entity)
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    if doDebug:
    +        print curlCommand
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +    if doDebug:
    +        print "stdout:", stdout, "stderr:", stderr
    +
    +    try:
    +        result = loads(stdout)
    +        if 'instances' not in result:
    +            print "No instance was started, try deleting the job and re-running"
    +            return -1, "stdout: {0}, stderr: {1}".format(stdout, stderr)
    +        if onlyRuntime:
    +            # Parse the start/end times in JSON result from cURL
    +            resultPayload = result['instances'][0]
    +            return resultPayload['startTime'], resultPayload['endTime']
    +        else:
    +            # Parse the logfile/status in JSON result from cURL
    +            resultPayload = result['instances'][0] #['actions'][0]
    +            return resultPayload['logFile'], resultPayload['status']
    +    except KeyError as e:
    +        print "KeyError in  getFalconStatus()", str(e), "\n", stdout
    +        return -1, str(e)
    +    except ValueError as e:
    +        print "ValueError in  getFalconStatus()", str(e), "\n", stdout
    +        print "Is Falcon running at : {} ?".format(falconUri)
    +        return -1, str(e)
    +
    +    # Example output follows:
    +    else:
    +        stdout = """{
    +            "status":"SUCCEEDED",
    +            "message":"default/STATUS\n",
    +            "requestId":"default/1436392466@qtp-1730704097-152 - 8dd8f7fa-2024-4bdb-a048-c188759c2f47\n",
    +            "instances":[{
    +                "instance":"2016-08-17T13:22Z",
    +                "status":"SUSPENDED",
    +                "logFile":"http://sandbox2.hortonworks.com:11000/oozie?job=0000014-160805194358788-oozie-oozi-W",
    +                "cluster":"secondaryIDP",
    +                "startTime":"2016-08-17T12:25:23-07:00",
    +                "details":"",
    +                "actions":[{
    +                    "action":"user-action",
    +                    "status":"RUNNING"
    +                }, {
    +                    "action":"failed-post-processing",
    +                    "status":"RUNNING",
    +                    "logFile":"http://sandbox2.hortonworks.com:8088/proxy/application_1470437437449_0002/"
    +                }]
    +            }]
    +        }""".replace("\n", "");
    +
    +        return 0, loads(stdout)['instances'][0]['actions'][0]
    +
    +def doFalconSchedule(falconUri="http://localhost:15000", jobName="drSyncTest",
    +                     userName="gpadmin", isTesting=False):
    +    """Schedule an existing Falcon process/job entity for execution
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobName (str): the Falcon process entity name to get status for
    +        userName (str): the username used for authorization
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false
    +
    +    endpoint = "/api/entities/schedule/process/{n}?user.name={u}&skipDryRun=true".format(n=jobName, u=userName)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except KeyError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +        except:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>default/drSyncTest(process) scheduled successfully</message>
    +            <requestId>default/2028387903@qtp-1730704097-6 - 89554f01-91cf-4bbd-97c2-ee175711b2ba</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +# Falcon process entity template used to create/update job attributes
    +drSyncTemplate="""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +<process name="{name}" xmlns="uri:falcon:process:0.1">
    +    <tags>_falcon_mirroring_type=HDFS</tags>
    +    <clusters>
    +        <cluster name="{executionClusterEntityName}">
    +            <validity start="{startTime}" end="{endTime}"/>
    +        </cluster>
    +    </clusters>
    +    <parallel>1</parallel>
    +    <order>LAST_ONLY</order>
    +    <frequency>days(7)</frequency>
    +    <timezone>GMT{gmtOffset}</timezone>
    +    <properties>
    +        <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
    +        <property name="distcpMaxMaps" value="{distcpMaxMaps}"/>
    +        <property name="distcpMapBandwidth" value="{distcpMaxMBpsPerMap}"/>
    +        <property name="drSourceDir" value="{pathToSync}"/>
    +        <property name="drTargetDir" value="{pathToSync}"/>
    +        <property name="drTargetClusterFS" value="{targetHdfsUri}"/>
    +        <property name="drSourceClusterFS" value="{sourceHdfsUri}"/>
    +        <!-- This can be a list of emails for notifications -->
    +        <property name="drNotificationReceivers" value="NA"/>
    +        <property name="targetCluster" value="{targetClusterEntityName}"/>
    +        <property name="sourceCluster" value="{sourceClusterEntityName}"/>
    +        <property name="queueName" value="default"/>
    +        <property name="jobPriority" value="HIGH"/>
    +    </properties>
    +    <workflow name="drSyncTest-WF" engine="oozie" path="{workflowFilename}" lib=""/>
    +    <retry policy="periodic" delay="minutes(1)" attempts="3"/>
    +    <ACL owner="{userName}" group="users" permission="0755"/>
    +</process>"""
    +
    +def doFalconSubmit(falconUri="http://localhost:15000", jobParameters=None,
    +                   isTesting=False):
    +    """Submit/create a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/submit/process?user.name=falcon
    +
    +    endpoint = "/api/entities/submit/process?user.name={u}".format(u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    # TODO long term would be to encapsulate Falcon functions in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except ElementTree.ParseError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>falcon/default/Submit successful (process) drSyncTest</message>
    +            <requestId>falcon/default/2028387903@qtp-1730704097-6 - 7ddba052-527b-462f-823f-e7dd0a1a08fa</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def doFalconSoar(falconUri="http://localhost:15000", jobParameters=None,
    +                 isTesting=False):
    +    """Update, schedule, and monitor a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/update/process/drSyncTest?user.name=falcon
    +
    +    endpoint = "/api/entities/update/process/{n}?user.name={u}".format(n=jobParameters['jobName'],
    +                                                                       u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")[:3] + ":00"
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    print "Scheduling for", oneMinuteLater
    +    print "Ending on", oneYearLater
    +
    +    # TODO encapsulate everything in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting AWS S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    # TODO output for debug level
    +    #from pprint import pprint
    +    #pprint (payload)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {uri} -d ".format(uri=falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Note, needed to seperate out the payload as it can't be split on spaces
    +        curlProcess = Popen(curlCommand.split() + [payload],
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        # Curl process completed successfully
    +        if retVal == 0:
    +
    +            try:
    +                # Parse the XML result from cURL into a dictionary
    +                result = xmlToDict(stdout)['result']
    +                stderr = result['message']
    +
    +                # Falcon REST update operation successful
    +                if "SUCCEEDED" in result['status']:
    +                    print "Falcon process update was successful"
    +
    +                    # We should doFalconSchedule() here
    +                    status, message = doFalconSchedule(falconUri=falconUri,
    +                                                       jobName=jobParameters['jobName'],
    +                                                       isTesting=False)
    +
    +                    # If we suceeded in scheduling
    +                    if "SUCCEEDED" in status:
    +                        print "Falcon process scheduling was successful"
    +
    +                        # Reset retVal to catch error between scheduled and running states
    +                        retVal = -1
    +                        sleep(5)
    +
    +                        message, status = getFalconStatus(falconUri=falconUri,
    +                                                          entity=jobParameters['jobName'])
    +
    +                        # Continuously poll for hdfs-mirroring status
    +                        while "RUNNING" in status:
    +                            message, status = getFalconStatus(falconUri=falconUri,
    +                                                              entity=jobParameters['jobName'])
    +                            print status
    +
    +                            # flag RUNNING state reached using retVal
    +                            retVal = 0
    +                            sleep(10)
    +
    +                        if status == "KILLED":
    +                            return -1, message
    +
    +                        # Poll one last time for runtimes
    +                        start, finish = getFalconStatus(falconUri=falconUri,
    +                                                        entity=jobParameters['jobName'],
    +                                                        onlyRuntime=True)
    +
    +                        return retVal, "logfile: {0} started: {1} finished: {2}".format(message, start, finish)
    +
    +                    # Scheduling failed
    +                    else:
    +                        return -1, message
    +
    +                # Falcon REST update operation NOT successful
    +                else:
    +                    print "Falcon REST operation not successful"
    +                    return result['status'], stderr
    +
    +            except KeyError:
    +                print "Are you using the correct Falcon server URI?", falconUri
    +                return -1, stdout
    +
    +        # Curl process did not complete successfully
    +        else:
    +            print "Curl command failed"
    +            return retVal, stderr
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +            <result>
    +                <status>SUCCEEDED</status>
    +                <message>falcon/update/default/Updated successfully</message>
    +                <requestId>falcon/update/default/868391317@qtp-1730704097-47 - b2391bd7-3ae0-468e-b39c-5d002099a446</requestId>
    +            </result>
    +        """.replace("\n", "");
    +
    +        # Parse the XML result from cURL into a dictionary
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def hdfsFingerprintsMatch(reference, comparison):
    +    """Helper function to compare two fingerprints / md5 hashes
    +
    +    Args:
    +        reference (str): the reference MD5 checksum string
    +        comparison (str): the comparison MD5 checksum string
    +    
    +    Returns:
    +        isEqual (bool): Zero for success, negative one otherwise 
    +
    +    """
    +
    +    return reference == comparison
    +
    +
    +def stopHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a quick stop of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStopCommand = "hawq stop master -a -M fast"
    +
    +    if masterHost is not None:
    +        hawqStopCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                    c=hawqStopCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStopProcess = Popen(hawqStopCommand,
    +                                stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStopProcess.communicate()
    +
    +        return hawqStopProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +def startHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a start of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStartCommand = "hawq start master -a"
    +
    +    if masterHost is not None:
    +        hawqStartCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                     c=hawqStartCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStartProcess = Popen(hawqStartCommand,
    +                                 stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStartProcess.communicate()
    +
    +        return hawqStartProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +
    +def copyToHdfs(source, dest, isTesting=False):
    +    """Utility function to copy a source file
    +    to the destination HDFS directory/file
    +
    +    Args:
    +        source (str): the source file on the local FS
    +        dest (str): the target HDFS directory and filename
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source,
    +                                                                    d=dest)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def checkHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to query HDFS for 
    +    safemode enabled or disabled
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +    
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode get"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        try:
    +            offOrOn = True if "ON" in stdout.split()[-1] else False
    +        except IndexError as e:
    +            return -1, str(e)
    +
    +        return hdfsProcess.returncode, offOrOn
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def enableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to enable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode enter"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def disableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to disable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode leave"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def forceHdfsCheckpoint(namenodeHost=None, isTesting=False):
    +    """Utility function to force an HDFS checkpoint
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -saveNamespace"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +
    +def createTarball(masterDataBase="/data/hawq/",
    +                  targetTarball="/tmp/hawqMdd-{t}.tar", isTesting=False):
    +    """Utility function to create a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        masterDataBase (str): the base directory containing the MASTER_DATA_DIRECTORY
    +        targetTarball (str): the target directory and filename of the tarball
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # tar cpf /tmp/test.tar.bz2 --exclude=csv -C /data/hawq master
    +
    +    theTime = strftime("%Y-%m-%d-%H%M")
    +
    +    tarCommand = "tar -cpf {t} --exclude=csv -C {c} master".format(t=targetTarball.format(t=theTime),
    +                                                                   c=masterDataBase)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = tarProcess.communicate()
    +
    +        except OSError as e:
    +            return -1, str(e), -1
    +
    +        if tarProcess.returncode == 0:
    +
    +            md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime))
    +
    +            try:
    +                md5Process = Popen(md5Command.split(),
    +                                   stdout=PIPE, stderr=PIPE)
    +
    +                (stdout2, stderr2) = md5Process.communicate()
    +
    +                checksum = stdout2.split()[0].strip()
    +
    +                if md5Process.returncode != 0:
    +                    return -1, "md5 checksum creation failed : " + stderr2, -1
    +                else:
    +                    return 0, targetTarball.format(t=theTime), checksum
    +
    +            except OSError as e:
    +                return -1, str(e), -1
    +
    +        else:
    +            print "Tarball creation failed : " + stderr
    +            return -1, stderr, -1
    +
    +    else:
    +        return 0, "TEST BRANCH", -1
    +
    +def cleanupTarball(filename, isTesting=False):
    +    """Utility function to delete a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        filename (str): the target directory and filename of the tarball to clean up
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # rm -f /tmp/test.tar
    +
    +    rmCommand = "rm -f {f}".format(f=filename)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            rmProcess = Popen(rmCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = rmProcess.communicate()
    +
    +            retVal = rmProcess.returncode
    +
    +            return retVal, stderr
    +
    +        except OSError as e:
    +            return -1, str(e)
    +    else:
    +        return 0, "TEST BRANCH"
    +
    +if __name__ == '__main__':
    +    options, args = parseargs()
    +
    +    #if options.verbose:
    +    #    enable_verbose_logging()
    +
    +    # TODO - switch prints to this once using gppylibs
    +    #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir)
    +
    +
    +    # ### HAWQ Extract every non-system table (source)
    +    # Note: the asusmption is this has been done in
    +    # advance of executing this tool.
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        print ""
    +        print "Please confirm you've performed hawq_extract() on all critical data tables"
    +        print "and saved this information outside of the cluster (e.g. version control)"
    +        print "or are using Falcon with an atomic option (i.e. in HDP-2.5: snapshot-based replication)"
    +        print ""
    +        print "This is critical for data recovery if a sync operation partially completes!"
    +        print ""
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Falcon cluster entities
    +    # Note: the assumption is both source and target
    +    # cluster entities have alredy been created in Falcon
    +    # TODO add a confirmation step, later a REST call to check
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print ""
    +        print "Please confirm you've created both source and target Falcon cluster entities:"
    +        print ""
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Stop HAWQ
    +    #
    +    # TODO?: switch to Ambari REST, followed by pkill -5 <<some hawq filter>>
    +
    +    # Source
    +    earlier = int(time())
    +    print "Stopping source HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Stopping target HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(masterHost=options.targetHawqMaster,
    +                              isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Restarting source HAWQ" if options.verbose else None;
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +
    +    if retVal == 0:
    +        print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +    else:
    +        print "Source HAWQ failed to restart after pre-sync failure."
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Create HAWQ Master Data Directory archive
    +    print "Creating MDD tarball" if options.verbose else None;
    +    retVal, filenameOrStderr, md5sum = createTarball(masterDataBase="/data/hawq/",
    +                                                     isTesting=options.testMode)
    +    print retVal, filenameOrStderr, md5sum if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to create archive of source HAWQ MASTER_DATA_DIRECTORY"
    +        print "Error message was : " + filenameOrStderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +        if retVal == 0:
    +            print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +        else:
    +            print "Source HAWQ failed to restart after pre-sync failure."
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Start HAWQ
    +    print "Starting source HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(isTesting=options.testMode)
    +    later = int(time())
    +
    +    if retVal != 0:
    +        print "Failed to start source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +
    +    # TODO add a CLI flag to force source into read-write
    +    if checkHdfsSafemode()[1] == True:
    +        print "Source cluster HDFS is read-only, cannot proceed"
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        sys.exit(1)
    +
    +    # ### Copy MDD archive to HDFS
    +    print "Copying MDD tarball to HDFS" if options.verbose else None;
    +    retVal, stderr = copyToHdfs(source=filenameOrStderr,
    +                                dest=options.pathToSync,
    +                                isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to copy MDD tarball to HDFS"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Cleanup MDD archive from /tmp
    +    print "Cleaning up MDD tarball on local FS" if options.verbose else None;
    +    retVal, stderr = cleanupTarball(filenameOrStderr,
    +                                    isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to clean up MDD tarball"
    +        print "Error message was " + stderr
    +        print ""
    +        print "You will need to manually remove the following file"
    +        print filenameOrStderr
    +        print ""
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    """
    +    # ### Force HDFS checkpoint and enable safemode on source
    +    print "Enabling HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = enableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on source cluster" if options.verbose else None;
    +    retVal, stderr = forceHdfsCheckpoint(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    """;
    +
    +    # ### Leave safemode on target HDFS
    +    print "Disabling HDFS safemode on target" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(namenodeHost=options.targetNamenode,
    +                                         isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Note, the entity names refer to Falcon
    +    # entities that have been created prior
    +    # to execution of this tool
    +    """
    +    jobParameters = dict(userName="gpadmin",
    +                         distcpMaxMaps="100",
    +                         distcpMaxMBpsPerMap="1000",
    +                         sourceClusterEntityName="sourceCluster",
    +                         sourceHdfsUri="hdfs://{0}:8020".format(sourceNamenode),
    +                         targetClusterEntityName="targetCluster",
    +                         txMapsargetHdfsUri="hdfs://{0}:8020".format(targetNamenode),
    +                         executionClusterEntityName="sourceCluster",
    +                         workflowFilename="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                         pathToSync="/tmp/syncTest",
    +                         jobName="drSync")
    +    """;
    +
    +    jobParameters = dict(userName=options.userName,
    +                         distcpMaxMaps=options.distcpMaxMaps,
    +                         distcpMaxMBpsPerMap=options.distcpMaxMBpsPerMap,
    +                         sourceClusterEntityName=options.sourceClusterEntityName,
    +                         sourceHdfsUri="hdfs://{0}:8020".format(options.sourceNamenode),
    +                         targetClusterEntityName=options.targetClusterEntityName,
    +                         targetHdfsUri="hdfs://{0}:8020".format(options.targetNamenode),
    +                         executionClusterEntityName=options.executionClusterEntityName,
    +                         workflowFilename=options.workflowFilename,
    +                         pathToSync=options.pathToSync,
    +                         jobName=options.jobName)
    +
    +    print jobParameters if options.verbose else None;
    +
    +    # ### Update and Schedule Job - monitor until completion
    +    print "Falcon Soar" if options.verbose else None;
    +    retVal, stderr = doFalconSoar(falconUri=options.falconUri,
    +                                  jobParameters=jobParameters,
    +                                  isTesting=options.testMode)
    +    falconOutput = stderr
    +
    +    if retVal != 0:
    +        print "Falcon replication job failed"
    +        print "Error message was " + stderr
    +        print "Source cluster will be left in safemode for remediation"
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Leave safemode on source HDFS
    +    print "Disable HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Force HDFS checkpoint and enable safemode on target
    +    print "Enabling HDFS safemode on target cluster" if options.verbose else None
    +    retVal, stderr = enableHdfsSafemode(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on target cluster" if options.verbose else None
    +    retVal, stderr = forceHdfsCheckpoint(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### HDFS Fingerprint comparison
    +
    +    print "Validating HDFS fingerprints match between source and target clusters" if options.verbose else None
    +
    +    retVal, md5OrStderr = getHdfsFingerprint(hdfsUri=jobParameters['sourceHdfsUri'],
    +                                             hdfsDir=jobParameters['pathToSync'],
    +                                             isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on source cluster"
    +        print "Error message was " + md5OrStderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    retVal, md5OrStderr2 = getHdfsFingerprint(hdfsUri=jobParameters['targetHdfsUri'],
    +                                              hdfsDir=jobParameters['pathToSync'],
    +                                              isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on target cluster"
    +        print "Error message was " + md5OrStderr2
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    isValidSync = hdfsFingerprintsMatch(md5OrStderr, md5OrStderr2)
    +
    +    if not isValidSync:
    +        print "Source and target cluster HDFS fingerprints do not match."
    +        print "Source checksum : " + md5OrStderr
    +        print "Target checksum : " + md5OrStderr2
    +        print "This is bad, please check Falcon sync logs : " + falconOutput
    +        sys.exit(1)
    +    else:
    +        print "Source and target HDFS fingerprints match."
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Starting target HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(masterHost=options.targetHawqMaster,
    +                               isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to start target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    else:
    +        print "HAWQ sync completed successfully!"
    +        print """
    +        ## Manual runbook during DR event
    +        1. Copy MDD archive from HDFS to target master (CLI)
    +        2. Restore archive in /data/hawq/ (CLI)
    --- End diff --
    
    Which specific directory does this map to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq issue #940: HAWQ 1078. Implement hawqsync-falcon DR utility.

Posted by kdunn-pivotal <gi...@git.apache.org>.
Github user kdunn-pivotal commented on the issue:

    https://github.com/apache/incubator-hawq/pull/940
  
    # HAWQSYNC partial-sync recovery runbook:
    
    1. Copy "last known good state" tarball from `hdfs://hawq_default/hawqExtract-*.tar.bz2`
    
    2. Re-run `hawqsync-extract` to establish "current state".
    
    3. Perform diff's for every table file, determine tables with inconsistencies.
    
    4. For each inconsistent table: 
       Re-register `faultyTable` using "last known good" YAML.
       a. `hawq register -f faultyTable.yaml faultyTable`
    
       Store the valid records in a temporary table
       a. `CREATE TABLE newTemp AS SELECT * FROM faultyTable`
    
       Truncate the faulty table, to allow the catalog and HDFS file sizes to be consistent again
       b. `TRUNCATE faultyTable`
    
       Re-populate the table with valid records
       c. `INSERT INTO faultyTable SELECT * FROM newTemp`
    
       Purge the temporary table
       d. `DROP TABLE newTemp`
    
    This process, overall, ensures our catalog EOF marker and _actual_ HDFS file size are properly aligned for every table. This is especially important when ETL needs to resume on tables that may have previously had "inconsistent bytes" appended, as would be the case for a partial sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-hawq pull request #940: HAWQ 1078. Implement hawqsync-falcon DR ut...

Posted by ictmalili <gi...@git.apache.org>.
Github user ictmalili commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/940#discussion_r83185244
  
    --- Diff: tools/bin/hawqsync-falcon ---
    @@ -0,0 +1,1331 @@
    +#!/usr/bin/env python
    +# Licensed to the Apache Software Foundation (ASF) under one
    +# or more contributor license agreements.  See the NOTICE file
    +# distributed with this work for additional information
    +# regarding copyright ownership.  The ASF licenses this file
    +# to you under the Apache License, Version 2.0 (the
    +# "License"); you may not use this file except in compliance
    +# with the License.  You may obtain a copy of the License at
    +#
    +#   http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing,
    +# software distributed under the License is distributed on an
    +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +# KIND, either express or implied.  See the License for the
    +# specific language governing permissions and limitations
    +# under the License.
    +
    +import os
    +import sys
    +from optparse import OptionParser
    +from subprocess import Popen, PIPE
    +from hashlib import md5
    +from json import loads
    +from time import strftime, sleep, time
    +from collections import defaultdict
    +# TODO - make use of these common HAWQ libs instead of print
    +#from gppylib.gplog import setup_hawq_tool_logging, enable_verbose_logging
    +#from gppylib.commands.unix import getLocalHostname, getUserName
    +try:
    +    from xml.etree import cElementTree as ElementTree
    +except ImportError, e:
    +    from xml.etree import ElementTree
    +
    +def parseargs():
    +    parser = OptionParser(usage="HAWQ sync options.")
    +    parser.add_option('-v', '--verbose', action='store_true',
    +                      default=False)
    +    parser.add_option("-a", "--prompt", action="store_false",
    +                      dest="prompt", default=True,
    +                      help="Execute without prompt.")
    +    parser.add_option("-l", "--logdir", dest="logDir",
    +                      help="Sets the directory for log files")
    +    parser.add_option('-d', '--dryRun', action='store_true',
    +                      default=False,
    +                      dest='testMode', help="Execute in test mode")
    +    parser.add_option('-u', '--user', dest='userName', default="gpadmin",
    +                      help="The user to own Falcon ACLs and run job as")
    +    parser.add_option('--maxMaps', dest='distcpMaxMaps',
    +                      default="10",
    +                      help="The maximum number of map jobs to allow")
    +    parser.add_option('--mapBandwidth', dest='distcpMaxMBpsPerMap',
    +                      default="100",
    +                      help="The maximum allowable bandwidth for each map job, in MB/s")
    +    parser.add_option('-s', '--sourceNamenode', dest='sourceNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-S', '--sourceEntity', dest='sourceClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name of the source")
    +    parser.add_option('-m', '--sourceHawqMaster', dest='sourceHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the source HAWQ master")
    +    parser.add_option('-M', '--targetHawqMaster', dest='targetHawqMaster',
    +                      default="",
    +                      help="The IP or FQDN of the target HAWQ master")
    +    parser.add_option('-f', '--falconUri', dest='falconUri',
    +                      default="http://localhost:15000",
    +                      help="The URI to use for issuing Falcon REST calls")
    +    parser.add_option('-t', '--targetNamenode', dest='targetNamenode',
    +                      default="",
    +                      help="The IP or FQDN of the source HDFS namenode")
    +    parser.add_option('-T', '--targetEntity', dest='targetClusterEntityName',
    +                      default="target",
    +                      help="The Falcon cluster entity name of the target")
    +    parser.add_option('-e', '--executionEntity',
    +                      dest='executionClusterEntityName',
    +                      default="source",
    +                      help="The Falcon cluster entity name specifying where to execute the job")
    +    parser.add_option('-w', '--workflowHdfsFilename', dest='workflowFilename',
    +                      default="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                      help="The HDFS location of the underlying Oozie workflow to use for sync job")
    +    parser.add_option('-p', '--pathToSync', dest='pathToSync',
    +                      default="/tmp/syncTest",
    +                      help="The root directory to be syncronized")
    +    parser.add_option('-j', '--jobName', dest='jobName', default="drSync",
    +                      help="The Falcon job entity name to be executed")
    +
    +    (options, args) = parser.parse_args()
    +    return (options, args)
    +
    +def extractFilenameAndSize(line, hdfsPort):
    +    """Utility function to extract filename and file
    +    size from a line of output from `hdfs dfs -ls -R`
    +
    +    """
    +
    +    tokens = line.split()
    +    return tokens[-1].split(":" + hdfsPort)[-1], tokens[4]
    +
    +def flattenFilelist(data, hdfsPort):
    +    """Utility function to convert a list of output
    +    lines from `hdfs dfs -ls -R` into a single, sorted, 
    +    delimited string to be used as a syncronization
    +    fingerprint
    +
    +    """
    +
    +    # Ensure record contains expected number of fields
    +    isValid = lambda r: len(r.strip().split()) == 8
    +
    +    # Subset the records to only filename and size fields
    +    filenameAndSize = [extractFilenameAndSize(line, hdfsPort) for line in data.split("\n") if isValid(line)]
    +
    +    # Reverse sort the list by filename column
    +    sortedFilenameAndSize = sorted(filenameAndSize, key=lambda r: r[0], reverse=True)
    +
    +    # Flatten a single line into a delimited string
    +    mergeLines = lambda l: "-".join(l)
    +
    +    # Perform the flatten for every line and join lines into a string
    +    return "\n".join(map(mergeLines, sortedFilenameAndSize))
    +
    +def computeMd5(data):
    +    """Utility function to compute MD5 checksum
    +
    +    """
    +    hasher = md5()
    +    hasher.update(data)
    +
    +    return hasher.hexdigest()
    +
    +def getHdfsFingerprint(hdfsUri="", hdfsDir="/hawq_default", isTesting=False):
    +    """Utility function to compute an MD5 
    +    hash from the output of a recursive HDFS 
    +    directory listing
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsPort = hdfsUri.split(":")[-1]
    +
    +    hdfsCommand = "hdfs dfs -ls -R {u}{d}".format(u=hdfsUri, d=hdfsDir)
    +    #print hdfsCommand
    +
    +    filelist = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as gpadmin user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (filelist, stderr) = hdfsProcess.communicate()
    +
    +        retVal = hdfsProcess.returncode
    +
    +        if retVal != 0:
    +            return retVal, stderr
    +
    +    # Sample output to follow
    +    else:
    +        filelist = """
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:32 hdfs://sandbox:8020/hawq_default/16385
    +        drwx------   - gpadmin gpadmin          0 2016-08-04 18:58 hdfs://sandbox:8020/hawq_default/16385/16387
    +        drwx------   - gpadmin gpadmin          0 2016-07-14 12:14 hdfs://sandbox:8020/hawq_default/16385/16387/18947
    +        """;
    +
    +        retVal = 0
    +
    +    data = flattenFilelist(filelist, hdfsPort)
    +
    +    # sample yields: 342f414e7519f8c6a9eacce94777ba08
    +    return retVal, computeMd5(data)
    +
    +def etree_to_dict(t):
    +    """Utility function to turn an XML 
    +    element tree into a dictionary
    +
    +    """
    +
    +    d = {t.tag: {} if t.attrib else None}
    +    children = list(t)
    +    if children:
    +        dd = defaultdict(list)
    +        for dc in map(etree_to_dict, children):
    +            for k, v in dc.iteritems():
    +                dd[k].append(v)
    +
    +        d[t.tag] = dict((k, v[0]) if len(v) == 1 else dict((k, v)) for k, v in dd.iteritems())
    +    if t.attrib:
    +        d[t.tag].update(('@' + k, v) for k, v in t.attrib.iteritems())
    +    if t.text:
    +        text = t.text.strip()
    +        if children or t.attrib:
    +            if text:
    +              d[t.tag]['#text'] = text
    +        else:
    +            d[t.tag] = text
    +    return d
    +
    +def xmlToDict(data):
    +    """Wrapper function to convert 
    +    XML string into a Python dictionary
    +
    +    """
    +
    +    e = ElementTree.XML(data)
    +    return etree_to_dict(e)
    +
    +def getFalconStatus(falconUri="http://localhost:15000", entity="drSyncTest",
    +                    user="gpadmin", onlyRuntime=False, isTesting=False,
    +                    doDebug=False):
    +    """Get the current status of an existing Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        entity (str): the Falcon process entity name to get status for
    +        user (str): the username used for authorization
    +        onlyRuntime (bool): only query for process runtime (e.g. post-completion)
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +        doDebug (bool): debugging mode for additional verbosity
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise. message can contain process start time or logfile
    +        message (str): message can contain process end time or status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    # GET http://localhost:15000/api/entities/status/process/drSyncTest?user.name=falcon&fields=status,clusters,tags
    +
    +    endpoint = "/api/instance/status/process/{e}?user.name={u}&fields=status".format(u=user, e=entity)
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    if doDebug:
    +        print curlCommand
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +    if doDebug:
    +        print "stdout:", stdout, "stderr:", stderr
    +
    +    try:
    +        result = loads(stdout)
    +        if 'instances' not in result:
    +            print "No instance was started, try deleting the job and re-running"
    +            return -1, "stdout: {0}, stderr: {1}".format(stdout, stderr)
    +        if onlyRuntime:
    +            # Parse the start/end times in JSON result from cURL
    +            resultPayload = result['instances'][0]
    +            return resultPayload['startTime'], resultPayload['endTime']
    +        else:
    +            # Parse the logfile/status in JSON result from cURL
    +            resultPayload = result['instances'][0] #['actions'][0]
    +            return resultPayload['logFile'], resultPayload['status']
    +    except KeyError as e:
    +        print "KeyError in  getFalconStatus()", str(e), "\n", stdout
    +        return -1, str(e)
    +    except ValueError as e:
    +        print "ValueError in  getFalconStatus()", str(e), "\n", stdout
    +        print "Is Falcon running at : {} ?".format(falconUri)
    +        return -1, str(e)
    +
    +    # Example output follows:
    +    else:
    +        stdout = """{
    +            "status":"SUCCEEDED",
    +            "message":"default/STATUS\n",
    +            "requestId":"default/1436392466@qtp-1730704097-152 - 8dd8f7fa-2024-4bdb-a048-c188759c2f47\n",
    +            "instances":[{
    +                "instance":"2016-08-17T13:22Z",
    +                "status":"SUSPENDED",
    +                "logFile":"http://sandbox2.hortonworks.com:11000/oozie?job=0000014-160805194358788-oozie-oozi-W",
    +                "cluster":"secondaryIDP",
    +                "startTime":"2016-08-17T12:25:23-07:00",
    +                "details":"",
    +                "actions":[{
    +                    "action":"user-action",
    +                    "status":"RUNNING"
    +                }, {
    +                    "action":"failed-post-processing",
    +                    "status":"RUNNING",
    +                    "logFile":"http://sandbox2.hortonworks.com:8088/proxy/application_1470437437449_0002/"
    +                }]
    +            }]
    +        }""".replace("\n", "");
    +
    +        return 0, loads(stdout)['instances'][0]['actions'][0]
    +
    +def doFalconSchedule(falconUri="http://localhost:15000", jobName="drSyncTest",
    +                     userName="gpadmin", isTesting=False):
    +    """Schedule an existing Falcon process/job entity for execution
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobName (str): the Falcon process entity name to get status for
    +        userName (str): the username used for authorization
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false
    +
    +    endpoint = "/api/entities/schedule/process/{n}?user.name={u}&skipDryRun=true".format(n=jobName, u=userName)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except KeyError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +        except:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>default/drSyncTest(process) scheduled successfully</message>
    +            <requestId>default/2028387903@qtp-1730704097-6 - 89554f01-91cf-4bbd-97c2-ee175711b2ba</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +# Falcon process entity template used to create/update job attributes
    +drSyncTemplate="""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +<process name="{name}" xmlns="uri:falcon:process:0.1">
    +    <tags>_falcon_mirroring_type=HDFS</tags>
    +    <clusters>
    +        <cluster name="{executionClusterEntityName}">
    +            <validity start="{startTime}" end="{endTime}"/>
    +        </cluster>
    +    </clusters>
    +    <parallel>1</parallel>
    +    <order>LAST_ONLY</order>
    +    <frequency>days(7)</frequency>
    +    <timezone>GMT{gmtOffset}</timezone>
    +    <properties>
    +        <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
    +        <property name="distcpMaxMaps" value="{distcpMaxMaps}"/>
    +        <property name="distcpMapBandwidth" value="{distcpMaxMBpsPerMap}"/>
    +        <property name="drSourceDir" value="{pathToSync}"/>
    +        <property name="drTargetDir" value="{pathToSync}"/>
    +        <property name="drTargetClusterFS" value="{targetHdfsUri}"/>
    +        <property name="drSourceClusterFS" value="{sourceHdfsUri}"/>
    +        <!-- This can be a list of emails for notifications -->
    +        <property name="drNotificationReceivers" value="NA"/>
    +        <property name="targetCluster" value="{targetClusterEntityName}"/>
    +        <property name="sourceCluster" value="{sourceClusterEntityName}"/>
    +        <property name="queueName" value="default"/>
    +        <property name="jobPriority" value="HIGH"/>
    +    </properties>
    +    <workflow name="drSyncTest-WF" engine="oozie" path="{workflowFilename}" lib=""/>
    +    <retry policy="periodic" delay="minutes(1)" attempts="3"/>
    +    <ACL owner="{userName}" group="users" permission="0755"/>
    +</process>"""
    +
    +def doFalconSubmit(falconUri="http://localhost:15000", jobParameters=None,
    +                   isTesting=False):
    +    """Submit/create a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/submit/process?user.name=falcon
    +
    +    endpoint = "/api/entities/submit/process?user.name={u}".format(u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    # TODO long term would be to encapsulate Falcon functions in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        try:
    +            # Parse the XML result from cURL into a dictionary
    +            resultPayload = xmlToDict(stdout)['result']
    +            return resultPayload['status'], resultPayload['message']
    +        except ElementTree.ParseError:
    +            print "Parse error in getFalconSchedule()", stdout
    +            return -1, stdout
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +        <result>
    +            <status>SUCCEEDED</status>
    +            <message>falcon/default/Submit successful (process) drSyncTest</message>
    +            <requestId>falcon/default/2028387903@qtp-1730704097-6 - 7ddba052-527b-462f-823f-e7dd0a1a08fa</requestId>
    +        </result>
    +        """.replace("\n", "");
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def doFalconSoar(falconUri="http://localhost:15000", jobParameters=None,
    +                 isTesting=False):
    +    """Update, schedule, and monitor a Falcon process/job entity
    +
    +    Args:
    +        falconUri (str): the URI for the Falcon server (e.g. http://host:port)
    +        jobParameters (dict): a dictionary containing process entity configuration 
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int): Zero for success, negative one otherwise 
    +        message (str): message status string (e.g. SUCCEEDED)
    +
    +    """
    +
    +    retVal = -1
    +
    +    if jobParameters is None:
    +        return retVal, "You must provide a job parameters dictionary"
    +
    +    # Example REST call:
    +    #  "Content-Type:text/xml"
    +    #  POST http://localhost:15000/api/entities/update/process/drSyncTest?user.name=falcon
    +
    +    endpoint = "/api/entities/update/process/{n}?user.name={u}".format(n=jobParameters['jobName'],
    +                                                                       u=jobParameters['userName'])
    +
    +    thisMinute = int(strftime("%M"))
    +    thisYear = int(strftime("%Y"))
    +    gmtOffset = strftime("%z")[:3] + ":00"
    +
    +    oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
    +    oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
    +
    +    print "Scheduling for", oneMinuteLater
    +    print "Ending on", oneYearLater
    +
    +    # TODO encapsulate everything in a class structure
    +    # -- (i.e. full Python Falcon abstraction/API)
    +    # -- Targeting AWS S3 or Azure Blob will require a different template
    +    payload = drSyncTemplate.format(startTime=oneMinuteLater,
    +                                    endTime=oneYearLater,
    +                                    gmtOffset=gmtOffset,
    +                                    distcpMaxMaps=jobParameters['distcpMaxMaps'],
    +                                    distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
    +                                    sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
    +                                    sourceHdfsUri=jobParameters['sourceHdfsUri'],
    +                                    targetClusterEntityName=jobParameters['targetClusterEntityName'],
    +                                    targetHdfsUri=jobParameters['targetHdfsUri'],
    +                                    executionClusterEntityName=jobParameters['executionClusterEntityName'],
    +                                    workflowFilename=jobParameters['workflowFilename'],
    +                                    pathToSync=jobParameters['pathToSync'],
    +                                    name=jobParameters['jobName'],
    +                                    userName=jobParameters['userName'])
    +
    +    # TODO output for debug level
    +    #from pprint import pprint
    +    #pprint (payload)
    +
    +    curlCommand = "curl -H Content-Type:text/xml -X POST {uri} -d ".format(uri=falconUri + endpoint)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Note, needed to seperate out the payload as it can't be split on spaces
    +        curlProcess = Popen(curlCommand.split() + [payload],
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = curlProcess.communicate()
    +
    +        retVal = curlProcess.returncode
    +
    +        # Curl process completed successfully
    +        if retVal == 0:
    +
    +            try:
    +                # Parse the XML result from cURL into a dictionary
    +                result = xmlToDict(stdout)['result']
    +                stderr = result['message']
    +
    +                # Falcon REST update operation successful
    +                if "SUCCEEDED" in result['status']:
    +                    print "Falcon process update was successful"
    +
    +                    # We should doFalconSchedule() here
    +                    status, message = doFalconSchedule(falconUri=falconUri,
    +                                                       jobName=jobParameters['jobName'],
    +                                                       isTesting=False)
    +
    +                    # If we suceeded in scheduling
    +                    if "SUCCEEDED" in status:
    +                        print "Falcon process scheduling was successful"
    +
    +                        # Reset retVal to catch error between scheduled and running states
    +                        retVal = -1
    +                        sleep(5)
    +
    +                        message, status = getFalconStatus(falconUri=falconUri,
    +                                                          entity=jobParameters['jobName'])
    +
    +                        # Continuously poll for hdfs-mirroring status
    +                        while "RUNNING" in status:
    +                            message, status = getFalconStatus(falconUri=falconUri,
    +                                                              entity=jobParameters['jobName'])
    +                            print status
    +
    +                            # flag RUNNING state reached using retVal
    +                            retVal = 0
    +                            sleep(10)
    +
    +                        if status == "KILLED":
    +                            return -1, message
    +
    +                        # Poll one last time for runtimes
    +                        start, finish = getFalconStatus(falconUri=falconUri,
    +                                                        entity=jobParameters['jobName'],
    +                                                        onlyRuntime=True)
    +
    +                        return retVal, "logfile: {0} started: {1} finished: {2}".format(message, start, finish)
    +
    +                    # Scheduling failed
    +                    else:
    +                        return -1, message
    +
    +                # Falcon REST update operation NOT successful
    +                else:
    +                    print "Falcon REST operation not successful"
    +                    return result['status'], stderr
    +
    +            except KeyError:
    +                print "Are you using the correct Falcon server URI?", falconUri
    +                return -1, stdout
    +
    +        # Curl process did not complete successfully
    +        else:
    +            print "Curl command failed"
    +            return retVal, stderr
    +
    +    # Example output follows:
    +    else:
    +        stdout = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    +            <result>
    +                <status>SUCCEEDED</status>
    +                <message>falcon/update/default/Updated successfully</message>
    +                <requestId>falcon/update/default/868391317@qtp-1730704097-47 - b2391bd7-3ae0-468e-b39c-5d002099a446</requestId>
    +            </result>
    +        """.replace("\n", "");
    +
    +        # Parse the XML result from cURL into a dictionary
    +
    +        return 0, xmlToDict(stdout)['result']['status']
    +
    +def hdfsFingerprintsMatch(reference, comparison):
    +    """Helper function to compare two fingerprints / md5 hashes
    +
    +    Args:
    +        reference (str): the reference MD5 checksum string
    +        comparison (str): the comparison MD5 checksum string
    +    
    +    Returns:
    +        isEqual (bool): Zero for success, negative one otherwise 
    +
    +    """
    +
    +    return reference == comparison
    +
    +
    +def stopHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a quick stop of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStopCommand = "hawq stop master -a -M fast"
    +
    +    if masterHost is not None:
    +        hawqStopCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                    c=hawqStopCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStopProcess = Popen(hawqStopCommand,
    +                                stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStopProcess.communicate()
    +
    +        return hawqStopProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +def startHawq(masterHost=None, isTesting=False):
    +    """Optionally connect to a remote HAWQ master
    +    and do a start of the HAWQ master process
    +
    +    Args:
    +        masterHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains a stderr string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hawqStartCommand = "hawq start master -a"
    +
    +    if masterHost is not None:
    +        hawqStartCommand = "ssh {h} -- '{c}'".format(h=masterHost,
    +                                                     c=hawqStartCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        hawqStartProcess = Popen(hawqStartCommand,
    +                                 stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hawqStartProcess.communicate()
    +
    +        return hawqStartProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TEST MODE"
    +
    +
    +def copyToHdfs(source, dest, isTesting=False):
    +    """Utility function to copy a source file
    +    to the destination HDFS directory/file
    +
    +    Args:
    +        source (str): the source file on the local FS
    +        dest (str): the target HDFS directory and filename
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source,
    +                                                                    d=dest)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'gpadmin'
    +        hdfsProcess = Popen(hdfsCommand.split(), env=env,
    +                            stdout=PIPE, stderr=PIPE)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def checkHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to query HDFS for 
    +    safemode enabled or disabled
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +    
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode get"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        try:
    +            offOrOn = True if "ON" in stdout.split()[-1] else False
    +        except IndexError as e:
    +            return -1, str(e)
    +
    +        return hdfsProcess.returncode, offOrOn
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def enableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to enable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode enter"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def disableHdfsSafemode(namenodeHost=None, isTesting=False):
    +    """Utility function to disable safemode
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode leave"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +def forceHdfsCheckpoint(namenodeHost=None, isTesting=False):
    +    """Utility function to force an HDFS checkpoint
    +
    +    Args:
    +        namenodeHost (str): the remote host to SSH to first, otherwise localhost
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +
    +    hdfsCommand = "/usr/bin/hdfs dfsadmin -saveNamespace"
    +
    +    if namenodeHost is not None:
    +        hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
    +                                                                          c=hdfsCommand)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +        # Force HDFS commands to run as hdfs user
    +        env = os.environ.copy()
    +        env['HADOOP_USER_NAME'] = 'hdfs'
    +        hdfsProcess = Popen(hdfsCommand, env=env,
    +                            stdout=PIPE, stderr=PIPE, shell=True)
    +
    +        (stdout, stderr) = hdfsProcess.communicate()
    +
    +        return hdfsProcess.returncode, stderr
    +
    +    else:
    +        return 0, "TESTING"
    +
    +
    +def createTarball(masterDataBase="/data/hawq/",
    +                  targetTarball="/tmp/hawqMdd-{t}.tar", isTesting=False):
    +    """Utility function to create a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        masterDataBase (str): the base directory containing the MASTER_DATA_DIRECTORY
    +        targetTarball (str): the target directory and filename of the tarball
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # tar cpf /tmp/test.tar.bz2 --exclude=csv -C /data/hawq master
    +
    +    theTime = strftime("%Y-%m-%d-%H%M")
    +
    +    tarCommand = "tar -cpf {t} --exclude=csv -C {c} master".format(t=targetTarball.format(t=theTime),
    +                                                                   c=masterDataBase)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = tarProcess.communicate()
    +
    +        except OSError as e:
    +            return -1, str(e), -1
    +
    +        if tarProcess.returncode == 0:
    +
    +            md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime))
    +
    +            try:
    +                md5Process = Popen(md5Command.split(),
    +                                   stdout=PIPE, stderr=PIPE)
    +
    +                (stdout2, stderr2) = md5Process.communicate()
    +
    +                checksum = stdout2.split()[0].strip()
    +
    +                if md5Process.returncode != 0:
    +                    return -1, "md5 checksum creation failed : " + stderr2, -1
    +                else:
    +                    return 0, targetTarball.format(t=theTime), checksum
    +
    +            except OSError as e:
    +                return -1, str(e), -1
    +
    +        else:
    +            print "Tarball creation failed : " + stderr
    +            return -1, stderr, -1
    +
    +    else:
    +        return 0, "TEST BRANCH", -1
    +
    +def cleanupTarball(filename, isTesting=False):
    +    """Utility function to delete a tarball of the HAWQ MASTER_DATA_DIRECTORY
    +
    +    Args:
    +        filename (str): the target directory and filename of the tarball to clean up
    +        isTesting (bool): NOOP mode bypassing actual REST calls for testing
    +    
    +    Returns:
    +        retVal (int) or message (str): Zero for success, negative one otherwise
    +        message (str): message contains status string
    +
    +    """
    +
    +    retVal = -1
    +    checksum = None
    +
    +    # Example invocation (note: excluding most of pg_log contents)
    +    # rm -f /tmp/test.tar
    +
    +    rmCommand = "rm -f {f}".format(f=filename)
    +
    +    stdout = None
    +    stderr = None
    +    if not isTesting:
    +
    +        try:
    +            rmProcess = Popen(rmCommand.split(), stdout=PIPE, stderr=PIPE)
    +
    +            (stdout, stderr) = rmProcess.communicate()
    +
    +            retVal = rmProcess.returncode
    +
    +            return retVal, stderr
    +
    +        except OSError as e:
    +            return -1, str(e)
    +    else:
    +        return 0, "TEST BRANCH"
    +
    +if __name__ == '__main__':
    +    options, args = parseargs()
    +
    +    #if options.verbose:
    +    #    enable_verbose_logging()
    +
    +    # TODO - switch prints to this once using gppylibs
    +    #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir)
    +
    +
    +    # ### HAWQ Extract every non-system table (source)
    +    # Note: the asusmption is this has been done in
    +    # advance of executing this tool.
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        print ""
    +        print "Please confirm you've performed hawq_extract() on all critical data tables"
    +        print "and saved this information outside of the cluster (e.g. version control)"
    +        print "or are using Falcon with an atomic option (i.e. in HDP-2.5: snapshot-based replication)"
    +        print ""
    +        print "This is critical for data recovery if a sync operation partially completes!"
    +        print ""
    +        print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Falcon cluster entities
    +    # Note: the assumption is both source and target
    +    # cluster entities have alredy been created in Falcon
    +    # TODO add a confirmation step, later a REST call to check
    +    if options.prompt:
    +        # TODO - switch to this once using gppylibs
    +        #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
    +
    +        # TODO - switch to gppylib-based logging
    +        print ""
    +        print "Please confirm you've created both source and target Falcon cluster entities:"
    +        print ""
    +        answer = raw_input("y or n: ")
    +        if "y" not in answer and "Y" not in answer:
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Stop HAWQ
    +    #
    +    # TODO?: switch to Ambari REST, followed by pkill -5 <<some hawq filter>>
    +
    +    # Source
    +    earlier = int(time())
    +    print "Stopping source HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Stopping target HAWQ" if options.verbose else None;
    +    retVal, stderr = stopHawq(masterHost=options.targetHawqMaster,
    +                              isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to stop target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Restarting source HAWQ" if options.verbose else None;
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +
    +    if retVal == 0:
    +        print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +    else:
    +        print "Source HAWQ failed to restart after pre-sync failure."
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Create HAWQ Master Data Directory archive
    +    print "Creating MDD tarball" if options.verbose else None;
    +    retVal, filenameOrStderr, md5sum = createTarball(masterDataBase="/data/hawq/",
    +                                                     isTesting=options.testMode)
    +    print retVal, filenameOrStderr, md5sum if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to create archive of source HAWQ MASTER_DATA_DIRECTORY"
    +        print "Error message was : " + filenameOrStderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        retVal, stderr = startHawq(isTesting=options.testMode)
    +        later = int(time())
    +        if retVal == 0:
    +            print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +        else:
    +            print "Source HAWQ failed to restart after pre-sync failure."
    +            print "Exiting."
    +            sys.exit(1)
    +
    +    # ### Start HAWQ
    +    print "Starting source HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(isTesting=options.testMode)
    +    later = int(time())
    +
    +    if retVal != 0:
    +        print "Failed to start source HAWQ master"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
    +
    +    # TODO add a CLI flag to force source into read-write
    +    if checkHdfsSafemode()[1] == True:
    +        print "Source cluster HDFS is read-only, cannot proceed"
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        sys.exit(1)
    +
    +    # ### Copy MDD archive to HDFS
    +    print "Copying MDD tarball to HDFS" if options.verbose else None;
    +    retVal, stderr = copyToHdfs(source=filenameOrStderr,
    +                                dest=options.pathToSync,
    +                                isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to copy MDD tarball to HDFS"
    +        print "Error message was " + stderr
    +        print "Cleaning up MDD tarball on local FS"
    +        print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Cleanup MDD archive from /tmp
    +    print "Cleaning up MDD tarball on local FS" if options.verbose else None;
    +    retVal, stderr = cleanupTarball(filenameOrStderr,
    +                                    isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to clean up MDD tarball"
    +        print "Error message was " + stderr
    +        print ""
    +        print "You will need to manually remove the following file"
    +        print filenameOrStderr
    +        print ""
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    """
    +    # ### Force HDFS checkpoint and enable safemode on source
    +    print "Enabling HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = enableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on source cluster" if options.verbose else None;
    +    retVal, stderr = forceHdfsCheckpoint(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    """;
    +
    +    # ### Leave safemode on target HDFS
    +    print "Disabling HDFS safemode on target" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(namenodeHost=options.targetNamenode,
    +                                         isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # Note, the entity names refer to Falcon
    +    # entities that have been created prior
    +    # to execution of this tool
    +    """
    +    jobParameters = dict(userName="gpadmin",
    +                         distcpMaxMaps="100",
    +                         distcpMaxMBpsPerMap="1000",
    +                         sourceClusterEntityName="sourceCluster",
    +                         sourceHdfsUri="hdfs://{0}:8020".format(sourceNamenode),
    +                         targetClusterEntityName="targetCluster",
    +                         txMapsargetHdfsUri="hdfs://{0}:8020".format(targetNamenode),
    +                         executionClusterEntityName="sourceCluster",
    +                         workflowFilename="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
    +                         pathToSync="/tmp/syncTest",
    +                         jobName="drSync")
    +    """;
    +
    +    jobParameters = dict(userName=options.userName,
    +                         distcpMaxMaps=options.distcpMaxMaps,
    +                         distcpMaxMBpsPerMap=options.distcpMaxMBpsPerMap,
    +                         sourceClusterEntityName=options.sourceClusterEntityName,
    +                         sourceHdfsUri="hdfs://{0}:8020".format(options.sourceNamenode),
    +                         targetClusterEntityName=options.targetClusterEntityName,
    +                         targetHdfsUri="hdfs://{0}:8020".format(options.targetNamenode),
    +                         executionClusterEntityName=options.executionClusterEntityName,
    +                         workflowFilename=options.workflowFilename,
    +                         pathToSync=options.pathToSync,
    +                         jobName=options.jobName)
    +
    +    print jobParameters if options.verbose else None;
    +
    +    # ### Update and Schedule Job - monitor until completion
    +    print "Falcon Soar" if options.verbose else None;
    +    retVal, stderr = doFalconSoar(falconUri=options.falconUri,
    +                                  jobParameters=jobParameters,
    +                                  isTesting=options.testMode)
    +    falconOutput = stderr
    +
    +    if retVal != 0:
    +        print "Falcon replication job failed"
    +        print "Error message was " + stderr
    +        print "Source cluster will be left in safemode for remediation"
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Leave safemode on source HDFS
    +    print "Disable HDFS safemode on source cluster" if options.verbose else None;
    +    retVal, stderr = disableHdfsSafemode(isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to leave HDFS safemode on source cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### Force HDFS checkpoint and enable safemode on target
    +    print "Enabling HDFS safemode on target cluster" if options.verbose else None
    +    retVal, stderr = enableHdfsSafemode(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to enable HDFS safemode on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    print "Forcing HDFS checkpoint on target cluster" if options.verbose else None
    +    retVal, stderr = forceHdfsCheckpoint(namenodeHost=options.targetNamenode, isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to force HDFS checkpoint on target cluster"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    # ### HDFS Fingerprint comparison
    +
    +    print "Validating HDFS fingerprints match between source and target clusters" if options.verbose else None
    +
    +    retVal, md5OrStderr = getHdfsFingerprint(hdfsUri=jobParameters['sourceHdfsUri'],
    +                                             hdfsDir=jobParameters['pathToSync'],
    +                                             isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on source cluster"
    +        print "Error message was " + md5OrStderr
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    retVal, md5OrStderr2 = getHdfsFingerprint(hdfsUri=jobParameters['targetHdfsUri'],
    +                                              hdfsDir=jobParameters['pathToSync'],
    +                                              isTesting=options.testMode)
    +
    +    if retVal != 0:
    +        print "Failed to generate HDFS fingerprint on target cluster"
    +        print "Error message was " + md5OrStderr2
    +        print "Exiting"
    +        sys.exit(1)
    +
    +    isValidSync = hdfsFingerprintsMatch(md5OrStderr, md5OrStderr2)
    +
    +    if not isValidSync:
    +        print "Source and target cluster HDFS fingerprints do not match."
    +        print "Source checksum : " + md5OrStderr
    +        print "Target checksum : " + md5OrStderr2
    +        print "This is bad, please check Falcon sync logs : " + falconOutput
    +        sys.exit(1)
    +    else:
    +        print "Source and target HDFS fingerprints match."
    +
    +    # Target
    +    # TODO - either use Ambari REST call or use SSH
    +    print "Starting target HAWQ" if options.verbose else None;
    +    retVal, stderr = startHawq(masterHost=options.targetHawqMaster,
    +                               isTesting=options.testMode)
    +    print retVal, stderr if options.verbose else None;
    +
    +    if retVal != 0:
    +        print "Failed to start target HAWQ master"
    +        print "Error message was " + stderr
    +        print "Exiting"
    +        sys.exit(1)
    +    else:
    +        print "HAWQ sync completed successfully!"
    +        print """
    +        ## Manual runbook during DR event
    +        1. Copy MDD archive from HDFS to target master (CLI)
    +        2. Restore archive in /data/hawq/ (CLI)
    --- End diff --
    
    Is it HAWQ master's MASTER_DATA_DIRECTORY?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---