You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/10/31 03:09:37 UTC

[2/2] incubator-hawq git commit: HAWQ-955. Add runnable scripts for feature test running in parallel.

HAWQ-955. Add runnable scripts for feature test running in parallel.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/3c2f5d2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/3c2f5d2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/3c2f5d2f

Branch: refs/heads/master
Commit: 3c2f5d2fb5c449b552e26f09d6e5806639a55ac4
Parents: d3f4826
Author: xunzhang <xu...@gmail.com>
Authored: Mon Sep 12 13:36:12 2016 +0800
Committer: amyrazz44 <ab...@pivotal.io>
Committed: Mon Oct 31 10:57:49 2016 +0800

----------------------------------------------------------------------
 src/test/feature/README.md                    |   4 +-
 src/test/feature/gtest-parallel               | 411 +++++++++++++++++++++
 src/test/feature/parallel-run-feature-test.sh |  54 +++
 src/test/feature/test_main.cpp                |  74 ----
 4 files changed, 468 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3c2f5d2f/src/test/feature/README.md
----------------------------------------------------------------------
diff --git a/src/test/feature/README.md b/src/test/feature/README.md
index 2e27f97..737dd2b 100644
--- a/src/test/feature/README.md
+++ b/src/test/feature/README.md
@@ -15,7 +15,7 @@ Before building the code of feature tests part, just make sure your compiler sup
 1. Make sure HAWQ is running correctly. If not, `init` or `start` HAWQ at first. Note please don't set locale related arguments for hawq init.
 2. Load environment configuration by running `source $INSTALL_PREFIX/greenplum_path.sh`.
 3. Load hdfs configuration. For example, `export HADOOP_HOME=/Users/wuhong/hadoop-2.7.2 && export PATH=${PATH}:${HADOOP_HOME}/bin`. Since some test cases need `hdfs` and `hadoop` command, just ensure these commands work before running. Otherwise you will get failure.
-4. Run `./feature-test`, you could use `--gtest_filter` option to filter test cases(both positive and negative patterns are supported). Please see more options by running `./feature-test --help`.
+4. Run the cases with`./parallel-run-feature-test.sh 8 ./feature-test`(in this case 8 threads in parallel), you could use `--gtest_filter` option to filter test cases(both positive and negative patterns are supported). Please see more options by running `./feature-test --help`.
 
 # Development
 In contribution to HAWQ, we suggest developers submitting feature tests related to your feature development. In writting a featurte test, you need to write a cpp file inside corresponding folders. There are two recommended way to write this cpp file:
@@ -32,6 +32,8 @@ If some cases failed in your environment, check it out with the generated `.diff
 
 There are some cases expected to be fail in specific environment which need to be fixed later on. Don't worry about that.
 
+To run feature tests in parallel, make sure your python version is equal to or greater than 2.7.
+
 # Reference
 [HAWQ-832](https://issues.apache.org/jira/browse/HAWQ-832)
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3c2f5d2f/src/test/feature/gtest-parallel
----------------------------------------------------------------------
diff --git a/src/test/feature/gtest-parallel b/src/test/feature/gtest-parallel
new file mode 100755
index 0000000..9b1f9ee
--- /dev/null
+++ b/src/test/feature/gtest-parallel
@@ -0,0 +1,411 @@
+#!/usr/bin/env python2
+# Copyright 2013 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import cPickle
+import errno
+import gzip
+import multiprocessing
+import optparse
+import os
+import signal
+import subprocess
+import sys
+import tempfile
+import thread
+import threading
+import time
+import zlib
+
+# An object that catches SIGINT sent to the Python process and notices
+# if processes passed to wait() die by SIGINT (we need to look for
+# both of those cases, because pressing Ctrl+C can result in either
+# the main process or one of the subprocesses getting the signal).
+#
+# Before a SIGINT is seen, wait(p) will simply call p.wait() and
+# return the result. Once a SIGINT has been seen (in the main process
+# or a subprocess, including the one the current call is waiting for),
+# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
+class SigintHandler(object):
+  class ProcessWasInterrupted(Exception): pass
+  sigint_returncodes = {-signal.SIGINT,  # Unix
+                        -1073741510,     # Windows
+                        }
+  def __init__(self):
+    self.__lock = threading.Lock()
+    self.__processes = set()
+    self.__got_sigint = False
+    signal.signal(signal.SIGINT, self.__sigint_handler)
+  def __on_sigint(self):
+    self.__got_sigint = True
+    while self.__processes:
+      try:
+        self.__processes.pop().terminate()
+      except OSError:
+        pass
+  def __sigint_handler(self, signal_num, frame):
+    with self.__lock:
+      self.__on_sigint()
+  def got_sigint(self):
+    with self.__lock:
+      return self.__got_sigint
+  def wait(self, p):
+    with self.__lock:
+      if self.__got_sigint:
+        p.terminate()
+      self.__processes.add(p)
+    code = p.wait()
+    with self.__lock:
+      self.__processes.discard(p)
+      if code in self.sigint_returncodes:
+        self.__on_sigint()
+      if self.__got_sigint:
+        raise self.ProcessWasInterrupted
+    return code
+sigint_handler = SigintHandler()
+
+# Return the width of the terminal, or None if it couldn't be
+# determined (e.g. because we're not being run interactively).
+def term_width(out):
+  if not out.isatty():
+    return None
+  try:
+    p = subprocess.Popen(["stty", "size"],
+                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    (out, err) = p.communicate()
+    if p.returncode != 0 or err:
+      return None
+    return int(out.split()[1])
+  except (IndexError, OSError, ValueError):
+    return None
+
+# Output transient and permanent lines of text. If several transient
+# lines are written in sequence, the new will overwrite the old. We
+# use this to ensure that lots of unimportant info (tests passing)
+# won't drown out important info (tests failing).
+class Outputter(object):
+  def __init__(self, out_file):
+    self.__out_file = out_file
+    self.__previous_line_was_transient = False
+    self.__width = term_width(out_file)  # Line width, or None if not a tty.
+  def transient_line(self, msg):
+    if self.__width is None:
+      self.__out_file.write(msg + "\n")
+    else:
+      self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
+      self.__previous_line_was_transient = True
+  def flush_transient_output(self):
+    if self.__previous_line_was_transient:
+      self.__out_file.write("\n")
+      self.__previous_line_was_transient = False
+  def permanent_line(self, msg):
+    self.flush_transient_output()
+    self.__out_file.write(msg + "\n")
+
+stdout_lock = threading.Lock()
+
+class FilterFormat:
+  if sys.stdout.isatty():
+    # stdout needs to be unbuffered since the output is interactive.
+    sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
+
+  out = Outputter(sys.stdout)
+  total_tests = 0
+  finished_tests = 0
+
+  tests = {}
+  outputs = {}
+  failures = []
+
+  def print_test_status(self, last_finished_test, time_ms):
+    self.out.transient_line("[%d/%d] %s (%d ms)"
+                            % (self.finished_tests, self.total_tests,
+                               last_finished_test, time_ms))
+
+  def handle_meta(self, job_id, args):
+    (command, arg) = args.split(' ', 1)
+    if command == "TEST":
+      (binary, test) = arg.split(' ', 1)
+      self.tests[job_id] = (binary, test.strip())
+    elif command == "EXIT":
+      (exit_code, time_ms) = [int(x) for x in arg.split(' ', 1)]
+      self.finished_tests += 1
+      (binary, test) = self.tests[job_id]
+      self.print_test_status(test, time_ms)
+      if exit_code != 0:
+        self.failures.append(self.tests[job_id])
+        with open(self.outputs[job_id]) as f:
+          for line in f.readlines():
+            self.out.permanent_line(line.rstrip())
+        self.out.permanent_line(
+          "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
+          % (self.finished_tests, self.total_tests, test, exit_code, time_ms))
+    elif command == "TESTCNT":
+      self.total_tests = int(arg.split(' ', 1)[1])
+      self.out.transient_line("[0/%d] Running tests..." % self.total_tests)
+
+  def logfile(self, job_id, name):
+    self.outputs[job_id] = name
+
+  def log(self, line):
+    stdout_lock.acquire()
+    (prefix, output) = line.split(' ', 1)
+
+    assert prefix[-1] == ':'
+    self.handle_meta(int(prefix[:-1]), output)
+    stdout_lock.release()
+
+  def end(self):
+    if self.failures:
+      self.out.permanent_line("FAILED TESTS (%d/%d):"
+                              % (len(self.failures), self.total_tests))
+      for (binary, test) in self.failures:
+        self.out.permanent_line(" " + binary + ": " + test)
+    self.out.flush_transient_output()
+
+class RawFormat:
+  def log(self, line):
+    stdout_lock.acquire()
+    sys.stdout.write(line + "\n")
+    sys.stdout.flush()
+    stdout_lock.release()
+  def logfile(self, job_id, name):
+    with open(self.outputs[job_id]) as f:
+      for line in f.readlines():
+        self.log(str(job_id) + '> ' + line.rstrip())
+  def end(self):
+    pass
+
+# Record of test runtimes. Has built-in locking.
+class TestTimes(object):
+  def __init__(self, save_file):
+    "Create new object seeded with saved test times from the given file."
+    self.__times = {}  # (test binary, test name) -> runtime in ms
+
+    # Protects calls to record_test_time(); other calls are not
+    # expected to be made concurrently.
+    self.__lock = threading.Lock()
+
+    try:
+      with gzip.GzipFile(save_file, "rb") as f:
+        times = cPickle.load(f)
+    except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
+      # File doesn't exist, isn't readable, is malformed---whatever.
+      # Just ignore it.
+      return
+
+    # Discard saved times if the format isn't right.
+    if type(times) is not dict:
+      return
+    for ((test_binary, test_name), runtime) in times.items():
+      if (type(test_binary) is not str or type(test_name) is not str
+          or type(runtime) not in {int, long, type(None)}):
+        return
+
+    self.__times = times
+
+  def get_test_time(self, binary, testname):
+    """Return the last duration for the given test as an integer number of
+    milliseconds, or None if the test failed or if there's no record for it."""
+    return self.__times.get((binary, testname), None)
+
+  def record_test_time(self, binary, testname, runtime_ms):
+    """Record that the given test ran in the specified number of
+    milliseconds. If the test failed, runtime_ms should be None."""
+    with self.__lock:
+      self.__times[(binary, testname)] = runtime_ms
+
+  def write_to_file(self, save_file):
+    "Write all the times to file."
+    try:
+      with open(save_file, "wb") as f:
+        with gzip.GzipFile("", "wb", 9, f) as gzf:
+          cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
+    except IOError:
+      pass  # ignore errors---saving the times isn't that important
+
+# Remove additional arguments (anything after --).
+additional_args = []
+
+for i in range(len(sys.argv)):
+  if sys.argv[i] == '--':
+    additional_args = sys.argv[i+1:]
+    sys.argv = sys.argv[:i]
+    break
+
+parser = optparse.OptionParser(
+    usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
+
+parser.add_option('-d', '--output_dir', type='string',
+                  default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
+                  help='output directory for test logs')
+parser.add_option('-r', '--repeat', type='int', default=1,
+                  help='repeat tests')
+parser.add_option('--failed', action='store_true', default=False,
+                  help='run only failed and new tests')
+parser.add_option('-w', '--workers', type='int',
+                  default=multiprocessing.cpu_count(),
+                  help='number of workers to spawn')
+parser.add_option('--gtest_color', type='string', default='yes',
+                  help='color output')
+parser.add_option('--gtest_filter', type='string', default='',
+                  help='test filter')
+parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
+                  default=False, help='run disabled tests too')
+parser.add_option('--format', type='string', default='filter',
+                  help='output format (raw,filter)')
+parser.add_option('--print_test_times', action='store_true', default=False,
+                  help='When done, list the run time of each test')
+
+(options, binaries) = parser.parse_args()
+
+if binaries == []:
+  parser.print_usage()
+  sys.exit(1)
+
+logger = RawFormat()
+if options.format == 'raw':
+  pass
+elif options.format == 'filter':
+  logger = FilterFormat()
+else:
+  sys.exit("Unknown output format: " + options.format)
+
+# Find tests.
+save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
+times = TestTimes(save_file)
+tests = []
+for test_binary in binaries:
+  command = [test_binary]
+  if options.gtest_also_run_disabled_tests:
+    command += ['--gtest_also_run_disabled_tests']
+
+  list_command = list(command)
+  if options.gtest_filter != '':
+    list_command += ['--gtest_filter=' + options.gtest_filter]
+
+  try:
+    test_list = subprocess.Popen(list_command + ['--gtest_list_tests'],
+                                 stdout=subprocess.PIPE).communicate()[0]
+  except OSError as e:
+    sys.exit("%s: %s" % (test_binary, str(e)))
+
+  command += additional_args
+
+  test_group = ''
+  for line in test_list.split('\n'):
+    if not line.strip():
+      continue
+    if line[0] != " ":
+      # Remove comments for typed tests and strip whitespace.
+      test_group = line.split('#')[0].strip()
+      continue
+    # Remove comments for parameterized tests and strip whitespace.
+    line = line.split('#')[0].strip()
+    if not line:
+      continue
+
+    test = test_group + line
+    if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test:
+      continue
+    tests.append((times.get_test_time(test_binary, test),
+                  test_binary, test, command))
+
+if options.failed:
+  # The first element of each entry is the runtime of the most recent
+  # run if it was successful, or None if the test is new or the most
+  # recent run failed.
+  tests = [x for x in tests if x[0] is None]
+
+# Sort tests by falling runtime (with None, which is what we get for
+# new and failing tests, being considered larger than any real
+# runtime).
+tests.sort(reverse=True, key=lambda x: ((1 if x[0] is None else 0), x))
+
+# Repeat tests (-r flag).
+tests *= options.repeat
+test_lock = threading.Lock()
+job_id = 0
+logger.log(str(-1) + ': TESTCNT ' + ' ' + str(len(tests)))
+
+exit_code = 0
+
+# Create directory for test log output.
+try:
+  os.makedirs(options.output_dir)
+except OSError as e:
+  # Ignore errors if this directory already exists.
+  if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
+    raise e
+# Remove files from old test runs.
+for logfile in os.listdir(options.output_dir):
+  os.remove(os.path.join(options.output_dir, logfile))
+
+# Run the specified job. Return the elapsed time in milliseconds if
+# the job succeeds, or None if the job fails. (This ensures that
+# failing tests will run first the next time.)
+def run_job((command, job_id, test)):
+  begin = time.time()
+
+  with tempfile.NamedTemporaryFile(dir=options.output_dir, delete=False) as log:
+    sub = subprocess.Popen(command + ['--gtest_filter=' + test] +
+                             ['--gtest_color=' + options.gtest_color],
+                           stdout=log.file,
+                           stderr=log.file)
+    try:
+      code = sigint_handler.wait(sub)
+    except sigint_handler.ProcessWasInterrupted:
+      thread.exit()
+    runtime_ms = int(1000 * (time.time() - begin))
+    logger.logfile(job_id, log.name)
+
+  logger.log("%s: EXIT %s %d" % (job_id, code, runtime_ms))
+  if code == 0:
+    return runtime_ms
+  global exit_code
+  exit_code = code
+  return None
+
+def worker():
+  global job_id
+  while True:
+    job = None
+    test_lock.acquire()
+    if job_id < len(tests):
+      (_, test_binary, test, command) = tests[job_id]
+      logger.log(str(job_id) + ': TEST ' + test_binary + ' ' + test)
+      job = (command, job_id, test)
+    job_id += 1
+    test_lock.release()
+    if job is None:
+      return
+    times.record_test_time(test_binary, test, run_job(job))
+
+def start_daemon(func):
+  t = threading.Thread(target=func)
+  t.daemon = True
+  t.start()
+  return t
+
+workers = [start_daemon(worker) for i in range(options.workers)]
+
+[t.join() for t in workers]
+logger.end()
+times.write_to_file(save_file)
+if options.print_test_times:
+  ts = sorted((times.get_test_time(test_binary, test), test_binary, test)
+              for (_, test_binary, test, _) in tests
+              if times.get_test_time(test_binary, test) is not None)
+  for (time_ms, test_binary, test) in ts:
+    print "%8s %s" % ("%dms" % time_ms, test)
+sys.exit(-signal.SIGINT if sigint_handler.got_sigint() else exit_code)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3c2f5d2f/src/test/feature/parallel-run-feature-test.sh
----------------------------------------------------------------------
diff --git a/src/test/feature/parallel-run-feature-test.sh b/src/test/feature/parallel-run-feature-test.sh
new file mode 100755
index 0000000..80fc9e9
--- /dev/null
+++ b/src/test/feature/parallel-run-feature-test.sh
@@ -0,0 +1,54 @@
+#! /bin/bash
+
+if [ x$GPHOME == 'x' ]; then
+  echo "Please source greenplum_path.sh before running feature tests."
+  exit 0
+fi
+
+PSQL=${GPHOME}/bin/psql
+HAWQ_DB=${PGDATABASE:-"postgres"}
+HAWQ_HOST=${PGHOST:-"localhost"}
+HAWQ_PORT=${PGPORT:-"5432"}
+HAWQ_USER=${PGUSER:-}
+HAWQ_PASSWORD=${PGPASSWORD:-}
+
+run_sql() {
+  suffix=""
+  if [ x$HAWQ_USER != 'x' ]; then
+    suffix=$suffix:" -U $HAWQ_USER"
+    if [ x$HAWQ_PASSWORD != 'x' ]; then
+      suffix=$suffix:" -W $HAWQ_PASSWORD"
+    fi
+  fi
+  $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT -c "$1" $suffix > /dev/null 2>&1
+  if [ $? -ne 0 ]; then
+    echo "$1 failed."
+    exit 1
+  fi
+}
+
+init_hawq_test() {
+  TEST_DB_NAME="hawq_feature_test_db"
+  
+  $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT \
+        -c "create database $TEST_DB_NAME;" > /dev/null 2>&1
+  run_sql "alter database $TEST_DB_NAME set lc_messages to 'C';"
+  run_sql "alter database $TEST_DB_NAME set lc_monetary to 'C';"
+  run_sql "alter database $TEST_DB_NAME set lc_numeric to 'C';"
+  run_sql "alter database $TEST_DB_NAME set lc_time to 'C';"
+  run_sql "alter database $TEST_DB_NAME set timezone_abbreviations to 'Default';"
+  run_sql "alter database $TEST_DB_NAME set timezone to 'PST8PDT';"
+  run_sql "alter database $TEST_DB_NAME set datestyle to 'postgres,MDY';"
+  export PGDATABASE=$TEST_DB_NAME
+}
+
+run_feature_test() {
+  if [ $# -lt 2 ]; then
+    echo "Usage: parallel-run-feature-test.sh workers [number of workers] binary [binary ...] -- [additional args]" 
+    exit 0
+  fi
+  init_hawq_test
+  python $(dirname $0)/gtest-parallel --worker=$1 ${@:2}
+}
+
+run_feature_test $1 ${@:2}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/3c2f5d2f/src/test/feature/test_main.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/test_main.cpp b/src/test/feature/test_main.cpp
old mode 100644
new mode 100755
index 41142ec..443e2db
--- a/src/test/feature/test_main.cpp
+++ b/src/test/feature/test_main.cpp
@@ -1,80 +1,6 @@
-#include <sys/types.h>
-#include <pwd.h>
 #include "gtest/gtest.h"
-#include "psql.h"
-#include "sql_util.h"
-
-using std::string;
-
-class TestPrepare
-{
-  private:
-    const string testDbName = "hawq_feature_test";
-    std::unique_ptr<hawq::test::PSQL> conn;
-	void init_hawq_test();
-  public:
-    TestPrepare();
-    ~TestPrepare();
-};
-
-#define PSQL_RUN_AND_ASSERT() \
-  conn->runSQLCommand(cmd); \
-  ASSERT_EQ(0, conn->getLastStatus()) << conn->getLastResult();
-
-void TestPrepare::init_hawq_test()
-{
-  string user = HAWQ_USER;
-  if(user.empty()) {
-    struct passwd *pw;
-    uid_t uid = geteuid();
-    pw = getpwuid(uid);
-    user.assign(pw->pw_name);
-  }
-
-  conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user, HAWQ_PASSWORD));
-
-  // Create the test db and set some default guc values so that test outputs
-  // could be consistent. We do not drop the database in advance since keeping the
-  // previous environment could probably help reproducing and resolving some failing
-  // test issues, so you need to drop the database yourself when necessary, before
-  // running the tests.
-  string cmd;
-  cmd  = "create database " + testDbName;
-  // Do not check return value since probably the db has existed.
-  conn->runSQLCommand(cmd);
-  cmd  = "alter database " + testDbName + " set lc_messages to 'C'";
-  PSQL_RUN_AND_ASSERT();
-  cmd  = "alter database " + testDbName + " set lc_monetary to 'C'";
-  PSQL_RUN_AND_ASSERT();
-  cmd  = "alter database " + testDbName + " set lc_numeric to 'C'";
-  PSQL_RUN_AND_ASSERT();
-  cmd  = "alter database " + testDbName + " set lc_time to 'C'";
-  PSQL_RUN_AND_ASSERT();
-  cmd  = "alter database " + testDbName + " set timezone_abbreviations to 'Default'";
-  PSQL_RUN_AND_ASSERT();
-  cmd  = "alter database " + testDbName + " set timezone to 'PST8PDT'";
-  PSQL_RUN_AND_ASSERT();
-  cmd  = "alter database " + testDbName + " set datestyle to 'postgres,MDY'";
-  PSQL_RUN_AND_ASSERT();
-}
-
-TestPrepare::TestPrepare()
-{
-  init_hawq_test();
-
-  // The test will use the newly created database.
-  setenv("PGDATABASE", testDbName.c_str(), 1);
-}
-
-TestPrepare::~TestPrepare()
-{
-}
 
 int main(int argc, char** argv) {
-
-  TestPrepare tp;
-
   ::testing::InitGoogleTest(&argc, argv);
-
   return RUN_ALL_TESTS();
 }