You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2019/03/18 07:49:44 UTC

[hawq] branch huor_cloud created (now cc3b64c)

This is an automated email from the ASF dual-hosted git repository.

huor pushed a change to branch huor_cloud
in repository https://gitbox.apache.org/repos/asf/hawq.git.


      at cc3b64c  Add test for cloud environment support

This branch includes the following new commits:

     new cc3b64c  Add test for cloud environment support

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hawq] 01/01: Add test for cloud environment support

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huor pushed a commit to branch huor_cloud
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit cc3b64c7283305f58086adb9675da564bd918527
Author: Ruilong Huo <hu...@apache.org>
AuthorDate: Mon Mar 18 15:43:11 2019 +0800

    Add test for cloud environment support
---
 src/test/feature/cloudtest/sql/normal/all.sql      | 101 +++
 .../feature/cloudtest/sql/normal/before_normal.sql |  41 ++
 .../cloudtest/sql/parallel/before_parallel.sql     |  99 +++
 .../feature/cloudtest/sql/parallel/parallel.sql    |  73 +++
 src/test/feature/cloudtest/test_cloud.cpp          | 271 ++++++++
 src/test/feature/cloudtest/test_cloud.h            | 143 +++++
 src/test/feature/cloudtest/test_cloud.xml          |  23 +
 src/test/feature/lib/parse_out.cpp                 | 167 +++++
 src/test/feature/lib/parse_out.h                   |  85 +++
 src/test/feature/lib/psql.cpp                      | 203 +++++-
 src/test/feature/lib/psql.h                        |  45 +-
 src/test/feature/lib/sql_util.cpp                  | 673 +++++++++++++++++---
 src/test/feature/lib/sql_util.h                    | 301 ++++++++-
 src/test/feature/lib/sql_util_parallel.cpp         | 705 +++++++++++++++++++++
 src/test/feature/lib/sql_util_parallel.h           | 133 ++++
 src/test/feature/lib/sqlfile-parsebase.cpp         |  89 +++
 src/test/feature/lib/sqlfile-parsebase.h           |  60 ++
 17 files changed, 3115 insertions(+), 97 deletions(-)

diff --git a/src/test/feature/cloudtest/sql/normal/all.sql b/src/test/feature/cloudtest/sql/normal/all.sql
new file mode 100644
index 0000000..ed31c43
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/normal/all.sql
@@ -0,0 +1,101 @@
+create table a(i int);
+
+create language plpythonu;
+
+CREATE OR REPLACE FUNCTION f4() RETURNS TEXT AS $$ plpy.execute("select * from a order by i") $$ LANGUAGE plpythonu VOLATILE;
+
+select * from f4();
+
+CREATE OR REPLACE FUNCTION normalize_si(text) RETURNS text AS $$ BEGIN RETURN substring($1, 9, 2) || substring($1, 7, 2) || substring($1, 5, 2) || substring($1, 1, 4); END; $$LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION si_lt(text, text) RETURNS boolean AS $$ BEGIN RETURN normalize_si($1) < normalize_si($2); END; $$ LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OPERATOR <# ( PROCEDURE=si_lt,LEFTARG=text, RIGHTARG=text);
+
+CREATE OR REPLACE FUNCTION si_same(text, text) RETURNS int AS $$ BEGIN IF normalize_si($1) < normalize_si($2) THEN RETURN -1; END IF; END; $$ LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OPERATOR CLASS sva_special_ops FOR TYPE text USING btree AS OPERATOR 1 <#, FUNCTION 1 si_same(text, text);
+
+CREATE RESOURCE QUEUE myqueue WITH (PARENT='pg_root', ACTIVE_STATEMENTS=20, MEMORY_LIMIT_CLUSTER=50%, CORE_LIMIT_CLUSTER=50%);   
+
+CREATE TABLESPACE mytblspace FILESPACE dfs_system;    
+
+CREATE TABLE foo(i int) TABLESPACE mytblspace;
+
+insert into foo(i) values(1234);
+
+COPY a TO '/tmp/a.txt';
+
+COPY a FROM '/tmp/a.txt';
+
+COPY a TO STDOUT WITH DELIMITER '|';
+
+CREATE EXTERNAL TABLE ext_t ( N_NATIONKEY INTEGER ,N_NAME CHAR(25), N_REGIONKEY  INTEGER ,N_COMMENT    VARCHAR(152))location ('gpfdist://localhost:7070/nation_error50.tbl')FORMAT 'text' (delimiter '|')SEGMENT REJECT LIMIT 51;
+
+--select * from ext_t order by N_NATIONKEY;   
+
+CREATE WRITABLE EXTERNAL TABLE ext_t2 (i int) LOCATION ('gpfdist://localhost:7070/ranger2.out') FORMAT 'TEXT' ( DELIMITER '|' NULL ' ');
+
+--insert into ext_t2(i) values(234);
+
+create schema sa;
+
+create temp table ta(i int);
+
+create view av as select * from a order by i;
+
+create table aa as select * from a order by i;
+
+create table sa.t(a int, b int);
+
+CREATE SEQUENCE myseq START 1;
+
+insert into a values(1);
+
+insert into a values(1);
+
+insert into a VALUES (nextval('myseq'));
+
+select * from pg_database, a order by oid, i limit 1;
+
+select generate_series(1,3);
+
+select * from av;
+
+SELECT setval('myseq', 1);
+
+SELECT * INTO aaa FROM a WHERE i > 0 order by i;
+
+PREPARE fooplan (int) AS INSERT INTO a VALUES($1);EXECUTE fooplan(1);DEALLOCATE fooplan;
+
+explain select * from a;
+
+CREATE FUNCTION scube_accum(numeric, numeric) RETURNS numeric AS 'select $1 + $2 * $2 * $2' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT;
+
+CREATE AGGREGATE scube(numeric) ( SFUNC = scube_accum, STYPE = numeric, INITCOND = 0 );
+
+ALTER AGGREGATE scube(numeric) RENAME TO scube2;   
+
+CREATE TYPE mytype AS (f1 int, f2 int);
+
+CREATE FUNCTION getfoo() RETURNS SETOF mytype AS $$ SELECT i, i FROM a order by i $$ LANGUAGE SQL;
+
+select getfoo();
+
+begin; DECLARE mycursor CURSOR FOR SELECT * FROM a order by i; FETCH FORWARD 2 FROM mycursor; commit;
+
+BEGIN; INSERT INTO a VALUES (1); SAVEPOINT my_savepoint; INSERT INTO a VALUES (1); RELEASE SAVEPOINT my_savepoint; COMMIT;
+
+\d
+
+analyze a;
+
+analyze;
+
+vacuum aa;
+
+vacuum analyze;
+
+truncate aa;
+
+alter table a rename column i to j;
\ No newline at end of file
diff --git a/src/test/feature/cloudtest/sql/normal/before_normal.sql b/src/test/feature/cloudtest/sql/normal/before_normal.sql
new file mode 100644
index 0000000..f234f82
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/normal/before_normal.sql
@@ -0,0 +1,41 @@
+drop function f4();
+
+drop language plpythonu CASCADE;
+
+drop OPERATOR CLASS sva_special_ops USING btree;
+
+drop OPERATOR <# (text,text) CASCADE;
+
+drop FUNCTION si_same(text, text);
+
+drop FUNCTION si_lt(text, text);
+
+drop FUNCTION normalize_si(text);
+
+DROP RESOURCE QUEUE myqueue;
+
+drop table foo;
+
+drop tablespace mytblspace;
+
+drop EXTERNAL TABLE ext_t;
+
+drop EXTERNAL TABLE ext_t2;
+
+DROP AGGREGATE scube2(numeric);
+
+DROP FUNCTION scube_accum(numeric, numeric);
+
+drop type mytype cascade;
+
+drop SEQUENCE myseq;
+
+drop view av;
+
+drop table aaa;
+
+drop table aa;
+
+drop table a;
+
+drop schema sa CASCADE;
\ No newline at end of file
diff --git a/src/test/feature/cloudtest/sql/parallel/before_parallel.sql b/src/test/feature/cloudtest/sql/parallel/before_parallel.sql
new file mode 100644
index 0000000..f81c1e5
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/parallel/before_parallel.sql
@@ -0,0 +1,99 @@
+-- Drop object ,clean env
+drop OPERATOR CLASS sva_special_ops USING btree;
+
+drop OPERATOR <# (text,text) CASCADE;
+
+drop FUNCTION si_same(text, text);
+
+drop FUNCTION si_lt(text, text);
+
+drop FUNCTION normalize_si(text);
+
+DROP RESOURCE QUEUE myqueue;
+
+drop table foo;
+
+drop tablespace mytblspace;
+
+drop EXTERNAL TABLE ext_t;
+
+drop EXTERNAL TABLE ext_t2;
+
+drop function f4();
+
+DROP AGGREGATE scube2(numeric);
+
+DROP FUNCTION scube_accum(numeric, numeric);
+
+drop type mytype cascade;
+
+drop language plpythonu;
+
+truncate aa;
+
+drop SEQUENCE myseq;
+
+drop view av;
+
+--drop table aaa;
+
+drop table aa;
+
+drop table a;
+
+drop schema sa CASCADE;
+
+-- Create object
+
+create table a(i int);
+
+CREATE OR REPLACE FUNCTION si_same(text, text) RETURNS int AS $$ BEGIN IF normalize_si($1) < normalize_si($2) THEN RETURN -1; END IF; END; $$ LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OPERATOR CLASS sva_special_ops FOR TYPE text USING btree AS OPERATOR 1 <#, FUNCTION 1 si_same(text, text);
+
+CREATE RESOURCE QUEUE myqueue WITH (PARENT='pg_root', ACTIVE_STATEMENTS=20, MEMORY_LIMIT_CLUSTER=50%, CORE_LIMIT_CLUSTER=50%);   
+
+CREATE TABLESPACE mytblspace FILESPACE dfs_system;    
+
+create language plpythonu;
+
+CREATE TABLE foo(i int) TABLESPACE mytblspace;
+
+CREATE EXTERNAL TABLE ext_t ( N_NATIONKEY INTEGER ,N_NAME CHAR(25), N_REGIONKEY  INTEGER ,N_COMMENT    VARCHAR(152))location ('gpfdist://localhost:7070/nation_error50.tbl')FORMAT 'text' (delimiter '|')SEGMENT REJECT LIMIT 51;
+
+CREATE WRITABLE EXTERNAL TABLE ext_t2 (i int) LOCATION ('gpfdist://localhost:7070/ranger2.out') FORMAT 'TEXT' ( DELIMITER '|' NULL ' ');
+
+CREATE OR REPLACE FUNCTION f4() RETURNS TEXT AS $$ plpy.execute("select * from a order by i") $$ LANGUAGE plpythonu VOLATILE;
+
+create schema sa;
+
+create temp table ta(i int);
+
+create view av as select * from a order by i;
+
+create table aa as select * from a order by i;
+
+create table sa.t(a int, b int);
+
+CREATE SEQUENCE myseq START 1;
+
+PREPARE fooplan (int) AS INSERT INTO a VALUES($1);EXECUTE fooplan(1);DEALLOCATE fooplan;
+
+CREATE FUNCTION scube_accum(numeric, numeric) RETURNS numeric AS 'select $1 + $2 * $2 * $2' LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT;
+
+CREATE AGGREGATE scube(numeric) ( SFUNC = scube_accum, STYPE = numeric, INITCOND = 0 );
+
+ALTER AGGREGATE scube(numeric) RENAME TO scube2;   
+
+CREATE TYPE mytype AS (f1 int, f2 int);
+
+CREATE FUNCTION getfoo() RETURNS SETOF mytype AS $$ SELECT i, i FROM a order by i $$ LANGUAGE SQL;
+
+--alter table a rename column i to j;
+
+CREATE OR REPLACE FUNCTION normalize_si(text) RETURNS text AS $$ BEGIN RETURN substring($1, 9, 2) || substring($1, 7, 2) || substring($1, 5, 2) || substring($1, 1, 4); END; $$LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION si_lt(text, text) RETURNS boolean AS $$ BEGIN RETURN normalize_si($1) < normalize_si($2); END; $$ LANGUAGE 'plpgsql' IMMUTABLE;
+
+CREATE OPERATOR <# ( PROCEDURE=si_lt,LEFTARG=text, RIGHTARG=text);
+
diff --git a/src/test/feature/cloudtest/sql/parallel/parallel.sql b/src/test/feature/cloudtest/sql/parallel/parallel.sql
new file mode 100644
index 0000000..de12e14
--- /dev/null
+++ b/src/test/feature/cloudtest/sql/parallel/parallel.sql
@@ -0,0 +1,73 @@
+--set session role=usertest21;
+insert into foo(i) values(1234);
+
+--set session role=usertest24;
+--COPY (select * from a) to '/tmp/a.txt';
+--COPY a FROM '/tmp/a.txt';
+
+--set session role=usertest25;
+COPY a TO STDOUT WITH DELIMITER '|';
+
+--set session role=usertest27;
+--select * from ext_t order by N_NATIONKEY;   
+
+
+--set session role=usertest29;
+--insert into ext_t2(i) values(234);
+
+--set session role=usertest38;
+insert into a values(1);
+
+--set session role=usertest39;
+insert into a values(1);
+
+--set session role=usertest4;
+select * from f4();
+
+--set session role=usertest40;
+insert into a VALUES (nextval('myseq'));
+
+--set session role=usertest41;
+select * from pg_database, a order by oid, i limit 1;
+
+--set session role=usertest42;
+select generate_series(1,3);
+
+--set session role=usertest43;
+select * from av;
+
+--set session role=usertest44;
+SELECT setval('myseq', 1);
+
+--set session role=usertest45;
+--SELECT * INTO aaa FROM a WHERE i > 0 order by i;
+
+--set session role=usertest46;
+PREPARE fooplan (int) AS INSERT INTO a VALUES($1);EXECUTE fooplan(1);DEALLOCATE fooplan;
+
+--set session role=usertest47;
+explain select * from a;
+
+--set session role=usertest55;
+select getfoo();
+
+--set session role=usertest57;
+begin; DECLARE mycursor CURSOR FOR SELECT * FROM a order by i; FETCH FORWARD 2 FROM mycursor; commit;
+
+--set session role=usertest58;
+BEGIN; INSERT INTO a VALUES (1); SAVEPOINT my_savepoint; INSERT INTO a VALUES (1); RELEASE SAVEPOINT my_savepoint; COMMIT;
+
+--set session role=usertest59;
+\d
+
+--set session role=usertest60;
+analyze a;
+
+--set session role=usertest61;
+--analyze;
+
+--set session role=usertest62;
+vacuum aa;
+
+--set session role=usertest63;
+--vacuum analyze;
diff --git a/src/test/feature/cloudtest/test_cloud.cpp b/src/test/feature/cloudtest/test_cloud.cpp
new file mode 100644
index 0000000..9730238
--- /dev/null
+++ b/src/test/feature/cloudtest/test_cloud.cpp
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <fstream>
+
+#include "lib/command.h"
+#include "lib/sql_util_parallel.h"
+#include "lib/xml_parser.h"
+#include "test_cloud.h"
+
+#define CREATE_ROLE "CREATE ROLE"
+#define ALTER_ROLE "ALTER ROLE"
+#define DROP_ROLE "DROP ROLE"
+
+using hawq::test::Command;
+
+TEST_F(TestCloud, Permissions) {
+  ASSERT_TRUE(testLogin(getUserName(), HAWQ_PASSWORD))
+      << "Connect Error: Can't login as " + getUserName();
+  execSql("drop role if exists lava;");
+  execSql("drop role if exists user1;");
+  execSql("drop role if exists user2;");
+  execSql(getHost2(), "drop role if exists user2;");
+
+  std::string result;
+  result = execSql("create role lava password \'lava00000000\';");
+  const std::string lava_pwd("lava00000000");
+  ASSERT_TRUE(checkOut(result, CREATE_ROLE))
+      << "create role lava failure: " + result;
+  EXPECT_FALSE(testLogin("lava", lava_pwd))
+      << "Error: lava can login before grant login permission";
+
+  result = execSql("alter role lava login;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "grant lava login permission failure: " + result;
+  ASSERT_TRUE(testLogin("lava", lava_pwd))
+      << "Error: lava can't login after grant login permission";
+
+  result = execSql("alter role lava Superuser;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "grant lava Superuser permision failure: " + result;
+  result = execSql("alter role lava Createrole;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "grant lava Createrole permision failure: " + result;
+  result = execSql("lava", lava_pwd,
+                   "create role user1 login password \'user100000000\';");
+  ASSERT_TRUE((checkOut(result, CREATE_ROLE)))
+      << "lava create user1 failure: " + result;
+  ASSERT_TRUE(testLogin("user1", "user100000000")) << "login as user1 failure";
+
+  result = execSql("lava", lava_pwd,
+                   "alter role user1 password \'newPwd00000000\';");
+  const std::string user_newPwd = "newPwd00000000";
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava reset user1's password failure";
+  EXPECT_FALSE(testLogin("user1", "user100000000"))
+      << "user1 can login after reset password";
+  EXPECT_TRUE(testLogin("user1", user_newPwd))
+      << "user1 user new password can't login after reset password";
+
+  result = execSql("user1", user_newPwd,
+                   "create role user2 password \'user200000000\'");
+  EXPECT_FALSE(checkOut(result, CREATE_ROLE))
+      << "user1 can create user2 before get creatrole permissions: " + result;
+  result = execSql("lava", lava_pwd, "alter role user1 createrole;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava grant user1 createrole permissions failure: " + result;
+  result = execSql("user1", user_newPwd,
+                   "create role user2 password \'user200000000\'");
+  ASSERT_TRUE(checkOut(result, CREATE_ROLE))
+      << "user1 can't create user2 after get creatrole permissions: " + result;
+
+  result = execSql("lava", lava_pwd, "alter role user1 nocreaterole;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava can't revoke createrole from user1:" << result;
+  result = execSql("user1", user_newPwd, "drop role user2;");
+  ASSERT_FALSE(checkOut(result, DROP_ROLE))
+      << "user can drop user2 after revoked createrole permissions: " + result;
+
+  result = execSql("lava", lava_pwd, "drop role user2;");
+  ASSERT_TRUE(checkOut(result, DROP_ROLE)) << "lava can't drop user2";
+
+  result = execSql(getHost2(), "create role user2 password \'user200000000\';");
+  ASSERT_TRUE(checkOut(result, CREATE_ROLE))
+      << "lavaIntern can't create user2 in cluster2: " + result;
+
+  result = execSql("lava", lava_pwd, "alter role user2 login;");
+  ASSERT_TRUE(checkOut(result, ALTER_ROLE))
+      << "lava can't grant login permission in cluster1 to user2: " + result;
+
+  ASSERT_TRUE(testLogin("user2", "user200000000"))
+      << "login as user2 in cluster1 failure";
+  result = execSql(getHost2(), "drop role user2;");
+  ASSERT_TRUE(checkOut(result, DROP_ROLE))
+      << "drop user2 failure in cluster2: " + result;
+  ASSERT_FALSE(testLogin("user2", "user200000000"))
+      << "user2 can login in cluster1 after drop user2 in cluster1";
+}
+
+TEST_F(TestCloud, Normal) {
+  Command::getCommandStatus("mkdir -p " + getTestRootPath() +
+                            "/cloudtest/out/normal");
+  configConnection();
+  execSQLfile("cloudtest/sql/normal/before_normal.sql",
+              "cloudtest/out/normal/before_normal.out");
+  execSQLfileandCheck("cloudtest/sql/normal/all.sql",
+                      "cloudtest/out/normal/all.out");
+}
+
+#define PROCESS_NUM 10
+#define ITERATION_NUM 1
+TEST_F(TestCloud, Parallel) {
+  std::string result;
+  Command::getCommandStatus("mkdir -p " + getTestRootPath() +
+                            "/cloudtest/out/parallel");
+  Command::getCommandStatus("rm -rf " + getTestRootPath() +
+                            "/cloudtest/out/parallel/round*");
+  configConnection();
+  execSQLfile("cloudtest/sql/parallel/before_parallel.sql",
+              "cloudtest/out/parallel/before_parallel.out");
+  for (int i = 0; i < ITERATION_NUM; i++) {
+    std::string outpath =
+        "cloudtest/out/parallel/round" + std::to_string(i) + "/";
+    Command::getCommandStatus("mkdir -p " + getTestRootPath() + "/" + outpath);
+    runParallelSQLFile("cloudtest/sql/parallel/parallel.sql",
+                       outpath + "parallel.out", PROCESS_NUM);
+  }
+
+  std::string outputFile;
+  for (int i = 0; i < ITERATION_NUM; ++i)
+    for (int j = 0; j < PROCESS_NUM; ++j) {
+      outputFile = "cloudtest/out/parallel/round" + std::to_string(i) + "/" +
+                   "parallel_process_" + std::to_string(j) + ".out";
+      bool result = hawq::test::SQLUtility::checkPatternInFile(
+          outputFile, ": ERROR:|FATAL|PANIC");
+      EXPECT_TRUE(result) << outputFile << " find : ERROR:|FATAL|PANIC";
+    }
+}
+
+TestCloud::TestCloud()
+    : testRootPath(hawq::test::SQLUtility::getTestRootPath()), usexml(false) {
+  setConfigXml("cloudtest/test_cloud.xml");
+}
+
+bool TestCloud::setConfigXml(const std::string &xmlfile) {
+  if (xmlfile.empty()) {
+    std::cout << "xml file should not be empty when you call setConfigXml";
+    return false;
+  } else {
+    std::string xmlabsfile = testRootPath + "/" + xmlfile;
+    if (!std::ifstream(xmlabsfile)) {
+      std::cout << xmlabsfile << " doesn't exist";
+      return false;
+    }
+    try {
+      std::unique_ptr<hawq::test::XmlConfig> xmlconf;
+      xmlconf.reset(new hawq::test::XmlConfig(xmlabsfile));
+      xmlconf->parse();
+      hostName = xmlconf->getString("cloud_host");
+      cluster2host = xmlconf->getString("cloud_host2");
+      port = xmlconf->getString("cloud_port", "5432");
+      databaseName = xmlconf->getString("cloud_database", "postgres");
+      userName = xmlconf->getString("cloud_user");
+      usexml = true;
+      xmlconf.release();
+      return true;
+    } catch (...) {
+      std::cout << "Parse Xml file" << xmlabsfile << "error";
+      return false;
+    }
+  }
+}
+
+void TestCloud::configConnection() {
+  conn.reset(new hawq::test::PSQL(databaseName, hostName, port, userName,
+                                  HAWQ_PASSWORD));
+}
+
+std::string TestCloud::getTestRootPath() const { return this->testRootPath; }
+
+void TestCloud::execSQLfile(const std::string &sqlFile,
+                            const std::string &outFile) {
+  std::string sqlFileAbsPath = testRootPath + "/" + sqlFile;
+  std::string outFileAbsPath = testRootPath + "/" + outFile;
+  conn->setOutputFile(outFileAbsPath);
+  EXPECT_EQ(0, conn->runSQLFile(sqlFileAbsPath, "").getLastStatus());
+  conn->resetOutput();
+}
+
+void TestCloud::execSQLfileandCheck(const std::string &sqlFile,
+                                    const std::string &outFile) {
+  execSQLfile(sqlFile, outFile);
+  bool result = hawq::test::SQLUtility::checkPatternInFile(
+      outFile, ": ERROR:|FATAL|PANIC");
+  EXPECT_TRUE(result) << outFile << " find ERROR|FATAL|PANIC";
+}
+
+bool TestCloud::testLogin(const std::string &host, const std::string &portStr,
+                          const std::string &database, const std::string &user,
+                          const std::string &pwd) const {
+  std::string loginCmd = "psql -h " + host + " -p " + portStr + " -d " +
+                         database + " -U " + user + " --c '\\q'";
+  std::string result = Command::getCommandOutput("export PGPASSWORD=\'" + pwd +
+                                                 "\' && " + loginCmd);
+  return result.empty();
+}
+
+bool TestCloud::testLogin(const std::string &user,
+                          const std::string &pwd) const {
+  return testLogin(hostName, port, databaseName, user, pwd);
+}
+
+bool TestCloud::testLogin(const std::string &pwd) const {
+  return testLogin(hostName, port, databaseName, userName, pwd);
+}
+
+std::string TestCloud::execSql(const std::string &host,
+                               const std::string &sql) {
+  return execSql(host, port, databaseName, userName, HAWQ_PASSWORD, sql);
+}
+
+std::string TestCloud::execSql(const std::string &sql) {
+  return execSql(userName, HAWQ_PASSWORD, sql);
+}
+
+std::string TestCloud::execSql(const std::string &user, const std::string &pwd,
+                               const std::string &sql) {
+  return execSql(hostName, port, databaseName, user, pwd, sql);
+}
+
+std::string TestCloud::execSql(const std::string &host,
+                               const std::string &portStr,
+                               const std::string &database,
+                               const std::string &user, const std::string &pwd,
+                               const std::string &sql) {
+  std::string sqlCmd = "echo \"" + sql + "\" | psql -h " + host + " -p " +
+                       portStr + " -d " + database + " -U " + user;
+  std::string result = Command::getCommandOutput("export PGPASSWORD=\'" + pwd +
+                                                 "\' && " + sqlCmd);
+  return result;
+}
+
+bool TestCloud::checkOut(const std::string &out, const std::string &pattern) {
+  return (out.find("ERROR:") != std::string::npos)
+             ? false
+             : (out.find(pattern) != std::string::npos);
+}
+
+void TestCloud::runParallelSQLFile(const std::string sqlFile,
+                                   const std::string outputFile, int processnum,
+                                   const std::string ansFile) {
+  hawq::test::SQLUtility sqlUtil(this->databaseName, this->hostName, this->port,
+                                 this->userName);
+  sqlUtil.runParallelSQLFile(sqlFile, outputFile, processnum, ansFile);
+}
diff --git a/src/test/feature/cloudtest/test_cloud.h b/src/test/feature/cloudtest/test_cloud.h
new file mode 100644
index 0000000..0e05ea3
--- /dev/null
+++ b/src/test/feature/cloudtest/test_cloud.h
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TEST_CLOUD_H
+#define TEST_CLOUD_H
+
+#include <string>
+
+#include "gtest/gtest.h"
+#include "lib/psql.h"
+
+class TestCloud : public ::testing::Test {
+ public:
+  TestCloud();
+  ~TestCloud() {}
+
+  // config connection param use member hostName,port,databaseName,userName
+  void configConnection();
+
+  // get some value from xmlfile
+  // @xmlfile the abs path of xml file
+  // @return config done status
+  bool setConfigXml(const std::string &xmlfile);
+
+  // @return test root path
+  std::string getTestRootPath() const;
+
+  // test login user member hostName,port databaseName
+  // @user user Name
+  // @pwd  login password
+  // @return login status
+  bool testLogin(const std::string &user, const std::string &pwd) const;
+
+  // test login use member hostName,port,databaseName,userName
+  // @pwd login password
+  // @return login status
+  bool testLogin(const std::string &pwd) const;
+
+  // test login
+  // @host host name
+  // @portStr which port
+  // @database database name to connect
+  // @user user name
+  // @pwd login password
+  // @return login status
+  bool testLogin(const std::string &host, const std::string &portStr,
+                 const std::string &database, const std::string &user,
+                 const std::string &pwd) const;
+
+  // execute sql file and check outfile
+  // @sqlfile sql file relative path
+  // @outfile out file relative path
+  void execSQLfileandCheck(const std::string &sqlfile,
+                           const std::string &outfile);
+
+  // execute sql file
+  // @sqlfile sql file relative path
+  // @outfile out file relative path
+  void execSQLfile(const std::string &sqlfile, const std::string &outfile);
+
+  // execute sql in host,other param use member
+  // @host host name
+  // @sql sql query
+  // @return out of sql
+  std::string execSql(const std::string &host, const std::string &sql);
+
+  // execute sql
+  // @sql sql query
+  // @return out of sql
+  std::string execSql(const std::string &sql);
+
+  // execute sql query by user
+  // @user who execute sql
+  // @pwd user's password
+  // @sql sql query
+  // @return out of execute sql
+  std::string execSql(const std::string &user, const std::string &pwd,
+                      const std::string &sql);
+
+  // execute sql query
+  // @host name
+  // @portStr port
+  // @database database name
+  // @user who execute sql
+  // @pwd user's password
+  // @sql sql query
+  // @return out of execute sql
+  std::string execSql(const std::string &host, const std::string &portStr,
+                      const std::string &database, const std::string &user,
+                      const std::string &pwd, const std::string &sql);
+  // check out use pattern
+  // @out out of sql
+  // @pattern out need have string
+  // @return out is satisfactory
+  bool checkOut(const std::string &out, const std::string &pattern = "");
+
+  // run parallel sql file
+  // @sqlFile to be run sql file
+  // @outputFile out file of run sql file
+  // @processnum number of process
+  // @ansFile expect out file to compare to out file
+  void runParallelSQLFile(const std::string sqlFile,
+                          const std::string outputFile, int processnum,
+                          const std::string ansFile = "");
+
+  // @return another host
+  std::string getHost2() const { return this->cluster2host; }
+
+  // @return port
+  std::string getPort() const { return this->port; }
+
+  // @return user name
+  std::string getUserName() const { return this->userName; }
+
+ private:
+  std::unique_ptr<hawq::test::PSQL> conn;
+  std::string testRootPath;
+
+  std::string hostName;
+  std::string cluster2host;
+  std::string port;
+  std::string databaseName;
+  std::string userName;
+  bool usexml;
+};
+
+#endif
diff --git a/src/test/feature/cloudtest/test_cloud.xml b/src/test/feature/cloudtest/test_cloud.xml
new file mode 100644
index 0000000..e9ebcaf
--- /dev/null
+++ b/src/test/feature/cloudtest/test_cloud.xml
@@ -0,0 +1,23 @@
+<configuration>
+       <property>
+                <name>cloud_host</name>
+                <value>HOST1</value>
+        </property>
+        <property>
+                <name>cloud_host2</name>
+                <value>HOST2</value>
+        </property>
+        <property>
+                <name>cloud_port</name>
+                <value>5432</value>
+        </property>
+
+        <property>
+                <name>cloud_database</name>
+                <value>postgres</value>
+        </property>
+        <property>
+                <name>cloud_user</name>
+                <value>oushu</value>
+        </property>
+</configuration>
diff --git a/src/test/feature/lib/parse_out.cpp b/src/test/feature/lib/parse_out.cpp
new file mode 100644
index 0000000..5a2299b
--- /dev/null
+++ b/src/test/feature/lib/parse_out.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "parse_out.h"
+
+namespace hawq {
+namespace test {
+using namespace std;
+
+const std::string Sqlout_parser::ignore_start_tag = "--ignore_start";
+const std::string Sqlout_parser::ignore_end_tag = "--ignore_end";
+const std::string Sqlout_parser::oneline_sql_start_tag = "--oneline_sql_start";
+const std::string Sqlout_parser::oneline_sql_end_tag = "--oneline_sql_end";
+const std::string Sqlout_parser::sql_start_tag = "--sql_start";
+const std::string Sqlout_parser::sql_end_tag = "--sql_end";
+const std::string Sqlout_parser::out_start_tag = " --out_start";
+
+bool Sqlout_parser::parse_sql_time(
+    const string &input_sqloutfile,
+    vector<pair<string, double>> &output_sqloutinfo) {
+  vector<SqlOut> sqloutinfo;
+  vector<pair<string, double>> temp_output;
+  if (__parse_sql_time(input_sqloutfile, sqloutinfo) == false) return false;
+  for (size_t i = 0; i != sqloutinfo.size(); ++i) {
+    if (!sqloutinfo[i].is_ignore) {
+      temp_output.emplace_back(std::move(sqloutinfo[i].sql),
+                               sqloutinfo[i].time);
+    }
+  }
+  output_sqloutinfo.swap(temp_output);
+  return true;
+}
+
+bool Sqlout_parser::__parse_sql_time(const string &outfile,
+                                     vector<SqlOut> &ouf) {
+  vector<string> v_inf;
+  if (__trans_file_to_vector(outfile, v_inf) == false) return false;
+  if (__parse_sql_time_imple(v_inf, ouf) == false) return false;
+  return true;
+}
+
+bool Sqlout_parser::__parse_sql_time_imple(const vector<string> &inf,
+                                           vector<SqlOut> &ouf) {
+  size_t module_start_index = 0, module_end_index = 0;
+  for (;;) {
+    if (__is_tag(inf[module_end_index], ignore_start_tag) == true) {
+      if (__parse_ignore(inf, ouf, module_start_index, module_end_index,
+                         false) == false)
+        return false;
+      module_start_index = module_end_index + 1;
+      for (; __is_tag(inf[module_end_index], ignore_end_tag) == false;) {
+        if (++module_end_index == inf.size())
+          return __not_found_tag_error(module_start_index, ignore_end_tag);
+      }
+      if (__parse_ignore(inf, ouf, module_start_index, module_end_index,
+                         true) == false)
+        return false;
+    } else if (++module_end_index == inf.size()) {
+      if (__parse_ignore(inf, ouf, module_start_index, module_end_index,
+                         false) == false)
+        return false;
+      return true;
+    } else {
+    }
+  }
+  return false;
+}
+
+bool Sqlout_parser::__parse_ignore(const vector<string> &inf,
+                                   vector<SqlOut> &ouf, size_t &start_index,
+                                   const size_t end_index, bool ignore) {
+  while (start_index != end_index) {
+    if (__parse_sqlout(inf, ouf, start_index, end_index, ignore) == false)
+      return false;
+  }
+  return true;
+}
+
+bool Sqlout_parser::__parse_sqlout(const vector<string> &inf,
+                                   vector<SqlOut> &ouf, size_t &start_index,
+                                   const size_t end_index, bool ignore) {
+  for (; start_index != end_index;) {
+    const string &line = inf[start_index++];
+    if (__is_tag(line, oneline_sql_start_tag) == true) {
+      if (__parse_sqlout_imple(inf, ouf, start_index, end_index,
+                               oneline_sql_end_tag, ignore) == false)
+        return false;
+    } else if (__is_tag(line, sql_start_tag) == true) {
+      if (__parse_sqlout_imple(inf, ouf, start_index, end_index, sql_end_tag,
+                               ignore) == false)
+        return false;
+    } else {
+    }
+  }
+  return true;
+}
+
+bool Sqlout_parser::__parse_sqlout_imple(const vector<string> &inf,
+                                         vector<SqlOut> &ouf,
+                                         size_t &start_index,
+                                         const size_t end_index,
+                                         const string &end_tag, bool ignore) {
+  string sql;
+  size_t sql_start_index = start_index;
+  size_t out_pos;
+  if (end_tag == oneline_sql_end_tag) {
+    if ((out_pos = inf[start_index].find(out_start_tag)) == string::npos)
+      return __not_found_tag_error(start_index, out_start_tag);
+    sql = inf[start_index].substr(0, out_pos);
+  } else {
+    if ((out_pos = inf[start_index].find(out_start_tag)) != string::npos) {
+      sql = inf[start_index].substr(0, out_pos);  // if sql is empty or oneline
+    } else {
+      sql = inf[start_index];
+      for (++start_index; start_index != end_index; ++start_index) {
+        const string &line = inf[start_index];
+        if ((out_pos = line.find(out_start_tag)) != string::npos) {
+          sql.append("\n").append(line.substr(0, out_pos));
+          break;
+        } else {
+          sql.append("\n").append(line);
+        }
+      }
+    }
+
+    if (start_index == end_index)
+      return __not_found_tag_error(start_index, out_start_tag);
+  }
+  double outtime = -1.0;
+  for (++start_index; start_index != end_index; ++start_index) {
+    const string &line = inf[start_index];
+    __parse_time(line, outtime);
+    if (__sub_equal(line, end_tag) == true) {
+      ouf.emplace_back(ignore, outtime, std::move(sql));
+      return true;
+    }
+  }
+  return __not_found_tag_error(sql_start_index, end_tag);
+}
+
+bool Sqlout_parser::__parse_time(const string &line, double &outtime) {
+  if (__sub_equal(line, 0, time_tag)) {
+    size_t start_pos = time_tag.size();
+    size_t end_pos = line.find(' ', start_pos);
+    outtime = stod(line.substr(start_pos, end_pos - start_pos));
+    return true;
+  }
+  return false;
+}
+
+}  // namespace test
+}  // namespace hawq
\ No newline at end of file
diff --git a/src/test/feature/lib/parse_out.h b/src/test/feature/lib/parse_out.h
new file mode 100644
index 0000000..ef681a1
--- /dev/null
+++ b/src/test/feature/lib/parse_out.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HAWQ_SRC_TEST_FEATURE_PARSE_OUT_H_
+#define HAWQ_SRC_TEST_FEATURE_PARSE_OUT_H_
+
+#include "lib/sqlfile-parsebase.h"
+
+namespace hawq {
+namespace test {
+
+struct SqlOut {
+  SqlOut(bool is_ignore, double time, std::string &&sql)
+      : is_ignore(is_ignore), time(time), sql(std::move(sql)) {}
+
+  bool is_ignore;
+  double time;
+  std::string sql;
+};
+
+class Sqlout_parser : public SqlFileParseBase {
+ public:
+  Sqlout_parser() : SqlFileParseBase("parse_out") {}
+  bool parse_sql_time(
+      const std::string &input_sqloutfile,
+      std::vector<std::pair<std::string, double>> &output_sqloutinfo);
+
+  static inline std::string reportSql(const std::string &oneSql) {
+    return sql_start_tag + "\n" + oneSql + out_start_tag + "\n" + sql_end_tag;
+  }
+
+  static inline std::string reportIgnoreSql(const std::string oneSql) {
+    return ignore_start_tag + "\n" + reportSql(oneSql) + "\n" + ignore_end_tag;
+  }
+
+ private:
+  bool __parse_sql_time(const std::string &outfile, std::vector<SqlOut> &ouf);
+
+  bool __parse_sql_time_imple(const std::vector<std::string> &inf,
+                              std::vector<SqlOut> &ouf);
+
+  bool __parse_ignore(const std::vector<std::string> &inf,
+                      std::vector<SqlOut> &ouf, size_t &start_index,
+                      const size_t end_index, bool ignore);
+
+  bool __parse_sqlout(const std::vector<std::string> &inf,
+                      std::vector<SqlOut> &ouf, size_t &start_index,
+                      const size_t end_index, bool ignore);
+
+  bool __parse_sqlout_imple(const std::vector<std::string> &inf,
+                            std::vector<SqlOut> &ouf, size_t &start_index,
+                            const size_t end_index, const std::string &end_tag,
+                            bool ignore);
+
+  bool __parse_time(const std::string &line, double &outtime);
+
+ private:
+  const std::string time_tag = "Time: ";
+
+ private:
+  static const std::string ignore_start_tag;
+  static const std::string ignore_end_tag;
+  static const std::string oneline_sql_start_tag;
+  static const std::string oneline_sql_end_tag;
+  static const std::string sql_start_tag;
+  static const std::string sql_end_tag;
+  static const std::string out_start_tag;
+};
+}  // namespace test
+}  // namespace hawq
+#endif
\ No newline at end of file
diff --git a/src/test/feature/lib/psql.cpp b/src/test/feature/lib/psql.cpp
index 0e71d56..386864e 100644
--- a/src/test/feature/lib/psql.cpp
+++ b/src/test/feature/lib/psql.cpp
@@ -17,8 +17,14 @@
  */
 
 #include <unistd.h>
+#include <stdio.h>
+#include <string.h>
 #include "psql.h"
+
+//
+#include <iostream>
 #include "command.h"
+#include "../../../interfaces/libpq/libpq-int.h"
 
 using std::string;
 
@@ -49,6 +55,11 @@ const std::vector<string>& PSQLQueryResult::getFields() const {
   return this->_fields;
 }
 
+const std::vector<int>& PSQLQueryResult::getFieldSizes() const {
+  return this->_fieldsizes;
+}
+
+
 const std::vector<string>& PSQLQueryResult::getRow(int ri) const {
   return this->getRows()[ri];
 }
@@ -82,11 +93,18 @@ void PSQLQueryResult::savePGResult(const PGresult* res) {
   int nfields = PQnfields(res);
   for (int i = 0; i < nfields; i++) {
     this->_fields.push_back(PQfname(res, i));
+    this->_fieldsizes.push_back(PQfsize(res,i));
   }
   for (int i = 0; i < PQntuples(res); i++) {
     std::vector<string> row;
     for (int j = 0; j < nfields; j++) {
       row.push_back(PQgetvalue(res, i, j));
+      int size = row[j].size();
+      int size2 = _fieldsizes[j];
+      if (size > size2)
+      {
+         _fieldsizes[j] = size;
+      }
     }
     this->_rows.push_back(row);
   }
@@ -96,6 +114,7 @@ void PSQLQueryResult::reset() {
   this->_errmsg.clear();
   this->_rows.clear();
   this->_fields.clear();
+  this->_fieldsizes.clear();
 }
 
 PSQL& PSQL::runSQLCommand(const string& sql_cmd) {
@@ -112,6 +131,21 @@ PSQL& PSQL::runSQLFile(const string& sql_file, bool printTupleOnly) {
   return *this;
 }
 
+PSQL& PSQL::runSQLFile(const string& sql_file, const string &psqlOptions, bool printTupleOnly) {
+  this->_last_status =
+      hawq::test::Command::getCommandStatus(this->_getPSQLFileCommand(sql_file, psqlOptions, printTupleOnly));
+  return *this;
+}
+
+bool PSQL::testConnection() {
+  PGconn* conn = NULL;
+  conn = PQconnectdb(this->getConnectionString().c_str());
+  if (PQstatus(conn) != CONNECTION_OK) {
+     this->_result.setErrorMessage(PQerrorMessage(conn));
+     return false;
+  }
+  return true;
+}
 const PSQLQueryResult& PSQL::getQueryResult(const string& sql) {
   PGconn* conn = NULL;
   PGresult* res = NULL;
@@ -142,11 +176,19 @@ done:
   return this->_result;
 }
 
+string PSQL::getHost(){
+	return this->_host;
+}
 PSQL& PSQL::setHost(const string& host) {
   this->_host = host;
   return *this;
 }
 
+PSQL& PSQL::setDatabase(const std::string& db){
+  this->_dbname = db;
+  return *this;
+}
+
 PSQL& PSQL::setPort(const string& port) {
   this->_port = port;
   return *this;
@@ -220,12 +262,23 @@ const string PSQL::_getPSQLFileCommand(const string& file, bool printTupleOnly)
   }
 }
 
+const string PSQL::_getPSQLFileCommand(const string& file, const string& psqlOptions, bool printTupleOnly) const {
+  string command = this->_getPSQLBaseCommand();
+  command.append(psqlOptions);
+  if (printTupleOnly) {
+	  return command.append(" -a -A -t -f").append(file);
+  }
+  else {
+	  return command.append(" -a -f ").append(file);
+  }
+}
+
 bool PSQL::checkDiff(const string& expect_file,
                      const string& result_file,
                      bool save_diff,
                      const string& global_init_file,
                      const string& init_file) {
-  string diff_file = result_file + ".diff";
+  string diff_file = getDifFilePath(result_file);
   string command;
   command.append("gpdiff.pl -du ")
       .append(PSQL_BASIC_DIFF_OPTS);
@@ -256,5 +309,153 @@ bool PSQL::checkDiff(const string& expect_file,
   }
 }
 
+std::string PSQL::getDifFilePath(const std::string&result_file){
+  return result_file + ".diff";
+}
+
+
+
+LibPQSQL::LibPQSQL(const std::string &connstring)
+{
+	libpqconn = NULL;
+	connstr = connstring;
+}
+
+LibPQSQL::~LibPQSQL()
+{
+	disconnectDb();
+}
+
+bool LibPQSQL::connectDb()
+{
+    libpqconn = PQconnectdb(this->connstr.c_str());
+    if (PQstatus(libpqconn) != CONNECTION_OK) {
+        this->_result.setErrorMessage(PQerrorMessage(libpqconn));
+    	if (libpqconn) {
+    	    PQfinish(libpqconn);
+    	    libpqconn = NULL;
+    	}
+        return false;
+    }
+    return true;
+}
+
+
+bool LibPQSQL::execQuery(const string& sql,FILE* outputfile)
+{
+    if (!libpqconn)
+        return false;
+
+    PGresult* res = NULL;
+    bool status = true;
+    res = PQexec(libpqconn, sql.c_str());
+
+    if(PQresultStatus(res)!=PGRES_TUPLES_OK&&strcmp(res->cmdStatus,"SET")!=0)
+    	if(outputfile!=NULL)
+    		fprintf(outputfile,"%s\n",res->cmdStatus);
+    switch (PQresultStatus(res))
+    {
+        case PGRES_TUPLES_OK:
+        {
+            this->_result.reset();
+            this->_result.savePGResult(res);
+            break;
+        }
+        case PGRES_COMMAND_OK:
+        {
+            this->_result.reset();
+            this->_result.savePGResult(res);
+            break;
+        }
+        case PGRES_BAD_RESPONSE:
+        case PGRES_NONFATAL_ERROR:
+        case PGRES_FATAL_ERROR:
+        {
+            this->_result.reset();
+            //this->_result.savePGResult(res);
+            this->_result.setErrorMessage(res->errMsg);
+            break;
+        }
+        default:
+        {
+            this->_result.setErrorMessage(PQerrorMessage(libpqconn));
+            status = false;
+            break;
+        }
+    }
+    if (res)
+    {
+        PQclear(res);
+        res = NULL;
+    }
+    return status;
+}
+
+int32_t LibPQSQL::getRownum()
+{
+    const std::vector<std::vector<std::string>> rowdata = this->_result.getRows();
+    return rowdata.size();
+}
+
+
+void LibPQSQL::printQueryResult(FILE* outputfile)
+{
+    if(this->_result.isError()){
+        fprintf(outputfile,"%s",this->_result.getErrorMessage().c_str());
+        return ;
+    }
+    const std::vector<std::string> fields = this->_result.getFields();
+    if (fields.size() == 0 )
+        return;
+
+    const std::vector<std::vector<std::string>> rowdata = this->_result.getRows();
+    const std::vector<int> fieldsize = this->_result.getFieldSizes();
+
+    int totalsize = 0;
+    uint16_t i = 0;
+    string formatstring;
+    for ( i = 0; i < fields.size() ; i++)
+    {
+        totalsize += fieldsize[i] + 3;
+        if (i == 0)
+            formatstring = " %-" + std::to_string(fieldsize[i]) + "s ";
+        else
+            formatstring = "| %-" + std::to_string(fieldsize[i]) + "s ";
+        fprintf(outputfile, formatstring.c_str(), fields[i].c_str());
+    }
+    fprintf(outputfile, "\n");
+    for ( i = 0; i < totalsize ; i++)
+    {
+        fprintf(outputfile, "%s", "-");
+    }
+    fprintf(outputfile, "\n");
+
+    for ( i = 0; i < rowdata.size() ; i++)
+    {
+        for ( uint16_t j = 0; j < rowdata[i].size() ; j++)
+        {
+            if (j == 0)
+                formatstring = " %-" + std::to_string(fieldsize[j]) + "s ";
+            else
+                formatstring = "| %-" + std::to_string(fieldsize[j]) + "s ";
+            fprintf(outputfile, formatstring.c_str(), rowdata[i][j].c_str());
+        }
+        fprintf(outputfile, "\n");
+    }
+    if (rowdata.size() == 0)
+        fprintf(outputfile, "(0 row)\n");
+    else
+        fprintf(outputfile, "(%lu rows)\n", rowdata.size());
+}
+bool LibPQSQL::disconnectDb()
+{
+    if (libpqconn) {
+        PQfinish(libpqconn);
+        libpqconn = NULL;
+        return true;
+    }
+    return false;
+}
+
 } // namespace test
 } // namespace hawq
diff --git a/src/test/feature/lib/psql.h b/src/test/feature/lib/psql.h
index 215a43b..3e3ed23 100644
--- a/src/test/feature/lib/psql.h
+++ b/src/test/feature/lib/psql.h
@@ -21,7 +21,7 @@
 
 #include <string>
 #include <vector>
-
+#include <unistd.h>
 #include "command.h"
 #include "libpq-fe.h"
 
@@ -39,6 +39,7 @@ class PSQLQueryResult {
 
   const std::vector<std::vector<std::string> >& getRows() const;
   const std::vector<std::string>& getFields() const;
+  const std::vector<int>& getFieldSizes() const;
 
   const std::vector<std::string>& getRow(int ri) const;
   const std::string& getData(int ri, int ci) const;
@@ -53,10 +54,10 @@ class PSQLQueryResult {
  private:
   PSQLQueryResult(const PSQLQueryResult&);
   const PSQLQueryResult& operator=(const PSQLQueryResult&);
-
   std::string _errmsg;
   std::vector<std::vector<std::string> > _rows;
   std::vector<std::string> _fields;
+  std::vector<int> _fieldsizes;
 };
 
 class PSQL {
@@ -65,22 +66,27 @@ class PSQL {
        const std::string& port = "5432", const std::string& user = "gpadmin",
        const std::string& password = "")
       : _dbname(db), _host(host), _port(port),
-      _user(user), _password(password) {}
+      _user(user), _password(password) { _last_status = 0; }
   virtual ~PSQL(){};
 
   PSQL& runSQLCommand(const std::string& sql_cmd);
   PSQL& runSQLFile(const std::string& sql_file, bool printTupleOnly = false);
+  PSQL& runSQLFile(const std::string& sql_file,
+                   const std::string& psqlOptions, bool printTupleOnly = false);
   const PSQLQueryResult& getQueryResult(const std::string& sql);
 
   PSQL& setHost(const std::string& host);
+  PSQL& setDatabase(const std::string& db);
   PSQL& setPort(const std::string& port);
   PSQL& setUser(const std::string& username);
   PSQL& setPassword(const std::string& password);
   PSQL& setOutputFile(const std::string& out);
   std::string getConnectionString() const;
+  bool testConnection();
 
   int getLastStatus() const;
   const std::string& getLastResult() const;
+  std::string getHost();
 
   static bool checkDiff(const std::string& expect_file,
                         const std::string& result_file,
@@ -88,6 +94,8 @@ class PSQL {
                         const std::string& global_init_file = "",
                         const std::string& init_file = "");
 
+  static std::string getDifFilePath(const std::string& result_file);
+
   void resetOutput();
 
  private:
@@ -98,6 +106,12 @@ class PSQL {
   const std::string _getPSQLQueryCommand(const std::string& query) const;
   const std::string _getPSQLFileCommand(const std::string& file, bool printTupleOnly = false) const;
 
+  // Generate psql execute command
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param printTupleOnly whether only print or not
+  const std::string _getPSQLFileCommand(const std::string& file, const std::string& psqlOptions, bool printTupleOnly = false) const;
+
   std::string _dbname;
   std::string _host;
   std::string _port;
@@ -110,6 +124,31 @@ class PSQL {
   std::string _last_result;
 };
 
+
+class LibPQSQL {
+ public:
+	LibPQSQL(const std::string &connstring);
+	~LibPQSQL();
+
+	//Connect database with the input connection string
+    bool connectDb();
+
+    //Disconnect database
+    bool disconnectDb();
+
+    //Run SQL and store result in _result
+    bool execQuery(const std::string &sql,FILE* outputfile = NULL);
+
+    //Output result to sqlfile
+    void printQueryResult(FILE* outputfile);
+
+    int32_t getRownum();
+
+ private:
+    PGconn* libpqconn;
+    std::string connstr;
+    PSQLQueryResult _result;
+};
 } // namespace test
 } // namespace hawq
 
diff --git a/src/test/feature/lib/sql_util.cpp b/src/test/feature/lib/sql_util.cpp
index 07071f4..1dbef13 100644
--- a/src/test/feature/lib/sql_util.cpp
+++ b/src/test/feature/lib/sql_util.cpp
@@ -21,8 +21,11 @@
 #include <unistd.h>
 
 #include <fstream>
+#include <iomanip>
+#include <regex>
 #include <vector>
 
+#include "hawq_config.h"
 #include "sql_util.h"
 #include "string_util.h"
 
@@ -41,83 +44,153 @@ namespace test {
 SQLUtility::SQLUtility(SQLUtilityMode mode)
     : testRootPath(getTestRootPath()),
       test_info(::testing::UnitTest::GetInstance()->current_test_info()) {
-  auto getConnection = [&] () {
+  auto getConnection = [&]() {
     string user = HAWQ_USER;
-    if(user.empty()) {
+    if (user.empty()) {
       struct passwd *pw;
       uid_t uid = geteuid();
       pw = getpwuid(uid);
       user.assign(pw->pw_name);
     }
-    conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user, HAWQ_PASSWORD));
+    conn.reset(new hawq::test::PSQL(HAWQ_DB, HAWQ_HOST, HAWQ_PORT, user,
+                                    HAWQ_PASSWORD));
   };
   getConnection();
 
-  if (mode == MODE_SCHEMA) {
+  if (mode == MODE_SCHEMA || mode == MODE_SCHEMA_NODROP ||
+      mode == MODE_MADLIB) {
+    schemaName = string(test_info->test_case_name()) + "_" + test_info->name();
+    databaseName = HAWQ_DB;
+    sql_util_mode = mode;
+    if (mode == MODE_SCHEMA || mode == MODE_MADLIB) {
+      exec("DROP SCHEMA IF EXISTS " + schemaName + " CASCADE");
+      exec("CREATE SCHEMA " + schemaName);
+    } else {
+      execIgnore("CREATE SCHEMA " + schemaName);
+    }
+  } else if (mode == MODE_MADLIB) {
     schemaName = string(test_info->test_case_name()) + "_" + test_info->name();
     databaseName = HAWQ_DB;
     exec("DROP SCHEMA IF EXISTS " + schemaName + " CASCADE");
     exec("CREATE SCHEMA " + schemaName);
-    sql_util_mode = MODE_SCHEMA;
+
   } else {
     schemaName = HAWQ_DEFAULT_SCHEMA;
-    databaseName = "db_" + string(test_info->test_case_name()) + "_" + test_info->name();
-    std::transform(databaseName.begin(), databaseName.end(), databaseName.begin(), ::tolower);
+    databaseName =
+        "db_" + string(test_info->test_case_name()) + "_" + test_info->name();
+    std::transform(databaseName.begin(), databaseName.end(),
+                   databaseName.begin(), ::tolower);
     exec("DROP DATABASE IF EXISTS " + databaseName);
     exec("CREATE DATABASE " + databaseName);
     sql_util_mode = MODE_DATABASE;
   }
 }
 
+SQLUtility::SQLUtility(const std::string &db, const std::string &host,
+                       const std::string &port, const std::string &user)
+    : testRootPath(getTestRootPath()),
+      test_info(::testing::UnitTest::GetInstance()->current_test_info()) {
+  conn.reset(new hawq::test::PSQL(db, host, port, user, HAWQ_PASSWORD));
+  schemaName = HAWQ_DEFAULT_SCHEMA;
+  databaseName = db;
+  sql_util_mode = MODE_SCHEMA_NODROP;
+}
+
 SQLUtility::~SQLUtility() {
   if (!test_info->result()->Failed()) {
-
-	//--------------------------------------------------------------------------
-	// This is a temporary work around to sleep a short time window in order to
-	// wait for the quit of query dispatcher processes. Because each query
-	// dispatcher has one resource heart-beat thread to be joined before the
-	// exit, in worst case, that thread will sleep 100ms and consequently check
-	// the switch variable to complete the exiting logic. This may causes the
-	// error reporting that the database is still accessed by other users, when
-	// user drops database once finished using database.
-	//
-	// When we have that exit logic improved, we can remove this logic.
-	//--------------------------------------------------------------------------
+    //--------------------------------------------------------------------------
+    // This is a temporary work around to sleep a short time window in order to
+    // wait for the quit of query dispatcher processes. Because each query
+    // dispatcher has one resource heart-beat thread to be joined before the
+    // exit, in worst case, that thread will sleep 100ms and consequently check
+    // the switch variable to complete the exiting logic. This may causes the
+    // error reporting that the database is still accessed by other users, when
+    // user drops database once finished using database.
+    //
+    // When we have that exit logic improved, we can remove this logic.
+    //--------------------------------------------------------------------------
 
     usleep(200000);
-    if (schemaName != HAWQ_DEFAULT_SCHEMA) {
+    if (schemaName != HAWQ_DEFAULT_SCHEMA &&
+        sql_util_mode != MODE_SCHEMA_NODROP) {
       exec("DROP SCHEMA " + schemaName + " CASCADE");
     }
 
-    if (sql_util_mode ==  MODE_DATABASE) {
+    if (sql_util_mode == MODE_DATABASE) {
       exec("DROP DATABASE " + databaseName);
     }
   }
 }
 
-std::string SQLUtility::getDbName() {
-    return databaseName;
+std::string SQLUtility::getStrCurTime() {
+  time_t secs = time(NULL);
+  struct tm *tm_cur = localtime(&secs);
+  std::string str_time = std::to_string(tm_cur->tm_year + 1900) + "-" +
+                         std::to_string(tm_cur->tm_mon + 1) + "-" +
+                         std::to_string(tm_cur->tm_mday) + " " +
+                         std::to_string(tm_cur->tm_hour) + ":" +
+                         std::to_string(tm_cur->tm_min) + ":" +
+                         std::to_string(tm_cur->tm_sec);
+  return str_time;
 }
 
-std::string SQLUtility::getSchemaName() {
-    return schemaName;
+std::ostream &SQLUtility::time_log() {
+  std::cout.flags(std::ios::left);
+  std::cout << std::setw(18) << SQLUtility::getStrCurTime() << " => ";
+  return std::cout;
 }
 
+std::string SQLUtility::getHost() { return conn->getHost(); }
+
+std::string SQLUtility::getDbName() { return databaseName; }
+
+std::string SQLUtility::getSchemaName() { return schemaName; }
+
+void SQLUtility::setSchemaName(const std::string &name, bool isSwitch) {
+  schemaName = name;
+  if (!isSwitch) {
+    assert(name != HAWQ_DEFAULT_SCHEMA);
+    if (sql_util_mode != MODE_SCHEMA_NODROP) {
+      exec("DROP SCHEMA IF EXISTS " + schemaName + " CASCADE");
+      exec("CREATE SCHEMA " + schemaName);
+    } else {
+      execIgnore("CREATE SCHEMA " + schemaName);
+    }
+  }
+}
+
+std::string SQLUtility::getVersion() {
+  return getQueryResult("select version();");
+}
+
+// Set Host of connection value
+void SQLUtility::setHost(const std::string &host) { conn->setHost(host); }
+
+void SQLUtility::setDatabase(const std::string &db) { conn->setDatabase(db); }
+
 void SQLUtility::exec(const string &sql) {
   EXPECT_EQ(0, (conn->runSQLCommand(sql)).getLastStatus())
       << conn->getLastResult();
 }
 
+void SQLUtility::execIgnore(const string &sql) { conn->runSQLCommand(sql); }
+
 string SQLUtility::execute(const string &sql, bool check) {
   conn->runSQLCommand("SET SEARCH_PATH=" + schemaName + ";" + sql);
   EXPECT_NE(conn.get(), nullptr);
   if (check) {
-    EXPECT_EQ(0, conn->getLastStatus()) << conn->getLastResult();
+    EXPECT_EQ(0, conn->getLastStatus()) << sql << " " << conn->getLastResult();
     return "";
   }
   return conn.get()->getLastResult();
 }
 
+bool SQLUtility::executeSql(const std::string &sql) {
+  conn->runSQLCommand("SET SEARCH_PATH=" + schemaName + ";" + sql);
+  EXPECT_NE(conn.get(), nullptr);
+  return !conn->getLastStatus();
+}
+
 void SQLUtility::executeExpectErrorMsgStartWith(const std::string &sql,
                                                 const std::string &errmsg) {
   std::string errout = execute(sql, false);
@@ -147,13 +220,51 @@ void SQLUtility::query(const string &sql, const string &expectStr) {
   EXPECT_EQ(expectStr, resultStr);
 }
 
-void SQLUtility::execSQLFile(const string &sqlFile,
-                             const string &ansFile,
-                             const string &initFile,
-                             bool usingDefaultSchema,
+bool SQLUtility::checkAnsFile(const string &ansFileAbsPath,
+                              const string &outFileAbsPath,
+                              const string &initFile, const string &newSqlFile,
+                              FilePath &fp) {
+  // initFile if any
+  string initFileAbsPath;
+  if (!initFile.empty()) {
+    initFileAbsPath = testRootPath + "/" + initFile;
+    if (!std::ifstream(initFileAbsPath)) {
+      EXPECT_TRUE(false) << initFileAbsPath << " doesn't exist";
+      return false;
+    }
+    fp = splitFilePath(initFileAbsPath);
+    // double check to avoid empty fileBaseName
+    if (fp.fileBaseName.empty()) {
+      EXPECT_TRUE(false) << initFileAbsPath << " is invalid";
+      return false;
+    }
+  } else {
+    initFileAbsPath = "";
+  }
+
+  string globalInitFileAbsPath;
+  globalInitFileAbsPath = testRootPath + "/lib/global_init_file";
+
+  bool is_sql_ans_diff =
+      conn->checkDiff(ansFileAbsPath, outFileAbsPath, true,
+                      globalInitFileAbsPath, initFileAbsPath);
+  if (is_sql_ans_diff == false) {
+    // no diff, continue to delete the generated sql file
+    if (!newSqlFile.empty() && remove(newSqlFile.c_str())) {
+      EXPECT_TRUE(false) << "Error deleting file " << newSqlFile;
+      return false;
+    }
+    return true;
+  } else {
+    EXPECT_FALSE(is_sql_ans_diff) << conn->getDifFilePath(outFileAbsPath);
+    return false;
+  }
+}
+
+void SQLUtility::execSQLFile(const string &sqlFile, const string &ansFile,
+                             const string &initFile, bool usingDefaultSchema,
                              bool printTupleOnly) {
   FilePath fp;
-
   // do precheck for sqlFile & ansFile
   if (hawq::test::startsWith(sqlFile, "/") ||
       hawq::test::startsWith(ansFile, "/"))
@@ -177,42 +288,74 @@ void SQLUtility::execSQLFile(const string &sqlFile,
   conn->resetOutput();
 
   // initFile if any
-  string initFileAbsPath;
-  if (!initFile.empty()) {
-    initFileAbsPath = testRootPath + "/" + initFile;
-    if (!std::ifstream(initFileAbsPath))
-      ASSERT_TRUE(false) << initFileAbsPath << " doesn't exist";
-    fp = splitFilePath(initFileAbsPath);
-    // double check to avoid empty fileBaseName
-    if (fp.fileBaseName.empty())
-      ASSERT_TRUE(false) << initFileAbsPath << " is invalid";
-  } else {
-    initFileAbsPath = "";
+  checkAnsFile(ansFileAbsPath, outFileAbsPath, initFile, newSqlFile, fp);
+}
+
+void SQLUtility::execSQLFileandCheck(const string &sqlFile,
+                                     const string &outFile,
+                                     const std::string &pattern,
+                                     const std::string &sqlOptions) {
+  // do precheck for sqlFile
+  if (hawq::test::startsWith(sqlFile, "/")) {
+    ASSERT_TRUE(false)
+        << "For sqlFile, relative path to feature test root dir is needed";
   }
+  string sqlFileAbsPath = testRootPath + "/" + sqlFile;
+  FilePath fp = splitFilePath(sqlFileAbsPath);
+  string outFileAbsPath = testRootPath + "/" + outFile;
 
-  string globalInitFileAbsPath;
-  globalInitFileAbsPath = testRootPath + "/lib/global_init_file";
+  if (fp.fileBaseName.empty()) ASSERT_TRUE(false) << sqlFile << " is invalid";
 
-  bool is_sql_ans_diff = conn->checkDiff(ansFileAbsPath, outFileAbsPath, true, globalInitFileAbsPath, initFileAbsPath);
-  EXPECT_FALSE(is_sql_ans_diff);
-  if (is_sql_ans_diff == false) {
-    // no diff, continue to delete the generated sql file
-    if (remove(newSqlFile.c_str()))
-      ASSERT_TRUE(false) << "Error deleting file " << newSqlFile;
-  } else {
-    EXPECT_FALSE(true);
+  // generate new sql file with set search_path added at the begining
+  const string newSqlFile = generateSQLFile(sqlFile, false);
+  conn->setOutputFile(outFileAbsPath);
+
+  EXPECT_EQ(0, conn->runSQLFile(newSqlFile, sqlOptions).getLastStatus());
+  conn->resetOutput();
+  bool result = this->checkPatternInFile(outFile, pattern);
+  ASSERT_TRUE(result) << outFile << " find " << pattern;
+}
+
+bool SQLUtility::checkPatternInFile(const std::string &outFile,
+                                    const std::string &pattern) {
+  if (hawq::test::startsWith(outFile, "/")) {
+    std::cout << "ERROR : outfile format is error ";
+    return false;
   }
+  string outFileAbsPath = getTestRootPath() + "/" + outFile;
+  std::ifstream fin(outFileAbsPath);
+  std::string line;
+  bool result = true;
+  while (getline(fin, line) && result) {
+    std::regex re(pattern, std::regex_constants::icase);
+    if (std::regex_search(line, re)) {
+      result = false;
+      break;
+    }
+  }
+  fin.close();
+  return result;
+}
+
+bool SQLUtility::execAbsSQLFile(const string &sqlFile) {
+  // double check to avoid empty fileBaseName
+  FilePath fp = splitFilePath(sqlFile);
+  if (fp.fileBaseName.empty()) return false;
+  // outFile is located in the same folder with ansFile
+  string outFileAbsPath = "/tmp/" + fp.fileBaseName + ".out";
+
+  // run sql file and store its result in output file
+  conn->setOutputFile(outFileAbsPath);
+  return conn->runSQLFile(sqlFile).getLastStatus() == 0 ? true : false;
 }
 
 bool SQLUtility::execSQLFile(const string &sqlFile) {
   // do precheck for sqlFile
-  if (hawq::test::startsWith(sqlFile, "/"))
-    return false;
+  if (hawq::test::startsWith(sqlFile, "/")) return false;
 
   // double check to avoid empty fileBaseName
   FilePath fp = splitFilePath(sqlFile);
-  if (fp.fileBaseName.empty())
-    return false;
+  if (fp.fileBaseName.empty()) return false;
   // outFile is located in the same folder with ansFile
   string outFileAbsPath = "/tmp/" + fp.fileBaseName + ".out";
 
@@ -224,9 +367,290 @@ bool SQLUtility::execSQLFile(const string &sqlFile) {
   return conn->runSQLFile(newSqlFile).getLastStatus() == 0 ? true : false;
 }
 
-const string SQLUtility::generateSQLFile(const string &sqlFile, bool usingDefaultSchema) {
+int32_t SQLUtility::getErrorNumber(const string &outputFile) {
+  // do precheck for sqlFile
+  string outputAbspath;
+  if (hawq::test::startsWith(outputFile, "/"))
+    outputAbspath = outputFile;
+  else
+    outputAbspath = testRootPath + "/" + outputFile;
+  string cmdstr = "grep -i ERROR " + outputAbspath + " | wc -l";
+  string output = Command::getCommandOutput(cmdstr);
+  try {
+    return std::stoi(output);
+  } catch (...) {
+    return -1;
+  }
+}
+
+bool SQLUtility::__beforeExecSpecificSQLFile(const string &sqlFile,
+                                             const string &outputFile,
+                                             const string &sqlFilePrefix,
+                                             string &newSqlFile,
+                                             string &outputAbsPath) {
+  // do precheck for sqlFile
+  if (hawq::test::startsWith(sqlFile, "/")) return false;
+
+  // double check to avoid empty fileBaseName
+  FilePath fp = splitFilePath(sqlFile);
+  if (fp.fileBaseName.empty()) return false;
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile = generateSQLFile(sqlFile, false, sqlFilePrefix);
+
+  // outFile is located in the same folder with ansFile
+  outputAbsPath = testRootPath + "/" + outputFile;
+
+  return true;
+}
+
+bool SQLUtility::execSpecificSQLFile(const string &sqlFile,
+                                     const string &outputFile,
+                                     const string &psqlOptions,
+                                     const string &sqlFilePrefix) {
+  string newSqlFile, outputAbsPath;
+  if (!__beforeExecSpecificSQLFile(sqlFile, outputFile, sqlFilePrefix,
+                                   newSqlFile, outputAbsPath))
+    return false;
+
+  conn->setOutputFile(outputAbsPath);
+
+  bool result = conn->runSQLFile(newSqlFile, psqlOptions).getLastStatus() == 0
+                    ? true
+                    : false;
+  conn->resetOutput();
+  return result;
+}
+
+bool SQLUtility::__beforeBoolExecAppendSQLFile(const string &sqlFile,
+                                               const string &outputFile,
+                                               const string &sqlFilePrefix,
+                                               const string &appendString,
+                                               string &newSqlFile) {
+  // do precheck for sqlFile
+  if (hawq::test::startsWith(sqlFile, "/")) return false;
+
+  // double check to avoid empty fileBaseName
+  FilePath fp = splitFilePath(sqlFile);
+  if (fp.fileBaseName.empty()) return false;
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile = generateSQLFile(sqlFile, false, sqlFilePrefix, appendString);
+
+  // outFile is located in the same folder with ansFile
+  std::string outputAbsPath = testRootPath + "/" + outputFile;
+  conn->setOutputFile(outputAbsPath);
+  return true;
+}
+
+bool SQLUtility::execAppendSQLFile(const string &sqlFile,
+                                   const string &outputFile,
+                                   const string &psqlOptions,
+                                   const string &sqlFilePrefix,
+                                   const string &appendString) {
+  std::string newSqlFile;
+  if (!__beforeBoolExecAppendSQLFile(sqlFile, outputFile, sqlFilePrefix,
+                                     appendString, newSqlFile))
+    return false;
+
+  bool result = conn->runSQLFile(newSqlFile, psqlOptions).getLastStatus() == 0
+                    ? true
+                    : false;
+  conn->resetOutput();
+  return result;
+}
+
+void SQLUtility::__beforeRunSqlfile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &sqlFilePrefix, const string &appendString,
+    bool usingDefaultSchema, FilePath &fp, std::string &ansFileAbsPath,
+    std::string &newSqlFile, std::string &outputAbsPath) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  ansFileAbsPath = testRootPath + "/" + ansFile;
+  if (!std::ifstream(ansFileAbsPath))
+    ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+  fp = splitFilePath(ansFileAbsPath);
+  // double check to avoid empty fileBaseName
+  if (fp.fileBaseName.empty())
+    ASSERT_TRUE(false) << ansFileAbsPath << " is invalid";
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile =
+      generateSQLFile(sqlFile, usingDefaultSchema, sqlFilePrefix, appendString);
+
+  // outFile is located in the same folder with ansFile
+  outputAbsPath = testRootPath + "/" + outputFile;
+}
+
+bool SQLUtility::__afterRunSqlfile(const std::string &initFile,
+                                   bool sortoutfile, FilePath &fp,
+                                   std::string &ansFileAbsPath,
+                                   std::string &outputAbsPath,
+                                   std::string &newSqlFile) {
+  if (sortoutfile == false)
+    return checkAnsFile(ansFileAbsPath, outputAbsPath, initFile, newSqlFile,
+                        fp);
+  else {
+    hawq::test::Command comm;
+    std::string command;
+    fp = splitFilePath(outputAbsPath);
+    if (fp.fileBaseName.empty()) {
+      EXPECT_TRUE(false) << outputAbsPath << " is invalid";
+      return false;
+    }
+    std::string sortoutFileAbsPath =
+        fp.path + "/" + fp.fileBaseName + "_sort.out";
+    command = "sort " + outputAbsPath + " > " + sortoutFileAbsPath;
+    int result = comm.getCommandStatus(command);
+    EXPECT_TRUE(result != -1);
+    return checkAnsFile(ansFileAbsPath, sortoutFileAbsPath, initFile, "", fp);
+  }
+}
+
+void SQLUtility::execAppendSQLFile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &psqlOptions, const string &sqlFilePrefix,
+    const string &appendString, const string &initFile, bool usingDefaultSchema,
+    bool printTupleOnly, bool sortoutfile) {
+  FilePath fp;
+  std::string ansFileAbsPath, newSqlFile, outputAbsPath;
+  __beforeRunSqlfile(sqlFile, outputFile, ansFile, sqlFilePrefix, appendString,
+                     usingDefaultSchema, fp, ansFileAbsPath, newSqlFile,
+                     outputAbsPath);
+  conn->setOutputFile(outputAbsPath);
+  EXPECT_EQ(0, conn->runSQLFile(newSqlFile, psqlOptions, printTupleOnly)
+                   .getLastStatus());
+  conn->resetOutput();
+  __afterRunSqlfile(initFile, sortoutfile, fp, ansFileAbsPath, outputAbsPath,
+                    newSqlFile);
+}
+
+void SQLUtility::runParallelSQLFile(const string sqlFile,
+                                    const string outputFile, int processnum,
+                                    const string ansFile,
+                                    bool usingDefaultSchema) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  // generate new sql file with set search_path added at the begining
+  const string newSqlFile = generateSQLFile(sqlFile, usingDefaultSchema);
+  pid_t childpid;
+  std::vector<pid_t> childprocess;
+  for (int i = 0; i < processnum; i++) {
+    childpid = fork();
+    if (childpid == -1) ASSERT_TRUE(false) << "Fork child process error";
+    if (childpid == 0) {
+      auto filename = hawq::test::split(outputFile, '.');
+      string outFileAbsPath = testRootPath + "/" + filename[0] + "_process_" +
+                              std::to_string(i) + ".out";
+      conn->setOutputFile(outFileAbsPath);
+      EXPECT_TRUE(conn->runSQLFile(newSqlFile).getLastStatus() == 0 ? true
+                                                                    : false);
+      exit(0);
+    } else {
+      childprocess.push_back(childpid);
+    }
+  }
+  while (childprocess.size() > 0) {
+    int length = childprocess.size();
+    pid_t pid = waitpid(childprocess[length - 1], NULL, 0);
+    if (pid == childprocess[length - 1]) childprocess.pop_back();
+  }
+}
+
+void SQLUtility::runParallelSQLFile(const string sqlFile,
+                                    const string outputFile, int processnum,
+                                    std::vector<std::string> &options,
+                                    const string ansFile,
+                                    bool usingDefaultSchema) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  // generate new sql file with set search_path added at the begining
+  const string newSqlFile = generateSQLFile(sqlFile, usingDefaultSchema);
+  pid_t childpid;
+  std::vector<pid_t> childprocess;
+  for (int i = 0; i < processnum; i++) {
+    childpid = fork();
+    if (childpid == -1) ASSERT_TRUE(false) << "Fork child process error";
+    if (childpid == 0) {
+      auto filename = hawq::test::split(outputFile, '.');
+      string outFileAbsPath = testRootPath + "/" + filename[0] + "_process_" +
+                              std::to_string(i) + ".out";
+      conn->setOutputFile(outFileAbsPath);
+      EXPECT_TRUE(conn->runSQLFile(newSqlFile, options[i]).getLastStatus() == 0
+                      ? true
+                      : false);
+      exit(0);
+    } else {
+      childprocess.push_back(childpid);
+    }
+  }
+  while (childprocess.size() > 0) {
+    int length = childprocess.size();
+    pid_t pid = waitpid(childprocess[length - 1], NULL, 0);
+    if (pid == childprocess[length - 1]) childprocess.pop_back();
+  }
+}
+
+void SQLUtility::__beforeExecSpecificSQLFile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &sqlFilePrefix, bool usingDefaultSchema, FilePath &fp,
+    string &ansFileAbsPath, string &newSqlFile, string &outputAbsPath) {
+  // do precheck for sqlFile & ansFile
+  if (hawq::test::startsWith(sqlFile, "/") ||
+      hawq::test::startsWith(ansFile, "/"))
+    ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                          "test root dir is needed";
+  ansFileAbsPath = testRootPath + "/" + ansFile;
+  if (!std::ifstream(ansFileAbsPath))
+    ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+  fp = splitFilePath(ansFileAbsPath);
+  // double check to avoid empty fileBaseName
+  if (fp.fileBaseName.empty())
+    ASSERT_TRUE(false) << ansFileAbsPath << " is invalid";
+
+  // generate new sql file with set search_path added at the begining
+  newSqlFile = generateSQLFile(sqlFile, usingDefaultSchema, sqlFilePrefix);
+
+  // outFile is located in the same folder with ansFile
+  outputAbsPath = testRootPath + "/" + outputFile;
+}
+
+void SQLUtility::execSpecificSQLFile(
+    const string &sqlFile, const string &outputFile, const string &ansFile,
+    const string &psqlOptions, const string &sqlFilePrefix,
+    const string &initFile, bool usingDefaultSchema, bool printTupleOnly) {
+  FilePath fp;
+  std::string ansFileAbsPath, newSqlFile, outputAbsPath;
+
+  __beforeExecSpecificSQLFile(sqlFile, outputFile, ansFile, sqlFilePrefix,
+                              usingDefaultSchema, fp, ansFileAbsPath,
+                              newSqlFile, outputAbsPath);
+
+  conn->setOutputFile(outputAbsPath);
+  EXPECT_EQ(0, conn->runSQLFile(newSqlFile, psqlOptions, printTupleOnly)
+                   .getLastStatus());
+  conn->resetOutput();
+
+  checkAnsFile(ansFileAbsPath, outputAbsPath, initFile, newSqlFile, fp);
+}
+
+const string SQLUtility::generateSQLFile(const string &sqlFile,
+                                         bool usingDefaultSchema,
+                                         const string &sqlFileSurfix,
+                                         const string &appendString) {
   const string originSqlFile = testRootPath + "/" + sqlFile;
-  const string newSqlFile = "/tmp/" + string(test_info->test_case_name()) + "_" + test_info->name() + ".sql";
+  const string newSqlFile = "/tmp/" + string(test_info->test_case_name()) +
+                            "_" + test_info->name() + sqlFileSurfix + ".sql";
   std::fstream in;
   in.open(originSqlFile, std::ios::in);
   if (!in.is_open()) {
@@ -238,13 +662,18 @@ const string SQLUtility::generateSQLFile(const string &sqlFile, bool usingDefaul
     EXPECT_TRUE(false) << "Error opening file " << newSqlFile;
   }
   out << "-- start_ignore" << std::endl;
-  if (!usingDefaultSchema) {
-      out << "SET SEARCH_PATH=" + schemaName + ";" << std::endl;
+  if (sql_util_mode == MODE_MADLIB) {
+    out << "SET SEARCH_PATH=" << schemaName << ",madlib;" << std::endl;
+  } else if (!usingDefaultSchema) {
+    out << "SET SEARCH_PATH=" + schemaName + ";" << std::endl;
   }
-  if (sql_util_mode ==  MODE_DATABASE) {
+  if (sql_util_mode == MODE_DATABASE) {
     out << "\\c " << databaseName << std::endl;
   }
   out << "-- end_ignore" << std::endl;
+
+  if (appendString.size() > 0) out << appendString << "\n";
+
   string line;
   while (getline(in, line)) {
     out << line << std::endl;
@@ -260,9 +689,60 @@ const hawq::test::PSQLQueryResult &SQLUtility::executeQuery(const string &sql) {
   return result;
 }
 
+std::string SQLUtility::getHawqDfsURL() {
+  std::string url = this->getGUCValue("hawq_dfs_url");
+  if (url[url.size() - 1] != '/') {
+    url += "/";
+  }
+  return url;
+}
+
+bool SQLUtility::isDarwin() {
+  std::string output = Command::getCommandOutput("uname | grep Darwin | wc -l");
+  int num = atoi(output.c_str());
+  return num != 0;
+}
+
+std::string SQLUtility::getHawqMagmaURL() {
+  hawq::test::HawqConfig hc;
+  std::vector<std::string> segs;
+  hc.getSlaves(segs);
+
+  if (segs.empty()) {
+    return "";
+  } else {
+    // use magma service on first segment host specified in $GPHOME/etc/slaves
+    return segs[0] + ":" + this->getGUCValue("hawq_magma_port_segment");
+  }
+}
+
+std::string SQLUtility::getHawqDfsHost() {
+  string url = this->getGUCValue("hawq_dfs_url");
+  std::size_t pos = url.find("/");
+  std::string result = url.substr(0, pos);
+  return result;
+}
+
+std::string SQLUtility::getHdfsPath() {
+  string url = this->getGUCValue("hawq_dfs_url");
+  std::size_t found = url.find("/");
+  std::string result = url.substr(found);
+  if (result[result.size() - 1] != '/') {
+    result += "/";
+  }
+  return result;
+}
+
+std::string SQLUtility::getHdfsNamenode() {
+  string url = this->getGUCValue("hawq_dfs_url");
+  std::size_t found = url.find("/");
+  std::string result = url.substr(0, found + 1);
+  return result;
+}
+
 const hawq::test::PSQL *SQLUtility::getPSQL() const { return conn.get(); }
 
-string SQLUtility::getTestRootPath() const {
+string SQLUtility::getTestRootPath() {
   string result;
 #ifdef __linux__
   char pathbuf[PATH_MAX];
@@ -294,19 +774,22 @@ std::string SQLUtility::getGUCValue(const std::string &guc) {
   string sql = "show " + guc;
   const hawq::test::PSQLQueryResult &result = executeQuery(sql);
   EXPECT_EQ(result.rowCount(), 1);
-  std::vector<std::string> row = result.getRows()[0];
-  return row[0];
+  if (result.rowCount() == 1) {
+    std::vector<std::string> row = result.getRows()[0];
+    return row[0];
+  }
+  return "";
 }
 
-std::string SQLUtility::getQueryResult(const std::string &query) {
+std::string SQLUtility::getQueryResult(const std::string &query, bool check) {
   const hawq::test::PSQLQueryResult &result = executeQuery(query);
-  EXPECT_LE(result.rowCount(), 1);
   std::string value;
-  if (result.rowCount() == 1)
-  {
+  if (check) EXPECT_LE(result.rowCount(), 1);
+  if (result.rowCount() == 1) {
     value = result.getRows()[0][0];
+  } else {
+    value = "";
   }
-
   return value;
 }
 
@@ -321,7 +804,7 @@ std::string SQLUtility::getQueryResultSetString(const std::string &query) {
   return resultStr;
 }
 
-FilePath SQLUtility::splitFilePath(const string &filePath) const {
+FilePath SQLUtility::splitFilePath(const string &filePath) {
   FilePath fp;
   size_t found1 = filePath.find_last_of("/");
   size_t found2 = filePath.find_last_of(".");
@@ -331,5 +814,51 @@ FilePath SQLUtility::splitFilePath(const string &filePath) const {
   return fp;
 }
 
-} // namespace test
-} // namespace hawq
+void SQLUtility::runParallelSQLFile_sqllist(
+    std::vector<std::string> &sqlfiles, std::vector<std::string> &ansfiles) {
+  int len = sqlfiles.size();
+  int anslen = ansfiles.size();
+  int i;
+  FilePath fp;
+  std::string sqlFileAbsPath, ansFileAbsPath, outFileAbsPath;
+  // do precheck for sqlFile & ansFile
+  if (len != anslen)
+    ASSERT_TRUE(false)
+        << "The number of sqlfile is not equal to the number of ansfile";
+  for (i = 0; i < len; i++) {
+    if (hawq::test::startsWith(sqlfiles[i], "/") ||
+        hawq::test::startsWith(ansfiles[i], "/"))
+      ASSERT_TRUE(false) << "For sqlFile and ansFile, relative path to feature "
+                            "test root dir is needed";
+    ansFileAbsPath = testRootPath + "/" + ansfiles[i];
+    if (!std::ifstream(ansFileAbsPath))
+      ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+  }
+  pid_t childpid;
+  std::vector<pid_t> childprocess;
+  for (i = 0; i < len; i++) {
+    childpid = fork();
+    if (childpid == -1) ASSERT_TRUE(false) << "Fork child process error";
+    if (childpid == 0) {
+      sqlFileAbsPath = testRootPath + "/" + sqlfiles[i];
+      ansFileAbsPath = testRootPath + "/" + ansfiles[i];
+      fp = splitFilePath(ansFileAbsPath);
+      outFileAbsPath = fp.path + "/" + fp.fileBaseName + ".out";
+      conn->setOutputFile(outFileAbsPath);
+      EXPECT_TRUE(
+          conn->runSQLFile(sqlFileAbsPath).getLastStatus() == 0 ? true : false);
+      conn->resetOutput();
+      checkAnsFile(ansFileAbsPath, outFileAbsPath, "", "", fp);
+      exit(0);
+    } else
+      childprocess.push_back(childpid);
+  }
+  while (childprocess.size() > 0) {
+    int length = childprocess.size();
+    pid_t pid = waitpid(childprocess[length - 1], NULL, 0);
+    if (pid == childprocess[length - 1]) childprocess.pop_back();
+  }
+}
+
+}  // namespace test
+}  // namespace hawq
diff --git a/src/test/feature/lib/sql_util.h b/src/test/feature/lib/sql_util.h
index 3bdce10..748ad31 100644
--- a/src/test/feature/lib/sql_util.h
+++ b/src/test/feature/lib/sql_util.h
@@ -28,14 +28,25 @@
 #define HAWQ_DB (getenv("PGDATABASE") ? getenv("PGDATABASE") : "postgres")
 #define HAWQ_HOST (getenv("PGHOST") ? getenv("PGHOST") : "localhost")
 #define HAWQ_PORT (getenv("PGPORT") ? getenv("PGPORT") : "5432")
-#define HAWQ_USER (getenv("PGUSER") ? getenv("PGUSER") : "")
+#define HAWQ_USER (getenv("PGUSER") ? getenv("PGUSER") : getenv("USER"))
 #define HAWQ_PASSWORD (getenv("PGPASSWORD") ? getenv("PGPASSWORD") : "")
 #define HAWQ_DEFAULT_SCHEMA ("public")
 #define RANGER_HOST (getenv("RANGERHOST") ? getenv("RANGERHOST") : "localhost")
+#define KUBENET_MASTER \
+  (getenv("KUBENET_MASTER") ? getenv("KUBENET_MASTER") : "localhost")
+#define HIVE_HOST (getenv("HIVEHOST") ? getenv("HIVEHOST") : "localhost")
+#define HIVE_PORT (getenv("HIVEPORT") ? getenv("HIVEPORT") : "9083")
+#define TIME_LOG() hawq::test::SQLUtility::time_log()
+#define EXPECT_TRUE_RET_IF_FALSE(expression, errorMessage) \
+  {                                                        \
+    bool ret;                                              \
+    EXPECT_TRUE(ret = (expression)) << errorMessage;       \
+    if (!ret) return false;                                \
+  }
 
 namespace hawq {
 namespace test {
- 
+
 struct FilePath {
   std::string path;
   std::string fileBaseName;
@@ -43,15 +54,44 @@ struct FilePath {
 };
 
 enum SQLUtilityMode {
-    MODE_SCHEMA,
-    MODE_DATABASE,
-    MODE_MAX_NUM
+  MODE_SCHEMA,
+  MODE_DATABASE,
+  MODE_MAX_NUM,
+  MODE_MADLIB,
+  MODE_SCHEMA_NODROP
 };
 
+#define COST_TIME(START_TIMEB, END_TIMEB, UINT32_COST_TIME, CODES) \
+  timeb START_TIMEB, END_TIMEB;                                    \
+  ftime(&START_TIMEB);                                             \
+  CODES                                                            \
+  ftime(&END_TIMEB);                                               \
+  UINT32_COST_TIME =                                               \
+      (uint32_t)(((END_TIMEB.timezone - END_TIMEB.timezone) * 60 + \
+                  (END_TIMEB.time - START_TIMEB.time)) *           \
+                     1000 +                                        \
+                 (END_TIMEB.millitm - START_TIMEB.millitm));
+
 class SQLUtility {
  public:
   SQLUtility(SQLUtilityMode mode = MODE_SCHEMA);
+
+  SQLUtility(const std::string &db, const std::string &host = "localhost",
+             const std::string &port = "5432",
+             const std::string &user = "gpadmin");
+
   ~SQLUtility();
+  // Get the local system time
+  // @return string of local system time
+  static std::string getStrCurTime();
+
+  // print log with current local time
+  // @return std::cout
+  static std::ostream &time_log();
+
+  // Get the test Host
+  // @return string of the test Host
+  std::string getHost();
 
   // Get the test database name
   // @return string of the test database name
@@ -60,13 +100,31 @@ class SQLUtility {
   // Get the test schema name
   // @return string of the test schema name
   std::string getSchemaName();
-  
+
+  // Get the test database version
+  // @return string of test database version
+  std::string getVersion();
+
+  // Set a new test SchemaName
+  void setSchemaName(const std::string &name, bool isSwitch = false);
+
+  // test the connection is ok or not
+  bool testConnection() { return conn->testConnection(); }
+
+  // return test info of gtest
+  const ::testing::TestInfo *const getTestInfo() { return this->test_info; }
+
   // Execute sql command
   // @param sql The given sql command
   // @param check true(default) if expected correctly executing, false otherwise
   // @return error or notice message if check is false, else return empty
   std::string execute(const std::string &sql, bool check = true);
 
+  // Execute sql command
+  // @param sql The given sql command
+  // @return true if executed sql successfully, else return false
+  bool executeSql(const std::string &sql);
+
   // Execute sql command and ignore the behavior of its running status
   // @param sql The given sql command
   // @return void
@@ -87,34 +145,167 @@ class SQLUtility {
   // Execute sql file and diff with ans file
   // @param sqlFile The given sqlFile which is relative path to test root dir
   // @param ansFile The given ansFile which is relative path to test root dir
-  // @param initFile The given initFile (used by gpdiff.pl) which is relative path to test root dir
+  // @param initFile The given initFile (used by gpdiff.pl) which is relative
+  // path to test root dir
   // @return void
   void execSQLFile(const std::string &sqlFile, const std::string &ansFile,
-		  const std::string &initFile = "", bool usingDefaultSchema = false,
-		  bool printTupleOnly = false);
+                   const std::string &initFile = "",
+                   bool usingDefaultSchema = false,
+                   bool printTupleOnly = false);
 
   // Execute sql file and check its return status
   // @param sqlFile The given sqlFile which is relative path to test root dir
   // @return true if the sql file is executed successfully, false otherwise
   bool execSQLFile(const std::string &sqlFile);
 
+  // Execute sql file and check its return status
+  // @param sqlFile The given sqlFile which is absolute path
+  // @return true if the sql file is executed successfully, false otherwise
+  bool execAbsSQLFile(const std::string &sqlFile);
+
+  // Execute sql file and check its output file
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outFile The given outFile which is relatice path to test root dir
+  // @param pattern provides the keywords such as ERROR to check in output file
+  // @return true if the sql file is executed successfully, false otherwise
+  void execSQLFileandCheck(const std::string &sqlFile,
+                           const std::string &outFile,
+                           const std::string &pattern = "",
+                           const std::string &sqloptions = "");
+
+  // Execute sql file and diff with ans file
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test root
+  // dir
+  // @param ansFile The given ansFile which is relative path to test root dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param initFile The given initFile (used by gpdiff.pl) which is relative
+  // path to test root dir
+  // @return void
+  void execSpecificSQLFile(
+      const std::string &sqlFile, const std::string &outputFile,
+      const std::string &ansFile, const std::string &psqlOptions,
+      const std::string &sqlFilePrefix, const std::string &initFile = "",
+      bool usingDefaultSchema = false, bool printTupleOnly = false);
+
+  bool execSpecificSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                           const std::string &outputFile,
+                           const std::string &ansFile,
+                           const std::string &psqlOptions,
+                           const std::string &sqlFilePrefix,
+                           const std::string &initFile = "",
+                           bool usingDefaultSchema = false,
+                           bool printTupleOnly = false);
+
+  // Execute sql file and check its return status
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test root
+  // dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @return true if the sql file is executed successfully, false otherwise
+  bool execSpecificSQLFile(const std::string &sqlFile,
+                           const std::string &outputFile,
+                           const std::string &psqlOptions,
+                           const std::string &sqlFilePrefix);
+
+  bool execSpecificSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                           const std::string &outputFile,
+                           const std::string &psqlOptions,
+                           const std::string &sqlFilePrefix);
+
+  // Execute sql file with append string in the head and check its return status
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test root
+  // dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param appendString The append string such as \timing  explain analyze
+  // @return true if the sql file is executed successfully, false otherwise
+  bool execAppendSQLFile(const std::string &sqlFile,
+                         const std::string &outputFile,
+                         const std::string &psqlOptions,
+                         const std::string &sqlFilePrefix,
+                         const std::string &appendString);
+
+  bool execAppendSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                         const std::string &outputFile,
+                         const std::string &psqlOptions,
+                         const std::string &sqlFilePrefix,
+                         const std::string &appendString);
+  // Execute sql file and diff with ans file
+  // @param sqlFile The given sqlFile which is relative path to test root dir
+  // @param outputFile The given output file which is relative path to test root
+  // dir
+  // @param ansFile The given ansFile which is relative path to test root dir
+  // @param psqlOptions The psql options such as -v TABLENAME="test"
+  // @param appendString The append string such as \timing  explain analyze
+  // @param initFile The given initFile (used by gpdiff.pl) which is relative
+  // path to test root dir
+  // @param sortoutfile new executor do not support orderby sometimes check
+  // outfile need to sort it before
+  // @return void
+  void execAppendSQLFile(
+      const std::string &sqlFile, const std::string &outputFile,
+      const std::string &ansFile, const std::string &psqlOptions,
+      const std::string &sqlFilePrefix, const std::string &appendString,
+      const std::string &initFile = "", bool usingDefaultSchema = false,
+      bool printTupleOnly = false, bool sortoutfile = false);
+
+  bool execAppendSQLFile(uint32_t &costTime, const std::string &sqlFile,
+                         const std::string &outputFile,
+                         const std::string &ansFile,
+                         const std::string &psqlOptions,
+                         const std::string &sqlFilePrefix,
+                         const std::string &appendString,
+                         const std::string &initFile = "",
+                         bool usingDefaultSchema = false,
+                         bool printTupleOnly = false, bool sortoutfile = false);
+
+  void runParallelSQLFile(const std::string sqlFile,
+                          const std::string outputFile, int processnum,
+                          const std::string ansFile = "",
+                          bool usingDefaultSchema = false);
+
+  void runParallelSQLFile(const std::string sqlFile,
+                          const std::string outputFile, int processnum,
+                          std::vector<std::string> &options,
+                          const std::string ansFile = "",
+                          bool usingDefaultSchema = false);
+
+  // Return the number of error in the output file
+  // @param filename or matched string such as path/*.out
+  int32_t getErrorNumber(const std::string &outputFile);
+
+  // check error in the file
+  // @param outFile The given outFile which is relatice path to test root dir
+  // @param pattern provides the keywords such as ERROR to check in output file
+  // @return true if the sql file is executed successfully, false otherwis
+  static bool checkPatternInFile(const std::string &outFile,
+                                 const std::string &pattern);
+
   // Get PSQL connection: do not suggest to use
   // @return PSQL raw pointer
   const hawq::test::PSQL *getPSQL() const;
 
   // Get test root dir abs path
   // @return path string
-  std::string getTestRootPath() const;
+  static std::string getTestRootPath();
 
   // Set GUC value
   void setGUCValue(const std::string &guc, const std::string &value);
+
+  // Set Host of connection value
+  void setHost(const std::string &host);
+
+  // Set Database of connection value
+  void setDatabase(const std::string &db);
+
   // Get GUC value
   std::string getGUCValue(const std::string &guc);
 
   // execute given query and return query result
   // @param query the given query
   // @return the query result
-  std::string getQueryResult(const std::string &query);
+  std::string getQueryResult(const std::string &query, bool check = true);
 
   std::string getQueryResultSetString(const std::string &query);
 
@@ -122,26 +313,94 @@ class SQLUtility {
   // @param sql the given sql command
   // @param errmsg the expected sql error message
   // @return void
-  void executeExpectErrorMsgStartWith(const std::string &sql, const std::string &errmsg);
+  void executeExpectErrorMsgStartWith(const std::string &sql,
+                                      const std::string &errmsg);
 
   const hawq::test::PSQLQueryResult &executeQuery(const std::string &sql);
 
+  // return hdfs path : hawq_default
+  std::string getHdfsPath();
+
+  // return hdfs namenode and port: localhost:8020
+  std::string getHdfsNamenode();
+
+  // return hdfs dfs url: localhost:8020/hawq_default
+  std::string getHawqDfsURL();
+
+  // return hdfs dfs host: localhost:8020
+  std::string getHawqDfsHost();
+
+  // return magma service url: hostname:50001
+  std::string getHawqMagmaURL();
+  void runParallelSQLFile_sqllist(std::vector<std::string> &sqlfile,
+                                  std::vector<std::string> &ansfile);
+  // return true if OS is Darwin
+  bool isDarwin();
+
  private:
   std::unique_ptr<hawq::test::PSQL> getConnection();
-  const std::string generateSQLFile(const std::string &sqlFile, bool usingDefaultSchema);
-  FilePath splitFilePath(const std::string &filePath) const;
+  const std::string generateSQLFile(const std::string &sqlFile,
+                                    bool usingDefaultSchema,
+                                    const std::string &sqlFileSurfix = "",
+                                    const std::string &appendString = "");
+
+  bool __beforeBoolExecAppendSQLFile(const std::string &sqlFile,
+                                     const std::string &outputFile,
+                                     const std::string &sqlFilePrefix,
+                                     const std::string &appendString,
+                                     std::string &newSqlFile);
+
+  void __beforeRunSqlfile(const std::string &sqlFile,
+                          const std::string &outputFile,
+                          const std::string &ansFile,
+                          const std::string &sqlFilePrefix,
+                          const std::string &appendString,
+                          bool usingDefaultSchema, FilePath &fp,
+                          std::string &ansFileAbsPath, std::string &newSqlFile,
+                          std::string &outputAbsPath);
+
+  bool __afterRunSqlfile(const std::string &initFile, bool sortoutfile,
+                         FilePath &fp, std::string &ansFileAbsPath,
+                         std::string &outputAbsPath, std::string &newSqlFile);
+
+  bool __beforeExecSpecificSQLFile(const std::string &sqlFile,
+                                   const std::string &outputFile,
+                                   const std::string &sqlFilePrefix,
+                                   std::string &newSqlFile,
+                                   std::string &outputAbsPath);
+
+  void __beforeExecSpecificSQLFile(
+      const std::string &sqlFile, const std::string &outputFile,
+      const std::string &ansFile, const std::string &sqlFilePrefix,
+      bool usingDefaultSchema, FilePath &fp, std::string &ansFileAbsPath,
+      std::string &newSqlFile, std::string &outputAbsPath);
+
   void exec(const std::string &sql);
 
+  void execIgnore(const std::string &sql);
+
+ public:
+  static FilePath splitFilePath(const std::string &filePath);
+  bool checkAnsFile(const std::string &ansFileAbsPath,
+                    const std::string &outFileAbsPath,
+                    const std::string &initFile, const std::string &newSqlFile,
+                    FilePath &fp);
+
+  std::string getConnectionString() { return conn->getConnectionString(); }
+
+ protected:
+  std::string testRootPath;
+  const ::testing::TestInfo *const test_info;
+
  private:
-  std::string schemaName;
-  std::string databaseName;
   SQLUtilityMode sql_util_mode;
   std::unique_ptr<hawq::test::PSQL> conn;
-  std::string testRootPath;
-  const ::testing::TestInfo *const test_info;
-}; // class SQLUtility
+  std::string databaseName;
+  std::string schemaName;
+
+};  // class SQLUtility
 
-} // namespace test
-} // namespace hawq
+}  // namespace test
+}  // namespace hawq
 
 #endif  // SRC_TEST_FEATURE_LIB_SQL_UTIL_H_
diff --git a/src/test/feature/lib/sql_util_parallel.cpp b/src/test/feature/lib/sql_util_parallel.cpp
new file mode 100644
index 0000000..1a5665b
--- /dev/null
+++ b/src/test/feature/lib/sql_util_parallel.cpp
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+#include <fstream>
+#include <stdio.h>
+#include "sql_util_parallel.h"
+#include "string_util.h"
+
+
+
+using std::string;
+
+namespace hawq {
+namespace test {
+ThreadMessage::ThreadMessage(uint16_t tid)
+{
+    command = NULLCOMMAND;
+    messageinfo = "";
+    isrunning = false;
+    thread_id = tid;
+    waittids.clear();
+    waitfortids.clear();
+    submit = 1 ; //commit
+}
+
+ThreadMessage::~ThreadMessage()
+{
+    waittids.clear();
+    waitfortids.clear();
+}
+
+SQLUtilityParallel::SQLUtilityParallel() {
+    outputstream = NULL;
+    exclusive = false;
+    srand((unsigned)time(NULL));
+}
+
+SQLUtilityParallel::~SQLUtilityParallel() {
+}
+
+int32_t SQLUtilityParallel::__getTotalSession(std::fstream &in) {
+    string line;
+    int sessionnum = 0;
+    while (getline(in, line))
+    {
+        if (hawq::test::startsWith(line, "#"))
+            continue;
+        if (!hawq::test::startsWith(hawq::test::lower(line), "total"))
+        {
+            EXPECT_TRUE(false) << "The file format is error, "
+                    "it should begin with total:number of sessions ";
+            return 0;
+        }
+        auto totalstring = hawq::test::split(line, ':');
+        if (totalstring.size() != 2 )
+            return false;
+        try
+        {
+            sessionnum = std::stoi(hawq::test::trim(totalstring[1]));
+        }
+        catch(...)
+        {
+            return 0;
+        }
+        break;
+    }
+    return sessionnum;
+}
+
+
+bool SQLUtilityParallel::__processLine(const string &line, const int32_t totalsession,
+                                               int32_t &sessionid, int32_t &waittid, string &message)
+{
+    auto querystring = hawq::test::split(line, ':');
+    if (querystring.size() < 2 )
+        return false;
+    sessionid = std::stoi(hawq::test::trim(querystring[0])) ;
+    if (sessionid < 0 || sessionid > totalsession)
+        return false;
+    message = hawq::test::trim(querystring[querystring.size() - 1]);
+    thread_mutex.lock();
+    if(querystring.size() >= 3) {
+        waittid = 1;
+        if(!this->exclusive){
+            for (int i = 1; i < querystring.size() - 1; i++) {
+                int wait_tid = std::stoi(hawq::test::trim(querystring[i]));
+                threadmessage[wait_tid]->waittids.push_back(sessionid);
+                threadmessage[sessionid]->waitfortids.push_back(wait_tid);
+            }
+        }
+    }
+    thread_mutex.unlock();
+    return true;
+}
+
+
+bool SQLUtilityParallel::runParallelControlFile_exclusive(const string &sqlFile, const string &outputFile)
+{
+    string line;
+    bool result = true;
+    int32_t totalsession = 0 , i = 0 , commitnum = 0 ,blocknum = 0,commit=0;
+    std::vector<std::unique_ptr<std::thread>> querythread;
+
+    // Step 1: Open input and output file
+    std::fstream in;
+    in.open(sqlFile, std::ios::in);
+    outputstream = fopen(outputFile.c_str(), "w");
+    if (!in.is_open() || !outputstream) {
+        EXPECT_TRUE(false) << "Error opening input/output file ";
+        result = false;
+        goto clear;
+    }
+    // Step 2: clear env and get total session number
+    threadmessage.clear();
+    totalsession = __getTotalSession(in);
+    if (totalsession <= 0 )
+    {
+        result = false;
+        goto clear;
+    }
+    try
+    {
+        // Step 3: Initialize thread message, query thread, and connection pool
+        std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(0));
+        thrm->command = NEXTLINE;
+        threadmessage.push_back(std::move(thrm));
+        string schemamessage = "SET SEARCH_PATH = " + this->getSchemaName() + ";" ;
+        string connstr = this->getConnectionString();
+        //begin
+        for(i = 1 ; i <= totalsession; i++ ){
+            std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(i));
+            threadmessage.push_back(std::move(thrm));
+        }
+        for(i = 1 ; i <= totalsession; i++ ) {
+            std::unique_ptr<std::thread> thr(new std::thread
+                                                     (&SQLUtilityParallel::thread_execute_exclusive, this,  i, schemamessage, connstr , totalsession));
+            querythread.push_back(std::move(thr));
+        }
+        // Step 4: Read every line of file and pass to sub thread
+        while (getline(in, line))
+        {
+            if (hawq::test::startsWith(line, "#"))
+                continue;
+            if (hawq::test::startsWith(line, "$"))
+            {
+            		commit=1;
+            		continue;
+            }
+            int32_t sessionid = 0 , waittid = 0;
+            string message = "";
+            result = __processLine(line, totalsession, sessionid, waittid, message);
+            if (!result)
+                goto clear;
+            if (sessionid == 0)
+            {
+                this->execute(message, true);
+            }
+            else
+            {
+                if ( threadmessage[sessionid]->command != NULLCOMMAND )
+                {
+                    EXPECT_TRUE(false) << "The file format is error or query handling error\n";
+                    result = false;
+                    goto clear;
+                }
+                MessageCommandType cmd = (waittid == 0) ? SQLSTRING : BLOCKEDSQL;
+                if(cmd == SQLSTRING){
+                    blocknum++;
+                }
+                //lock
+                thread_mutex.lock();
+                threadmessage[sessionid]->command = cmd;
+                threadmessage[sessionid]->messageinfo = message;
+                threadmessage[0]->command = NULLCOMMAND;
+                if(threadmessage[sessionid]->messageinfo.find("vacuum") != string::npos)
+                {
+                    threadmessage[sessionid]->submit = -2; //vacuum
+                }
+                else if(threadmessage[sessionid]->messageinfo.find("drop table") != string::npos){
+                    threadmessage[sessionid]->submit = -1; //rollback
+                }
+                else{
+                    int32_t n = random() % 2;
+                    if(n == 0)
+                        threadmessage[sessionid]->submit = 1; //commit
+                    else
+                        threadmessage[sessionid]->submit = -1; //rollback
+                }
+                thread_mutex.unlock();
+            }
+
+            while (threadmessage[0]->command == NULLCOMMAND  ) {}
+            if (threadmessage[0]->command == BADSTATUS)
+                break;
+        }
+
+        // Step 5: Set commit/rollback command
+        int32_t j = 0;
+        while(commitnum < totalsession || blocknum < totalsession){
+            j = (j+1) % totalsession ;
+            if( j == 0 )
+            {
+                j = totalsession;
+            }
+            if(threadmessage[j]->command == BLOCKOVER){
+                thread_mutex.lock();
+                threadmessage[j]->command = PRINTRESULT;
+                thread_mutex.unlock();
+                while(threadmessage[j]->command == PRINTRESULT){}
+                blocknum++;
+            }
+            if(threadmessage[j]->submit == -2 && threadmessage[j]->command == NULLCOMMAND)
+            {
+                commitnum ++;
+                thread_mutex.lock();
+                threadmessage[j]->submit = 0;
+                thread_mutex.unlock();
+            }
+            else if(threadmessage[j]->submit != 0 && threadmessage[j]->command == NULLCOMMAND){
+                commitnum ++;
+                thread_mutex.lock();
+                threadmessage[j]->command = SQLSTRING;
+                if(commit&&j%2==1)
+                    threadmessage[j]->messageinfo = "commit;";
+                else
+                    threadmessage[j]->messageinfo = "rollback;";
+                threadmessage[0]->command = NULLCOMMAND;
+                thread_mutex.unlock();
+                while (threadmessage[0]->command == NULLCOMMAND  ) {}
+                thread_mutex.lock();
+                threadmessage[j]->submit = 0;
+                thread_mutex.unlock();
+            }
+        }
+
+        // Step 6: Setup exit signal to every sub-thread
+        thread_mutex.lock();
+        for(i = 1 ; i <= totalsession; i++ )
+        {
+            threadmessage[i]->command = EXITTHREAD;
+            threadmessage[i]->messageinfo = "";
+        }
+        thread_mutex.unlock();
+        for(i = 0 ; i < totalsession; i++ ){
+            querythread[i]->join();
+        }
+        // Step 7: Check status of every thread
+        for(i = 0 ; i <= totalsession; i++ )
+        {
+            if (threadmessage[i]->isrunning ||
+                threadmessage[i]->messageinfo.size() > 0 ||
+                threadmessage[i]->command == BADSTATUS)
+            {
+                EXPECT_TRUE(false) << "There is not finished or error job";
+                result = false;
+                goto clear;
+            }
+        }
+    }
+
+    catch(...)
+    {
+        EXPECT_TRUE(false) << "There is exception \n";
+        result = false;
+        goto clear;
+    }
+    clear:
+        threadmessage.clear();
+    in.close();
+    fclose(outputstream);
+    outputstream = NULL;
+    return result;
+}
+
+void SQLUtilityParallel::thread_execute_exclusive(uint16_t tid, const std::string &schemapath, const std::string &connstr , int32_t totalsession)
+{
+    std::unique_ptr<LibPQSQL> libpqconn;
+    libpqconn.reset(new LibPQSQL(connstr));
+    if (!libpqconn->connectDb())
+        goto errorclear;
+    if (!libpqconn->execQuery(schemapath,outputstream))
+        goto errorclear;
+    do
+    {
+        thread_mutex.lock();
+        MessageCommandType cmd = threadmessage[tid]->command;
+        string message = threadmessage[tid]->messageinfo;
+        thread_mutex.unlock();
+
+        switch (cmd)
+        {
+            case EXITTHREAD:
+            {
+                libpqconn->disconnectDb();
+                libpqconn.release();
+                return;
+            }
+            case BLOCKEDSQL:
+            {
+                thread_mutex.lock();
+                std::cout<<"Tid: "<<tid<<" Blocked "<<threadmessage[tid]->messageinfo<<std::endl;
+                threadmessage[tid]->isrunning = true;
+                fprintf(outputstream, "%d:blocked:%s\n", tid, message.c_str());
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+
+                //wait for result from database
+                bool result = libpqconn->execQuery(message,outputstream);
+                if (!result) {
+                    goto errorclear;
+                }
+
+                thread_mutex.lock();
+                threadmessage[tid]->isrunning = false;
+                threadmessage[tid]->command = BLOCKOVER;
+                std::cout<<"Tid: "<<tid<<" Block over "<<std::endl;
+                thread_mutex.unlock();
+                break;
+            }
+            case BLOCKOVER :
+                break;
+            case PRINTRESULT:
+            {
+                thread_mutex.lock();
+                std::cout<<"Tid: "<<tid<<" Print "<<std::endl;
+                fprintf(outputstream, "%d:back:%s\n", tid, message.c_str());
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                thread_mutex.unlock();
+                break;
+            }
+            case SQLSTRING:
+            {
+                thread_mutex.lock();
+                std::cout<<"Tid: "<<tid<<" SQLstring "<<threadmessage[tid]->messageinfo<<std::endl;
+                if (threadmessage[tid]->isrunning)
+                    goto errorclearlock;
+                fprintf(outputstream, "%d:%s\n", tid, message.c_str());
+                //submit sql
+                if (message.compare("commit;") == 0 ||
+                    message.compare("rollback;") == 0 ||
+                    threadmessage[tid]->messageinfo.find("vacuum") != string::npos ||
+                    threadmessage[tid]->messageinfo.find("VACUUM") != string::npos)
+                {
+                    for (uint16_t i = 0; i < threadmessage[tid]->waittids.size(); i++)
+                    {
+                        if (!threadmessage[threadmessage[tid]->waittids[i]]->isrunning)
+                        {
+                            goto errorclearlock;
+                        }
+                    }
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                }
+                else
+                {
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                }
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+                break;
+            }
+            case NULLCOMMAND:
+                break;
+            case NEXTLINE:
+            case BADSTATUS:
+            {
+                goto errorclear;
+            }
+        }
+    } while (true);
+
+    errorclearlock:
+    thread_mutex.unlock();
+    errorclear:
+    thread_mutex.lock();
+    threadmessage[0]->command = BADSTATUS;
+    threadmessage[tid]->command = BADSTATUS;
+    thread_mutex.unlock();
+    libpqconn->disconnectDb();
+    libpqconn.release();
+}
+
+bool SQLUtilityParallel::runParallelControlFile(const string &sqlFile, const string &outputFile)
+{
+    string line;
+    bool result = true;
+    int32_t totalsession = 0, i = 0;
+    std::vector<std::unique_ptr<std::thread>> querythread;
+    // Step 1: Open input and output file
+    std::fstream in;
+    in.open(sqlFile, std::ios::in);
+    outputstream = fopen(outputFile.c_str(), "w");
+    if (!in.is_open() || !outputstream) {
+        EXPECT_TRUE(false) << "Error opening input/output file ";
+        result = false;
+        goto clear;
+    }
+    // Step 2: clear env and get total session number
+    threadmessage.clear();
+    totalsession = __getTotalSession(in);
+    if (totalsession <= 0 )
+    {
+        result = false;
+        goto clear;
+    }
+    try
+    {
+        // Step 3: Initialize thread message, query thread, and connection pool
+        std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(0));
+        thrm->command = NEXTLINE;
+        threadmessage.push_back(std::move(thrm));
+        string schemamessage = "SET SEARCH_PATH = " + this->getSchemaName() + ";" ;
+        string connstr = this->getConnectionString();
+        //begin
+        for(i = 1 ; i <= totalsession; i++ ){
+            std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(i));
+            threadmessage.push_back(std::move(thrm));
+        }
+        for(i = 1 ; i <= totalsession; i++ ) {
+            std::unique_ptr<std::thread> thr(new std::thread
+                                                     (&SQLUtilityParallel::thread_execute, this,  i, schemamessage, connstr));
+            querythread.push_back(std::move(thr));
+        }
+
+        // Step 4: Read every line of file and pass to sub thread
+        while (getline(in, line))
+        {
+            if (hawq::test::startsWith(line, "#"))
+                continue;
+            int32_t sessionid = 0 , waittid = 0;
+            string message = "";
+
+            result = __processLine(line, totalsession, sessionid, waittid, message);
+            if (!result)
+                goto clear;
+
+            if (sessionid == 0)
+            {
+                this->execute(message, true);
+            }
+            else
+            {
+                if ( threadmessage[sessionid]->command != NULLCOMMAND )
+                {
+                    EXPECT_TRUE(false) << "The file format is error or query handling error\n";
+                    result = false;
+                    goto clear;
+                }
+                MessageCommandType cmd = (waittid == 0) ? SQLSTRING : BLOCKEDSQL;
+                //lock
+                thread_mutex.lock();
+                threadmessage[sessionid]->command = cmd;
+                threadmessage[sessionid]->messageinfo = message;
+                threadmessage[0]->command = NULLCOMMAND;
+                thread_mutex.unlock();
+            }
+            while (threadmessage[0]->command == NULLCOMMAND  ) {}
+            if (threadmessage[0]->command == BADSTATUS)
+                break;
+        }
+
+        // Step 5: Setup exit signal to every sub-thread
+        thread_mutex.lock();
+        for(i = 1 ; i <= totalsession; i++ )
+        {
+            threadmessage[i]->command = EXITTHREAD;
+            threadmessage[i]->messageinfo = "";
+        }
+        thread_mutex.unlock();
+        for(i = 0 ; i < totalsession; i++ ){
+            querythread[i]->join();
+        }
+        // Step 6: Check status of every thread
+        for(i = 0 ; i <= totalsession; i++ )
+        {
+            if (threadmessage[i]->isrunning ||
+                threadmessage[i]->messageinfo.size() > 0 ||
+                threadmessage[i]->command == BADSTATUS)
+            {
+                EXPECT_TRUE(false) << "There is not finished or error job";
+                result = false;
+                goto clear;
+            }
+        }
+    }
+
+    catch(...)
+    {
+        EXPECT_TRUE(false) << "There is exception \n";
+        result = false;
+        goto clear;
+    }
+    clear:
+    threadmessage.clear();
+    in.close();
+    fclose(outputstream);
+    outputstream = NULL;
+    return result;
+}
+
+void SQLUtilityParallel::thread_execute(uint16_t tid, const std::string &schemapath, const std::string &connstr)
+{
+    std::unique_ptr<LibPQSQL> libpqconn;
+    libpqconn.reset(new LibPQSQL(connstr));
+    if (!libpqconn->connectDb())
+        goto errorclear;
+    if (!libpqconn->execQuery(schemapath,outputstream))
+        goto errorclear;
+    do
+    {
+        thread_mutex.lock();
+        MessageCommandType cmd = threadmessage[tid]->command;
+        string message = threadmessage[tid]->messageinfo;
+        thread_mutex.unlock();
+
+        switch (cmd)
+        {
+            case EXITTHREAD:
+            {
+                libpqconn->disconnectDb();
+                libpqconn.release();
+                return;
+            }
+            case BLOCKEDSQL:
+            {
+                thread_mutex.lock();
+                threadmessage[tid]->isrunning = true;
+                std::cout<<"BLOCK Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
+                fprintf(outputstream, "%d:blocked:%s\n", tid, message.c_str());
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+                bool result = libpqconn->execQuery(message,outputstream);
+                if (!result) {
+                    std::cout<<"Error"<<std::endl;
+                    goto errorclear;
+                }
+
+                thread_mutex.lock();
+                threadmessage[tid]->isrunning = false;
+                threadmessage[tid]->command = BLOCKOVER;
+                std::cout<<"BLOCKOVER Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
+                thread_mutex.unlock();
+                break;
+            }
+            case BLOCKOVER :
+                break;
+            case PRINTRESULT:
+            {
+                thread_mutex.lock();
+                std::cout<<"PRINT Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
+                fprintf(outputstream, "%d:back:%s\n", tid, message.c_str());
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                thread_mutex.unlock();
+                break;
+            }
+            case SQLSTRING:
+            {
+                thread_mutex.lock();
+                std::cout<<"SQL Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
+                if (threadmessage[tid]->isrunning)
+                    goto errorclearlock;
+                fprintf(outputstream, "%d:%s\n", tid, message.c_str());
+
+                if (message.compare("commit;") == 0 ||
+                    message.compare("rollback;") == 0 || threadmessage[tid]->messageinfo.find("vacuum") != string::npos) {
+                    for (uint16_t i = 0; i < threadmessage[tid]->waittids.size(); i++) {
+                        if (!threadmessage[threadmessage[tid]->waittids[i]]->isrunning) {
+                            goto errorclearlock;
+                        }
+                    }
+
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                    thread_mutex.unlock();
+
+                    while (threadmessage[tid]->waittids.size() > 0) {
+                        thread_mutex.lock();
+                        int waitpid = threadmessage[tid]->waittids[0];
+
+                        for (std::vector<int>::iterator iter = threadmessage[waitpid]->waitfortids.begin();
+                             iter != threadmessage[waitpid]->waitfortids.end(); iter++) {
+                            if (*iter == tid) {
+                                threadmessage[waitpid]->waitfortids.erase(iter);
+                                break;
+                            }
+                        }
+
+                        thread_mutex.unlock();
+
+                        if (threadmessage[waitpid]->waitfortids.size() == 0) {
+                            thread_mutex.unlock();
+                            while (threadmessage[waitpid]->command != BLOCKOVER) {}
+                            thread_mutex.lock();
+                            threadmessage[waitpid]->command = PRINTRESULT;
+                            thread_mutex.unlock();
+                        }
+
+                        while (threadmessage[waitpid]->command == PRINTRESULT) {}
+                        thread_mutex.lock();
+                        threadmessage[tid]->waittids.erase(threadmessage[tid]->waittids.begin());
+                        thread_mutex.unlock();
+                    }
+
+                    thread_mutex.lock();
+                }
+                else
+                {
+                    bool result = libpqconn->execQuery(message,outputstream);
+                    if (!result)
+                        goto errorclearlock;
+                }
+                libpqconn->printQueryResult(outputstream);
+                threadmessage[tid]->command = NULLCOMMAND;
+                threadmessage[tid]->messageinfo = "";
+                threadmessage[0]->command = NEXTLINE;
+                thread_mutex.unlock();
+                break;
+            }
+            case NULLCOMMAND:
+                break;
+            case NEXTLINE:
+            case BADSTATUS:
+            {
+                goto errorclear;
+            }
+        }
+    } while (true);
+
+    errorclearlock:
+    thread_mutex.unlock();
+    errorclear:
+    thread_mutex.lock();
+    threadmessage[0]->command = BADSTATUS;
+    threadmessage[tid]->command = BADSTATUS;
+    thread_mutex.unlock();
+    libpqconn->disconnectDb();
+    libpqconn.release();
+}
+
+
+void SQLUtilityParallel::execParallelControlSQLFile(const string &sqlFile,
+                                                            const string &ansFile,
+                                                            bool exclusive,
+                                                            const string &initFile)
+{
+    FilePath fp;
+    const string ansFileAbsPath = testRootPath + "/" + ansFile;
+    const string sqlFileAbsPath = testRootPath + "/" + sqlFile;
+    if (!std::ifstream(ansFileAbsPath))
+        ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
+    if (!std::ifstream(sqlFileAbsPath))
+        ASSERT_TRUE(false) << sqlFileAbsPath << " doesn't exist";
+    //get file from source
+
+    fp = splitFilePath(ansFileAbsPath);
+    // double check to avoid empty fileBaseName
+    if (fp.fileBaseName.empty())
+        ASSERT_TRUE(false) << ansFileAbsPath << " is invalid";
+
+    // outFile is located in the same folder with ansFile
+    const string outFileAbsPath = fp.path + "/" + fp.fileBaseName + ".out";
+
+    bool result;
+    this->exclusive = exclusive ;
+    if( this->exclusive )
+        result = this->runParallelControlFile_exclusive(sqlFileAbsPath, outFileAbsPath);
+    else
+        result = this->runParallelControlFile(sqlFileAbsPath, outFileAbsPath);
+
+    if (!result)
+        ASSERT_TRUE(false) << " Execute query is failed";
+
+    //Check result
+    if( !this->exclusive )
+        checkAnsFile(ansFileAbsPath, outFileAbsPath, initFile, "", fp);
+}
+    } // namespace test
+} // namespace hawq
diff --git a/src/test/feature/lib/sql_util_parallel.h b/src/test/feature/lib/sql_util_parallel.h
new file mode 100644
index 0000000..46b9e9c
--- /dev/null
+++ b/src/test/feature/lib/sql_util_parallel.h
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ *
+ * This is control different sessions sequence issue
+ * The file format should be like
+     # This is basic transaction test demo    //# is used for comment
+     Total:2                                  // indicate how sessions in total
+     0:DROP TABLE IF EXISTS TEST;             // 0 indicate it is for prepare
+     0:CREATE TABLE TEST(A INT);
+     1:BEGIN;
+     2:BEGIN;
+     1:SELECT * FROM  TEST;
+     2:INSERT INTO TEST VALUES(1);
+     2:COMMIT;
+     1:SELECT * FROM  TEST;
+     1:COMMIT;
+
+* There is another example for blocked test
+     # for blocked session. in below example,
+     # session 1 will lock whole table, session 2 will be blocked until session 1 commit
+     Total:2                                  // indicate how sessions in total
+     0:DROP TABLE IF EXISTS TEST;             // 0 indicate it is for prepare
+     0:CREATE TABLE TEST(A INT);
+     1:BEGIN;
+     2:BEGIN;
+     1:TRUNCATE TEST;
+     2:1:SELECT * FROM TEST;               //Here it indicates session 2 is blocked by session 1
+     1:COMMIT;                             //Here session 2 will not be blocked and output session 2 result.
+     2:COMMIT;
+ *
+ */
+#ifndef HAWQ_SRC_TEST_FEATURE_LIB_SQL_UTIL_PARALL_H_
+#define HAWQ_SRC_TEST_FEATURE_LIB_SQL_UTIL_PARALL_H_
+
+#include "sql_util.h"
+#include <string>
+#include <mutex>
+
+namespace hawq {
+namespace test {
+ 
+enum MessageCommandType {
+    NULLCOMMAND,
+    NEXTLINE,
+    BLOCKEDSQL,
+    SQLSTRING,
+    EXITTHREAD,
+    PRINTRESULT,
+    BADSTATUS,
+    BLOCKOVER
+};
+
+class ThreadMessage{
+public:
+    ThreadMessage(uint16_t tid = 0 );
+    ~ThreadMessage();
+    int thread_id;
+    int vacuum ;
+    std::string messageinfo;
+    MessageCommandType command;
+    bool isrunning;
+    int32_t submit; // 1: commit ; 0: submit ; -1: rollback; -2 vacuum;
+    std::vector<int> waittids;   //threadids which are blocked by this thread
+    std::vector<int> waitfortids; //threadids which this thread waiting for
+};
+
+class SQLUtilityParallel : public SQLUtility {
+public:
+    SQLUtilityParallel();
+    ~SQLUtilityParallel();
+    bool exclusive; // whether test access_exclusive lock;
+
+  // Execute sql file and diff with ans file.
+  // @param sqlFile The given sqlFile which is relative path to test root dir. This file control how the SQL is execute
+  // @param ansFile The given ansFile which is relative path to test root dir
+  // @param exclusive whether we can predict the order of transactions
+  // @return void
+  void execParallelControlSQLFile(const std::string &sqlFile, const std::string &ansFile,bool exclusive,
+		                          const std::string &initFile = "");
+  //int32_t getWaitNumber(const std::string &schemapath, const std::string &connstr);
+
+  // Run sql file and generate output file
+    bool runParallelControlFile(const std::string &sqlFile, const std::string &outputFile);
+    bool runParallelControlFile_exclusive(const std::string &sqlFile, const std::string &outputFile);
+private:
+
+  // Handling SQL comand for every thread
+  // @param tid the thread id
+  // @param schemapath it is used that every case should have its own schema
+  // @param connstr the connection string
+  void thread_execute(uint16_t tid, const std::string &schemapath, const std::string &connstr);
+  // @param totalsession the number of totalsession
+  void thread_execute_exclusive(uint16_t tid, const std::string &schemapath, const std::string &connstr, const int32_t totalsession);
+
+  // Get total session number from input sql file
+  int32_t __getTotalSession(std::fstream &in);
+
+  //Process line to get session id, wait tid and running sql
+  // @param line  input line
+  // @param totalsession the total session for this sql file
+  // @param(output) sessionid current execute thread id
+  // @param(output) waittid the thread which blocks current thread. 0 if it is not blocked.
+  // @param(output) message the sql will be executed
+  // @return whether it is executed successfully or not
+  bool __processLine(const std::string &line, const int32_t totalsession,
+                     int32_t &sessionid, int32_t &waittid, std::string &message);
+
+private:
+  std::vector<std::unique_ptr<ThreadMessage>> threadmessage;
+  std::mutex thread_mutex;
+  FILE* outputstream;
+}; // class SQLUtilityParallel
+
+} // namespace test
+} // namespace hawq
+
+#endif  // SRC_TEST_FEATURE_LIB_SQL_UTIL_PARALL_H_
diff --git a/src/test/feature/lib/sqlfile-parsebase.cpp b/src/test/feature/lib/sqlfile-parsebase.cpp
new file mode 100644
index 0000000..90c70fe
--- /dev/null
+++ b/src/test/feature/lib/sqlfile-parsebase.cpp
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+
+#include "sqlfile-parsebase.h"
+
+std::string SqlFileParseBase::trim(std::string str) {
+  size_t beginNotSpace = 0;
+  for (; beginNotSpace != str.size() && isspace(str[beginNotSpace]);
+       ++beginNotSpace)
+    ;
+  str.erase(0, beginNotSpace);
+  if (str.empty()) return str;
+  size_t endNotSpace = str.size() - 1;
+  for (; isspace(str[endNotSpace]); --endNotSpace)
+    ;
+  str.erase(endNotSpace + 1);
+  return str;
+}
+
+size_t SqlFileParseBase::__find_is_not_space(const std::string &line) {
+  std::string::const_iterator pos = std::find_if_not(
+      line.cbegin(), line.cend(), [](const char c) { return isspace(c); });
+  return (pos - line.cbegin());
+}
+
+bool SqlFileParseBase::__sub_equal(const std::string &target, size_t pos,
+                                   const std::string &tag) {
+  return __sub_equal(target.substr(pos), tag);
+}
+
+bool SqlFileParseBase::__sub_equal(const std::string &target,
+                                   std::string::const_iterator pos,
+                                   const std::string &tag) {
+  return __sub_equal(target.substr(pos - target.cbegin()), tag);
+}
+
+bool SqlFileParseBase::__sub_equal(const std::string &target,
+                                   const std::string &tag) {
+  return target.size() >= tag.size() && target.substr(0, tag.size()) == tag;
+}
+
+bool SqlFileParseBase::__open_error(const std::string &filename) {
+  return __error(filename + " open error");
+}
+
+bool SqlFileParseBase::__not_found_tag_error(const size_t index,
+                                             const std::string &tag) {
+  return SqlFileParseBase::__error("line " + std::to_string(index) +
+                                   " not found " + tag);
+}
+
+bool SqlFileParseBase::__trans_file_to_vector(const std::string &infile,
+                                              std::vector<std::string> &v_inf) {
+  std::ifstream inf(infile);
+  if (!inf) return __open_error(infile);
+  std::string lineBuf;
+  std::vector<std::string> v_temp_inf;
+  for (; getline(inf, lineBuf);) v_temp_inf.push_back(std::move(lineBuf));
+  v_inf.swap(v_temp_inf);
+  inf.close();
+  return true;
+}
+
+bool SqlFileParseBase::__is_tag(const std::string &line,
+                                const std::string &tag) {
+  return __sub_equal(line, __find_is_not_space(line), tag);
+}
+
+bool SqlFileParseBase::__error(const std::string &error_message) {
+  std::cerr << parseName_ << ": error: " << error_message << std::endl;
+  return false;
+}
\ No newline at end of file
diff --git a/src/test/feature/lib/sqlfile-parsebase.h b/src/test/feature/lib/sqlfile-parsebase.h
new file mode 100644
index 0000000..4434422
--- /dev/null
+++ b/src/test/feature/lib/sqlfile-parsebase.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HAWQ_SRC_TEST_FEATURE_SQLFILE_PARSEBASE_OUT_H_
+#define HAWQ_SRC_TEST_FEATURE_SQLFILE_PARSEBASE_OUT_H_
+
+#include <fstream>
+#include <iostream>
+#include <string>
+#include <vector>
+
+class SqlFileParseBase {
+ public:
+  SqlFileParseBase(const std::string &parseName) : parseName_(parseName) {}
+
+  static std::string trim(std::string str);
+
+ protected:
+  size_t __find_is_not_space(const std::string &line);
+
+  bool __sub_equal(const std::string &target, size_t pos,
+                   const std::string &tag);
+
+  bool __sub_equal(const std::string &target, std::string::const_iterator pos,
+                   const std::string &tag);
+
+  bool __sub_equal(const std::string &target, const std::string &tag);
+
+  bool __error(const std::string &error_message);
+
+  bool __open_error(const std::string &filename);
+
+  bool __not_found_tag_error(const size_t index, const std::string &tag);
+
+  bool __trans_file_to_vector(const std::string &infile,
+                              std::vector<std::string> &v_inf);
+
+  bool __is_tag(const std::string &line, const std::string &tag);
+
+  bool __is_comment(const std::string &line) { return __is_tag(line, "--"); }
+
+ private:
+  const std::string parseName_;
+};
+
+#endif
\ No newline at end of file