You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2015/11/05 04:10:02 UTC
[6/8] incubator-hawq git commit: HAWQ-121. Remove legacy command line
tools.
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/9932786b/tools/bin/gpfilespace
----------------------------------------------------------------------
diff --git a/tools/bin/gpfilespace b/tools/bin/gpfilespace
deleted file mode 100755
index 2076dfa..0000000
--- a/tools/bin/gpfilespace
+++ /dev/null
@@ -1,1515 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-'''
- gpfilespace [options]
-
-General Options:
- -? | --help show this help message and exit
- -v | --version show the program's version number and exit
- -l | --logdir DIRECTORY log file directory
-
-Connection Options:
- -h | --host HOSTNAME database server host
- -p | --port PORT database server port
- -U | --username NAME database user name
- -W | --password prompt for password
-
-Execution Options:
- -c | --config FILE configuration file
- -o | --output DIRECTORY output directory for config file
-
-Move Options:
- --movefilespace FILESPACE | default move filespace to new location on distributed file system
- --location <dfslocation> new location which the filespace will be moved to
-
- --movetransfilespace FILESPACE | default move filespace used by transaction files to FILESPACE
- --movetempfilespace FILESPACE | default move filespace used by temporary files to FILESPACE
- --showtransfilespace show the filespace used by transaction files
- --showtempfilespace show the filespace used by temporary files
-
-Compatible Options:
- -f | --filesystem FSNAME filesytem name for the filespace (default value
- will be the environment varialbe:
- GPFILESPACE_FILESYSTEM or ask for user inputing
- during the interacitve mode)
-'''
-# ============================================================================
-__version__ = '$Revision$'
-
-# ============================================================================
-import sys, os
-EXECNAME = os.path.split(__file__)[-1]
-# ============================================================================
-# Python version 2.6.2 is expected, must be between 2.5-3.0
-# ============================================================================
-if sys.version_info < (2, 5, 0) or sys.version_info >= (3, 0, 0):
- sys.stderr.write("""
-Error: %s is supported on Python versions 2.5 or greater
-Please upgrade python installed on this machine.
-""" % EXECNAME)
- sys.exit(1)
-
-# ============================================================================
-import optparse # Option Parsing
-import subprocess # Calling external processes
-import traceback # exception logging
-import re
-import commands
-from time import strftime
-try:
- from gppylib import gphostcache
- from gppylib.commands.base import WorkerPool
- from gppylib.commands import unix
- from gppylib.db import dbconn
- from gppylib.db import catalog
- from pygresql import pg # Database interaction
- from gppylib.gparray import * # Greenplum Array datastructures
- from gppylib.gplog import * # Greenplum logging facility
- from gppylib.gpcoverage import GpCoverage
- from getpass import getpass
- from gppylib.parseutils import line_reader, parse_fspacename, parse_fspacesys, parse_fspacereplica, parse_gpfilespace_line, \
- canonicalize_address
- from gppylib.operations.filespace import MoveFilespace, MoveFilespaceError, MoveFileSpaceLocation, GetCurrentFilespaceEntries, FileType, CheckFilespaceConsistency, CheckSuperUser, PG_SYSTEM_FILESPACE
- from gppylib.commands.base import WorkerPool, Command
-
-except ImportError, e:
- sys.exit('Error: unable to import module: ' + str(e))
-
-
-# ============================================================================
-# Some basic exceptions:
-#
-# + StandardError
-# + GPError
-# + GPDatabaseError
-# + GPPrivilegeError
-#
-# ============================================================================
-class GPError(StandardError): pass
-class GPDatabaseError(GPError): pass
-class GPPrivilegeError(GPError): pass
-
-CLUSTERSIZE = 1 # DEBUGGING (to simulate ebay set to 98)
-DEFAULT_NUM_WORKERS = 64 # same as gpstart, used for hostcache
-
-# ============================================================================
-def cli_help():
- '''
- Reads the help file from the docs directory, if documentation file can't
- be found this defaults to the __doc__ string.
- '''
- help_path = os.path.join(sys.path[0], '../docs/cli_help',
- EXECNAME + '_help')
- f = None
- try:
- try:
- f = open(help_path)
- return f.read(-1)
- except:
- return __doc__
- finally:
- if f:
- f.close()
-
-def usage():
- print cli_help()
-
-# ============================================================================
-def ParseOptions():
- '''
- Parses command line options input to the script.
- '''
-
- # Determine Default Values
- gphome = os.environ.get("GPHOME")
- home = os.environ.get("HOME")
-
- logdir = os.path.join(home, "gpAdminLogs")
- pguser = os.environ.get("PGUSER") or unix.getUserName()
- pghost = os.environ.get("PGHOST") or unix.getLocalHostname()
- pgport = os.environ.get("PGPORT") or 5432
- filesystem = os.environ.get("GPFILESPACE_FILESYSTEM") or None
- output = os.getcwd()
-
- # Setup Options Parser
- parser = optparse.OptionParser(usage=cli_help(), add_help_option=False)
- parser.add_option('--version', default=False,
- action='store_true', help=optparse.SUPPRESS_HELP)
- parser.add_option('-l', '--logdir', default=logdir,
- help=optparse.SUPPRESS_HELP)
- parser.add_option('-h', '--host', default=pghost,
- help=optparse.SUPPRESS_HELP)
- parser.add_option('-?', '--help', default=False,
- action='store_true', help=optparse.SUPPRESS_HELP)
- parser.add_option('-p', '--port', default=pgport,
- help=optparse.SUPPRESS_HELP)
- parser.add_option('-U', '--username', default=pguser,
- help=optparse.SUPPRESS_HELP)
- parser.add_option('-W', '--password', default=False,
- action='store_true', help=optparse.SUPPRESS_HELP)
- parser.add_option('-c', '--config', default=None,
- help=optparse.SUPPRESS_HELP)
- parser.add_option('-o', '--output', default=output,
- help=optparse.SUPPRESS_HELP)
- parser.add_option('-v', '--verbose',
- action='store_true', help=optparse.SUPPRESS_HELP)
- parser.add_option('-a', dest='interactive',
- action='store_false', help=optparse.SUPPRESS_HELP)
- parser.add_option('-q', '--quiet',
- action='store_true', help=optparse.SUPPRESS_HELP)
- parser.add_option('--movetransfilespace',
- help=optparse.SUPPRESS_HELP)
- parser.add_option('--movefilespace')
- parser.add_option('--location')
- parser.add_option('-f', '--filesystem', default=filesystem,
- help=optparse.SUPPRESS_HELP)
- parser.add_option('--movetempfilespace',
- help=optparse.SUPPRESS_HELP)
- parser.add_option('--showtransfilespace', default=False,
- action='store_true', help=optparse.SUPPRESS_HELP)
- parser.add_option('--showtempfilespace', default=False,
- action='store_true', help=optparse.SUPPRESS_HELP)
-
- (options, args) = parser.parse_args()
-
- # Print version and exit
- if options.version:
- print EXECNAME + ' ' + __version__
- sys.exit(0)
-
- # Print help and exit
- if options.help:
- usage()
- sys.exit(0)
-
- # We don't support any arguments not listed above
- if len(args) > 0:
- parser.error("unknown arguments %s" % repr(args))
-
- # Make sure that port is an integer
- # We check this here rather than in parser.add_option because the default
- # value from getenv() might fail this check, and OptionParser doesn't
- # like recieving bad default values
- try:
- options.port = int(options.port)
- except:
- logger.error("Invalid PORT: '%s'" % options.port)
- sys.exit(1)
-
- return options
-
-
-# ============================================================================
-def runcmd(cmd):
- '''
- runcmd(cmd)
-
- Executes a command in the shell and returns STDOUT.
-
- Throws:
- OSError - command returned error code
-
- Usage:
- runcmd(['ssh', hostname, cmd])
- '''
- logger.debug(" ".join(cmd))
- pipe = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- close_fds=True)
- result = pipe.communicate()
- if pipe.returncode:
- raise OSError(result[1])
- else:
- return result[0].strip()
-
-# ============================================================================
-def getstring(prompt, question=False, default=""):
- '''
- getstring(prompt, question, default)
-
- Prompts the user for input, returns the value read from STDIN.
- '''
-
- # test that we have an interactive terminal
- if not sys.stdin.isatty():
- logger.error('stdin is not a tty')
- logger.error('gpfilespace must be run from an interactive terminal')
- sys.exit(1)
-
- # Give the user the prompt and read from STDIN
- try:
- if question:
- if default:
- print "%s (default=%s)" % (question, default)
- else:
- print question
-
- value = raw_input(prompt).strip()
- if value == "":
- value = default
- return value
-
- # Control-C or Control-D
- except (KeyboardInterrupt, EOFError), e:
- print
- raise KeyboardInterrupt()
-
- # Some other exception
- except Exception, e:
- logger.error('failure reading from stdin')
- logger.error(repr(e))
- sys.exit(1)
-
-# ============================================================================
-def getbool(prompt, question, default):
- '''
- getbool(prompt, question, default)
-
- Prompts the user for input (yes|no), returns True/False
- '''
- while True:
- value = getstring(prompt, question, default).lower()
- if value in ("y", "ye", "yes"):
- return True
- elif value in ("n", "no"):
- return False
-
-# ============================================================================
-def pingurl(url):
- '''
- pingurl(url)
-
- We only
- '''
- url_protocol_pattern = "^[a-z][a-z0-9]*://"
- if len(url) <= 0:
- return False
- if re.match(url_protocol_pattern, url, flags=re.IGNORECASE) is not None:
- return False
- if url.startswith('/'):
- return False
- if url.find('/') == -1:
- return False
- if url.find(':') != -1:
- address = url[0:url.find(':')]
- else:
- address = url[0:url.find('/')]
- try:
- pingCmd = unix.Ping("Test host: %s" % address, address)
- pingCmd.run()
- except socket.gaierror:
- logger.warning("address %s is invalid or cannot be connected: %s" % (address, socket.gaierror))
- return False
- if pingCmd.get_results().rc == 0:
- return True
- else:
- logger.warning("Ping to host: '%s' FAILED" % address)
- logger.debug(" ping details: %s" % pingCmd)
- return False
-
-# ============================================================================
-def getdir(prompt, hosts=[], primary=None, shared=False, fsysn=None, db=None):
- '''
- getdir(prompt, hosts)
-
- Prompts the user for input, checks that the input is a full path and
- validates that it exists on at least a subset of the specified hosts.
-
- Since we will be entering a number of directories and since checking them
- can be expensive on large clusters we do NOT exhaustively check all the
- hosts on input. The checking perfomed here is merely a sanity check.
- We will perform more exhaustive checking after all paths are known.
- '''
- while True:
- value = getstring(prompt)
- if len(value) == 0:
- continue
- if shared:
- # TODO: check dfs uri
- try:
- cursor = dbconn.execSQL(db, "show enable_secure_filesystem")
- db.commit() # Should move into execSQL ?
- rows = cursor.fetchall()
- krbstatus = rows[0][0]
-
- cursor = dbconn.execSQL(db, "show krb_server_keyfile")
- db.commit()
- rows = cursor.fetchall();
- krb_keyfile = rows[0][0]
-
- except Exception, e:
- raise GPDatabaseError(str(e))
-
- dfs_name = fsysn
- dfs_url = value
-
- command = "gpcheckhdfs %s %s %s %s" % (dfs_name, dfs_url, krbstatus, krb_keyfile)
- (status,returnCode) = commands.getstatusoutput(command)
- if status != 0:
- print command
- if 1 == status:
- print "gpcheckhdfs run error,Please check: %s" % (command)
- continue
- elif 100 == status:
- print "Failed to Connect to %s://%s" % (dfs_name, dfs_url)
- continue
- elif 101 == status:
- print "Failed to Open or Write a File in HDFS"
- continue
- elif 102 == status:
- print "Failed to Write File to HDFS"
- continue
- elif 103 == status:
- print "Failed to Flush,Check your Datanode"
- continue
- elif 104 == status:
- print "Failed to Delete a File."
- continue
- elif 105 == status:
- print "DFS Directory Error"
- continue
- elif 106 == status:
- print "Get Delegation Token Error"
- continue
- elif 107 == status:
- print "Failed to Login to Kerberos Server"
- continue
- return value
- if not os.path.isabs(value):
- print "Must specify a full path"
- continue
- value = os.path.normpath(value)
-
- # only check up to 3 hosts (this is just a sanity check)
- nhosts = len(hosts)
- N = min(nhosts, 3)
- try:
- for i in range(N):
- hosts[i].dircheck(value, primary)
- return value
- except OSError, e:
- print "[Error] " + str(e)
- continue
- except GPError, e:
- print "[Error] " + str(e)
- continue
-
-def checkdirs(host, parents, paths):
- '''
- checkdirs()
-
- Helper function that uses ssh to call gpdircheck on the specified host.
-
- For every directory in 'parents' we will check that the directory
- actually exists and has reasonable permissions.
-
- For every directory in 'paths' we will check that the directory does not
- exist. (This is the -E option for gpdircheck)
- '''
-
- # Wrap the directory names in double quotes, this helps with
- # unexpected characters (such as spaces) in a path name.
- quoted_parents = map(lambda x: '"%s"' % x, parents)
- quoted_paths = map(lambda x: '"%s"' % x, paths)
-
- # add -E before each path,
- # - We now have a list that looks something like:
- # ['"/gpdata"', '-E', '"/gpdata/gp0"', ... ]
- dirs = quoted_parents + \
- reduce(lambda x,y: x+y, [["-E", d] for d in quoted_paths], [])
-
- # Execute the command on the host
- dircheck = os.path.expandvars("$GPHOME/bin/lib/gpdircheck")
- try:
- runcmd(['ssh', host, dircheck] + dirs)
- except OSError, e:
- raise GPError('%s: %s' % (host, str(e)))
-
-
-# Helper function to validate reasonable names for filespaces
-def isValidIdentifier(name):
- if name == "":
- return False
- if name[0] == '"' and name[-1] == '"':
- quoted = True
- name = name[1:-1]
- else:
- quoted = False
-
- if name[0:3] == 'pg_' or name[0:4] == '"pg_':
- print "[Error] The name prefix 'pg_' is reserved for " \
- "system filespaces\n"
- return False
- if name[0:3] == 'gp_' or name[0:4] == '"gp_':
- print "[Error] The name prefix 'gp_' is reserved for " \
- "system filespaces\n"
- return False
-
- # Check for identifier names
- if quoted:
- regex = r'([^"]*("")*)+$'
- else:
- regex = r'[a-zA-Z][a-zA-Z0-9_]*$'
- if not re.match(regex, name):
- if quoted:
- print "[Error] invalid identifier\n"
- else:
- print "[Error] invalid identifier"
- print " [Hint] non-alphanumeric identifers should be double-" \
- "quoted\n"
- return False
-
- return True
-
-
-
-# ============================================================================
-class GPHost():
- '''
- GPHost()
-
- Simple abstraction of a host machine.
-
- This is a container class for the segments on the host
- '''
-
- # --------------------------------------------------------------------
- def __init__(self, nic, hostname):
- '''
- GPHost()
-
- Used to resolve multiple nics into a single hostname.
- - nic is used for communication
- - hostname is used for grouping
- - segments is the set of segments on the host
-
-
- Because a poorly configured system may be configured such that
- "hostname" is not a valid network address we keep track of the
- first "nic" that we identified this host with and will use that
- nic for future communication with this host.
- '''
- self.nic = nic
- self.hostname = hostname
- self.segments = []
-
- # --------------------------------------------------------------------
- def __str__(self): return self.hostname
- def __repr__(self): return self.hostname
-
- # --------------------------------------------------------------------
- def add_segment(self, segment):
- '''
- Adds a new segment to the host
- '''
- self.segments.append(segment)
-
- # --------------------------------------------------------------------
- def primaries(self):
- '''
- Returns a list of primary segments on this host
- '''
- return filter(lambda x: x.isSegmentPrimary(), self.segments)
-
- # --------------------------------------------------------------------
- def mirrors(self):
- '''
- Returns a list of mirror segments on this host
- '''
- return filter(lambda x: x.isSegmentMirror(), self.segments)
-
- # --------------------------------------------------------------------
- def dircheck(self, dir, primary=None):
- '''
- Checks that a directory exists on the host and that the directory
- can be used to install filespace directories. This checks that the
- specified directory can work with all of the segments of the specified
- type on the host. Some tests can be logically determined based on
- the cataloged information of existing datadirs and filespaces, others
- require actally sshing to the host and checking the filesystem.
-
- Possible errors (OSError):
- - Directory conflicts with datadir
- - Subdirectory of datadir
- - No such file or directory
- - Not a directory
- - No read permissions
- - No write permissions
- - Directory not empty
- '''
-
- # The directory should not be a subdirectory of ANY datadir or
- # filespace on the host.
- #
- # Note: all paths have gone through os.path.normpath(), which means
- # no trailing "/" characters.
- for d in self.segments:
- name = os.path.split(d.getSegmentDataDirectory())[1]
- cdir = os.path.join(dir, name)
-
- # If the directories are one and the same
- if cdir == d.datadir:
- raise OSError("%s: %s - "
- "Directory conflicts with existing datadir"
- % (self.nic, cdir))
-
- # If the directory is a subdirectory
- if ((cdir[0:len(d.datadir)] == d.datadir) and
- (cdir[len(d.datadir):len(d.datadir)+1] == "/")):
- raise OSError("%s: %s - Subdirectory of existing datadir"
- % (self.nic, cdir))
-
- # determine which directories we are checking
- if primary == True:
- segdirs = map(lambda x: os.path.join(dir, os.path.split(x.getSegmentDataDirectory())[1]), self.primaries())
- elif primary == False:
- segdirs = map(lambda x: os.path.join(dir, os.path.split(x.getSegmentDataDirectory())[1]), self.mirrors())
- else:
- segdirs = []
-
- checkdirs(self.nic, [dir], segdirs)
- return True
-
- # --------------------------------------------------------------------
- def validate_paths(self):
- '''
- validate_paths()
-
- This function checks that the paths specified exist with the
- correct permissions on all hosts.
- '''
-
- # Get the list of all specified paths and their parent directories
- paths = map(lambda x: x.getSegmentPendingFilespace(), self.segments)
- parents = map(lambda x: os.path.split(x)[0], paths)
-
- # Check the host that we have no duplicate paths, if we are
- # looking at an unmodified config file this should always be true.
- if len(paths) != len(set(paths)):
- raise GPError("Duplicate paths on host: %s" % self.hostname)
-
- # The parent directories are allowed to have duplicates, but we
- # would like to remove the duplicates before we check them.
- parents = list(set(parents))
-
- # ssh to the host and check the paths
- # - TODO: do a better job parallelizing this
- checkdirs(self.nic, parents, paths)
-
-
-# ============================================================================
-class GPConfig:
- '''
- GPConfig()
-
- A simple abstraction of a "configuration" of like hosts.
-
- A configuration is basically a set of hosts that all have:
- - The same number of primary + mirror segment
- - The same set of "locations" for the segments on the node.
-
- Presence or absense of master or standby master does not matter.
- '''
-
- # --------------------------------------------------------------------
- def __init__(self, host):
- # Extract the locations of the primary and mirror segments
- primaries = host.primaries()
- mirrors = host.mirrors()
-
- # We generate unique identifications based on the sorted
- # "locations" of the primary and mirror segments
- plocations = map(lambda x: os.path.split(x.getSegmentDataDirectory())[0], primaries)
- mlocations = map(lambda x: os.path.split(x.getSegmentDataDirectory())[0], mirrors)
- plocations.sort()
- mlocations.sort()
-
- self.identity = ":".join(plocations + ["&"] + mlocations)
- self.primaries = plocations
- self.mirrors = mlocations
- self.hosts = [] # added via add_host() later
- self.pdest = [] # Filespace Primary destination
- self.mdest = [] # Filespace Mirror destination
-
- # --------------------------------------------------------------------
- def __str__(self):
- return "{'primaries': %s, 'mirrors': %s, 'hosts': %s}" % (
- str(self.primaries), str(self.mirrors), str(self.hosts)
- )
-
- # --------------------------------------------------------------------
- def __repr__(self):
- return "GPConfig<%s,%s>" % (self.identity, str(self.hosts))
-
- # --------------------------------------------------------------------
- def add_host(self, host):
- self.hosts.append(host)
-
- # --------------------------------------------------------------------
- def set_primary(self, dir):
- self.pdest.append(dir)
-
- # --------------------------------------------------------------------
- def set_mirror(self, dir):
- self.mdest.append(dir)
-
-
-
-# ============================================================================
-class GPFilespace:
- '''
- Greenplum Filespace Management Utility
- '''
-
- # --------------------------------------------------------------------
- def __init__(self, options):
- '''
- GPFilespace(options)
-
- Creates an instance of a GPFilespace with the parsed options
-
- This will:
- - setup the basic container object
- - establish a connection to the database
- - assert that the user is a database superuser
- - gather the basic information from gp_configuration
-
- It will not:
- - resolve gp_configuration.hostname to real hostnames
- - group segments to their hosts
- - identify host configuration groups
-
- These steps are omitted because they require probing the
- network, which can be expensive an large clusters. We will
- eventually need to bite the bullet and perform some of those
- tests, but if there are other errors that crop up before we
- need that it is better not to have wasted time.
-
- Do not fear, the class is still fully functional. The above
- information will be fetched the first time it is required.
-
- Usage:
- options = ParseOptions()
- X = GPFilespace(options)
- '''
-
- self.options = options
- self.fspacename = None # name for the filespace
- self.dir = options.output
- self.master = None # [] list of master segments
- self.segments = None # {} dbid => GpDB resolution
- self.hosts = None # {} hostname => GPHost resolution
- self.nics = None # {} nic => hostname resolution
- self.config = None # {} host configuration groups
- self.db = None # pygresql.pgobject
- self.dburl = None
-
- self.pool = None
- self.array = None
- self.hostcache = None
-
- self.default_fsysname = 'local'
- self.default_fsreplica = 3
-
- def Setup(self):
- '''
- GPFilespace::Setup()
-
- Handle initial setup, including spawning the threadpool.
-
- Once Setup() has been called it is imperitive that cleanup() be
- called on termination.
- '''
- self.connect()
- self.get_configuration()
- #self.resolve_hostnames()
-
- # --------------------------------------------------------------------
- def cleanup(self):
- '''
- GPFilespace::cleanup()
-
- Close all active connections, shutdown threads, etc.
- '''
- if self.pool:
- logger.debug('halting threads')
- self.pool.haltWork()
- self.pool = None
- self.close_conn()
-
- # --------------------------------------------------------------------
- def connect(self):
- '''
- GPFilespace::connect()
-
- Establishes a connection to the database as specified in the options.
- Checks that the database user is a database superuser.
-
- throws:
- GPDatabaseError - failure communicating with the database
- GPPrivilegeError - database user is not a database superuser
-
- Usage:
- X = GPFilespace(options)
- connection = X.connect()
- '''
-
- # If we are already connected simply return
- if self.db:
- return
-
- try:
- user = self.options.username
- host = self.options.host
- port = self.options.port
- db = 'template1'
- if self.options.password:
- password = getpass("Password: ")
- else:
- password = None
- dburl = dbconn.DbURL(username=user, hostname=host,
- port=port, dbname=db, password=password)
- conn = dbconn.connect(dburl)
-
- q = "SELECT usesuper FROM pg_user where usename = user"
- rows = catalog.basicSQLExec(conn, q)
- is_super = rows[0][0]
-
- except Exception, e:
- raise GPDatabaseError(str(e))
-
- if not is_super:
- error = "gpfilespace requires database superuser privileges"
- raise GPPrivilegeError(error)
-
- self.dburl = dburl
- self.db = conn
-
- # --------------------------------------------------------------------
- def close_conn(self):
- if self.db:
- logger.debug('closing database connection')
- self.db.close()
- self.db = None
-
- # --------------------------------------------------------------------
- def get_configuration(self):
- '''
- GPFilespace::get_configuration()
-
- Extracts gp_configuration from the database and uses it to populate:
- self.segments = hash of segments {dbid: GpDB()}
- self.master = list of master segments [GpDB()]
-
- If any existing configuration groups or host lists are present they
- will be obliterated, but we will keep any existing nic->hostname
- mappings.
-
- Usage:
- X = GPFilespace(options)
- X.get_configuration()
- '''
-
- # We should already have a connection, but if not establish one.
- self.connect()
-
- try:
- logger.info("getting config")
- self.array = GpArray.initFromCatalog(self.dburl)
- except Exception, e:
- raise GPDatabaseError(str(e))
-
- # These have pointers to the old GpDB so they must be destroyed
- # before we populate the new structures.
- self.master = []
- self.segments = {}
- self.hosts = None
- self.config = None
-
- # Setup a workerpool, used by the hostcache
- hosts = set(self.array.get_hostlist())
- workers = min(DEFAULT_NUM_WORKERS, len(hosts))
- self.pool = WorkerPool(numWorkers = workers)
-
- # Setup a hostcache
- self.hostcache = \
- gphostcache.GpHostCache(self.array, self.pool, withMasters=True)
-
- # Loop through configuration and add segments to datastructures
- # We save the costly hostname lookup step until later.
- for seg in self.array.getDbList():
-
- # Keep the primary master at the front of the list
- if seg.isSegmentMaster():
- self.master.insert(0, seg)
- elif seg.isSegmentStandby(): self.master.append(seg)
-
- # Segment hash includes primary, mirror, and master segments
- self.segments[seg.getSegmentDbId()] = seg
-
- logger.debug('Configuration contains %d segments' % len(self.segments))
-
-
- # --------------------------------------------------------------------
- def resolve_hostnames(self):
- '''
- resolve_hostnames()
-
- For every segment in the configuration we make sure that we have
- a real hostname available for that segment. In order to determine true
- configurations we need to resolve the NICs stored in gp_configuration
- into distinct hostnames so that we can unambiguously tell which segments
- are on the same host.
- '''
-
- # Should have already established the configuration
- if not self.segments:
- self.get_configuration()
-
- # If we haven't already looked up any hosts then create an empty
- # nic lookup dictionary, otherwise we can use existing nic->hostname
- # lookups.
- if not self.nics:
- self.nics = {}
-
- # The actual GPHost records themselves must always be recalculated.
- self.hosts = {}
-
- # This really shouldn't happen, but on the off chance that there
- # are no segments in gp_configuration this error is cleaner than
- # a division by zero.
- if not self.segments or len(self.segments) == 0:
- raise GPError("gp_configuration is empty")
-
- # Todo: Replace existing GPHost with more compatable gppylib interfaces
- # for the moment populate our local versions based on a GpHostCache
- for host in self.hostcache.get_hosts():
- for db in host.dbs:
- seg = self.segments[db.getSegmentDbId()]
- nic = seg.getSegmentAddress()
-
- if nic not in self.nics:
- hostname = host.hostname
- self.nics[nic] = hostname
- else:
- hostname = self.nics[nic]
-
- # build the host => segment list mapping
- if not hostname in self.hosts:
- self.hosts[hostname] = GPHost(nic, hostname)
- self.hosts[hostname].add_segment(seg)
-
- logger.debug('nics: ' + str(self.nics))
- logger.debug('hosts: ' + str(self.hosts))
-
- # --------------------------------------------------------------------
- def group_configurations(self):
- '''
- group_configurations()
-
- For all of the hosts in the filespaces we group them into distinct
- "configurations" of simalar shape - all have the same number of
- primaries/mirrors in the same locations.
- '''
- configs = {}
- for h in self.hosts:
- host = self.hosts[h]
-
- # Make a configuration for this host, this resolves the messy
- # distinct identification of the configuration
- c = GPConfig(host)
-
- # Lookup the identity in our existing configurations, if it
- # matches use it, otherwise enter this as a new configuration.
- if c.identity not in configs:
- configs[c.identity] = c
- config = configs[c.identity]
-
- # Finally add the host to the configuration
- config.add_host(host)
-
- return configs
-
-
- # --------------------------------------------------------------------
- def do_showfilespace(self, file_type, user, pswd):
- """
- Displays the filespace that is currently being used by
- transactions and temporary files.
-
- @param file_type: FileType.TRANSACTION_FILES or FileType.TEMPORARY_FILES
- @return: None
- """
- logger.info('Getting filespace information for %s' % FileType.lookup[file_type])
-
- if not CheckSuperUser(dbconn.DbURL(username=user, password=pswd)).run():
- raise MoveFilespaceError('gpfilespace requires database superuser privileges')
-
- gparray = GpArray.initFromCatalog(dbconn.DbURL()) if self.array is None else self.array
-
- if not CheckFilespaceConsistency(gparray, file_type).run():
- raise MoveFilespaceError('Filespace is inconsistent')
-
- filespace_entries = GetCurrentFilespaceEntries(gparray, file_type).run()
- filespace_name = gparray.getFileSpaceName(int(filespace_entries[0][0]))
- logger.info('Current Filespace for %s is %s' % (FileType.lookup[file_type], filespace_name))
- for entry in filespace_entries:
- logger.info(str(entry[1]) + ' ' + entry[2])
-
- # --------------------------------------------------------------------
- def generate_config(self):
- '''
- generate_config()
-
- This function coordinates user inputs into the generation of a
- filespace configuration file. It reads in paths from STDIN, performs
- so simple sanity checks on them and produces a config file.
-
- It does NOT exhaustively check all hosts that the directories are
- valid, but it does check a small sample of hosts. This is on the
- assumption that the segments will have been maintained together
- and that validating can be expensive on large clusters. A more
- exhaustive check will be performed when a user attempts to actually
- create the filespace using the generated config file.
- '''
-
- # determine output path location based on input directory.
- filename = "gpfilespace_config_" + strftime("%Y%m%d_%H%M%S")
- if not os.access(self.dir, os.F_OK):
- (self.dir, filename) = os.path.split(self.dir)
- if len(self.dir) == 0:
- self.dir = os.getcwd()
- if not os.access(self.dir, os.F_OK):
- logger.error('No such file or directory: "%s"' % self.dir)
- logger.error('Unable to create config file')
- sys.exit(1)
- if not os.access(self.dir, os.W_OK):
- logger.error('Directory not writeable: "%s"' % self.dir)
- logger.error('Unable to create config file')
- sys.exit(1)
- filename = os.path.join(self.dir, filename)
-
- #question = "Would you like to configure a new Filespace? (Y|N)"
- #if not getbool("> ", question, "Y"):
- # raise KeyboardInterrupt()
-
- # Prompt for the filespace name:
- fspacename = ""
- question = "Enter a name for this filespace"
- while not isValidIdentifier(fspacename):
- fspacename = getstring("> ", question)
-
- logger.debug("Filespace Name: %s" % fspacename)
-
- # fsysname and fsreplica can be skipped here, this will let us read the
- # old config file easily.
- fsysname = None
- if self.options.filesystem == self.default_fsysname:
- fsysname = self.default_fsysname
- fsreplica = self.default_fsreplica
- elif self.options.filesystem is not None:
- if self.options.filesystem in [fsys.getName() for fsys in self.array.getFilesystem()]:
- fsysname = self.options.filesystem
- fsreplica = 0
- else:
- fsysname = None
-
- if fsysname == None:
- fsysname = ""
- file_system_names = []
- # there is no local file system anymore.
- # file_system_names.append(self.default_fsysname)
- file_system_names.extend([fsys.getName() for fsys in self.array.getFilesystem()])
- question = "Available filesystem name:\n"
- for name in file_system_names:
- question += "filesystem: %s\n" % name
- question += "Choose filesystem name for this filespace\n"
- while not isValidIdentifier(fsysname) or fsysname not in file_system_names:
- fsysname = getstring("> ", question)
-
- logger.debug("Filesystem Type: %s" % fsysname)
-
- # get replica if filesystem type is not local
- fsreplica = -1
- if fsysname == self.default_fsysname:
- fsreplica = self.default_fsreplica
- else:
- question = "Enter replica num for filespace. If 0, default replica num is used"
- while fsreplica < 0:
- repstr = getstring("> ", question, "3")
- try:
- fsreplica = int(repstr)
- except (ValueError), e:
- fsreplica = -1
- if fsreplica < 0:
- print "replica num must be a positive integer or 0\n"
-
- logger.debug("Filespace Replica Num: %d" % fsreplica)
-
- # Search through the existing datadirectories in order to find
- # sets of hosts that have similar directory layouts for the segments.
- #
- # We expect this to return a single configuration for most of our
- # customers, but unusual gpexpand or recovery scenarios may have
- # left some hosts with unusual configurations.
- #
- #print "\nChecking your configuration:"
- #configurations = self.group_configurations()
- #for id in configurations:
- #config = configurations[id]
- #nhosts = len(config.hosts)
- #nprimary = len(config.primaries)
- #nmirror = len(config.mirrors)
- #print "Your system has %d hosts with %d primary and %d mirror " \
- #"segments per host." % (nhosts, nprimary, nmirror)
-
- #shared_filespace = True
- ## For every configuration we need to setup paths for the filespace
- ## Loop through each one and prompt for paths.
- #for id in configurations:
- #config = configurations[id]
- #nhosts = len(config.hosts)
- #nprimary = len(config.primaries)
- #nmirror = len(config.mirrors)
-
- #logger.debug("Configuring hosts: " + str(config.hosts))
- #print "\nConfiguring hosts: " + str(config.hosts)
-
- #question = "Should all of the nodes be configured identically? (Y|N)"
- #if not getbool("> ", question, "Y"):
- # logger.debug("configuring hosts as mixed configuration")
- # raise GPError("mixed configurations not yet supported")
- #else:
- logger.debug("configuring all hosts identically")
-
- # Get primary segment locations, if needed
- #
- # Note: it is theoretically possible for gpexpand to create
- # some hosts with "mirror only" configurations.
- #if (nprimary > 0):
- print "\nPlease specify the DFS location for the filespace (for example: localhost:9000/fs)"
- prompt = "location> "
- dir = getdir(prompt, config.hosts, primary=True, shared=shared_filespace, fsysn=fsysname, db=self.db)
- #for i in range(nprimary):
- #prompt = "primary location %d> " % (i+1)
- #config.set_primary(dir)
- logger.debug("%s %s" % (prompt, dir))
-
- # Get mirror segment locations, if needed
- #if (nmirror > 0 and not shared_filespace):
- #print "\nPlease specify %s locations for the mirror " \
- #"segments, one per line:" % nmirror
- #for i in range(nmirror):
- #prompt = "mirror location %d> " % (i+1)
- #dir = getdir(prompt, config.hosts, primary=False, shared=shared_filespace, fsysn=fsysname, db=self.db)
- #config.set_mirror(dir)
- #logger.debug("%s %s" % (prompt, dir))
-
- # Get a location for the master, this directory applies both to
- # the master and the standby master, like a regular datadir.
- #hostname = self.nics[self.master[0].getSegmentAddress()]
- #masterhost = self.hosts[hostname]
- ## Master/standby will store all data/file on local
- #if shared_filespace:
- ## try to figure out a dir
- #masterdir = os.path.abspath(self.array.master.getSegmentDataDirectory() + "/../" + "hdfs_" + fspacename)
- #masterdir_prefix = masterdir
- #id = 0
- #while os.path.exists(masterdir):
- #masterdir = masterdir_prefix + str(id)
- #id += 1
- #else:
- #os.mkdir(masterdir)
- ## create dir for standby
- #if self.array.standbyMaster is not None:
- #standby_host = self.array.standbyMaster.getSegmentHostName()
- #pool = WorkerPool()
- #cmdStr = "mkdir -p %s" % (masterdir)
- #cmd = Command("gpfilespace mkdir for standby", cmdStr, REMOTE, standby_host)
- #pool.addCommand(cmd)
- #pool.join()
- #items = pool.getCompletedItems()
- #for i in items:
- #if i.results.rc:
- #logger.error("%s: %s" % (i.remoteHost, i.results.stderr))
- #print "gpfilespace create standby master directory for shared storage failed"
- #return
- #else:
- #print "\nEnter a file system location for the master"
- #prompt = "master location> "
- #masterdir = getdir(prompt, [masterhost], fsysn=fsysname, db=self.db)
- #logger.debug("%s %s" % (prompt, masterdir))
-
- # Write out the information to a configuration file
- logger.debug("Creating Config File: %s" % filename)
- logger.info("Creating configuration file...")
-
- f = open(filename, 'w')
- f.write('filespace:%s\n' % fspacename)
- f.write('fsysname:%s\n' % fsysname)
- f.write('fsreplica:%d\n' % fsreplica)
-
- # This helper will build a dict to get correct primary datadir name.
- # key: content_id
- # value: primary seg data structure
- def buildDictForPrimarySegFromMirrorSeg(array):
- helperDict = {}
- for seg in array.segments:
- # Get GpDB object from Segment object.
- seg = seg.primaryDB
- # Need the real primary not the current primary!
- if seg.getSegmentPreferredRole() == 'm':
- continue
- helperDict[seg.getSegmentContentId()] = seg
-
- return helperDict
-
- def writeseg(seg, loc):
- name = os.path.split(seg.getSegmentDataDirectory())[1]
- f.write( canonicalize_address( seg.getSegmentHostName() ) )
- f.write(':')
- f.write(str(seg.dbid))
- f.write(':')
- f.write(canonicalize_address(os.path.join(loc, name))) # new prefix needed?
- f.write('\n')
-
- # Write out location for master nodes
- for seg in self.master:
- writeseg(seg, masterdir)
-
- # Loop through all of our configurations
- mirrorLookupPrimaryDict = buildDictForPrimarySegFromMirrorSeg(self.array)
- for id in configurations:
- config = configurations[id]
-
- # For every host in the current configuration loop through the
- # hosts in that configuration
- for host in config.hosts:
- # Write out the primary segments for this host
- primaries = host.primaries()
- for i in range(len(primaries)):
- writeseg(primaries[i], config.pdest[i])
-
- # Write out the mirror segments for this host
- mirrors = host.mirrors()
- for i in range(len(mirrors)):
- if shared_filespace:
- writeseg(mirrorLookupPrimaryDict[mirrors[i].getSegmentContentId()], config.mdest[i])
- else:
- writeseg(mirrors[i], config.mdest[i])
-
- f.close()
- logger.info("[created]")
- logger.info("""
-To add this filespace to the database please run the command:
- gpfilespace --config %s
-""" % filename)
-
- return None
-
- # --------------------------------------------------------------------
- def execute_config(self):
- '''
- execute_config()
-
- Reads a configuration file from the filesystem and attempts to create
- the specified filespace within the database.
-
- This first involves checking that we expect the create filespace
- command to succeed. This is an important step since a failure of a
- mirror to create the filespace may result in the mirror failing
- completely.
- '''
-
- # Read the configuration file passed as input (-c filename)
- try:
- print "Reading Configuration file: '%s'" % self.options.config
- config = open(self.options.config)
- except IOError, e:
- raise GPError(str(e))
-
- # First line in file is the filespace name, remaining lines are
- # specify hostname, dbid, and a path:
- #
- # filespace:name
- # [fsysname:type]
- # [fsreplica:replica]
- # hostname:dbid:path
- # ...
- fspacename = None
- fsysname = None
- fsreplica = None
- for lineno, line in line_reader(config):
-
- processed = False
- if fspacename is None:
- fspacename = parse_fspacename(self.options.config, lineno, line)
- if not isValidIdentifier(fspacename):
- raise GPError('Invalid filespace name: %s' % fspacename)
- continue
-
- # From here are the options may not be appeared in the config file,
- # process these options need to be carefully.
- #
- # If the expected option is not in the config file, do not continue
- # to process next line.
- if fsysname is None:
- fsysname = parse_fspacesys(self.options.config, lineno, line)
- if fsysname is None:
- if self.options.filesystem != self.default_fsysname:
- logger.warn("detected an old style file or an invalid file")
- logger.warn("no fsysname at line %d: default fsysname is '%s'" % (lineno, self.default_fsysname))
- fsysname = self.default_fsysname
- elif fsysname == 'local':
- raise GPError('Invalid filesystem name: %s' % fsysname)
- elif not isValidIdentifier(fsysname):
- processed = True
- raise GPError('Invalid filesystem name, should be a valid filesystem name: %s' % fsysname)
- else:
- processed = True
- if processed:
- continue
-
- if fsreplica is None:
- repstr = parse_fspacereplica(self.options.config, lineno, line)
- if repstr is None:
- if self.options.filesystem != self.default_fsysname:
- logger.warn("detected an old style file or an invalid file")
- logger.warn("no fsreplica at line %d: default fsreplica is '%s'" % (lineno, self.default_fsreplica))
- fsreplica = self.default_fsreplica
- else:
- processed = True
- fsreplica = -1
- try:
- fsreplica = int(repstr)
- if fsreplica < 0:
- raise ValueError()
- except (ValueError), e:
- raise GPError('Invalid filespace replica num: %s' %repstr)
- if processed:
- continue
-
- host, dbid, path = parse_gpfilespace_line(self.options.config, lineno, line)
- try:
- dbid = int(dbid)
- except:
- raise GPError("Invalid dbid at line %d: %s" % (lineno, line))
-
- if dbid not in self.segments:
- raise GPError("Segment not found in cluster at line %d: %s"
- % (lineno, line))
- seg = self.segments[dbid]
-
- # Assign the path for the segment, will throw GPError if the
- # segment already has a path associated with it.
- try:
- seg.addSegmentFilespace(None, path)
- except GPError, e:
- raise GPError("Duplicate path set for dbid %d at line %d: %s"
- % (dbid, lineno, line))
-
- if not fspacename:
- raise GPError("Filespace name not specified")
-
- if not fsysname:
- raise GPError("Filespace type not specified")
-
- # Loop through the segments to validate that we now have a mapping
- # for every segment!
- for dbid in self.segments:
- seg = self.segments[dbid]
- if not seg.getSegmentPendingFilespace():
- raise GPError("No path specified for dbid: %d" % seg.dbid)
-
- logger.debug('nics: ' + str(self.nics))
- logger.debug('hosts: ' + str(self.hosts))
-
-
- # Progress indicator:
- logger.info("Performing validation on paths")
- progress = 0
- done = 0.0
- each = 78.0 / len(self.hosts)
-
- # Perform exhaustive checking of paths:
- # - This may take some time on large clusters
- # - raises GPError on failure
- for h in self.hosts:
- for i in range(CLUSTERSIZE): ## DEBUG: mimic a large cluster
- if fsysname == self.default_fsysname:
- self.hosts[h].validate_paths()
-
- # progress reporting
- done += each
- while (done > progress):
- progress += 1
- sys.stdout.write(".")
- sys.stdout.flush()
- sys.stdout.write("\n\n")
- sys.stdout.flush()
-
- # Everything checks out, generate the SQL and execute it
- q = "\nCREATE FILESPACE %s ON %s \n(\n" % (fspacename, fsysname)
- dbids = self.segments.keys()
- dbids.sort()
- for dbid in dbids:
- q += " %d: '%s',\n" % (dbid, self.segments[dbid].getSegmentPendingFilespace())
-
- # We have an extra "," in the above, strip it out and add in
- # the end paren instead
- q = q[0:-2] + "\n)"
-
- if fsysname != self.default_fsysname and fsreplica is not None and fsreplica > 0:
- q += " WITH (NUMREPLICA = %d)" % (fsreplica)
-
- q += ";"
-
- logger.debug(q);
-
- logger.info("Connecting to database")
- self.connect()
- try:
- cursor = dbconn.execSQL(self.db, q)
- self.db.commit() # Should move into execSQL ?
- except Exception, e:
- raise GPDatabaseError(str(e))
-
- logger.info('Filespace "%s" successfully created' % fspacename)
- sys.stdout.flush()
-
-
-# ============================================================================
-# main()
-# ============================================================================
-logger = None
-
-if __name__ == '__main__':
- coverage = GpCoverage()
- coverage.start()
-
- opt = ParseOptions()
-
- if opt.verbose:
- enable_verbose_logging()
- if opt.quiet:
- quiet_stdout_logging()
-
- if opt.password:
- opt.password = getpass('Password: ')
- else:
- opt.password = None
-
- logger = setup_tool_logging(EXECNAME, opt.host, opt.username, opt.logdir)
- try:
- logger.debug("Start, config=" + str(opt.config))
- logger.info("""
-A tablespace requires a file system location to store its database
-files. A filespace is a collection of file system locations for all components
-in a Greenplum system (primary segment, mirror segment and master instances).
-Once a filespace is created, it can be used by one or more tablespaces.
-
-""")
- if opt.movefilespace:
- if opt.movefilespace == 'default':
- opt.movefilespace = "dfs_system"
-
- if opt.location is None or len(opt.location) == 0:
- usage()
- logger.error("Target location is not set");
- sys.exit(1)
-
- logger.warn('PLEASE STOP DATABASE AND BACKUP MASTER DATA DIRECTORY IN CASE OF LOSING DATA!')
-
- question = "I understand the risk and already backuped master data directory? (Y|N)"
- if not getbool("> ", question, "N"):
- raise KeyboardInterrupt()
-
- MoveFileSpaceLocation(opt.movefilespace, opt.location, opt.username, opt.password).run()
-
- logger.info("Move filespace successfully.")
- logger.info("IF STANDBY MASTER IS CONFIGURED, PLEASE REMOVE IT AND INITIALIZE A NEW ONE!")
- elif opt.movetransfilespace:
- logger.fatal('movetransfilespace option NOT SUPPORTED YET IN GPSQL')
- sys.exit(1)
- if opt.movetransfilespace == 'default':
- opt.movetransfilespace = PG_SYSTEM_FILESPACE
- MoveFilespace(opt.movetransfilespace, FileType.TRANSACTION_FILES, opt.username, opt.password).run()
- elif opt.movetempfilespace:
- logger.fatal('movetempfilespace option NOT SUPPORTED YET IN GPSQL')
- sys.exit(1)
- if opt.movetempfilespace == 'default':
- opt.movetempfilespace = PG_SYSTEM_FILESPACE
- MoveFilespace(opt.movetempfilespace, FileType.TEMPORARY_FILES, opt.username, opt.password).run()
- else:
- gpf = GPFilespace(opt)
- if opt.showtransfilespace:
- logger.fatal('showtransfilespace option NOT SUPPORTED YET IN GPSQL')
- sys.exit(1)
- gpf.do_showfilespace(FileType.TRANSACTION_FILES, opt.username, opt.password)
- elif opt.showtempfilespace:
- logger.fatal('showtempfilespace option NOT SUPPORTED YET IN GPSQL')
- sys.exit(1)
- gpf.do_showfilespace(FileType.TEMPORARY_FILES, opt.username, opt.password)
- else:
- try:
- gpf.Setup()
- if opt.config == None:
- gpf.generate_config()
- else:
- gpf.execute_config()
- finally:
- gpf.cleanup()
-
- logger.debug("Done")
- sys.exit(0)
-
- except MoveFilespaceError, e:
- logger.error(e)
- sys.exit(1)
- # An expected error
- except GPError, e:
- try:
- # Cast to unicode using UTF-8 to work around issues with the
- # logging module:
- # logger.error("퀙퀠퀩퀘ㄲㄸㅉㅃ12ㅁavb") => UnicodeDecodeError
- err = unicode(str(e), 'UTF-8')
- logger.error(err)
- logger.debug("exit(1)")
- except BaseException, e:
- sys.stderr.write("[Error] %s\n" % str(e))
- sys.stderr.write(traceback.format_exc())
- sys.stderr.flush()
- sys.exit(1)
-
- # User cancelation
- except KeyboardInterrupt, e:
- try:
- logger.info('[Cancelled]')
- except BaseException, e:
- sys.stderr.write("error %s\n" % str(e))
- sys.exit(1)
-
- # Let SystemExit exceptions through
- except SystemExit, e:
- try:
- logger.debug('exit(%s)' % str(e))
- except BaseException, e:
- sys.stderr.write("error %s\n" % str(e))
- raise e
-
- # Catch anything else - shouldn't ever occur
- except BaseException, e:
- try:
- err = '[%s]: "%s"' % (type(e).__name__, unicode(str(e), 'UTF-8'))
- logger.error(err)
- logger.debug(traceback.format_exc())
- logger.debug("exit(1)")
-
- sys.stderr.write('[Error] See "%s" for details\n' % get_logfile())
- except BaseException, e:
- sys.stderr.write("error %s\n" % str(e))
- sys.exit(1)
- finally:
- coverage.stop()
- coverage.generate_report()