You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:13 UTC

[10/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
new file mode 100644
index 0000000..88d913d
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.LocalContainerLauncher;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerAuxService;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Configures and starts the Tajo-specific components in the YARN cluster.
+ *
+ */
+public class MiniTajoYarnCluster extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder
+      .getJar(LocalContainerLauncher.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniTajoYarnCluster.class);
+
+  public MiniTajoYarnCluster(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniTajoYarnCluster(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 1, 1);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+
+    conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir/").getAbsolutePath());
+    }
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      //mkdir the staging directory so that right permissions are set while running as proxy user
+      fc.mkdir(stagingPath, null, true);
+      //mkdir done directory as well
+      String doneDir = JobHistoryUtils
+          .getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new YarnRuntimeException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
+    // which shuffle doesn't happen
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID);
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);
+
+    // local directory
+    conf.set(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.name(), "/tmp/tajo-localdir");
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+
+    LOG.info("MiniTajoYarn NM Local Dir: " + getConfig().get(YarnConfiguration.NM_LOCAL_DIRS));
+  }
+
+  private class JobHistoryServerWrapper extends AbstractService {
+    public JobHistoryServerWrapper() {
+      super(JobHistoryServerWrapper.class.getName());
+    }
+
+    @Override
+    public synchronized void start() {
+      try {
+        if (!getConfig().getBoolean(
+            JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
+            JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
+          // pick free random ports.
+          getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+              MiniYARNCluster.getHostname() + ":0");
+          getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+              MiniYARNCluster.getHostname() + ":0");
+        }
+        super.start();
+      } catch (Throwable t) {
+        throw new YarnRuntimeException(t);
+      }
+
+      LOG.info("MiniMRYARN ResourceManager address: " +
+          getConfig().get(YarnConfiguration.RM_ADDRESS));
+      LOG.info("MiniMRYARN ResourceManager web address: " +
+          getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+      LOG.info("MiniMRYARN HistoryServer address: " +
+          getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+      LOG.info("MiniMRYARN HistoryServer web address: " +
+          getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
+    }
+
+    @Override
+    public synchronized void stop() {
+      super.stop();
+    }
+  }
+
+  public static void main(String [] args) {
+    MiniTajoYarnCluster cluster = new MiniTajoYarnCluster(MiniTajoYarnCluster.class.getName());
+    cluster.init(new TajoConf());
+    cluster.start();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
new file mode 100644
index 0000000..961184c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.cli.ParsedResult;
+import org.apache.tajo.cli.SimpleParser;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.FileUtil;
+import org.junit.*;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * (Note that this class is not thread safe. Do not execute maven test in any parallel mode.)
+ * <br />
+ * <code>QueryTestCaseBase</code> provides useful methods to easily execute queries and verify their results.
+ *
+ * This class basically uses four resource directories:
+ * <ul>
+ *   <li>src/test/resources/dataset - contains a set of data files. It contains sub directories, each of which
+ *   corresponds each test class. All data files in each sub directory can be used in the corresponding test class.</li>
+ *
+ *   <li>src/test/resources/queries - This is the query directory. It contains sub directories, each of which
+ *   corresponds each test class. All query files in each sub directory can be used in the corresponding test
+ *   class.</li>
+ *
+ *   <li>src/test/resources/results - This is the result directory. It contains sub directories, each of which
+ *   corresponds each test class. All result files in each sub directory can be used in the corresponding test class.
+ *   </li>
+ * </ul>
+ *
+ * For example, if you create a test class named <code>TestJoinQuery</code>, you should create a pair of query and
+ * result set directories as follows:
+ *
+ * <pre>
+ *   src-|
+ *       |- resources
+ *             |- dataset
+ *             |     |- TestJoinQuery
+ *             |              |- table1.tbl
+ *             |              |- table2.tbl
+ *             |
+ *             |- queries
+ *             |     |- TestJoinQuery
+ *             |              |- TestInnerJoin.sql
+ *             |              |- table1_ddl.sql
+ *             |              |- table2_ddl.sql
+ *             |
+ *             |- results
+ *                   |- TestJoinQuery
+ *                            |- TestInnerJoin.result
+ * </pre>
+ *
+ * <code>QueryTestCaseBase</code> basically provides the following methods:
+ * <ul>
+ *  <li><code>{@link #executeQuery()}</code> - executes a corresponding query and returns an ResultSet instance</li>
+ *  <li><code>{@link #executeFile(String)}</code> - executes a given query file included in the corresponding query
+ *  file in the current class's query directory</li>
+ *  <li><code>assertResultSet()</code> - check if the query result is equivalent to the expected result included
+ *  in the corresponding result file in the current class's result directory.</li>
+ *  <li><code>cleanQuery()</code> - clean up all resources</li>
+ *  <li><code>executeDDL()</code> - execute a DDL query like create or drop table.</li>
+ * </ul>
+ *
+ * In order to make use of the above methods, query files and results file must be as follows:
+ * <ul>
+ *  <li>Each query file must be located on the subdirectory whose structure must be src/resources/queries/${ClassName},
+ *  where ${ClassName} indicates an actual test class's simple name.</li>
+ *  <li>Each result file must be located on the subdirectory whose structure must be src/resources/results/${ClassName},
+ *  where ${ClassName} indicates an actual test class's simple name.</li>
+ * </ul>
+ *
+ * Especially, {@link #executeQuery() and {@link #assertResultSet(java.sql.ResultSet)} methods automatically finds
+ * a query file to be executed and a result to be compared, which are corresponding to the running class and method.
+ * For them, query and result files additionally must be follows as:
+ * <ul>
+ *  <li>Each result file must have the file extension '.result'</li>
+ *  <li>Each query file must have the file extension '.sql'.</li>
+ * </ul>
+ */
+public class QueryTestCaseBase {
+  private static final Log LOG = LogFactory.getLog(QueryTestCaseBase.class);
+  protected static final TpchTestBase testBase;
+  protected static final TajoTestingCluster testingCluster;
+  protected static TajoConf conf;
+  protected static TajoClient client;
+  protected static final CatalogService catalog;
+  protected static final SQLAnalyzer sqlParser = new SQLAnalyzer();
+
+  /** the base path of dataset directories */
+  protected static final Path datasetBasePath;
+  /** the base path of query directories */
+  protected static final Path queryBasePath;
+  /** the base path of result directories */
+  protected static final Path resultBasePath;
+
+  static {
+    testBase = TpchTestBase.getInstance();
+    testingCluster = testBase.getTestingCluster();
+    conf = testBase.getTestingCluster().getConfiguration();
+    catalog = testBase.getTestingCluster().getMaster().getCatalog();
+    URL datasetBaseURL = ClassLoader.getSystemResource("dataset");
+    datasetBasePath = new Path(datasetBaseURL.toString());
+    URL queryBaseURL = ClassLoader.getSystemResource("queries");
+    queryBasePath = new Path(queryBaseURL.toString());
+    URL resultBaseURL = ClassLoader.getSystemResource("results");
+    resultBasePath = new Path(resultBaseURL.toString());
+  }
+
+  /** It transiently contains created tables for the running test class. */
+  private static String currentDatabase;
+  private static Set<String> createdTableGlobalSet = new HashSet<String>();
+  // queries and results directory corresponding to subclass class.
+  private Path currentQueryPath;
+  private Path currentResultPath;
+  private Path currentDatasetPath;
+
+  // for getting a method name
+  @Rule public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpClass() throws IOException {
+    conf = testBase.getTestingCluster().getConfiguration();
+    client = new TajoClient(conf);
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws ServiceException {
+    for (String tableName : createdTableGlobalSet) {
+      client.updateQuery("DROP TABLE IF EXISTS " + CatalogUtil.denormalizeIdentifier(tableName));
+    }
+    createdTableGlobalSet.clear();
+
+    // if the current database is "default", shouldn't drop it.
+    if (!currentDatabase.equals(TajoConstants.DEFAULT_DATABASE_NAME)) {
+      for (String tableName : catalog.getAllTableNames(currentDatabase)) {
+        client.updateQuery("DROP TABLE IF EXISTS " + tableName);
+      }
+
+      client.selectDatabase(TajoConstants.DEFAULT_DATABASE_NAME);
+      client.dropDatabase(currentDatabase);
+    }
+    client.close();
+  }
+
+  public QueryTestCaseBase() {
+    // hive 0.12 does not support quoted identifier.
+    // So, we use lower case database names when Tajo uses HCatalogStore.
+    if (testingCluster.isHCatalogStoreRunning()) {
+      this.currentDatabase = getClass().getSimpleName().toLowerCase();
+    } else {
+      this.currentDatabase = getClass().getSimpleName();
+    }
+    init();
+  }
+
+  public QueryTestCaseBase(String currentDatabase) {
+    this.currentDatabase = currentDatabase;
+    init();
+  }
+
+  private void init() {
+    String className = getClass().getSimpleName();
+    currentQueryPath = new Path(queryBasePath, className);
+    currentResultPath = new Path(resultBasePath, className);
+    currentDatasetPath = new Path(datasetBasePath, className);
+
+    try {
+      // if the current database is "default", we don't need create it because it is already prepated at startup time.
+      if (!currentDatabase.equals(TajoConstants.DEFAULT_DATABASE_NAME)) {
+        client.updateQuery("CREATE DATABASE IF NOT EXISTS " + CatalogUtil.denormalizeIdentifier(currentDatabase));
+      }
+      client.selectDatabase(currentDatabase);
+    } catch (ServiceException e) {
+      e.printStackTrace();
+    }
+    testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "false");
+  }
+
+  protected TajoClient getClient() {
+    return client;
+  }
+
+  public String getCurrentDatabase() {
+    return currentDatabase;
+  }
+
+  protected ResultSet executeString(String sql) throws Exception {
+    return testBase.execute(sql);
+  }
+
+  /**
+   * Execute a query contained in the file located in src/test/resources/results/<i>ClassName</i>/<i>MethodName</i>.
+   * <i>ClassName</i> and <i>MethodName</i> will be replaced by actual executed class and methods.
+   *
+   * @return ResultSet of query execution.
+   */
+  public ResultSet executeQuery() throws Exception {
+    return executeFile(name.getMethodName() + ".sql");
+  }
+
+  /**
+   * Execute a query contained in the given named file. This methods tries to find the given file within the directory
+   * src/test/resources/results/<i>ClassName</i>.
+   *
+   * @param queryFileName The file name to be used to execute a query.
+   * @return ResultSet of query execution.
+   */
+  public ResultSet executeFile(String queryFileName) throws Exception {
+    Path queryFilePath = getQueryFilePath(queryFileName);
+    FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+    assertTrue(queryFilePath.toString() + " existence check", fs.exists(queryFilePath));
+
+    List<ParsedResult> parsedResults = SimpleParser.parseScript(FileUtil.readTextFile(new File(queryFilePath.toUri())));
+    if (parsedResults.size() > 1) {
+      assertNotNull("This script \"" + queryFileName + "\" includes two or more queries");
+    }
+    ResultSet result = client.executeQueryAndGetResult(parsedResults.get(0).getStatement());
+    assertNotNull("Query succeeded test", result);
+    return result;
+  }
+
+  /**
+   * Assert the equivalence between the expected result and an actual query result.
+   * If it isn't it throws an AssertionError.
+   *
+   * @param result Query result to be compared.
+   */
+  public final void assertResultSet(ResultSet result) throws IOException {
+    assertResultSet("Result Verification", result, name.getMethodName() + ".result");
+  }
+
+  /**
+   * Assert the equivalence between the expected result and an actual query result.
+   * If it isn't it throws an AssertionError.
+   *
+   * @param result Query result to be compared.
+   * @param resultFileName The file name containing the result to be compared
+   */
+  public final void assertResultSet(ResultSet result, String resultFileName) throws IOException {
+    assertResultSet("Result Verification", result, resultFileName);
+  }
+
+  /**
+   * Assert the equivalence between the expected result and an actual query result.
+   * If it isn't it throws an AssertionError with the given message.
+   *
+   * @param message message The message to printed if the assertion is failed.
+   * @param result Query result to be compared.
+   */
+  public final void assertResultSet(String message, ResultSet result, String resultFileName) throws IOException {
+    FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+    Path resultFile = getResultFile(resultFileName);
+    assertTrue(resultFile.toString() + " existence check", fs.exists(resultFile));
+    try {
+      verifyResultText(message, result, resultFile);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public final void assertStrings(String actual) throws IOException {
+    assertStrings(actual, name.getMethodName() + ".result");
+  }
+
+  public final void assertStrings(String actual, String resultFileName) throws IOException {
+    assertStrings("Result Verification", actual, resultFileName);
+  }
+
+  public final void assertStrings(String message, String actual, String resultFileName) throws IOException {
+    FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+    Path resultFile = getResultFile(resultFileName);
+    assertTrue(resultFile.toString() + " existence check", fs.exists(resultFile));
+
+    String expectedResult = FileUtil.readTextFile(new File(resultFile.toUri()));
+    assertEquals(message, expectedResult, actual);
+  }
+
+  /**
+   * Release all resources
+   *
+   * @param resultSet ResultSet
+   */
+  public final void cleanupQuery(ResultSet resultSet) throws IOException {
+    if (resultSet == null) {
+      return;
+    }
+    try {
+      resultSet.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Assert that the database exists.
+   * @param databaseName The database name to be checked. This name is case sensitive.
+   */
+  public void assertDatabaseExists(String databaseName) throws ServiceException {
+    assertTrue(client.existDatabase(databaseName));
+  }
+
+  /**
+   * Assert that the database does not exists.
+   * @param databaseName The database name to be checked. This name is case sensitive.
+   */
+  public void assertDatabaseNotExists(String databaseName) throws ServiceException {
+    assertTrue(!client.existDatabase(databaseName));
+  }
+
+  /**
+   * Assert that the table exists.
+   *
+   * @param tableName The table name to be checked. This name is case sensitive.
+   * @throws ServiceException
+   */
+  public void assertTableExists(String tableName) throws ServiceException {
+    assertTrue(client.existTable(tableName));
+  }
+
+  /**
+   * Assert that the table does not exist.
+   *
+   * @param tableName The table name to be checked. This name is case sensitive.
+   */
+  public void assertTableNotExists(String tableName) throws ServiceException {
+    assertTrue(!client.existTable(tableName));
+  }
+
+  public void assertColumnExists(String tableName,String columnName) throws ServiceException {
+    TableDesc tableDesc = fetchTableMetaData(tableName);
+    assertTrue(tableDesc.getSchema().containsByName(columnName));
+  }
+
+  private TableDesc fetchTableMetaData(String tableName) throws ServiceException {
+    return client.getTableDesc(tableName);
+  }
+
+  /**
+   * It transforms a ResultSet instance to rows represented as strings.
+   *
+   * @param resultSet ResultSet that contains a query result
+   * @return String
+   * @throws SQLException
+   */
+  public String resultSetToString(ResultSet resultSet) throws SQLException {
+    StringBuilder sb = new StringBuilder();
+    ResultSetMetaData rsmd = resultSet.getMetaData();
+    int numOfColumns = rsmd.getColumnCount();
+
+    for (int i = 1; i <= numOfColumns; i++) {
+      if (i > 1) sb.append(",");
+      String columnName = rsmd.getColumnName(i);
+      sb.append(columnName);
+    }
+    sb.append("\n-------------------------------\n");
+
+    while (resultSet.next()) {
+      for (int i = 1; i <= numOfColumns; i++) {
+        if (i > 1) sb.append(",");
+        String columnValue = resultSet.getObject(i).toString();
+        sb.append(columnValue);
+      }
+      sb.append("\n");
+    }
+    return sb.toString();
+  }
+
+  private void verifyResultText(String message, ResultSet res, Path resultFile) throws SQLException, IOException {
+    String actualResult = resultSetToString(res);
+    String expectedResult = FileUtil.readTextFile(new File(resultFile.toUri()));
+    assertEquals(message, expectedResult.trim(), actualResult.trim());
+  }
+
+  private Path getQueryFilePath(String fileName) {
+    return StorageUtil.concatPath(currentQueryPath, fileName);
+  }
+
+  private Path getResultFile(String fileName) {
+    return StorageUtil.concatPath(currentResultPath, fileName);
+  }
+
+  private Path getDataSetFile(String fileName) {
+    return StorageUtil.concatPath(currentDatasetPath, fileName);
+  }
+
+  public List<String> executeDDL(String ddlFileName, @Nullable String [] args) throws Exception {
+    return executeDDL(ddlFileName, null, true, args);
+  }
+
+  /**
+   *
+   * Execute a data definition language (DDL) template. A general SQL DDL statement can be included in this file. But,
+   * for user-specified table name or exact external table path, you must use some format string to indicate them.
+   * The format string will be replaced by the corresponding arguments.
+   *
+   * The below is predefined format strings:
+   * <ul>
+   *   <li>${table.path} - It is replaced by the absolute file path that <code>dataFileName</code> points. </li>
+   *   <li>${i} - It is replaced by the corresponding element of <code>args</code>. For example, ${0} and ${1} are
+   *   replaced by the first and second elements of <code>args</code> respectively</li>. It uses zero-based index.
+   * </ul>
+   *
+   * @param ddlFileName A file name, containing a data definition statement.
+   * @param dataFileName A file name, containing data rows, which columns have to be separated by vertical bar '|'.
+   *                     This file name is used for replacing some format string indicating an external table location.
+   * @param args A list of arguments, each of which is used to replace corresponding variable which has a form of ${i}.
+   * @return The table names created
+   */
+  public List<String> executeDDL(String ddlFileName, @Nullable String dataFileName, @Nullable String ... args)
+      throws Exception {
+
+    return executeDDL(ddlFileName, dataFileName, true, args);
+  }
+
+  private List<String> executeDDL(String ddlFileName, @Nullable String dataFileName, boolean isLocalTable,
+                                  @Nullable String[] args) throws Exception {
+
+    Path ddlFilePath = new Path(currentQueryPath, ddlFileName);
+    FileSystem fs = ddlFilePath.getFileSystem(conf);
+    assertTrue(ddlFilePath + " existence check", fs.exists(ddlFilePath));
+
+    String template = FileUtil.readTextFile(new File(ddlFilePath.toUri()));
+    String dataFilePath = null;
+    if (dataFileName != null) {
+      dataFilePath = getDataSetFile(dataFileName).toString();
+    }
+    String compiled = compileTemplate(template, dataFilePath, args);
+
+    List<ParsedResult> parsedResults = SimpleParser.parseScript(compiled);
+    List<String> createdTableNames = new ArrayList<String>();
+
+    for (ParsedResult parsedResult : parsedResults) {
+      // parse a statement
+      Expr expr = sqlParser.parse(parsedResult.getStatement());
+      assertNotNull(ddlFilePath + " cannot be parsed", expr);
+
+      if (expr.getType() == OpType.CreateTable) {
+        CreateTable createTable = (CreateTable) expr;
+        String tableName = createTable.getTableName();
+        assertTrue("Table [" + tableName + "] creation is failed.", client.updateQuery(parsedResult.getStatement()));
+
+        TableDesc createdTable = client.getTableDesc(tableName);
+        String createdTableName = createdTable.getName();
+
+        assertTrue("table '" + createdTableName + "' creation check", client.existTable(createdTableName));
+        if (isLocalTable) {
+          createdTableGlobalSet.add(createdTableName);
+          createdTableNames.add(tableName);
+        }
+      } else if (expr.getType() == OpType.DropTable) {
+        DropTable dropTable = (DropTable) expr;
+        String tableName = dropTable.getTableName();
+        assertTrue("table '" + tableName + "' existence check",
+            client.existTable(CatalogUtil.buildFQName(currentDatabase, tableName)));
+        assertTrue("table drop is failed.", client.updateQuery(parsedResult.getStatement()));
+        assertFalse("table '" + tableName + "' dropped check",
+            client.existTable(CatalogUtil.buildFQName(currentDatabase, tableName)));
+        if (isLocalTable) {
+          createdTableGlobalSet.remove(tableName);
+        }
+      } else if (expr.getType() == OpType.AlterTable) {
+        AlterTable alterTable = (AlterTable) expr;
+        String tableName = alterTable.getTableName();
+        assertTrue("table '" + tableName + "' existence check", client.existTable(tableName));
+        client.updateQuery(compiled);
+        if (isLocalTable) {
+          createdTableGlobalSet.remove(tableName);
+        }
+      } else {
+        assertTrue(ddlFilePath + " is not a Create or Drop Table statement", false);
+      }
+    }
+
+    return createdTableNames;
+  }
+
+  /**
+   * Replace format strings by a given parameters.
+   *
+   * @param template
+   * @param dataFileName The data file name to replace <code>${table.path}</code>
+   * @param args The list argument to replace each corresponding format string ${i}. ${i} uses zero-based index.
+   * @return A string compiled
+   */
+  private String compileTemplate(String template, @Nullable String dataFileName, @Nullable String ... args) {
+    String result;
+    if (dataFileName != null) {
+      result = template.replace("${table.path}", "\'" + dataFileName + "'");
+    } else {
+      result = template;
+    }
+
+    if (args != null) {
+      for (int i = 0; i < args.length; i++) {
+        result = result.replace("${" + i + "}", args[i]);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
new file mode 100644
index 0000000..ed5e4bc
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.rm.YarnTajoResourceManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class TajoTestingCluster {
+	private static Log LOG = LogFactory.getLog(TajoTestingCluster.class);
+	private TajoConf conf;
+
+  protected MiniTajoYarnCluster yarnCluster;
+  private FileSystem defaultFS;
+  private MiniDFSCluster dfsCluster;
+	private MiniCatalogServer catalogServer;
+
+  private TajoMaster tajoMaster;
+  private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
+  private boolean standbyWorkerMode = false;
+
+	// If non-null, then already a cluster running.
+	private File clusterTestBuildDir = null;
+
+	/**
+	 * System property key to get test directory value.
+	 * Name is as it is because mini dfs has hard-codings to put test data here.
+	 */
+	public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA;
+
+	/**
+	 * Default parent directory for test output.
+	 */
+	public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
+
+  /**
+   * True If HCatalogStore is used. Otherwise, it is FALSE.
+   */
+  public Boolean isHCatalogStoreUse = false;
+
+  public TajoTestingCluster() {
+    this.conf = new TajoConf();
+    initPropertiesAndConfigs();
+	}
+
+  void initPropertiesAndConfigs() {
+    if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
+      String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
+      Preconditions.checkState(
+          testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()) ||
+              testResourceManager.equals(YarnTajoResourceManager.class.getCanonicalName()),
+          ConfVars.RESOURCE_MANAGER_CLASS.varname + " must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " +
+              YarnTajoResourceManager.class.getCanonicalName() +"."
+      );
+      conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
+    }
+    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
+    conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);
+
+    this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
+        .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
+    conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");
+  }
+
+	public TajoConf getConfiguration() {
+		return this.conf;
+	}
+
+	public void initTestDir() {
+		if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
+			clusterTestBuildDir = setupClusterTestBuildDir();
+			System.setProperty(TEST_DIRECTORY_KEY,
+          clusterTestBuildDir.getAbsolutePath());
+		}
+	}
+
+	/**
+	 * @return Where to write test data on local filesystem; usually
+	 * {@link #DEFAULT_TEST_DIRECTORY}
+	 * @see #setupClusterTestBuildDir()
+	 */
+	public static File getTestDir() {
+		return new File(System.getProperty(TEST_DIRECTORY_KEY,
+			DEFAULT_TEST_DIRECTORY));
+	}
+
+	/**
+	 * @param subdirName
+	 * @return Path to a subdirectory named <code>subdirName</code> under
+	 * {@link #getTestDir()}.
+	 * @see #setupClusterTestBuildDir()
+	 */
+	public static File getTestDir(final String subdirName) {
+		return new File(getTestDir(), subdirName);
+  }
+
+	public File setupClusterTestBuildDir() {
+		String randomStr = UUID.randomUUID().toString();
+		String dirStr = getTestDir(randomStr).toString();
+		File dir = new File(dirStr).getAbsoluteFile();
+		// Have it cleaned up on exit
+		dir.deleteOnExit();
+		return dir;
+	}
+
+  ////////////////////////////////////////////////////////
+  // HDFS Section
+  ////////////////////////////////////////////////////////
+  /**
+   * Start a minidfscluster.
+   * @param servers How many DNs to start.
+   * @throws Exception
+   * @see {@link #shutdownMiniDFSCluster()}
+   * @return The mini dfs cluster created.
+   */
+  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
+    return startMiniDFSCluster(servers, null, null);
+  }
+
+  /**
+   * Start a minidfscluster.
+   * Can only create one.
+   * @param servers How many DNs to start.
+   * @param dir Where to home your dfs cluster.
+   * @param hosts hostnames DNs to run on.
+   * @throws Exception
+   * @see {@link #shutdownMiniDFSCluster()}
+   * @return The mini dfs cluster created.
+   * @throws java.io.IOException
+   */
+  public MiniDFSCluster startMiniDFSCluster(int servers,
+                                            final File dir,
+                                            final String hosts[])
+      throws IOException {
+    if (dir == null) {
+      this.clusterTestBuildDir = setupClusterTestBuildDir();
+    } else {
+      this.clusterTestBuildDir = dir;
+    }
+
+    System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
+        this.clusterTestBuildDir.toString());
+
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
+    builder.hosts(hosts);
+    builder.numDataNodes(servers);
+    builder.format(true);
+    builder.manageNameDfsDirs(true);
+    builder.manageDataDfsDirs(true);
+    builder.waitSafeMode(true);
+    this.dfsCluster = builder.build();
+
+    // Set this just-started cluser as our filesystem.
+    this.defaultFS = this.dfsCluster.getFileSystem();
+    this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
+    this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
+
+    return this.dfsCluster;
+  }
+
+  public void shutdownMiniDFSCluster() throws Exception {
+    if (this.dfsCluster != null) {
+      try {
+        FileSystem fs = this.dfsCluster.getFileSystem();
+        if (fs != null) fs.close();
+      } catch (IOException e) {
+        System.err.println("error closing file system: " + e);
+      }
+      // The below throws an exception per dn, AsynchronousCloseException.
+      this.dfsCluster.shutdown();
+    }
+  }
+
+  public boolean isRunningDFSCluster() {
+    return this.defaultFS != null;
+  }
+
+  public MiniDFSCluster getMiniDFSCluster() {
+    return this.dfsCluster;
+  }
+
+  public FileSystem getDefaultFileSystem() {
+    return this.defaultFS;
+  }
+
+  ////////////////////////////////////////////////////////
+  // Catalog Section
+  ////////////////////////////////////////////////////////
+  public MiniCatalogServer startCatalogCluster() throws Exception {
+    TajoConf c = getConfiguration();
+
+    if(clusterTestBuildDir == null) {
+      clusterTestBuildDir = setupClusterTestBuildDir();
+    }
+
+    conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
+    conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db");
+    LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
+    conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
+
+    catalogServer = new MiniCatalogServer(conf);
+    CatalogServer catServer = catalogServer.getCatalogServer();
+    InetSocketAddress sockAddr = catServer.getBindAddress();
+    c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
+
+    return this.catalogServer;
+  }
+
+  public void shutdownCatalogCluster() {
+    if (catalogServer != null) {
+      this.catalogServer.shutdown();
+    }
+  }
+
+  public MiniCatalogServer getMiniCatalogCluster() {
+    return this.catalogServer;
+  }
+
+  public boolean isHCatalogStoreRunning() {
+    return isHCatalogStoreUse;
+  }
+
+  ////////////////////////////////////////////////////////
+  // Tajo Cluster Section
+  ////////////////////////////////////////////////////////
+  private void startMiniTajoCluster(File testBuildDir,
+                                               final int numSlaves,
+                                               boolean local) throws Exception {
+    TajoConf c = getConfiguration();
+    c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0");
+    c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0");
+    c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
+    c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
+    c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir");
+
+    LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
+
+    if (!local) {
+      c.setVar(ConfVars.ROOT_DIR,
+          getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
+    } else {
+      c.setVar(ConfVars.ROOT_DIR, clusterTestBuildDir.getAbsolutePath() + "/tajo");
+    }
+
+    setupCatalogForTesting(c, clusterTestBuildDir);
+
+    tajoMaster = new TajoMaster();
+    tajoMaster.init(c);
+    tajoMaster.start();
+
+    this.conf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, c.getVar(ConfVars.WORKER_PEER_RPC_ADDRESS));
+    this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, c.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+
+    InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
+
+    this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+        tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
+    this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+    this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
+
+    if(standbyWorkerMode) {
+      startTajoWorkers(numSlaves);
+    }
+    LOG.info("Mini Tajo cluster is up");
+    LOG.info("====================================================================================");
+    LOG.info("=                           MiniTajoCluster starts up                              =");
+    LOG.info("====================================================================================");
+    LOG.info("= * Master Address: " + tajoMaster.getMasterName());
+    LOG.info("= * CatalogStore: " + tajoMaster.getCatalogServer().getStoreClassName());
+    LOG.info("------------------------------------------------------------------------------------");
+    LOG.info("= * Warehouse Dir: " + TajoConf.getWarehouseDir(c));
+    LOG.info("= * Worker Tmp Dir: " + c.getVar(ConfVars.WORKER_TEMPORAL_DIR));
+    LOG.info("====================================================================================");
+  }
+
+  private void setupCatalogForTesting(TajoConf c, File testBuildDir) throws IOException {
+    final String HCATALOG_CLASS_NAME = "org.apache.tajo.catalog.store.HCatalogStore";
+    boolean hcatalogClassExists = false;
+    try {
+      getClass().getClassLoader().loadClass(HCATALOG_CLASS_NAME);
+      hcatalogClassExists = true;
+    } catch (ClassNotFoundException e) {
+      LOG.info("HCatalogStore is not available.");
+    }
+    String driverClass = System.getProperty(CatalogConstants.STORE_CLASS);
+
+    if (hcatalogClassExists &&
+        driverClass != null && driverClass.equals(HCATALOG_CLASS_NAME)) {
+      try {
+        getClass().getClassLoader().loadClass(HCATALOG_CLASS_NAME);
+        String jdbcUri = "jdbc:derby:;databaseName="+ testBuildDir.toURI().getPath()  + "/metastore_db;create=true";
+        c.set("hive.metastore.warehouse.dir", TajoConf.getWarehouseDir(c).toString() + "/default");
+        c.set("javax.jdo.option.ConnectionURL", jdbcUri);
+        c.set(TajoConf.ConfVars.WAREHOUSE_DIR.varname, conf.getVar(ConfVars.WAREHOUSE_DIR));
+        c.set(CatalogConstants.STORE_CLASS, HCATALOG_CLASS_NAME);
+        Path defaultDatabasePath = new Path(TajoConf.getWarehouseDir(c).toString() + "/default");
+        FileSystem fs = defaultDatabasePath.getFileSystem(c);
+        if (!fs.exists(defaultDatabasePath)) {
+          fs.mkdirs(defaultDatabasePath);
+        }
+        isHCatalogStoreUse = true;
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+    } else { // for derby
+      c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
+      c.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath() + "/db");
+    }
+    c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
+  }
+
+  private void startTajoWorkers(int numSlaves) throws Exception {
+    for(int i = 0; i < 1; i++) {
+      TajoWorker tajoWorker = new TajoWorker();
+
+      TajoConf workerConf  = new TajoConf(this.conf);
+
+      workerConf.setVar(ConfVars.WORKER_INFO_ADDRESS, "localhost:0");
+      workerConf.setVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS, "localhost:0");
+      workerConf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
+
+      workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0");
+      
+      tajoWorker.startWorker(workerConf, new String[]{"standby"});
+
+      LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
+      tajoWorkers.add(tajoWorker);
+    }
+  }
+
+  public void restartTajoCluster(int numSlaves) throws Exception {
+    tajoMaster.stop();
+    tajoMaster.start();
+
+    LOG.info("Minicluster has been restarted");
+  }
+
+  public TajoMaster getMaster() {
+    return this.tajoMaster;
+  }
+
+  public List<TajoWorker> getTajoWorkers() {
+    return this.tajoWorkers;
+  }
+
+  public void shutdownMiniTajoCluster() {
+    if(this.tajoMaster != null) {
+      this.tajoMaster.stop();
+    }
+    for(TajoWorker eachWorker: tajoWorkers) {
+      eachWorker.stopWorkerForce();
+    }
+    tajoWorkers.clear();
+    this.tajoMaster= null;
+  }
+
+  ////////////////////////////////////////////////////////
+  // Meta Cluster Section
+  ////////////////////////////////////////////////////////
+  /**
+   * @throws java.io.IOException If a cluster -- dfs or engine -- already running.
+   */
+  void isRunningCluster(String passedBuildPath) throws IOException {
+    if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
+    throw new IOException("Cluster already running at " +
+        this.clusterTestBuildDir);
+  }
+
+  /**
+   * This method starts up a tajo cluster with a given number of clusters in
+   * distributed mode.
+   *
+   * @param numSlaves the number of tajo cluster to start up
+   * @throws Exception
+   */
+  public void startMiniCluster(final int numSlaves)
+      throws Exception {
+    startMiniCluster(numSlaves, null);
+  }
+
+  public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception {
+
+    int numDataNodes = numSlaves;
+    if(dataNodeHosts != null && dataNodeHosts.length != 0) {
+      numDataNodes = dataNodeHosts.length;
+    }
+
+    LOG.info("Starting up minicluster with 1 master(s) and " +
+        numSlaves + " worker(s) and " + numDataNodes + " datanode(s)");
+
+    // If we already put up a cluster, fail.
+    String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
+    isRunningCluster(testBuildPath);
+    if (testBuildPath != null) {
+      LOG.info("Using passed path: " + testBuildPath);
+    }
+
+    // Make a new random dir to home everything in.  Set it as system property.
+    // minidfs reads home from system property.
+    this.clusterTestBuildDir = testBuildPath == null?
+        setupClusterTestBuildDir() : new File(testBuildPath);
+
+    System.setProperty(TEST_DIRECTORY_KEY,
+        this.clusterTestBuildDir.getAbsolutePath());
+
+    startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
+    this.dfsCluster.waitClusterUp();
+
+    if(!standbyWorkerMode) {
+      startMiniYarnCluster();
+    }
+
+    startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
+  }
+
+  private void startMiniYarnCluster() throws Exception {
+    LOG.info("Starting up YARN cluster");
+    // Scheduler properties required for YARN to work
+    conf.set("yarn.scheduler.capacity.root.queues", "default");
+    conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 2);
+
+    if (yarnCluster == null) {
+      yarnCluster = new MiniTajoYarnCluster(TajoTestingCluster.class.getName(), 3);
+      yarnCluster.init(conf);
+      yarnCluster.start();
+
+      ResourceManager resourceManager = yarnCluster.getResourceManager();
+      InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress();
+      InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress();
+      conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr));
+      conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr));
+
+      URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
+      if (url == null) {
+        throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
+      }
+      yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent());
+      OutputStream os = new FileOutputStream(new File(url.getPath()));
+      yarnCluster.getConfig().writeXml(os);
+      os.close();
+    }
+  }
+
+  public void startMiniClusterInLocal(final int numSlaves) throws Exception {
+    // If we already put up a cluster, fail.
+    String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
+    isRunningCluster(testBuildPath);
+    if (testBuildPath != null) {
+      LOG.info("Using passed path: " + testBuildPath);
+    }
+
+    // Make a new random dir to home everything in.  Set it as system property.
+    // minidfs reads home from system property.
+    this.clusterTestBuildDir = testBuildPath == null?
+        setupClusterTestBuildDir() : new File(testBuildPath);
+
+    System.setProperty(TEST_DIRECTORY_KEY,
+        this.clusterTestBuildDir.getAbsolutePath());
+
+    startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true);
+  }
+
+  public void shutdownMiniCluster() throws IOException {
+    LOG.info("========================================");
+    LOG.info("Minicluster is stopping");
+    LOG.info("========================================");
+
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    shutdownMiniTajoCluster();
+
+    if(this.catalogServer != null) {
+      shutdownCatalogCluster();
+    }
+
+    if(this.yarnCluster != null) {
+      this.yarnCluster.stop();
+    }
+
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    if(this.dfsCluster != null) {
+
+      try {
+        FileSystem fs = this.dfsCluster.getFileSystem();
+        if (fs != null) fs.close();
+        this.dfsCluster.shutdown();
+      } catch (IOException e) {
+        System.err.println("error closing file system: " + e);
+      }
+    }
+
+    if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
+      if(!ShutdownHookManager.get().isShutdownInProgress()) {
+        //TODO clean test dir when ShutdownInProgress
+        LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
+        localFS.delete(new Path(clusterTestBuildDir.toString()), true);
+        localFS.close();
+      }
+      this.clusterTestBuildDir = null;
+    }
+    LOG.info("Minicluster is down");
+  }
+
+  public static ResultSet run(String[] names,
+                              Schema[] schemas,
+                              Options option,
+                              String[][] tables,
+                              String query) throws Exception {
+    TpchTestBase instance = TpchTestBase.getInstance();
+    TajoTestingCluster util = instance.getTestingCluster();
+    while(true) {
+      if(util.getMaster().isMasterRunning()) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    TajoConf conf = util.getConfiguration();
+    TajoClient client = new TajoClient(conf);
+
+    FileSystem fs = util.getDefaultFileSystem();
+    Path rootDir = util.getMaster().
+        getStorageManager().getWarehouseDir();
+    fs.mkdirs(rootDir);
+    for (int i = 0; i < names.length; i++) {
+      Path tablePath = new Path(rootDir, names[i]);
+      fs.mkdirs(tablePath);
+      Path dfsPath = new Path(tablePath, names[i] + ".tbl");
+      FSDataOutputStream out = fs.create(dfsPath);
+      for (int j = 0; j < tables[i].length; j++) {
+        out.write((tables[i][j]+"\n").getBytes());
+      }
+      out.close();
+      TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option);
+      client.createExternalTable(names[i], schemas[i], tablePath, meta);
+    }
+    Thread.sleep(1000);
+    ResultSet res = client.executeQueryAndGetResult(query);
+    return res;
+  }
+
+    /**
+    * Write lines to a file.
+    *
+    * @param file File to write lines to
+    * @param lines Strings written to the file
+    * @throws java.io.IOException
+    */
+  private static void writeLines(File file, String... lines)
+      throws IOException {
+    Writer writer = Files.newWriter(file, Charsets.UTF_8);
+    try {
+      for (String line : lines) {
+        writer.write(line);
+        writer.write('\n');
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+
+  public void setAllTajoDaemonConfValue(String key, String value) {
+    tajoMaster.getContext().getConf().set(key, value);
+    for (TajoWorker eachWorker: tajoWorkers) {
+      eachWorker.getConfig().set(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
new file mode 100644
index 0000000..912400b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestQueryIdFactory {
+  
+  @Before
+  public void setup() {
+  }
+
+  @Test
+  public void testNewQueryId() {
+    QueryId qid1 = LocalTajoTestingUtility.newQueryId();
+    QueryId qid2 = LocalTajoTestingUtility.newQueryId();
+    assertTrue(qid1.compareTo(qid2) < 0);
+  }
+  
+  @Test
+  public void testNewSubQueryId() {
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    MasterPlan plan = new MasterPlan(qid, null, null);
+    ExecutionBlockId subqid1 = plan.newExecutionBlockId();
+    ExecutionBlockId subqid2 = plan.newExecutionBlockId();
+    assertTrue(subqid1.compareTo(subqid2) < 0);
+  }
+  
+  @Test
+  public void testNewQueryUnitId() {
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    MasterPlan plan = new MasterPlan(qid, null, null);
+    ExecutionBlockId subid = plan.newExecutionBlockId();
+    QueryUnitId quid1 = QueryIdFactory.newQueryUnitId(subid);
+    QueryUnitId quid2 = QueryIdFactory.newQueryUnitId(subid);
+    assertTrue(quid1.compareTo(quid2) < 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java b/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java
new file mode 100644
index 0000000..60b6c22
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.util.TajoIdUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestTajoIds {
+  @Test
+  public void testQueryId() {
+    long ts1 = 1315890136000l;
+    long ts2 = 1315890136001l;
+
+    QueryId j1 = createQueryId(ts1, 2);
+    QueryId j2 = createQueryId(ts1, 1);
+    QueryId j3 = createQueryId(ts2, 1);
+    QueryId j4 = createQueryId(ts1, 2);
+
+    assertTrue(j1.equals(j4));
+    assertFalse(j1.equals(j2));
+    assertFalse(j1.equals(j3));
+
+    assertTrue(j1.compareTo(j4) == 0);
+    assertTrue(j1.compareTo(j2) > 0);
+    assertTrue(j1.compareTo(j3) < 0);
+
+    assertTrue(j1.hashCode() == j4.hashCode());
+    assertFalse(j1.hashCode() == j2.hashCode());
+    assertFalse(j1.hashCode() == j3.hashCode());
+
+    QueryId j5 = createQueryId(ts1, 231415);
+    assertEquals("q_" + ts1 + "_0002", j1.toString());
+    assertEquals("q_" + ts1 + "_231415", j5.toString());
+  }
+
+  @Test
+  public void testQueryIds() {
+    long timeId = 1315890136000l;
+    
+    QueryId queryId = createQueryId(timeId, 1);
+    assertEquals("q_" + timeId + "_0001", queryId.toString());
+    
+    ExecutionBlockId subId = QueryIdFactory.newExecutionBlockId(queryId, 2);
+    assertEquals("eb_" + timeId +"_0001_000002", subId.toString());
+    
+    QueryUnitId qId = new QueryUnitId(subId, 5);
+    assertEquals("t_" + timeId + "_0001_000002_000005", qId.toString());
+
+    QueryUnitAttemptId attemptId = new QueryUnitAttemptId(qId, 4);
+    assertEquals("ta_" + timeId + "_0001_000002_000005_04", attemptId.toString());
+  }
+
+  @Test
+  public void testEqualsObject() {
+    long timeId = System.currentTimeMillis();
+    
+    QueryId queryId1 = createQueryId(timeId, 1);
+    QueryId queryId2 = createQueryId(timeId, 2);
+    assertNotSame(queryId1, queryId2);    
+    QueryId queryId3 = createQueryId(timeId, 1);
+    assertEquals(queryId1, queryId3);
+    
+    ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+    ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
+    assertNotSame(sid1, sid2);
+    ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+    assertEquals(sid1, sid3);
+    
+    QueryUnitId qid1 = new QueryUnitId(sid1, 9);
+    QueryUnitId qid2 = new QueryUnitId(sid1, 10);
+    assertNotSame(qid1, qid2);
+    QueryUnitId qid3 = new QueryUnitId(sid1, 9);
+    assertEquals(qid1, qid3);
+  }
+
+  @Test
+  public void testCompareTo() {
+    long time = System.currentTimeMillis();
+    
+    QueryId queryId1 = createQueryId(time, 1);
+    QueryId queryId2 = createQueryId(time, 2);
+    QueryId queryId3 = createQueryId(time, 1);
+    assertEquals(-1, queryId1.compareTo(queryId2));
+    assertEquals(1, queryId2.compareTo(queryId1));
+    assertEquals(0, queryId3.compareTo(queryId1));
+
+    ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+    ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
+    ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+    assertEquals(-1, sid1.compareTo(sid2));
+    assertEquals(1, sid2.compareTo(sid1));
+    assertEquals(0, sid3.compareTo(sid1));
+    
+    QueryUnitId qid1 = new QueryUnitId(sid1, 9);
+    QueryUnitId qid2 = new QueryUnitId(sid1, 10);
+    QueryUnitId qid3 = new QueryUnitId(sid1, 9);
+    assertEquals(-1, qid1.compareTo(qid2));
+    assertEquals(1, qid2.compareTo(qid1));
+    assertEquals(0, qid3.compareTo(qid1));
+  }
+  
+  @Test
+  public void testConstructFromString() {
+    QueryId qid1 = LocalTajoTestingUtility.newQueryId();
+    QueryId qid2 = TajoIdUtils.parseQueryId(qid1.toString());
+    assertEquals(qid1, qid2);
+
+    MasterPlan plan1 = new MasterPlan(qid1, null, null);
+    ExecutionBlockId sub1 = plan1.newExecutionBlockId();
+    ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
+    assertEquals(sub1, sub2);
+    
+    QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
+    QueryUnitId u2 = new QueryUnitId(u1.getProto());
+    assertEquals(u1, u2);
+
+    QueryUnitAttemptId attempt1 = new QueryUnitAttemptId(u1, 1);
+    QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.getProto());
+    assertEquals(attempt1, attempt2);
+  }
+
+  @Test
+  public void testConstructFromPB() {
+    QueryId qid1 = LocalTajoTestingUtility.newQueryId();
+    QueryId qid2 = new QueryId(qid1.getProto());
+    assertEquals(qid1, qid2);
+
+    MasterPlan plan = new MasterPlan(qid1, null, null);
+    ExecutionBlockId sub1 = plan.newExecutionBlockId();
+    ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
+    assertEquals(sub1, sub2);
+
+    QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
+    QueryUnitId u2 = new QueryUnitId(u1.getProto());
+    assertEquals(u1, u2);
+
+    QueryUnitAttemptId attempt1 = new QueryUnitAttemptId(u1, 1);
+    QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.getProto());
+    assertEquals(attempt1, attempt2);
+  }
+
+  public static QueryId createQueryId(long timestamp, int id) {
+    ApplicationId appId = BuilderUtils.newApplicationId(timestamp, id);
+
+    return QueryIdFactory.newQueryId(appId.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
new file mode 100644
index 0000000..8995d81
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+public class TpchTestBase {
+  private static final Log LOG = LogFactory.getLog(TpchTestBase.class);
+
+  String [] names;
+  String [] paths;
+  String [][] tables;
+  Schema[] schemas;
+  Map<String, Integer> nameMap = Maps.newHashMap();
+  protected TPCH tpch;
+  protected LocalTajoTestingUtility util;
+
+  private static TpchTestBase testBase;
+
+  static {
+    try {
+      testBase = new TpchTestBase();
+      testBase.setUp();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  private TpchTestBase() throws IOException {
+    names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier", "empty_orders"};
+    paths = new String[names.length];
+    for (int i = 0; i < names.length; i++) {
+      nameMap.put(names[i], i);
+    }
+
+    tpch = new TPCH();
+    tpch.loadSchemas();
+    tpch.loadQueries();
+
+    schemas = new Schema[names.length];
+    for (int i = 0; i < names.length; i++) {
+      schemas[i] = tpch.getSchema(names[i]);
+    }
+
+    tables = new String[names.length][];
+    File file;
+    for (int i = 0; i < names.length; i++) {
+      file = new File("src/test/tpch/" + names[i] + ".tbl");
+      if(!file.exists()) {
+        file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
+            + ".tbl");
+      }
+      tables[i] = FileUtil.readTextFile(file).split("\n");
+      paths[i] = file.getAbsolutePath();
+    }
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void setUp() throws Exception {
+    util = new LocalTajoTestingUtility();
+    Options opt = new Options();
+    opt.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    util.setup(names, paths, schemas, opt);
+  }
+
+  public static TpchTestBase getInstance() {
+    return testBase;
+  }
+
+  public ResultSet execute(String query) throws Exception {
+    return util.execute(query);
+  }
+
+  public TajoTestingCluster getTestingCluster() {
+    return util.getTestingCluster();
+  }
+
+  public void tearDown() throws IOException {
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+    }
+    util.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java b/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
new file mode 100644
index 0000000..4ca9b3e
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.benchmark;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+
+@Category(IntegrationTest.class)
+public class TestTPCH extends QueryTestCaseBase {
+
+  public TestTPCH() {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+  }
+
+  @Test
+  public void testQ1OrderBy() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testQ2FourJoins() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testTPCH14Expr() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java
new file mode 100644
index 0000000..9c6e760
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.conf.TajoConf;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecExternalShellCommand {
+  @Test
+  public void testCommand() throws Exception {
+    TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration();
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out);
+
+    cli.executeMetaCommand("\\! echo \"this is test\"");
+    String consoleResult = new String(out.toByteArray());
+    assertEquals("this is test\n", consoleResult);
+
+    cli.executeMetaCommand("\\! error_command");
+    consoleResult = new String(out.toByteArray());
+    assertEquals("this is test\nERROR: /bin/bash: error_command: command not found\n", consoleResult);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java
new file mode 100644
index 0000000..b51835f
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.conf.TajoConf;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHdfsCommand {
+  @Test
+  public void testHdfCommand() throws Exception {
+    TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration();
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    System.setOut(new PrintStream(out));
+    System.setErr(new PrintStream(out));
+    TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out);
+
+    cli.executeMetaCommand("\\dfs -test");
+    String consoleResult = new String(out.toByteArray());
+    assertEquals("-test: Not enough arguments: expected 1 but got 0\n" +
+        "Usage: hadoop fs [generic options] -test -[defsz] <path>\n", consoleResult);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java
new file mode 100644
index 0000000..9c02b65
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSimpleParser {
+
+  @Test
+  public final void testSpecialCases() throws InvalidStatementException {
+    List<ParsedResult> res1 = SimpleParser.parseScript("");
+    assertEquals(0, res1.size());
+
+    List<ParsedResult> res2 = SimpleParser.parseScript("a");
+    assertEquals(1, res2.size());
+
+    List<ParsedResult> res3 = SimpleParser.parseScript("?");
+    assertEquals(0, res3.size());
+
+    List<ParsedResult> res4 = SimpleParser.parseScript("\\");
+    assertEquals(1, res4.size());
+  }
+
+  @Test
+  public final void testMetaCommands() throws InvalidStatementException {
+    List<ParsedResult> res1 = SimpleParser.parseScript("\\d");
+    assertEquals(1, res1.size());
+    assertEquals(ParsedResult.StatementType.META, res1.get(0).getType());
+    assertEquals("\\d", res1.get(0).getStatement());
+
+    List<ParsedResult> res2 = SimpleParser.parseScript("\\d;\\c;\\f;");
+    assertEquals(3, res2.size());
+    assertEquals(ParsedResult.StatementType.META, res2.get(0).getType());
+    assertEquals("\\d", res2.get(0).getStatement());
+    assertEquals(ParsedResult.StatementType.META, res2.get(1).getType());
+    assertEquals("\\c", res2.get(1).getStatement());
+    assertEquals(ParsedResult.StatementType.META, res2.get(2).getType());
+    assertEquals("\\f", res2.get(2).getStatement());
+
+    List<ParsedResult> res3 = SimpleParser.parseScript("\n\t\t  \\d;\n\\c;\t\t\\f  ;");
+    assertEquals(3, res3.size());
+    assertEquals(ParsedResult.StatementType.META, res3.get(0).getType());
+    assertEquals("\\d", res3.get(0).getStatement());
+    assertEquals(ParsedResult.StatementType.META, res3.get(1).getType());
+    assertEquals("\\c", res3.get(1).getStatement());
+    assertEquals(ParsedResult.StatementType.META, res3.get(2).getType());
+    assertEquals("\\f", res3.get(2).getStatement());
+
+    List<ParsedResult> res4 = SimpleParser.parseScript("\\\td;");
+    assertEquals(1, res4.size());
+    assertEquals("\\\td", res4.get(0).getStatement());
+  }
+
+  @Test
+  public final void testStatements() throws InvalidStatementException {
+    List<ParsedResult> res1 = SimpleParser.parseScript("select * from test;");
+    assertEquals(1, res1.size());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res1.get(0).getType());
+    assertEquals("select * from test", res1.get(0).getStatement());
+
+    List<ParsedResult> res2 = SimpleParser.parseScript("select * from test;");
+    assertEquals(1, res2.size());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res2.get(0).getType());
+    assertEquals("select * from test", res2.get(0).getStatement());
+
+    List<ParsedResult> res3 = SimpleParser.parseScript("select * from test1;select * from test2;");
+    assertEquals(2, res3.size());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res3.get(0).getType());
+    assertEquals("select * from test1", res3.get(0).getStatement());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res3.get(1).getType());
+    assertEquals("select * from test2", res3.get(1).getStatement());
+
+    List<ParsedResult> res4 = SimpleParser.parseScript("\t\t\n\rselect * from \ntest1;select * from test2\n;");
+    assertEquals(2, res4.size());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res4.get(0).getType());
+    assertEquals("select * from \ntest1", res4.get(0).getStatement());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res4.get(1).getType());
+    assertEquals("select * from test2", res4.get(1).getStatement());
+
+    List<ParsedResult> res5 =
+        SimpleParser.parseScript("\t\t\n\rselect * from \ntest1;\\d test;select * from test2;\n\nselect 1;");
+    assertEquals(4, res5.size());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res5.get(0).getType());
+    assertEquals("select * from \ntest1", res5.get(0).getStatement());
+    assertEquals(ParsedResult.StatementType.META, res5.get(1).getType());
+    assertEquals("\\d test", res5.get(1).getStatement());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res5.get(2).getType());
+    assertEquals("select * from test2", res5.get(2).getStatement());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res5.get(3).getType());
+    assertEquals("select 1", res5.get(3).getStatement());
+  }
+
+  @Test
+  public final void testQuoted() throws InvalidStatementException {
+    List<ParsedResult> res1 = SimpleParser.parseScript("select '\n;' from test;");
+    assertEquals(1, res1.size());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res1.get(0).getType());
+    assertEquals("select '\n;' from test", res1.get(0).getStatement());
+
+    List<ParsedResult> res2 = SimpleParser.parseScript("select 'abc\nbbc\nddf' from test;");
+    assertEquals(1, res2.size());
+    assertEquals(ParsedResult.StatementType.STATEMENT, res2.get(0).getType());
+    assertEquals("select 'abc\nbbc\nddf' from test", res2.get(0).getStatement());
+
+    try {
+      SimpleParser.parseScript("select 'abc");
+      assertTrue(false);
+    } catch (InvalidStatementException is) {
+      assertTrue(true);
+    }
+  }
+
+  @Test
+  public final void testParseLines1() throws InvalidStatementException {
+    String [] lines = {
+      "select abc, ",
+      "bbc from test"
+    };
+    SimpleParser parser = new SimpleParser();
+    List<ParsedResult> result1 = parser.parseLines(lines[0]);
+    assertEquals(0, result1.size());
+    List<ParsedResult> result2 = parser.parseLines(lines[1]);
+    assertEquals(0, result2.size());
+    List<ParsedResult> result3 = parser.EOF();
+    assertEquals(1, result3.size());
+    assertEquals(lines[0] + lines[1], result3.get(0).getStatement());
+  }
+
+  @Test
+  public final void testParseLines2() throws InvalidStatementException {
+    String [] lines = {
+        "select abc, '",
+        "bbc' from test; select * from test3;"
+    };
+    SimpleParser parser = new SimpleParser();
+    List<ParsedResult> result1 = parser.parseLines(lines[0]);
+    assertEquals(0, result1.size());
+    List<ParsedResult> result2 = parser.parseLines(lines[1]);
+    assertEquals(2, result2.size());
+    assertEquals("select abc, 'bbc' from test", result2.get(0).getStatement());
+    assertEquals("select * from test3", result2.get(1).getStatement());
+  }
+
+  @Test
+  public final void testParseLines3() throws InvalidStatementException {
+    String [] lines = {
+        "select abc, 'bbc",
+        "' from test; select * from test3;"
+    };
+    SimpleParser parser = new SimpleParser();
+    List<ParsedResult> result1 = parser.parseLines(lines[0]);
+    assertEquals(0, result1.size());
+    List<ParsedResult> result2 = parser.parseLines(lines[1]);
+    assertEquals(2, result2.size());
+    assertEquals("select abc, 'bbc' from test", result2.get(0).getStatement());
+    assertEquals("select * from test3", result2.get(1).getStatement());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java b/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
new file mode 100644
index 0000000..1855217
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestDDLBuilder {
+  private static final Schema schema1;
+  private static final TableMeta meta1;
+  private static final PartitionMethodDesc partitionMethod1;
+
+  static {
+    schema1 = new Schema();
+    schema1.addColumn("name", TajoDataTypes.Type.BLOB);
+    schema1.addColumn("addr", TajoDataTypes.Type.TEXT);
+
+    meta1 = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+    meta1.putOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    meta1.putOption(StorageConstants.COMPRESSION_CODEC, GzipCodec.class.getName());
+
+    Schema expressionSchema = new Schema();
+    expressionSchema.addColumn("key", TajoDataTypes.Type.INT4);
+    expressionSchema.addColumn("key2", TajoDataTypes.Type.TEXT);
+    partitionMethod1 = new PartitionMethodDesc(
+        "db1",
+        "table1",
+        CatalogProtos.PartitionType.COLUMN,
+        "key,key2",
+        expressionSchema);
+  }
+
+  @Test
+  public void testBuildDDLForExternalTable() throws Exception {
+    TableDesc desc = new TableDesc("db1.table1", schema1, meta1, new Path("/table1"));
+    desc.setPartitionMethod(partitionMethod1);
+    desc.setExternal(true);
+    assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLForExternalTable.result"),
+        DDLBuilder.buildDDLForExternalTable(desc));
+  }
+
+  @Test
+  public void testBuildDDLQuotedTableName() throws Exception {
+    Schema schema2 = new Schema();
+    schema2.addColumn("name", TajoDataTypes.Type.BLOB);
+    schema2.addColumn("addr", TajoDataTypes.Type.TEXT);
+    schema2.addColumn("FirstName", TajoDataTypes.Type.TEXT);
+    schema2.addColumn("LastName", TajoDataTypes.Type.TEXT);
+    schema2.addColumn("with", TajoDataTypes.Type.TEXT);
+
+    Schema expressionSchema2 = new Schema();
+    expressionSchema2.addColumn("BirthYear", TajoDataTypes.Type.INT4);
+
+    PartitionMethodDesc partitionMethod2 = new PartitionMethodDesc(
+        "db1",
+        "table1",
+        CatalogProtos.PartitionType.COLUMN,
+        "key,key2",
+        expressionSchema2);
+
+    TableDesc desc = new TableDesc("db1.TABLE2", schema2, meta1, new Path("/table1"));
+    desc.setPartitionMethod(partitionMethod2);
+    desc.setExternal(true);
+    assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLQuotedTableName1.result"),
+        DDLBuilder.buildDDLForExternalTable(desc));
+
+    desc = new TableDesc("db1.TABLE1", schema2, meta1, new Path("/table1"));
+    desc.setPartitionMethod(partitionMethod2);
+    desc.setExternal(false);
+    assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLQuotedTableName2.result"),
+        DDLBuilder.buildDDLForBaseTable(desc));
+  }
+
+  @Test
+  public void testBuildDDLForBaseTable() throws Exception {
+    TableDesc desc = new TableDesc("db1.table2", schema1, meta1, new Path("/table1"));
+    assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLForBaseTable.result"),
+        DDLBuilder.buildDDLForBaseTable(desc));
+  }
+
+  @Test
+  public void testBuildColumn() throws Exception {
+    String [] tobeUnquoted = {
+        "column_name",
+        "columnname",
+        "column_1",
+    };
+
+    for (String columnName : tobeUnquoted) {
+      assertFalse(CatalogUtil.isShouldBeQuoted(columnName));
+    }
+
+    String [] quoted = {
+        "Column_Name",
+        "COLUMN_NAME",
+        "컬럼",
+        "$column_name",
+        "Column_Name1",
+        "with",
+        "when"
+    };
+
+    for (String columnName : quoted) {
+      assertTrue(CatalogUtil.isShouldBeQuoted(columnName));
+    }
+  }
+}