You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/08/10 08:52:04 UTC
incubator-hawq git commit: Revert "HAWQ-955. Add scriptS for feature
test running in parallel."
Repository: incubator-hawq
Updated Branches:
refs/heads/master 4bbd4a15d -> faae87a5e
Revert "HAWQ-955. Add scriptS for feature test running in parallel."
This reverts commit 2097cee2179dafb93b06881501263fb076dd700f.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/faae87a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/faae87a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/faae87a5
Branch: refs/heads/master
Commit: faae87a5e018a204f363aca7a2c81cd16fb08a01
Parents: 4bbd4a1
Author: rlei <rl...@pivotal.io>
Authored: Wed Aug 10 16:51:31 2016 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Wed Aug 10 16:51:31 2016 +0800
----------------------------------------------------------------------
src/test/feature/gtest-parallel | 411 ---------------------
src/test/feature/parallel-run-feature-test.sh | 54 ---
src/test/feature/test_main.cpp | 74 ++++
3 files changed, 74 insertions(+), 465 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/faae87a5/src/test/feature/gtest-parallel
----------------------------------------------------------------------
diff --git a/src/test/feature/gtest-parallel b/src/test/feature/gtest-parallel
deleted file mode 100755
index 9b1f9ee..0000000
--- a/src/test/feature/gtest-parallel
+++ /dev/null
@@ -1,411 +0,0 @@
-#!/usr/bin/env python2
-# Copyright 2013 Google Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import cPickle
-import errno
-import gzip
-import multiprocessing
-import optparse
-import os
-import signal
-import subprocess
-import sys
-import tempfile
-import thread
-import threading
-import time
-import zlib
-
-# An object that catches SIGINT sent to the Python process and notices
-# if processes passed to wait() die by SIGINT (we need to look for
-# both of those cases, because pressing Ctrl+C can result in either
-# the main process or one of the subprocesses getting the signal).
-#
-# Before a SIGINT is seen, wait(p) will simply call p.wait() and
-# return the result. Once a SIGINT has been seen (in the main process
-# or a subprocess, including the one the current call is waiting for),
-# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
-class SigintHandler(object):
- class ProcessWasInterrupted(Exception): pass
- sigint_returncodes = {-signal.SIGINT, # Unix
- -1073741510, # Windows
- }
- def __init__(self):
- self.__lock = threading.Lock()
- self.__processes = set()
- self.__got_sigint = False
- signal.signal(signal.SIGINT, self.__sigint_handler)
- def __on_sigint(self):
- self.__got_sigint = True
- while self.__processes:
- try:
- self.__processes.pop().terminate()
- except OSError:
- pass
- def __sigint_handler(self, signal_num, frame):
- with self.__lock:
- self.__on_sigint()
- def got_sigint(self):
- with self.__lock:
- return self.__got_sigint
- def wait(self, p):
- with self.__lock:
- if self.__got_sigint:
- p.terminate()
- self.__processes.add(p)
- code = p.wait()
- with self.__lock:
- self.__processes.discard(p)
- if code in self.sigint_returncodes:
- self.__on_sigint()
- if self.__got_sigint:
- raise self.ProcessWasInterrupted
- return code
-sigint_handler = SigintHandler()
-
-# Return the width of the terminal, or None if it couldn't be
-# determined (e.g. because we're not being run interactively).
-def term_width(out):
- if not out.isatty():
- return None
- try:
- p = subprocess.Popen(["stty", "size"],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- (out, err) = p.communicate()
- if p.returncode != 0 or err:
- return None
- return int(out.split()[1])
- except (IndexError, OSError, ValueError):
- return None
-
-# Output transient and permanent lines of text. If several transient
-# lines are written in sequence, the new will overwrite the old. We
-# use this to ensure that lots of unimportant info (tests passing)
-# won't drown out important info (tests failing).
-class Outputter(object):
- def __init__(self, out_file):
- self.__out_file = out_file
- self.__previous_line_was_transient = False
- self.__width = term_width(out_file) # Line width, or None if not a tty.
- def transient_line(self, msg):
- if self.__width is None:
- self.__out_file.write(msg + "\n")
- else:
- self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
- self.__previous_line_was_transient = True
- def flush_transient_output(self):
- if self.__previous_line_was_transient:
- self.__out_file.write("\n")
- self.__previous_line_was_transient = False
- def permanent_line(self, msg):
- self.flush_transient_output()
- self.__out_file.write(msg + "\n")
-
-stdout_lock = threading.Lock()
-
-class FilterFormat:
- if sys.stdout.isatty():
- # stdout needs to be unbuffered since the output is interactive.
- sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
-
- out = Outputter(sys.stdout)
- total_tests = 0
- finished_tests = 0
-
- tests = {}
- outputs = {}
- failures = []
-
- def print_test_status(self, last_finished_test, time_ms):
- self.out.transient_line("[%d/%d] %s (%d ms)"
- % (self.finished_tests, self.total_tests,
- last_finished_test, time_ms))
-
- def handle_meta(self, job_id, args):
- (command, arg) = args.split(' ', 1)
- if command == "TEST":
- (binary, test) = arg.split(' ', 1)
- self.tests[job_id] = (binary, test.strip())
- elif command == "EXIT":
- (exit_code, time_ms) = [int(x) for x in arg.split(' ', 1)]
- self.finished_tests += 1
- (binary, test) = self.tests[job_id]
- self.print_test_status(test, time_ms)
- if exit_code != 0:
- self.failures.append(self.tests[job_id])
- with open(self.outputs[job_id]) as f:
- for line in f.readlines():
- self.out.permanent_line(line.rstrip())
- self.out.permanent_line(
- "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
- % (self.finished_tests, self.total_tests, test, exit_code, time_ms))
- elif command == "TESTCNT":
- self.total_tests = int(arg.split(' ', 1)[1])
- self.out.transient_line("[0/%d] Running tests..." % self.total_tests)
-
- def logfile(self, job_id, name):
- self.outputs[job_id] = name
-
- def log(self, line):
- stdout_lock.acquire()
- (prefix, output) = line.split(' ', 1)
-
- assert prefix[-1] == ':'
- self.handle_meta(int(prefix[:-1]), output)
- stdout_lock.release()
-
- def end(self):
- if self.failures:
- self.out.permanent_line("FAILED TESTS (%d/%d):"
- % (len(self.failures), self.total_tests))
- for (binary, test) in self.failures:
- self.out.permanent_line(" " + binary + ": " + test)
- self.out.flush_transient_output()
-
-class RawFormat:
- def log(self, line):
- stdout_lock.acquire()
- sys.stdout.write(line + "\n")
- sys.stdout.flush()
- stdout_lock.release()
- def logfile(self, job_id, name):
- with open(self.outputs[job_id]) as f:
- for line in f.readlines():
- self.log(str(job_id) + '> ' + line.rstrip())
- def end(self):
- pass
-
-# Record of test runtimes. Has built-in locking.
-class TestTimes(object):
- def __init__(self, save_file):
- "Create new object seeded with saved test times from the given file."
- self.__times = {} # (test binary, test name) -> runtime in ms
-
- # Protects calls to record_test_time(); other calls are not
- # expected to be made concurrently.
- self.__lock = threading.Lock()
-
- try:
- with gzip.GzipFile(save_file, "rb") as f:
- times = cPickle.load(f)
- except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
- # File doesn't exist, isn't readable, is malformed---whatever.
- # Just ignore it.
- return
-
- # Discard saved times if the format isn't right.
- if type(times) is not dict:
- return
- for ((test_binary, test_name), runtime) in times.items():
- if (type(test_binary) is not str or type(test_name) is not str
- or type(runtime) not in {int, long, type(None)}):
- return
-
- self.__times = times
-
- def get_test_time(self, binary, testname):
- """Return the last duration for the given test as an integer number of
- milliseconds, or None if the test failed or if there's no record for it."""
- return self.__times.get((binary, testname), None)
-
- def record_test_time(self, binary, testname, runtime_ms):
- """Record that the given test ran in the specified number of
- milliseconds. If the test failed, runtime_ms should be None."""
- with self.__lock:
- self.__times[(binary, testname)] = runtime_ms
-
- def write_to_file(self, save_file):
- "Write all the times to file."
- try:
- with open(save_file, "wb") as f:
- with gzip.GzipFile("", "wb", 9, f) as gzf:
- cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
- except IOError:
- pass # ignore errors---saving the times isn't that important
-
-# Remove additional arguments (anything after --).
-additional_args = []
-
-for i in range(len(sys.argv)):
- if sys.argv[i] == '--':
- additional_args = sys.argv[i+1:]
- sys.argv = sys.argv[:i]
- break
-
-parser = optparse.OptionParser(
- usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
-
-parser.add_option('-d', '--output_dir', type='string',
- default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
- help='output directory for test logs')
-parser.add_option('-r', '--repeat', type='int', default=1,
- help='repeat tests')
-parser.add_option('--failed', action='store_true', default=False,
- help='run only failed and new tests')
-parser.add_option('-w', '--workers', type='int',
- default=multiprocessing.cpu_count(),
- help='number of workers to spawn')
-parser.add_option('--gtest_color', type='string', default='yes',
- help='color output')
-parser.add_option('--gtest_filter', type='string', default='',
- help='test filter')
-parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
- default=False, help='run disabled tests too')
-parser.add_option('--format', type='string', default='filter',
- help='output format (raw,filter)')
-parser.add_option('--print_test_times', action='store_true', default=False,
- help='When done, list the run time of each test')
-
-(options, binaries) = parser.parse_args()
-
-if binaries == []:
- parser.print_usage()
- sys.exit(1)
-
-logger = RawFormat()
-if options.format == 'raw':
- pass
-elif options.format == 'filter':
- logger = FilterFormat()
-else:
- sys.exit("Unknown output format: " + options.format)
-
-# Find tests.
-save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
-times = TestTimes(save_file)
-tests = []
-for test_binary in binaries:
- command = [test_binary]
- if options.gtest_also_run_disabled_tests:
- command += ['--gtest_also_run_disabled_tests']
-
- list_command = list(command)
- if options.gtest_filter != '':
- list_command += ['--gtest_filter=' + options.gtest_filter]
-
- try:
- test_list = subprocess.Popen(list_command + ['--gtest_list_tests'],
- stdout=subprocess.PIPE).communicate()[0]
- except OSError as e:
- sys.exit("%s: %s" % (test_binary, str(e)))
-
- command += additional_args
-
- test_group = ''
- for line in test_list.split('\n'):
- if not line.strip():
- continue
- if line[0] != " ":
- # Remove comments for typed tests and strip whitespace.
- test_group = line.split('#')[0].strip()
- continue
- # Remove comments for parameterized tests and strip whitespace.
- line = line.split('#')[0].strip()
- if not line:
- continue
-
- test = test_group + line
- if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test:
- continue
- tests.append((times.get_test_time(test_binary, test),
- test_binary, test, command))
-
-if options.failed:
- # The first element of each entry is the runtime of the most recent
- # run if it was successful, or None if the test is new or the most
- # recent run failed.
- tests = [x for x in tests if x[0] is None]
-
-# Sort tests by falling runtime (with None, which is what we get for
-# new and failing tests, being considered larger than any real
-# runtime).
-tests.sort(reverse=True, key=lambda x: ((1 if x[0] is None else 0), x))
-
-# Repeat tests (-r flag).
-tests *= options.repeat
-test_lock = threading.Lock()
-job_id = 0
-logger.log(str(-1) + ': TESTCNT ' + ' ' + str(len(tests)))
-
-exit_code = 0
-
-# Create directory for test log output.
-try:
- os.makedirs(options.output_dir)
-except OSError as e:
- # Ignore errors if this directory already exists.
- if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
- raise e
-# Remove files from old test runs.
-for logfile in os.listdir(options.output_dir):
- os.remove(os.path.join(options.output_dir, logfile))
-
-# Run the specified job. Return the elapsed time in milliseconds if
-# the job succeeds, or None if the job fails. (This ensures that
-# failing tests will run first the next time.)
-def run_job((command, job_id, test)):
- begin = time.time()
-
- with tempfile.NamedTemporaryFile(dir=options.output_dir, delete=False) as log:
- sub = subprocess.Popen(command + ['--gtest_filter=' + test] +
- ['--gtest_color=' + options.gtest_color],
- stdout=log.file,
- stderr=log.file)
- try:
- code = sigint_handler.wait(sub)
- except sigint_handler.ProcessWasInterrupted:
- thread.exit()
- runtime_ms = int(1000 * (time.time() - begin))
- logger.logfile(job_id, log.name)
-
- logger.log("%s: EXIT %s %d" % (job_id, code, runtime_ms))
- if code == 0:
- return runtime_ms
- global exit_code
- exit_code = code
- return None
-
-def worker():
- global job_id
- while True:
- job = None
- test_lock.acquire()
- if job_id < len(tests):
- (_, test_binary, test, command) = tests[job_id]
- logger.log(str(job_id) + ': TEST ' + test_binary + ' ' + test)
- job = (command, job_id, test)
- job_id += 1
- test_lock.release()
- if job is None:
- return
- times.record_test_time(test_binary, test, run_job(job))
-
-def start_daemon(func):
- t = threading.Thread(target=func)
- t.daemon = True
- t.start()
- return t
-
-workers = [start_daemon(worker) for i in range(options.workers)]
-
-[t.join() for t in workers]
-logger.end()
-times.write_to_file(save_file)
-if options.print_test_times:
- ts = sorted((times.get_test_time(test_binary, test), test_binary, test)
- for (_, test_binary, test, _) in tests
- if times.get_test_time(test_binary, test) is not None)
- for (time_ms, test_binary, test) in ts:
- print "%8s %s" % ("%dms" % time_ms, test)
-sys.exit(-signal.SIGINT if sigint_handler.got_sigint() else exit_code)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/faae87a5/src/test/feature/parallel-run-feature-test.sh
----------------------------------------------------------------------
diff --git a/src/test/feature/parallel-run-feature-test.sh b/src/test/feature/parallel-run-feature-test.sh
deleted file mode 100755
index 903773c..0000000
--- a/src/test/feature/parallel-run-feature-test.sh
+++ /dev/null
@@ -1,54 +0,0 @@
-#! /bin/bash
-
-if [ x$GPHOME == 'x' ]; then
- echo "Please source greenplum_path.sh before running feature tests."
- exit 0
-fi
-
-PSQL=${GPHOME}/bin/psql
-HAWQ_DB=${PGDATABASE:-"postgres"}
-HAWQ_HOST=${PGHOST:-"localhost"}
-HAWQ_PORT=${PGPORT:-"5432"}
-HAWQ_USER=${PGUSER:-}
-HAWQ_PASSWORD=${PGPASSWORD:-}
-
-run_sql() {
- suffix=""
- if [ x$HAWQ_USER != 'x' ]; then
- suffix=$suffix:" -U $HAWQ_USER"
- if [ x$HAWQ_PASSWORD != 'x' ]; then
- suffix=$suffix:" -W $HAWQ_PASSWORD"
- fi
- fi
- $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT -c "$1" $suffix > /dev/null 2>&1
- if [ $? -ne 0 ]; then
- echo "$1 failed."
- exit 1
- fi
-}
-
-init_hawq_test() {
- TEST_DB_NAME="hawq_feature_test_db"
-
- $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT \
- -c "create database $TEST_DB_NAME;" > /dev/null 2>&1
- run_sql "alter database $TEST_DB_NAME set lc_messages to 'C';"
- run_sql "alter database $TEST_DB_NAME set lc_monetary to 'C';"
- run_sql "alter database $TEST_DB_NAME set lc_numeric to 'C';"
- run_sql "alter database $TEST_DB_NAME set lc_time to 'C';"
- run_sql "alter database $TEST_DB_NAME set timezone_abbreviations to 'Default';"
- run_sql "alter database $TEST_DB_NAME set timezone to 'PST8PDT';"
- run_sql "alter database $TEST_DB_NAME set datestyle to 'postgres,MDY';"
- PGDATABASE=$TEST_DB_NAME
-}
-
-run_feature_test() {
- if [ $# -lt 2 ]; then
- echo "Usage: parallel-run-feature-test.sh workers [number of workers] binary [binary ...] -- [additional args]"
- exit 0
- fi
- init_hawq_test
- $(dirname $0)/gtest-parallel --worker=$1 ${@:2}
-}
-
-run_feature_test $1 ${@:2}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/faae87a5/src/test/feature/test_main.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/test_main.cpp b/src/test/feature/test_main.cpp
index 443e2db..41142ec 100644
--- a/src/test/feature/test_main.cpp
+++ b/src/test/feature/test_main.cpp
@@ -1,6 +1,80 @@
+#include <sys/types.h>
+#include <pwd.h>
#include "gtest/gtest.h"
+#include "psql.h"
+#include "sql_util.h"
+
+using std::string;
+
+class TestPrepare
+{
+ private:
+ const string testDbName = "hawq_feature_test";
+ std::unique_ptr<hawq::test::PSQL> conn;
+ void init_hawq_test();
+ public:
+ TestPrepare();
+ ~TestPrepare();
+};
+
+#define PSQL_RUN_AND_ASSERT() \
+ conn->runSQLCommand(cmd); \
+ ASSERT_EQ(0, conn->getLastStatus()) << conn->getLastResult();
+
+void TestPrepare::init_hawq_test()
+{
+ string user = HAWQ_USER;
+ if(user.empty()) {
+ struct passwd *pw;
+ uid_t uid = geteuid();
+ pw = getpwuid(uid);
+ user.assign(pw->pw_name);
+ }
+
+ conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user, HAWQ_PASSWORD));
+
+ // Create the test db and set some default guc values so that test outputs
+ // could be consistent. We do not drop the database in advance since keeping the
+ // previous environment could probably help reproducing and resolving some failing
+ // test issues, so you need to drop the database yourself when necessary, before
+ // running the tests.
+ string cmd;
+ cmd = "create database " + testDbName;
+ // Do not check return value since probably the db has existed.
+ conn->runSQLCommand(cmd);
+ cmd = "alter database " + testDbName + " set lc_messages to 'C'";
+ PSQL_RUN_AND_ASSERT();
+ cmd = "alter database " + testDbName + " set lc_monetary to 'C'";
+ PSQL_RUN_AND_ASSERT();
+ cmd = "alter database " + testDbName + " set lc_numeric to 'C'";
+ PSQL_RUN_AND_ASSERT();
+ cmd = "alter database " + testDbName + " set lc_time to 'C'";
+ PSQL_RUN_AND_ASSERT();
+ cmd = "alter database " + testDbName + " set timezone_abbreviations to 'Default'";
+ PSQL_RUN_AND_ASSERT();
+ cmd = "alter database " + testDbName + " set timezone to 'PST8PDT'";
+ PSQL_RUN_AND_ASSERT();
+ cmd = "alter database " + testDbName + " set datestyle to 'postgres,MDY'";
+ PSQL_RUN_AND_ASSERT();
+}
+
+TestPrepare::TestPrepare()
+{
+ init_hawq_test();
+
+ // The test will use the newly created database.
+ setenv("PGDATABASE", testDbName.c_str(), 1);
+}
+
+TestPrepare::~TestPrepare()
+{
+}
int main(int argc, char** argv) {
+
+ TestPrepare tp;
+
::testing::InitGoogleTest(&argc, argv);
+
return RUN_ALL_TESTS();
}