You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2016/08/10 08:52:04 UTC

incubator-hawq git commit: Revert "HAWQ-955. Add scriptS for feature test running in parallel."

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 4bbd4a15d -> faae87a5e


Revert "HAWQ-955. Add scriptS for feature test running in parallel."

This reverts commit 2097cee2179dafb93b06881501263fb076dd700f.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/faae87a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/faae87a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/faae87a5

Branch: refs/heads/master
Commit: faae87a5e018a204f363aca7a2c81cd16fb08a01
Parents: 4bbd4a1
Author: rlei <rl...@pivotal.io>
Authored: Wed Aug 10 16:51:31 2016 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Wed Aug 10 16:51:31 2016 +0800

----------------------------------------------------------------------
 src/test/feature/gtest-parallel               | 411 ---------------------
 src/test/feature/parallel-run-feature-test.sh |  54 ---
 src/test/feature/test_main.cpp                |  74 ++++
 3 files changed, 74 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/faae87a5/src/test/feature/gtest-parallel
----------------------------------------------------------------------
diff --git a/src/test/feature/gtest-parallel b/src/test/feature/gtest-parallel
deleted file mode 100755
index 9b1f9ee..0000000
--- a/src/test/feature/gtest-parallel
+++ /dev/null
@@ -1,411 +0,0 @@
-#!/usr/bin/env python2
-# Copyright 2013 Google Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import cPickle
-import errno
-import gzip
-import multiprocessing
-import optparse
-import os
-import signal
-import subprocess
-import sys
-import tempfile
-import thread
-import threading
-import time
-import zlib
-
-# An object that catches SIGINT sent to the Python process and notices
-# if processes passed to wait() die by SIGINT (we need to look for
-# both of those cases, because pressing Ctrl+C can result in either
-# the main process or one of the subprocesses getting the signal).
-#
-# Before a SIGINT is seen, wait(p) will simply call p.wait() and
-# return the result. Once a SIGINT has been seen (in the main process
-# or a subprocess, including the one the current call is waiting for),
-# wait(p) will call p.terminate() and raise ProcessWasInterrupted.
-class SigintHandler(object):
-  class ProcessWasInterrupted(Exception): pass
-  sigint_returncodes = {-signal.SIGINT,  # Unix
-                        -1073741510,     # Windows
-                        }
-  def __init__(self):
-    self.__lock = threading.Lock()
-    self.__processes = set()
-    self.__got_sigint = False
-    signal.signal(signal.SIGINT, self.__sigint_handler)
-  def __on_sigint(self):
-    self.__got_sigint = True
-    while self.__processes:
-      try:
-        self.__processes.pop().terminate()
-      except OSError:
-        pass
-  def __sigint_handler(self, signal_num, frame):
-    with self.__lock:
-      self.__on_sigint()
-  def got_sigint(self):
-    with self.__lock:
-      return self.__got_sigint
-  def wait(self, p):
-    with self.__lock:
-      if self.__got_sigint:
-        p.terminate()
-      self.__processes.add(p)
-    code = p.wait()
-    with self.__lock:
-      self.__processes.discard(p)
-      if code in self.sigint_returncodes:
-        self.__on_sigint()
-      if self.__got_sigint:
-        raise self.ProcessWasInterrupted
-    return code
-sigint_handler = SigintHandler()
-
-# Return the width of the terminal, or None if it couldn't be
-# determined (e.g. because we're not being run interactively).
-def term_width(out):
-  if not out.isatty():
-    return None
-  try:
-    p = subprocess.Popen(["stty", "size"],
-                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-    (out, err) = p.communicate()
-    if p.returncode != 0 or err:
-      return None
-    return int(out.split()[1])
-  except (IndexError, OSError, ValueError):
-    return None
-
-# Output transient and permanent lines of text. If several transient
-# lines are written in sequence, the new will overwrite the old. We
-# use this to ensure that lots of unimportant info (tests passing)
-# won't drown out important info (tests failing).
-class Outputter(object):
-  def __init__(self, out_file):
-    self.__out_file = out_file
-    self.__previous_line_was_transient = False
-    self.__width = term_width(out_file)  # Line width, or None if not a tty.
-  def transient_line(self, msg):
-    if self.__width is None:
-      self.__out_file.write(msg + "\n")
-    else:
-      self.__out_file.write("\r" + msg[:self.__width].ljust(self.__width))
-      self.__previous_line_was_transient = True
-  def flush_transient_output(self):
-    if self.__previous_line_was_transient:
-      self.__out_file.write("\n")
-      self.__previous_line_was_transient = False
-  def permanent_line(self, msg):
-    self.flush_transient_output()
-    self.__out_file.write(msg + "\n")
-
-stdout_lock = threading.Lock()
-
-class FilterFormat:
-  if sys.stdout.isatty():
-    # stdout needs to be unbuffered since the output is interactive.
-    sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
-
-  out = Outputter(sys.stdout)
-  total_tests = 0
-  finished_tests = 0
-
-  tests = {}
-  outputs = {}
-  failures = []
-
-  def print_test_status(self, last_finished_test, time_ms):
-    self.out.transient_line("[%d/%d] %s (%d ms)"
-                            % (self.finished_tests, self.total_tests,
-                               last_finished_test, time_ms))
-
-  def handle_meta(self, job_id, args):
-    (command, arg) = args.split(' ', 1)
-    if command == "TEST":
-      (binary, test) = arg.split(' ', 1)
-      self.tests[job_id] = (binary, test.strip())
-    elif command == "EXIT":
-      (exit_code, time_ms) = [int(x) for x in arg.split(' ', 1)]
-      self.finished_tests += 1
-      (binary, test) = self.tests[job_id]
-      self.print_test_status(test, time_ms)
-      if exit_code != 0:
-        self.failures.append(self.tests[job_id])
-        with open(self.outputs[job_id]) as f:
-          for line in f.readlines():
-            self.out.permanent_line(line.rstrip())
-        self.out.permanent_line(
-          "[%d/%d] %s returned/aborted with exit code %d (%d ms)"
-          % (self.finished_tests, self.total_tests, test, exit_code, time_ms))
-    elif command == "TESTCNT":
-      self.total_tests = int(arg.split(' ', 1)[1])
-      self.out.transient_line("[0/%d] Running tests..." % self.total_tests)
-
-  def logfile(self, job_id, name):
-    self.outputs[job_id] = name
-
-  def log(self, line):
-    stdout_lock.acquire()
-    (prefix, output) = line.split(' ', 1)
-
-    assert prefix[-1] == ':'
-    self.handle_meta(int(prefix[:-1]), output)
-    stdout_lock.release()
-
-  def end(self):
-    if self.failures:
-      self.out.permanent_line("FAILED TESTS (%d/%d):"
-                              % (len(self.failures), self.total_tests))
-      for (binary, test) in self.failures:
-        self.out.permanent_line(" " + binary + ": " + test)
-    self.out.flush_transient_output()
-
-class RawFormat:
-  def log(self, line):
-    stdout_lock.acquire()
-    sys.stdout.write(line + "\n")
-    sys.stdout.flush()
-    stdout_lock.release()
-  def logfile(self, job_id, name):
-    with open(self.outputs[job_id]) as f:
-      for line in f.readlines():
-        self.log(str(job_id) + '> ' + line.rstrip())
-  def end(self):
-    pass
-
-# Record of test runtimes. Has built-in locking.
-class TestTimes(object):
-  def __init__(self, save_file):
-    "Create new object seeded with saved test times from the given file."
-    self.__times = {}  # (test binary, test name) -> runtime in ms
-
-    # Protects calls to record_test_time(); other calls are not
-    # expected to be made concurrently.
-    self.__lock = threading.Lock()
-
-    try:
-      with gzip.GzipFile(save_file, "rb") as f:
-        times = cPickle.load(f)
-    except (EOFError, IOError, cPickle.UnpicklingError, zlib.error):
-      # File doesn't exist, isn't readable, is malformed---whatever.
-      # Just ignore it.
-      return
-
-    # Discard saved times if the format isn't right.
-    if type(times) is not dict:
-      return
-    for ((test_binary, test_name), runtime) in times.items():
-      if (type(test_binary) is not str or type(test_name) is not str
-          or type(runtime) not in {int, long, type(None)}):
-        return
-
-    self.__times = times
-
-  def get_test_time(self, binary, testname):
-    """Return the last duration for the given test as an integer number of
-    milliseconds, or None if the test failed or if there's no record for it."""
-    return self.__times.get((binary, testname), None)
-
-  def record_test_time(self, binary, testname, runtime_ms):
-    """Record that the given test ran in the specified number of
-    milliseconds. If the test failed, runtime_ms should be None."""
-    with self.__lock:
-      self.__times[(binary, testname)] = runtime_ms
-
-  def write_to_file(self, save_file):
-    "Write all the times to file."
-    try:
-      with open(save_file, "wb") as f:
-        with gzip.GzipFile("", "wb", 9, f) as gzf:
-          cPickle.dump(self.__times, gzf, cPickle.HIGHEST_PROTOCOL)
-    except IOError:
-      pass  # ignore errors---saving the times isn't that important
-
-# Remove additional arguments (anything after --).
-additional_args = []
-
-for i in range(len(sys.argv)):
-  if sys.argv[i] == '--':
-    additional_args = sys.argv[i+1:]
-    sys.argv = sys.argv[:i]
-    break
-
-parser = optparse.OptionParser(
-    usage = 'usage: %prog [options] binary [binary ...] -- [additional args]')
-
-parser.add_option('-d', '--output_dir', type='string',
-                  default=os.path.join(tempfile.gettempdir(), "gtest-parallel"),
-                  help='output directory for test logs')
-parser.add_option('-r', '--repeat', type='int', default=1,
-                  help='repeat tests')
-parser.add_option('--failed', action='store_true', default=False,
-                  help='run only failed and new tests')
-parser.add_option('-w', '--workers', type='int',
-                  default=multiprocessing.cpu_count(),
-                  help='number of workers to spawn')
-parser.add_option('--gtest_color', type='string', default='yes',
-                  help='color output')
-parser.add_option('--gtest_filter', type='string', default='',
-                  help='test filter')
-parser.add_option('--gtest_also_run_disabled_tests', action='store_true',
-                  default=False, help='run disabled tests too')
-parser.add_option('--format', type='string', default='filter',
-                  help='output format (raw,filter)')
-parser.add_option('--print_test_times', action='store_true', default=False,
-                  help='When done, list the run time of each test')
-
-(options, binaries) = parser.parse_args()
-
-if binaries == []:
-  parser.print_usage()
-  sys.exit(1)
-
-logger = RawFormat()
-if options.format == 'raw':
-  pass
-elif options.format == 'filter':
-  logger = FilterFormat()
-else:
-  sys.exit("Unknown output format: " + options.format)
-
-# Find tests.
-save_file = os.path.join(os.path.expanduser("~"), ".gtest-parallel-times")
-times = TestTimes(save_file)
-tests = []
-for test_binary in binaries:
-  command = [test_binary]
-  if options.gtest_also_run_disabled_tests:
-    command += ['--gtest_also_run_disabled_tests']
-
-  list_command = list(command)
-  if options.gtest_filter != '':
-    list_command += ['--gtest_filter=' + options.gtest_filter]
-
-  try:
-    test_list = subprocess.Popen(list_command + ['--gtest_list_tests'],
-                                 stdout=subprocess.PIPE).communicate()[0]
-  except OSError as e:
-    sys.exit("%s: %s" % (test_binary, str(e)))
-
-  command += additional_args
-
-  test_group = ''
-  for line in test_list.split('\n'):
-    if not line.strip():
-      continue
-    if line[0] != " ":
-      # Remove comments for typed tests and strip whitespace.
-      test_group = line.split('#')[0].strip()
-      continue
-    # Remove comments for parameterized tests and strip whitespace.
-    line = line.split('#')[0].strip()
-    if not line:
-      continue
-
-    test = test_group + line
-    if not options.gtest_also_run_disabled_tests and 'DISABLED_' in test:
-      continue
-    tests.append((times.get_test_time(test_binary, test),
-                  test_binary, test, command))
-
-if options.failed:
-  # The first element of each entry is the runtime of the most recent
-  # run if it was successful, or None if the test is new or the most
-  # recent run failed.
-  tests = [x for x in tests if x[0] is None]
-
-# Sort tests by falling runtime (with None, which is what we get for
-# new and failing tests, being considered larger than any real
-# runtime).
-tests.sort(reverse=True, key=lambda x: ((1 if x[0] is None else 0), x))
-
-# Repeat tests (-r flag).
-tests *= options.repeat
-test_lock = threading.Lock()
-job_id = 0
-logger.log(str(-1) + ': TESTCNT ' + ' ' + str(len(tests)))
-
-exit_code = 0
-
-# Create directory for test log output.
-try:
-  os.makedirs(options.output_dir)
-except OSError as e:
-  # Ignore errors if this directory already exists.
-  if e.errno != errno.EEXIST or not os.path.isdir(options.output_dir):
-    raise e
-# Remove files from old test runs.
-for logfile in os.listdir(options.output_dir):
-  os.remove(os.path.join(options.output_dir, logfile))
-
-# Run the specified job. Return the elapsed time in milliseconds if
-# the job succeeds, or None if the job fails. (This ensures that
-# failing tests will run first the next time.)
-def run_job((command, job_id, test)):
-  begin = time.time()
-
-  with tempfile.NamedTemporaryFile(dir=options.output_dir, delete=False) as log:
-    sub = subprocess.Popen(command + ['--gtest_filter=' + test] +
-                             ['--gtest_color=' + options.gtest_color],
-                           stdout=log.file,
-                           stderr=log.file)
-    try:
-      code = sigint_handler.wait(sub)
-    except sigint_handler.ProcessWasInterrupted:
-      thread.exit()
-    runtime_ms = int(1000 * (time.time() - begin))
-    logger.logfile(job_id, log.name)
-
-  logger.log("%s: EXIT %s %d" % (job_id, code, runtime_ms))
-  if code == 0:
-    return runtime_ms
-  global exit_code
-  exit_code = code
-  return None
-
-def worker():
-  global job_id
-  while True:
-    job = None
-    test_lock.acquire()
-    if job_id < len(tests):
-      (_, test_binary, test, command) = tests[job_id]
-      logger.log(str(job_id) + ': TEST ' + test_binary + ' ' + test)
-      job = (command, job_id, test)
-    job_id += 1
-    test_lock.release()
-    if job is None:
-      return
-    times.record_test_time(test_binary, test, run_job(job))
-
-def start_daemon(func):
-  t = threading.Thread(target=func)
-  t.daemon = True
-  t.start()
-  return t
-
-workers = [start_daemon(worker) for i in range(options.workers)]
-
-[t.join() for t in workers]
-logger.end()
-times.write_to_file(save_file)
-if options.print_test_times:
-  ts = sorted((times.get_test_time(test_binary, test), test_binary, test)
-              for (_, test_binary, test, _) in tests
-              if times.get_test_time(test_binary, test) is not None)
-  for (time_ms, test_binary, test) in ts:
-    print "%8s %s" % ("%dms" % time_ms, test)
-sys.exit(-signal.SIGINT if sigint_handler.got_sigint() else exit_code)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/faae87a5/src/test/feature/parallel-run-feature-test.sh
----------------------------------------------------------------------
diff --git a/src/test/feature/parallel-run-feature-test.sh b/src/test/feature/parallel-run-feature-test.sh
deleted file mode 100755
index 903773c..0000000
--- a/src/test/feature/parallel-run-feature-test.sh
+++ /dev/null
@@ -1,54 +0,0 @@
-#! /bin/bash
-
-if [ x$GPHOME == 'x' ]; then
-  echo "Please source greenplum_path.sh before running feature tests."
-  exit 0
-fi
-
-PSQL=${GPHOME}/bin/psql
-HAWQ_DB=${PGDATABASE:-"postgres"}
-HAWQ_HOST=${PGHOST:-"localhost"}
-HAWQ_PORT=${PGPORT:-"5432"}
-HAWQ_USER=${PGUSER:-}
-HAWQ_PASSWORD=${PGPASSWORD:-}
-
-run_sql() {
-  suffix=""
-  if [ x$HAWQ_USER != 'x' ]; then
-    suffix=$suffix:" -U $HAWQ_USER"
-    if [ x$HAWQ_PASSWORD != 'x' ]; then
-      suffix=$suffix:" -W $HAWQ_PASSWORD"
-    fi
-  fi
-  $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT -c "$1" $suffix > /dev/null 2>&1
-  if [ $? -ne 0 ]; then
-    echo "$1 failed."
-    exit 1
-  fi
-}
-
-init_hawq_test() {
-  TEST_DB_NAME="hawq_feature_test_db"
-  
-  $PSQL -d $HAWQ_DB -h $HAWQ_HOST -p $HAWQ_PORT \
-        -c "create database $TEST_DB_NAME;" > /dev/null 2>&1
-  run_sql "alter database $TEST_DB_NAME set lc_messages to 'C';"
-  run_sql "alter database $TEST_DB_NAME set lc_monetary to 'C';"
-  run_sql "alter database $TEST_DB_NAME set lc_numeric to 'C';"
-  run_sql "alter database $TEST_DB_NAME set lc_time to 'C';"
-  run_sql "alter database $TEST_DB_NAME set timezone_abbreviations to 'Default';"
-  run_sql "alter database $TEST_DB_NAME set timezone to 'PST8PDT';"
-  run_sql "alter database $TEST_DB_NAME set datestyle to 'postgres,MDY';"
-  PGDATABASE=$TEST_DB_NAME
-}
-
-run_feature_test() {
-  if [ $# -lt 2 ]; then
-    echo "Usage: parallel-run-feature-test.sh workers [number of workers] binary [binary ...] -- [additional args]" 
-    exit 0
-  fi
-  init_hawq_test
-  $(dirname $0)/gtest-parallel --worker=$1 ${@:2}
-}
-
-run_feature_test $1 ${@:2}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/faae87a5/src/test/feature/test_main.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/test_main.cpp b/src/test/feature/test_main.cpp
index 443e2db..41142ec 100644
--- a/src/test/feature/test_main.cpp
+++ b/src/test/feature/test_main.cpp
@@ -1,6 +1,80 @@
+#include <sys/types.h>
+#include <pwd.h>
 #include "gtest/gtest.h"
+#include "psql.h"
+#include "sql_util.h"
+
+using std::string;
+
+class TestPrepare
+{
+  private:
+    const string testDbName = "hawq_feature_test";
+    std::unique_ptr<hawq::test::PSQL> conn;
+	void init_hawq_test();
+  public:
+    TestPrepare();
+    ~TestPrepare();
+};
+
+#define PSQL_RUN_AND_ASSERT() \
+  conn->runSQLCommand(cmd); \
+  ASSERT_EQ(0, conn->getLastStatus()) << conn->getLastResult();
+
+void TestPrepare::init_hawq_test()
+{
+  string user = HAWQ_USER;
+  if(user.empty()) {
+    struct passwd *pw;
+    uid_t uid = geteuid();
+    pw = getpwuid(uid);
+    user.assign(pw->pw_name);
+  }
+
+  conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user, HAWQ_PASSWORD));
+
+  // Create the test db and set some default guc values so that test outputs
+  // could be consistent. We do not drop the database in advance since keeping the
+  // previous environment could probably help reproducing and resolving some failing
+  // test issues, so you need to drop the database yourself when necessary, before
+  // running the tests.
+  string cmd;
+  cmd  = "create database " + testDbName;
+  // Do not check return value since probably the db has existed.
+  conn->runSQLCommand(cmd);
+  cmd  = "alter database " + testDbName + " set lc_messages to 'C'";
+  PSQL_RUN_AND_ASSERT();
+  cmd  = "alter database " + testDbName + " set lc_monetary to 'C'";
+  PSQL_RUN_AND_ASSERT();
+  cmd  = "alter database " + testDbName + " set lc_numeric to 'C'";
+  PSQL_RUN_AND_ASSERT();
+  cmd  = "alter database " + testDbName + " set lc_time to 'C'";
+  PSQL_RUN_AND_ASSERT();
+  cmd  = "alter database " + testDbName + " set timezone_abbreviations to 'Default'";
+  PSQL_RUN_AND_ASSERT();
+  cmd  = "alter database " + testDbName + " set timezone to 'PST8PDT'";
+  PSQL_RUN_AND_ASSERT();
+  cmd  = "alter database " + testDbName + " set datestyle to 'postgres,MDY'";
+  PSQL_RUN_AND_ASSERT();
+}
+
+TestPrepare::TestPrepare()
+{
+  init_hawq_test();
+
+  // The test will use the newly created database.
+  setenv("PGDATABASE", testDbName.c_str(), 1);
+}
+
+TestPrepare::~TestPrepare()
+{
+}
 
 int main(int argc, char** argv) {
+
+  TestPrepare tp;
+
   ::testing::InitGoogleTest(&argc, argv);
+
   return RUN_ALL_TESTS();
 }