You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:13 UTC
[10/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core
into the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
new file mode 100644
index 0000000..88d913d
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.LocalContainerLauncher;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerAuxService;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Configures and starts the Tajo-specific components in the YARN cluster.
+ *
+ */
+public class MiniTajoYarnCluster extends MiniYARNCluster {
+
+ public static final String APPJAR = JarFinder
+ .getJar(LocalContainerLauncher.class);
+
+ private static final Log LOG = LogFactory.getLog(MiniTajoYarnCluster.class);
+
+ public MiniTajoYarnCluster(String testName) {
+ this(testName, 1);
+ }
+
+ public MiniTajoYarnCluster(String testName, int noOfNMs) {
+ super(testName, noOfNMs, 1, 1);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+
+ conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+ conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+ if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+ "apps_staging_dir/").getAbsolutePath());
+ }
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
+
+ try {
+ Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+ new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+ FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+ if (fc.util().exists(stagingPath)) {
+ LOG.info(stagingPath + " exists! deleting...");
+ fc.delete(stagingPath, true);
+ }
+ LOG.info("mkdir: " + stagingPath);
+ //mkdir the staging directory so that right permissions are set while running as proxy user
+ fc.mkdir(stagingPath, null, true);
+ //mkdir done directory as well
+ String doneDir = JobHistoryUtils
+ .getConfiguredHistoryServerDoneDirPrefix(conf);
+ Path doneDirPath = fc.makeQualified(new Path(doneDir));
+ fc.mkdir(doneDirPath, null, true);
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Could not create staging directory. ", e);
+ }
+ conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
+ // which shuffle doesn't happen
+ //configure the shuffle service in NM
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID);
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class,
+ Service.class);
+
+ // Non-standard shuffle port
+ conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);
+
+ // local directory
+ conf.set(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.name(), "/tmp/tajo-localdir");
+
+ conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+ DefaultContainerExecutor.class, ContainerExecutor.class);
+
+ // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+ // for corresponding uberized tests.
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+ conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+
+ LOG.info("MiniTajoYarn NM Local Dir: " + getConfig().get(YarnConfiguration.NM_LOCAL_DIRS));
+ }
+
+ private class JobHistoryServerWrapper extends AbstractService {
+ public JobHistoryServerWrapper() {
+ super(JobHistoryServerWrapper.class.getName());
+ }
+
+ @Override
+ public synchronized void start() {
+ try {
+ if (!getConfig().getBoolean(
+ JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
+ // pick free random ports.
+ getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+ MiniYARNCluster.getHostname() + ":0");
+ getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+ MiniYARNCluster.getHostname() + ":0");
+ }
+ super.start();
+ } catch (Throwable t) {
+ throw new YarnRuntimeException(t);
+ }
+
+ LOG.info("MiniMRYARN ResourceManager address: " +
+ getConfig().get(YarnConfiguration.RM_ADDRESS));
+ LOG.info("MiniMRYARN ResourceManager web address: " +
+ getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+ LOG.info("MiniMRYARN HistoryServer address: " +
+ getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+ LOG.info("MiniMRYARN HistoryServer web address: " +
+ getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ }
+ }
+
+ public static void main(String [] args) {
+ MiniTajoYarnCluster cluster = new MiniTajoYarnCluster(MiniTajoYarnCluster.class.getName());
+ cluster.init(new TajoConf());
+ cluster.start();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
new file mode 100644
index 0000000..961184c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.cli.ParsedResult;
+import org.apache.tajo.cli.SimpleParser;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.FileUtil;
+import org.junit.*;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * (Note that this class is not thread safe. Do not execute maven test in any parallel mode.)
+ * <br />
+ * <code>QueryTestCaseBase</code> provides useful methods to easily execute queries and verify their results.
+ *
+ * This class basically uses four resource directories:
+ * <ul>
+ * <li>src/test/resources/dataset - contains a set of data files. It contains sub directories, each of which
+ * corresponds each test class. All data files in each sub directory can be used in the corresponding test class.</li>
+ *
+ * <li>src/test/resources/queries - This is the query directory. It contains sub directories, each of which
+ * corresponds each test class. All query files in each sub directory can be used in the corresponding test
+ * class.</li>
+ *
+ * <li>src/test/resources/results - This is the result directory. It contains sub directories, each of which
+ * corresponds each test class. All result files in each sub directory can be used in the corresponding test class.
+ * </li>
+ * </ul>
+ *
+ * For example, if you create a test class named <code>TestJoinQuery</code>, you should create a pair of query and
+ * result set directories as follows:
+ *
+ * <pre>
+ * src-|
+ * |- resources
+ * |- dataset
+ * | |- TestJoinQuery
+ * | |- table1.tbl
+ * | |- table2.tbl
+ * |
+ * |- queries
+ * | |- TestJoinQuery
+ * | |- TestInnerJoin.sql
+ * | |- table1_ddl.sql
+ * | |- table2_ddl.sql
+ * |
+ * |- results
+ * |- TestJoinQuery
+ * |- TestInnerJoin.result
+ * </pre>
+ *
+ * <code>QueryTestCaseBase</code> basically provides the following methods:
+ * <ul>
+ * <li><code>{@link #executeQuery()}</code> - executes a corresponding query and returns an ResultSet instance</li>
+ * <li><code>{@link #executeFile(String)}</code> - executes a given query file included in the corresponding query
+ * file in the current class's query directory</li>
+ * <li><code>assertResultSet()</code> - check if the query result is equivalent to the expected result included
+ * in the corresponding result file in the current class's result directory.</li>
+ * <li><code>cleanQuery()</code> - clean up all resources</li>
+ * <li><code>executeDDL()</code> - execute a DDL query like create or drop table.</li>
+ * </ul>
+ *
+ * In order to make use of the above methods, query files and results file must be as follows:
+ * <ul>
+ * <li>Each query file must be located on the subdirectory whose structure must be src/resources/queries/${ClassName},
+ * where ${ClassName} indicates an actual test class's simple name.</li>
+ * <li>Each result file must be located on the subdirectory whose structure must be src/resources/results/${ClassName},
+ * where ${ClassName} indicates an actual test class's simple name.</li>
+ * </ul>
+ *
+ * Especially, {@link #executeQuery() and {@link #assertResultSet(java.sql.ResultSet)} methods automatically finds
+ * a query file to be executed and a result to be compared, which are corresponding to the running class and method.
+ * For them, query and result files additionally must be follows as:
+ * <ul>
+ * <li>Each result file must have the file extension '.result'</li>
+ * <li>Each query file must have the file extension '.sql'.</li>
+ * </ul>
+ */
+public class QueryTestCaseBase {
+ private static final Log LOG = LogFactory.getLog(QueryTestCaseBase.class);
+ protected static final TpchTestBase testBase;
+ protected static final TajoTestingCluster testingCluster;
+ protected static TajoConf conf;
+ protected static TajoClient client;
+ protected static final CatalogService catalog;
+ protected static final SQLAnalyzer sqlParser = new SQLAnalyzer();
+
+ /** the base path of dataset directories */
+ protected static final Path datasetBasePath;
+ /** the base path of query directories */
+ protected static final Path queryBasePath;
+ /** the base path of result directories */
+ protected static final Path resultBasePath;
+
+ static {
+ testBase = TpchTestBase.getInstance();
+ testingCluster = testBase.getTestingCluster();
+ conf = testBase.getTestingCluster().getConfiguration();
+ catalog = testBase.getTestingCluster().getMaster().getCatalog();
+ URL datasetBaseURL = ClassLoader.getSystemResource("dataset");
+ datasetBasePath = new Path(datasetBaseURL.toString());
+ URL queryBaseURL = ClassLoader.getSystemResource("queries");
+ queryBasePath = new Path(queryBaseURL.toString());
+ URL resultBaseURL = ClassLoader.getSystemResource("results");
+ resultBasePath = new Path(resultBaseURL.toString());
+ }
+
+ /** It transiently contains created tables for the running test class. */
+ private static String currentDatabase;
+ private static Set<String> createdTableGlobalSet = new HashSet<String>();
+ // queries and results directory corresponding to subclass class.
+ private Path currentQueryPath;
+ private Path currentResultPath;
+ private Path currentDatasetPath;
+
+ // for getting a method name
+ @Rule public TestName name = new TestName();
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ conf = testBase.getTestingCluster().getConfiguration();
+ client = new TajoClient(conf);
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws ServiceException {
+ for (String tableName : createdTableGlobalSet) {
+ client.updateQuery("DROP TABLE IF EXISTS " + CatalogUtil.denormalizeIdentifier(tableName));
+ }
+ createdTableGlobalSet.clear();
+
+ // if the current database is "default", shouldn't drop it.
+ if (!currentDatabase.equals(TajoConstants.DEFAULT_DATABASE_NAME)) {
+ for (String tableName : catalog.getAllTableNames(currentDatabase)) {
+ client.updateQuery("DROP TABLE IF EXISTS " + tableName);
+ }
+
+ client.selectDatabase(TajoConstants.DEFAULT_DATABASE_NAME);
+ client.dropDatabase(currentDatabase);
+ }
+ client.close();
+ }
+
+ public QueryTestCaseBase() {
+ // hive 0.12 does not support quoted identifier.
+ // So, we use lower case database names when Tajo uses HCatalogStore.
+ if (testingCluster.isHCatalogStoreRunning()) {
+ this.currentDatabase = getClass().getSimpleName().toLowerCase();
+ } else {
+ this.currentDatabase = getClass().getSimpleName();
+ }
+ init();
+ }
+
+ public QueryTestCaseBase(String currentDatabase) {
+ this.currentDatabase = currentDatabase;
+ init();
+ }
+
+ private void init() {
+ String className = getClass().getSimpleName();
+ currentQueryPath = new Path(queryBasePath, className);
+ currentResultPath = new Path(resultBasePath, className);
+ currentDatasetPath = new Path(datasetBasePath, className);
+
+ try {
+ // if the current database is "default", we don't need create it because it is already prepated at startup time.
+ if (!currentDatabase.equals(TajoConstants.DEFAULT_DATABASE_NAME)) {
+ client.updateQuery("CREATE DATABASE IF NOT EXISTS " + CatalogUtil.denormalizeIdentifier(currentDatabase));
+ }
+ client.selectDatabase(currentDatabase);
+ } catch (ServiceException e) {
+ e.printStackTrace();
+ }
+ testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "false");
+ }
+
+ protected TajoClient getClient() {
+ return client;
+ }
+
+ public String getCurrentDatabase() {
+ return currentDatabase;
+ }
+
+ protected ResultSet executeString(String sql) throws Exception {
+ return testBase.execute(sql);
+ }
+
+ /**
+ * Execute a query contained in the file located in src/test/resources/results/<i>ClassName</i>/<i>MethodName</i>.
+ * <i>ClassName</i> and <i>MethodName</i> will be replaced by actual executed class and methods.
+ *
+ * @return ResultSet of query execution.
+ */
+ public ResultSet executeQuery() throws Exception {
+ return executeFile(name.getMethodName() + ".sql");
+ }
+
+ /**
+ * Execute a query contained in the given named file. This methods tries to find the given file within the directory
+ * src/test/resources/results/<i>ClassName</i>.
+ *
+ * @param queryFileName The file name to be used to execute a query.
+ * @return ResultSet of query execution.
+ */
+ public ResultSet executeFile(String queryFileName) throws Exception {
+ Path queryFilePath = getQueryFilePath(queryFileName);
+ FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+ assertTrue(queryFilePath.toString() + " existence check", fs.exists(queryFilePath));
+
+ List<ParsedResult> parsedResults = SimpleParser.parseScript(FileUtil.readTextFile(new File(queryFilePath.toUri())));
+ if (parsedResults.size() > 1) {
+ assertNotNull("This script \"" + queryFileName + "\" includes two or more queries");
+ }
+ ResultSet result = client.executeQueryAndGetResult(parsedResults.get(0).getStatement());
+ assertNotNull("Query succeeded test", result);
+ return result;
+ }
+
+ /**
+ * Assert the equivalence between the expected result and an actual query result.
+ * If it isn't it throws an AssertionError.
+ *
+ * @param result Query result to be compared.
+ */
+ public final void assertResultSet(ResultSet result) throws IOException {
+ assertResultSet("Result Verification", result, name.getMethodName() + ".result");
+ }
+
+ /**
+ * Assert the equivalence between the expected result and an actual query result.
+ * If it isn't it throws an AssertionError.
+ *
+ * @param result Query result to be compared.
+ * @param resultFileName The file name containing the result to be compared
+ */
+ public final void assertResultSet(ResultSet result, String resultFileName) throws IOException {
+ assertResultSet("Result Verification", result, resultFileName);
+ }
+
+ /**
+ * Assert the equivalence between the expected result and an actual query result.
+ * If it isn't it throws an AssertionError with the given message.
+ *
+ * @param message message The message to printed if the assertion is failed.
+ * @param result Query result to be compared.
+ */
+ public final void assertResultSet(String message, ResultSet result, String resultFileName) throws IOException {
+ FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+ Path resultFile = getResultFile(resultFileName);
+ assertTrue(resultFile.toString() + " existence check", fs.exists(resultFile));
+ try {
+ verifyResultText(message, result, resultFile);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public final void assertStrings(String actual) throws IOException {
+ assertStrings(actual, name.getMethodName() + ".result");
+ }
+
+ public final void assertStrings(String actual, String resultFileName) throws IOException {
+ assertStrings("Result Verification", actual, resultFileName);
+ }
+
+ public final void assertStrings(String message, String actual, String resultFileName) throws IOException {
+ FileSystem fs = currentQueryPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+ Path resultFile = getResultFile(resultFileName);
+ assertTrue(resultFile.toString() + " existence check", fs.exists(resultFile));
+
+ String expectedResult = FileUtil.readTextFile(new File(resultFile.toUri()));
+ assertEquals(message, expectedResult, actual);
+ }
+
+ /**
+ * Release all resources
+ *
+ * @param resultSet ResultSet
+ */
+ public final void cleanupQuery(ResultSet resultSet) throws IOException {
+ if (resultSet == null) {
+ return;
+ }
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Assert that the database exists.
+ * @param databaseName The database name to be checked. This name is case sensitive.
+ */
+ public void assertDatabaseExists(String databaseName) throws ServiceException {
+ assertTrue(client.existDatabase(databaseName));
+ }
+
+ /**
+ * Assert that the database does not exists.
+ * @param databaseName The database name to be checked. This name is case sensitive.
+ */
+ public void assertDatabaseNotExists(String databaseName) throws ServiceException {
+ assertTrue(!client.existDatabase(databaseName));
+ }
+
+ /**
+ * Assert that the table exists.
+ *
+ * @param tableName The table name to be checked. This name is case sensitive.
+ * @throws ServiceException
+ */
+ public void assertTableExists(String tableName) throws ServiceException {
+ assertTrue(client.existTable(tableName));
+ }
+
+ /**
+ * Assert that the table does not exist.
+ *
+ * @param tableName The table name to be checked. This name is case sensitive.
+ */
+ public void assertTableNotExists(String tableName) throws ServiceException {
+ assertTrue(!client.existTable(tableName));
+ }
+
+ public void assertColumnExists(String tableName,String columnName) throws ServiceException {
+ TableDesc tableDesc = fetchTableMetaData(tableName);
+ assertTrue(tableDesc.getSchema().containsByName(columnName));
+ }
+
+ private TableDesc fetchTableMetaData(String tableName) throws ServiceException {
+ return client.getTableDesc(tableName);
+ }
+
+ /**
+ * It transforms a ResultSet instance to rows represented as strings.
+ *
+ * @param resultSet ResultSet that contains a query result
+ * @return String
+ * @throws SQLException
+ */
+ public String resultSetToString(ResultSet resultSet) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ ResultSetMetaData rsmd = resultSet.getMetaData();
+ int numOfColumns = rsmd.getColumnCount();
+
+ for (int i = 1; i <= numOfColumns; i++) {
+ if (i > 1) sb.append(",");
+ String columnName = rsmd.getColumnName(i);
+ sb.append(columnName);
+ }
+ sb.append("\n-------------------------------\n");
+
+ while (resultSet.next()) {
+ for (int i = 1; i <= numOfColumns; i++) {
+ if (i > 1) sb.append(",");
+ String columnValue = resultSet.getObject(i).toString();
+ sb.append(columnValue);
+ }
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ private void verifyResultText(String message, ResultSet res, Path resultFile) throws SQLException, IOException {
+ String actualResult = resultSetToString(res);
+ String expectedResult = FileUtil.readTextFile(new File(resultFile.toUri()));
+ assertEquals(message, expectedResult.trim(), actualResult.trim());
+ }
+
+ private Path getQueryFilePath(String fileName) {
+ return StorageUtil.concatPath(currentQueryPath, fileName);
+ }
+
+ private Path getResultFile(String fileName) {
+ return StorageUtil.concatPath(currentResultPath, fileName);
+ }
+
+ private Path getDataSetFile(String fileName) {
+ return StorageUtil.concatPath(currentDatasetPath, fileName);
+ }
+
+ public List<String> executeDDL(String ddlFileName, @Nullable String [] args) throws Exception {
+ return executeDDL(ddlFileName, null, true, args);
+ }
+
+ /**
+ *
+ * Execute a data definition language (DDL) template. A general SQL DDL statement can be included in this file. But,
+ * for user-specified table name or exact external table path, you must use some format string to indicate them.
+ * The format string will be replaced by the corresponding arguments.
+ *
+ * The below is predefined format strings:
+ * <ul>
+ * <li>${table.path} - It is replaced by the absolute file path that <code>dataFileName</code> points. </li>
+ * <li>${i} - It is replaced by the corresponding element of <code>args</code>. For example, ${0} and ${1} are
+ * replaced by the first and second elements of <code>args</code> respectively</li>. It uses zero-based index.
+ * </ul>
+ *
+ * @param ddlFileName A file name, containing a data definition statement.
+ * @param dataFileName A file name, containing data rows, which columns have to be separated by vertical bar '|'.
+ * This file name is used for replacing some format string indicating an external table location.
+ * @param args A list of arguments, each of which is used to replace corresponding variable which has a form of ${i}.
+ * @return The table names created
+ */
+ public List<String> executeDDL(String ddlFileName, @Nullable String dataFileName, @Nullable String ... args)
+ throws Exception {
+
+ return executeDDL(ddlFileName, dataFileName, true, args);
+ }
+
+ private List<String> executeDDL(String ddlFileName, @Nullable String dataFileName, boolean isLocalTable,
+ @Nullable String[] args) throws Exception {
+
+ Path ddlFilePath = new Path(currentQueryPath, ddlFileName);
+ FileSystem fs = ddlFilePath.getFileSystem(conf);
+ assertTrue(ddlFilePath + " existence check", fs.exists(ddlFilePath));
+
+ String template = FileUtil.readTextFile(new File(ddlFilePath.toUri()));
+ String dataFilePath = null;
+ if (dataFileName != null) {
+ dataFilePath = getDataSetFile(dataFileName).toString();
+ }
+ String compiled = compileTemplate(template, dataFilePath, args);
+
+ List<ParsedResult> parsedResults = SimpleParser.parseScript(compiled);
+ List<String> createdTableNames = new ArrayList<String>();
+
+ for (ParsedResult parsedResult : parsedResults) {
+ // parse a statement
+ Expr expr = sqlParser.parse(parsedResult.getStatement());
+ assertNotNull(ddlFilePath + " cannot be parsed", expr);
+
+ if (expr.getType() == OpType.CreateTable) {
+ CreateTable createTable = (CreateTable) expr;
+ String tableName = createTable.getTableName();
+ assertTrue("Table [" + tableName + "] creation is failed.", client.updateQuery(parsedResult.getStatement()));
+
+ TableDesc createdTable = client.getTableDesc(tableName);
+ String createdTableName = createdTable.getName();
+
+ assertTrue("table '" + createdTableName + "' creation check", client.existTable(createdTableName));
+ if (isLocalTable) {
+ createdTableGlobalSet.add(createdTableName);
+ createdTableNames.add(tableName);
+ }
+ } else if (expr.getType() == OpType.DropTable) {
+ DropTable dropTable = (DropTable) expr;
+ String tableName = dropTable.getTableName();
+ assertTrue("table '" + tableName + "' existence check",
+ client.existTable(CatalogUtil.buildFQName(currentDatabase, tableName)));
+ assertTrue("table drop is failed.", client.updateQuery(parsedResult.getStatement()));
+ assertFalse("table '" + tableName + "' dropped check",
+ client.existTable(CatalogUtil.buildFQName(currentDatabase, tableName)));
+ if (isLocalTable) {
+ createdTableGlobalSet.remove(tableName);
+ }
+ } else if (expr.getType() == OpType.AlterTable) {
+ AlterTable alterTable = (AlterTable) expr;
+ String tableName = alterTable.getTableName();
+ assertTrue("table '" + tableName + "' existence check", client.existTable(tableName));
+ client.updateQuery(compiled);
+ if (isLocalTable) {
+ createdTableGlobalSet.remove(tableName);
+ }
+ } else {
+ assertTrue(ddlFilePath + " is not a Create or Drop Table statement", false);
+ }
+ }
+
+ return createdTableNames;
+ }
+
+ /**
+ * Replace format strings by a given parameters.
+ *
+ * @param template
+ * @param dataFileName The data file name to replace <code>${table.path}</code>
+ * @param args The list argument to replace each corresponding format string ${i}. ${i} uses zero-based index.
+ * @return A string compiled
+ */
+ private String compileTemplate(String template, @Nullable String dataFileName, @Nullable String ... args) {
+ String result;
+ if (dataFileName != null) {
+ result = template.replace("${table.path}", "\'" + dataFileName + "'");
+ } else {
+ result = template;
+ }
+
+ if (args != null) {
+ for (int i = 0; i < args.length; i++) {
+ result = result.replace("${" + i + "}", args[i]);
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
new file mode 100644
index 0000000..ed5e4bc
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.rm.YarnTajoResourceManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class TajoTestingCluster {
+ private static Log LOG = LogFactory.getLog(TajoTestingCluster.class);
+ private TajoConf conf;
+
+ protected MiniTajoYarnCluster yarnCluster;
+ private FileSystem defaultFS;
+ private MiniDFSCluster dfsCluster;
+ private MiniCatalogServer catalogServer;
+
+ private TajoMaster tajoMaster;
+ private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
+ private boolean standbyWorkerMode = false;
+
+ // If non-null, then already a cluster running.
+ private File clusterTestBuildDir = null;
+
+ /**
+ * System property key to get test directory value.
+ * Name is as it is because mini dfs has hard-codings to put test data here.
+ */
+ public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA;
+
+ /**
+ * Default parent directory for test output.
+ */
+ public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
+
+ /**
+ * True If HCatalogStore is used. Otherwise, it is FALSE.
+ */
+ public Boolean isHCatalogStoreUse = false;
+
+ public TajoTestingCluster() {
+ this.conf = new TajoConf();
+ initPropertiesAndConfigs();
+ }
+
+ void initPropertiesAndConfigs() {
+ if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
+ String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
+ Preconditions.checkState(
+ testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()) ||
+ testResourceManager.equals(YarnTajoResourceManager.class.getCanonicalName()),
+ ConfVars.RESOURCE_MANAGER_CLASS.varname + " must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " +
+ YarnTajoResourceManager.class.getCanonicalName() +"."
+ );
+ conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
+ }
+ conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
+ conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);
+
+ this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
+ .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
+ conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");
+ }
+
+ public TajoConf getConfiguration() {
+ return this.conf;
+ }
+
+ public void initTestDir() {
+ if (System.getProperty(TEST_DIRECTORY_KEY) == null) {
+ clusterTestBuildDir = setupClusterTestBuildDir();
+ System.setProperty(TEST_DIRECTORY_KEY,
+ clusterTestBuildDir.getAbsolutePath());
+ }
+ }
+
+ /**
+ * @return Where to write test data on local filesystem; usually
+ * {@link #DEFAULT_TEST_DIRECTORY}
+ * @see #setupClusterTestBuildDir()
+ */
+ public static File getTestDir() {
+ return new File(System.getProperty(TEST_DIRECTORY_KEY,
+ DEFAULT_TEST_DIRECTORY));
+ }
+
+ /**
+ * @param subdirName
+ * @return Path to a subdirectory named <code>subdirName</code> under
+ * {@link #getTestDir()}.
+ * @see #setupClusterTestBuildDir()
+ */
+ public static File getTestDir(final String subdirName) {
+ return new File(getTestDir(), subdirName);
+ }
+
+ public File setupClusterTestBuildDir() {
+ String randomStr = UUID.randomUUID().toString();
+ String dirStr = getTestDir(randomStr).toString();
+ File dir = new File(dirStr).getAbsoluteFile();
+ // Have it cleaned up on exit
+ dir.deleteOnExit();
+ return dir;
+ }
+
+ ////////////////////////////////////////////////////////
+ // HDFS Section
+ ////////////////////////////////////////////////////////
+ /**
+ * Start a minidfscluster.
+ * @param servers How many DNs to start.
+ * @throws Exception
+ * @see {@link #shutdownMiniDFSCluster()}
+ * @return The mini dfs cluster created.
+ */
+ public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
+ return startMiniDFSCluster(servers, null, null);
+ }
+
+ /**
+ * Start a minidfscluster.
+ * Can only create one.
+ * @param servers How many DNs to start.
+ * @param dir Where to home your dfs cluster.
+ * @param hosts hostnames DNs to run on.
+ * @throws Exception
+ * @see {@link #shutdownMiniDFSCluster()}
+ * @return The mini dfs cluster created.
+ * @throws java.io.IOException
+ */
+ public MiniDFSCluster startMiniDFSCluster(int servers,
+ final File dir,
+ final String hosts[])
+ throws IOException {
+ if (dir == null) {
+ this.clusterTestBuildDir = setupClusterTestBuildDir();
+ } else {
+ this.clusterTestBuildDir = dir;
+ }
+
+ System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
+ this.clusterTestBuildDir.toString());
+
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
+ builder.hosts(hosts);
+ builder.numDataNodes(servers);
+ builder.format(true);
+ builder.manageNameDfsDirs(true);
+ builder.manageDataDfsDirs(true);
+ builder.waitSafeMode(true);
+ this.dfsCluster = builder.build();
+
+ // Set this just-started cluser as our filesystem.
+ this.defaultFS = this.dfsCluster.getFileSystem();
+ this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString());
+ this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo");
+
+ return this.dfsCluster;
+ }
+
+ public void shutdownMiniDFSCluster() throws Exception {
+ if (this.dfsCluster != null) {
+ try {
+ FileSystem fs = this.dfsCluster.getFileSystem();
+ if (fs != null) fs.close();
+ } catch (IOException e) {
+ System.err.println("error closing file system: " + e);
+ }
+ // The below throws an exception per dn, AsynchronousCloseException.
+ this.dfsCluster.shutdown();
+ }
+ }
+
+ public boolean isRunningDFSCluster() {
+ return this.defaultFS != null;
+ }
+
+ public MiniDFSCluster getMiniDFSCluster() {
+ return this.dfsCluster;
+ }
+
+ public FileSystem getDefaultFileSystem() {
+ return this.defaultFS;
+ }
+
+ ////////////////////////////////////////////////////////
+ // Catalog Section
+ ////////////////////////////////////////////////////////
+ public MiniCatalogServer startCatalogCluster() throws Exception {
+ TajoConf c = getConfiguration();
+
+ if(clusterTestBuildDir == null) {
+ clusterTestBuildDir = setupClusterTestBuildDir();
+ }
+
+ conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
+ conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db");
+ LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
+ conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
+
+ catalogServer = new MiniCatalogServer(conf);
+ CatalogServer catServer = catalogServer.getCatalogServer();
+ InetSocketAddress sockAddr = catServer.getBindAddress();
+ c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
+
+ return this.catalogServer;
+ }
+
+ public void shutdownCatalogCluster() {
+ if (catalogServer != null) {
+ this.catalogServer.shutdown();
+ }
+ }
+
+ public MiniCatalogServer getMiniCatalogCluster() {
+ return this.catalogServer;
+ }
+
+ public boolean isHCatalogStoreRunning() {
+ return isHCatalogStoreUse;
+ }
+
+ ////////////////////////////////////////////////////////
+ // Tajo Cluster Section
+ ////////////////////////////////////////////////////////
+ private void startMiniTajoCluster(File testBuildDir,
+ final int numSlaves,
+ boolean local) throws Exception {
+ TajoConf c = getConfiguration();
+ c.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.WORKER_TEMPORAL_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo-localdir");
+
+ LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
+
+ if (!local) {
+ c.setVar(ConfVars.ROOT_DIR,
+ getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
+ } else {
+ c.setVar(ConfVars.ROOT_DIR, clusterTestBuildDir.getAbsolutePath() + "/tajo");
+ }
+
+ setupCatalogForTesting(c, clusterTestBuildDir);
+
+ tajoMaster = new TajoMaster();
+ tajoMaster.init(c);
+ tajoMaster.start();
+
+ this.conf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, c.getVar(ConfVars.WORKER_PEER_RPC_ADDRESS));
+ this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, c.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS));
+
+ InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
+
+ this.conf.setVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+ tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
+ this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
+ this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
+
+ if(standbyWorkerMode) {
+ startTajoWorkers(numSlaves);
+ }
+ LOG.info("Mini Tajo cluster is up");
+ LOG.info("====================================================================================");
+ LOG.info("= MiniTajoCluster starts up =");
+ LOG.info("====================================================================================");
+ LOG.info("= * Master Address: " + tajoMaster.getMasterName());
+ LOG.info("= * CatalogStore: " + tajoMaster.getCatalogServer().getStoreClassName());
+ LOG.info("------------------------------------------------------------------------------------");
+ LOG.info("= * Warehouse Dir: " + TajoConf.getWarehouseDir(c));
+ LOG.info("= * Worker Tmp Dir: " + c.getVar(ConfVars.WORKER_TEMPORAL_DIR));
+ LOG.info("====================================================================================");
+ }
+
+ private void setupCatalogForTesting(TajoConf c, File testBuildDir) throws IOException {
+ final String HCATALOG_CLASS_NAME = "org.apache.tajo.catalog.store.HCatalogStore";
+ boolean hcatalogClassExists = false;
+ try {
+ getClass().getClassLoader().loadClass(HCATALOG_CLASS_NAME);
+ hcatalogClassExists = true;
+ } catch (ClassNotFoundException e) {
+ LOG.info("HCatalogStore is not available.");
+ }
+ String driverClass = System.getProperty(CatalogConstants.STORE_CLASS);
+
+ if (hcatalogClassExists &&
+ driverClass != null && driverClass.equals(HCATALOG_CLASS_NAME)) {
+ try {
+ getClass().getClassLoader().loadClass(HCATALOG_CLASS_NAME);
+ String jdbcUri = "jdbc:derby:;databaseName="+ testBuildDir.toURI().getPath() + "/metastore_db;create=true";
+ c.set("hive.metastore.warehouse.dir", TajoConf.getWarehouseDir(c).toString() + "/default");
+ c.set("javax.jdo.option.ConnectionURL", jdbcUri);
+ c.set(TajoConf.ConfVars.WAREHOUSE_DIR.varname, conf.getVar(ConfVars.WAREHOUSE_DIR));
+ c.set(CatalogConstants.STORE_CLASS, HCATALOG_CLASS_NAME);
+ Path defaultDatabasePath = new Path(TajoConf.getWarehouseDir(c).toString() + "/default");
+ FileSystem fs = defaultDatabasePath.getFileSystem(c);
+ if (!fs.exists(defaultDatabasePath)) {
+ fs.mkdirs(defaultDatabasePath);
+ }
+ isHCatalogStoreUse = true;
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ } else { // for derby
+ c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
+ c.set(CatalogConstants.CATALOG_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath() + "/db");
+ }
+ c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
+ }
+
+ private void startTajoWorkers(int numSlaves) throws Exception {
+ for(int i = 0; i < 1; i++) {
+ TajoWorker tajoWorker = new TajoWorker();
+
+ TajoConf workerConf = new TajoConf(this.conf);
+
+ workerConf.setVar(ConfVars.WORKER_INFO_ADDRESS, "localhost:0");
+ workerConf.setVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS, "localhost:0");
+ workerConf.setVar(ConfVars.WORKER_PEER_RPC_ADDRESS, "localhost:0");
+
+ workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0");
+
+ tajoWorker.startWorker(workerConf, new String[]{"standby"});
+
+ LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
+ tajoWorkers.add(tajoWorker);
+ }
+ }
+
+ public void restartTajoCluster(int numSlaves) throws Exception {
+ tajoMaster.stop();
+ tajoMaster.start();
+
+ LOG.info("Minicluster has been restarted");
+ }
+
+ public TajoMaster getMaster() {
+ return this.tajoMaster;
+ }
+
+ public List<TajoWorker> getTajoWorkers() {
+ return this.tajoWorkers;
+ }
+
+ public void shutdownMiniTajoCluster() {
+ if(this.tajoMaster != null) {
+ this.tajoMaster.stop();
+ }
+ for(TajoWorker eachWorker: tajoWorkers) {
+ eachWorker.stopWorkerForce();
+ }
+ tajoWorkers.clear();
+ this.tajoMaster= null;
+ }
+
+ ////////////////////////////////////////////////////////
+ // Meta Cluster Section
+ ////////////////////////////////////////////////////////
+ /**
+ * @throws java.io.IOException If a cluster -- dfs or engine -- already running.
+ */
+ void isRunningCluster(String passedBuildPath) throws IOException {
+ if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
+ throw new IOException("Cluster already running at " +
+ this.clusterTestBuildDir);
+ }
+
+ /**
+ * This method starts up a tajo cluster with a given number of clusters in
+ * distributed mode.
+ *
+ * @param numSlaves the number of tajo cluster to start up
+ * @throws Exception
+ */
+ public void startMiniCluster(final int numSlaves)
+ throws Exception {
+ startMiniCluster(numSlaves, null);
+ }
+
+ public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception {
+
+ int numDataNodes = numSlaves;
+ if(dataNodeHosts != null && dataNodeHosts.length != 0) {
+ numDataNodes = dataNodeHosts.length;
+ }
+
+ LOG.info("Starting up minicluster with 1 master(s) and " +
+ numSlaves + " worker(s) and " + numDataNodes + " datanode(s)");
+
+ // If we already put up a cluster, fail.
+ String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
+ isRunningCluster(testBuildPath);
+ if (testBuildPath != null) {
+ LOG.info("Using passed path: " + testBuildPath);
+ }
+
+ // Make a new random dir to home everything in. Set it as system property.
+ // minidfs reads home from system property.
+ this.clusterTestBuildDir = testBuildPath == null?
+ setupClusterTestBuildDir() : new File(testBuildPath);
+
+ System.setProperty(TEST_DIRECTORY_KEY,
+ this.clusterTestBuildDir.getAbsolutePath());
+
+ startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
+ this.dfsCluster.waitClusterUp();
+
+ if(!standbyWorkerMode) {
+ startMiniYarnCluster();
+ }
+
+ startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
+ }
+
+ private void startMiniYarnCluster() throws Exception {
+ LOG.info("Starting up YARN cluster");
+ // Scheduler properties required for YARN to work
+ conf.set("yarn.scheduler.capacity.root.queues", "default");
+ conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
+
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 2);
+
+ if (yarnCluster == null) {
+ yarnCluster = new MiniTajoYarnCluster(TajoTestingCluster.class.getName(), 3);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ ResourceManager resourceManager = yarnCluster.getResourceManager();
+ InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress();
+ InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress();
+ conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr));
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr));
+
+ URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
+ if (url == null) {
+ throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
+ }
+ yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent());
+ OutputStream os = new FileOutputStream(new File(url.getPath()));
+ yarnCluster.getConfig().writeXml(os);
+ os.close();
+ }
+ }
+
+ public void startMiniClusterInLocal(final int numSlaves) throws Exception {
+ // If we already put up a cluster, fail.
+ String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
+ isRunningCluster(testBuildPath);
+ if (testBuildPath != null) {
+ LOG.info("Using passed path: " + testBuildPath);
+ }
+
+ // Make a new random dir to home everything in. Set it as system property.
+ // minidfs reads home from system property.
+ this.clusterTestBuildDir = testBuildPath == null?
+ setupClusterTestBuildDir() : new File(testBuildPath);
+
+ System.setProperty(TEST_DIRECTORY_KEY,
+ this.clusterTestBuildDir.getAbsolutePath());
+
+ startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, true);
+ }
+
+ public void shutdownMiniCluster() throws IOException {
+ LOG.info("========================================");
+ LOG.info("Minicluster is stopping");
+ LOG.info("========================================");
+
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ shutdownMiniTajoCluster();
+
+ if(this.catalogServer != null) {
+ shutdownCatalogCluster();
+ }
+
+ if(this.yarnCluster != null) {
+ this.yarnCluster.stop();
+ }
+
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ if(this.dfsCluster != null) {
+
+ try {
+ FileSystem fs = this.dfsCluster.getFileSystem();
+ if (fs != null) fs.close();
+ this.dfsCluster.shutdown();
+ } catch (IOException e) {
+ System.err.println("error closing file system: " + e);
+ }
+ }
+
+ if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
+ if(!ShutdownHookManager.get().isShutdownInProgress()) {
+ //TODO clean test dir when ShutdownInProgress
+ LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
+ localFS.delete(new Path(clusterTestBuildDir.toString()), true);
+ localFS.close();
+ }
+ this.clusterTestBuildDir = null;
+ }
+ LOG.info("Minicluster is down");
+ }
+
+ public static ResultSet run(String[] names,
+ Schema[] schemas,
+ Options option,
+ String[][] tables,
+ String query) throws Exception {
+ TpchTestBase instance = TpchTestBase.getInstance();
+ TajoTestingCluster util = instance.getTestingCluster();
+ while(true) {
+ if(util.getMaster().isMasterRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ TajoConf conf = util.getConfiguration();
+ TajoClient client = new TajoClient(conf);
+
+ FileSystem fs = util.getDefaultFileSystem();
+ Path rootDir = util.getMaster().
+ getStorageManager().getWarehouseDir();
+ fs.mkdirs(rootDir);
+ for (int i = 0; i < names.length; i++) {
+ Path tablePath = new Path(rootDir, names[i]);
+ fs.mkdirs(tablePath);
+ Path dfsPath = new Path(tablePath, names[i] + ".tbl");
+ FSDataOutputStream out = fs.create(dfsPath);
+ for (int j = 0; j < tables[i].length; j++) {
+ out.write((tables[i][j]+"\n").getBytes());
+ }
+ out.close();
+ TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, option);
+ client.createExternalTable(names[i], schemas[i], tablePath, meta);
+ }
+ Thread.sleep(1000);
+ ResultSet res = client.executeQueryAndGetResult(query);
+ return res;
+ }
+
+ /**
+ * Write lines to a file.
+ *
+ * @param file File to write lines to
+ * @param lines Strings written to the file
+ * @throws java.io.IOException
+ */
+ private static void writeLines(File file, String... lines)
+ throws IOException {
+ Writer writer = Files.newWriter(file, Charsets.UTF_8);
+ try {
+ for (String line : lines) {
+ writer.write(line);
+ writer.write('\n');
+ }
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+ }
+
+ public void setAllTajoDaemonConfValue(String key, String value) {
+ tajoMaster.getContext().getConf().set(key, value);
+ for (TajoWorker eachWorker: tajoWorkers) {
+ eachWorker.getConfig().set(key, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
new file mode 100644
index 0000000..912400b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestQueryIdFactory {
+
+ @Before
+ public void setup() {
+ }
+
+ @Test
+ public void testNewQueryId() {
+ QueryId qid1 = LocalTajoTestingUtility.newQueryId();
+ QueryId qid2 = LocalTajoTestingUtility.newQueryId();
+ assertTrue(qid1.compareTo(qid2) < 0);
+ }
+
+ @Test
+ public void testNewSubQueryId() {
+ QueryId qid = LocalTajoTestingUtility.newQueryId();
+ MasterPlan plan = new MasterPlan(qid, null, null);
+ ExecutionBlockId subqid1 = plan.newExecutionBlockId();
+ ExecutionBlockId subqid2 = plan.newExecutionBlockId();
+ assertTrue(subqid1.compareTo(subqid2) < 0);
+ }
+
+ @Test
+ public void testNewQueryUnitId() {
+ QueryId qid = LocalTajoTestingUtility.newQueryId();
+ MasterPlan plan = new MasterPlan(qid, null, null);
+ ExecutionBlockId subid = plan.newExecutionBlockId();
+ QueryUnitId quid1 = QueryIdFactory.newQueryUnitId(subid);
+ QueryUnitId quid2 = QueryIdFactory.newQueryUnitId(subid);
+ assertTrue(quid1.compareTo(quid2) < 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java b/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java
new file mode 100644
index 0000000..60b6c22
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TestTajoIds.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.util.TajoIdUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestTajoIds {
+ @Test
+ public void testQueryId() {
+ long ts1 = 1315890136000l;
+ long ts2 = 1315890136001l;
+
+ QueryId j1 = createQueryId(ts1, 2);
+ QueryId j2 = createQueryId(ts1, 1);
+ QueryId j3 = createQueryId(ts2, 1);
+ QueryId j4 = createQueryId(ts1, 2);
+
+ assertTrue(j1.equals(j4));
+ assertFalse(j1.equals(j2));
+ assertFalse(j1.equals(j3));
+
+ assertTrue(j1.compareTo(j4) == 0);
+ assertTrue(j1.compareTo(j2) > 0);
+ assertTrue(j1.compareTo(j3) < 0);
+
+ assertTrue(j1.hashCode() == j4.hashCode());
+ assertFalse(j1.hashCode() == j2.hashCode());
+ assertFalse(j1.hashCode() == j3.hashCode());
+
+ QueryId j5 = createQueryId(ts1, 231415);
+ assertEquals("q_" + ts1 + "_0002", j1.toString());
+ assertEquals("q_" + ts1 + "_231415", j5.toString());
+ }
+
+ @Test
+ public void testQueryIds() {
+ long timeId = 1315890136000l;
+
+ QueryId queryId = createQueryId(timeId, 1);
+ assertEquals("q_" + timeId + "_0001", queryId.toString());
+
+ ExecutionBlockId subId = QueryIdFactory.newExecutionBlockId(queryId, 2);
+ assertEquals("eb_" + timeId +"_0001_000002", subId.toString());
+
+ QueryUnitId qId = new QueryUnitId(subId, 5);
+ assertEquals("t_" + timeId + "_0001_000002_000005", qId.toString());
+
+ QueryUnitAttemptId attemptId = new QueryUnitAttemptId(qId, 4);
+ assertEquals("ta_" + timeId + "_0001_000002_000005_04", attemptId.toString());
+ }
+
+ @Test
+ public void testEqualsObject() {
+ long timeId = System.currentTimeMillis();
+
+ QueryId queryId1 = createQueryId(timeId, 1);
+ QueryId queryId2 = createQueryId(timeId, 2);
+ assertNotSame(queryId1, queryId2);
+ QueryId queryId3 = createQueryId(timeId, 1);
+ assertEquals(queryId1, queryId3);
+
+ ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+ ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
+ assertNotSame(sid1, sid2);
+ ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+ assertEquals(sid1, sid3);
+
+ QueryUnitId qid1 = new QueryUnitId(sid1, 9);
+ QueryUnitId qid2 = new QueryUnitId(sid1, 10);
+ assertNotSame(qid1, qid2);
+ QueryUnitId qid3 = new QueryUnitId(sid1, 9);
+ assertEquals(qid1, qid3);
+ }
+
+ @Test
+ public void testCompareTo() {
+ long time = System.currentTimeMillis();
+
+ QueryId queryId1 = createQueryId(time, 1);
+ QueryId queryId2 = createQueryId(time, 2);
+ QueryId queryId3 = createQueryId(time, 1);
+ assertEquals(-1, queryId1.compareTo(queryId2));
+ assertEquals(1, queryId2.compareTo(queryId1));
+ assertEquals(0, queryId3.compareTo(queryId1));
+
+ ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+ ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
+ ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+ assertEquals(-1, sid1.compareTo(sid2));
+ assertEquals(1, sid2.compareTo(sid1));
+ assertEquals(0, sid3.compareTo(sid1));
+
+ QueryUnitId qid1 = new QueryUnitId(sid1, 9);
+ QueryUnitId qid2 = new QueryUnitId(sid1, 10);
+ QueryUnitId qid3 = new QueryUnitId(sid1, 9);
+ assertEquals(-1, qid1.compareTo(qid2));
+ assertEquals(1, qid2.compareTo(qid1));
+ assertEquals(0, qid3.compareTo(qid1));
+ }
+
+ @Test
+ public void testConstructFromString() {
+ QueryId qid1 = LocalTajoTestingUtility.newQueryId();
+ QueryId qid2 = TajoIdUtils.parseQueryId(qid1.toString());
+ assertEquals(qid1, qid2);
+
+ MasterPlan plan1 = new MasterPlan(qid1, null, null);
+ ExecutionBlockId sub1 = plan1.newExecutionBlockId();
+ ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
+ assertEquals(sub1, sub2);
+
+ QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
+ QueryUnitId u2 = new QueryUnitId(u1.getProto());
+ assertEquals(u1, u2);
+
+ QueryUnitAttemptId attempt1 = new QueryUnitAttemptId(u1, 1);
+ QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.getProto());
+ assertEquals(attempt1, attempt2);
+ }
+
+ @Test
+ public void testConstructFromPB() {
+ QueryId qid1 = LocalTajoTestingUtility.newQueryId();
+ QueryId qid2 = new QueryId(qid1.getProto());
+ assertEquals(qid1, qid2);
+
+ MasterPlan plan = new MasterPlan(qid1, null, null);
+ ExecutionBlockId sub1 = plan.newExecutionBlockId();
+ ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
+ assertEquals(sub1, sub2);
+
+ QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
+ QueryUnitId u2 = new QueryUnitId(u1.getProto());
+ assertEquals(u1, u2);
+
+ QueryUnitAttemptId attempt1 = new QueryUnitAttemptId(u1, 1);
+ QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.getProto());
+ assertEquals(attempt1, attempt2);
+ }
+
+ public static QueryId createQueryId(long timestamp, int id) {
+ ApplicationId appId = BuilderUtils.newApplicationId(timestamp, id);
+
+ return QueryIdFactory.newQueryId(appId.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
new file mode 100644
index 0000000..8995d81
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+public class TpchTestBase {
+ private static final Log LOG = LogFactory.getLog(TpchTestBase.class);
+
+ String [] names;
+ String [] paths;
+ String [][] tables;
+ Schema[] schemas;
+ Map<String, Integer> nameMap = Maps.newHashMap();
+ protected TPCH tpch;
+ protected LocalTajoTestingUtility util;
+
+ private static TpchTestBase testBase;
+
+ static {
+ try {
+ testBase = new TpchTestBase();
+ testBase.setUp();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private TpchTestBase() throws IOException {
+ names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier", "empty_orders"};
+ paths = new String[names.length];
+ for (int i = 0; i < names.length; i++) {
+ nameMap.put(names[i], i);
+ }
+
+ tpch = new TPCH();
+ tpch.loadSchemas();
+ tpch.loadQueries();
+
+ schemas = new Schema[names.length];
+ for (int i = 0; i < names.length; i++) {
+ schemas[i] = tpch.getSchema(names[i]);
+ }
+
+ tables = new String[names.length][];
+ File file;
+ for (int i = 0; i < names.length; i++) {
+ file = new File("src/test/tpch/" + names[i] + ".tbl");
+ if(!file.exists()) {
+ file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
+ + ".tbl");
+ }
+ tables[i] = FileUtil.readTextFile(file).split("\n");
+ paths[i] = file.getAbsolutePath();
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void setUp() throws Exception {
+ util = new LocalTajoTestingUtility();
+ Options opt = new Options();
+ opt.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ util.setup(names, paths, schemas, opt);
+ }
+
+ public static TpchTestBase getInstance() {
+ return testBase;
+ }
+
+ public ResultSet execute(String query) throws Exception {
+ return util.execute(query);
+ }
+
+ public TajoTestingCluster getTestingCluster() {
+ return util.getTestingCluster();
+ }
+
+ public void tearDown() throws IOException {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ }
+ util.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java b/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
new file mode 100644
index 0000000..4ca9b3e
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/benchmark/TestTPCH.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.benchmark;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoConstants;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+
+@Category(IntegrationTest.class)
+public class TestTPCH extends QueryTestCaseBase {
+
+ public TestTPCH() {
+ super(TajoConstants.DEFAULT_DATABASE_NAME);
+ }
+
+ @Test
+ public void testQ1OrderBy() throws Exception {
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+
+ @Test
+ public void testQ2FourJoins() throws Exception {
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+
+ @Test
+ public void testTPCH14Expr() throws Exception {
+ ResultSet res = executeQuery();
+ assertResultSet(res);
+ cleanupQuery(res);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java
new file mode 100644
index 0000000..9c6e760
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestExecExternalShellCommand.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.conf.TajoConf;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExecExternalShellCommand {
+ @Test
+ public void testCommand() throws Exception {
+ TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out);
+
+ cli.executeMetaCommand("\\! echo \"this is test\"");
+ String consoleResult = new String(out.toByteArray());
+ assertEquals("this is test\n", consoleResult);
+
+ cli.executeMetaCommand("\\! error_command");
+ consoleResult = new String(out.toByteArray());
+ assertEquals("this is test\nERROR: /bin/bash: error_command: command not found\n", consoleResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java
new file mode 100644
index 0000000..b51835f
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestHdfsCommand.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.conf.TajoConf;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHdfsCommand {
+ @Test
+ public void testHdfCommand() throws Exception {
+ TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ System.setOut(new PrintStream(out));
+ System.setErr(new PrintStream(out));
+ TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out);
+
+ cli.executeMetaCommand("\\dfs -test");
+ String consoleResult = new String(out.toByteArray());
+ assertEquals("-test: Not enough arguments: expected 1 but got 0\n" +
+ "Usage: hadoop fs [generic options] -test -[defsz] <path>\n", consoleResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java b/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java
new file mode 100644
index 0000000..9c02b65
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/TestSimpleParser.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cli;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSimpleParser {
+
+ @Test
+ public final void testSpecialCases() throws InvalidStatementException {
+ List<ParsedResult> res1 = SimpleParser.parseScript("");
+ assertEquals(0, res1.size());
+
+ List<ParsedResult> res2 = SimpleParser.parseScript("a");
+ assertEquals(1, res2.size());
+
+ List<ParsedResult> res3 = SimpleParser.parseScript("?");
+ assertEquals(0, res3.size());
+
+ List<ParsedResult> res4 = SimpleParser.parseScript("\\");
+ assertEquals(1, res4.size());
+ }
+
+ @Test
+ public final void testMetaCommands() throws InvalidStatementException {
+ List<ParsedResult> res1 = SimpleParser.parseScript("\\d");
+ assertEquals(1, res1.size());
+ assertEquals(ParsedResult.StatementType.META, res1.get(0).getType());
+ assertEquals("\\d", res1.get(0).getStatement());
+
+ List<ParsedResult> res2 = SimpleParser.parseScript("\\d;\\c;\\f;");
+ assertEquals(3, res2.size());
+ assertEquals(ParsedResult.StatementType.META, res2.get(0).getType());
+ assertEquals("\\d", res2.get(0).getStatement());
+ assertEquals(ParsedResult.StatementType.META, res2.get(1).getType());
+ assertEquals("\\c", res2.get(1).getStatement());
+ assertEquals(ParsedResult.StatementType.META, res2.get(2).getType());
+ assertEquals("\\f", res2.get(2).getStatement());
+
+ List<ParsedResult> res3 = SimpleParser.parseScript("\n\t\t \\d;\n\\c;\t\t\\f ;");
+ assertEquals(3, res3.size());
+ assertEquals(ParsedResult.StatementType.META, res3.get(0).getType());
+ assertEquals("\\d", res3.get(0).getStatement());
+ assertEquals(ParsedResult.StatementType.META, res3.get(1).getType());
+ assertEquals("\\c", res3.get(1).getStatement());
+ assertEquals(ParsedResult.StatementType.META, res3.get(2).getType());
+ assertEquals("\\f", res3.get(2).getStatement());
+
+ List<ParsedResult> res4 = SimpleParser.parseScript("\\\td;");
+ assertEquals(1, res4.size());
+ assertEquals("\\\td", res4.get(0).getStatement());
+ }
+
+ @Test
+ public final void testStatements() throws InvalidStatementException {
+ List<ParsedResult> res1 = SimpleParser.parseScript("select * from test;");
+ assertEquals(1, res1.size());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res1.get(0).getType());
+ assertEquals("select * from test", res1.get(0).getStatement());
+
+ List<ParsedResult> res2 = SimpleParser.parseScript("select * from test;");
+ assertEquals(1, res2.size());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res2.get(0).getType());
+ assertEquals("select * from test", res2.get(0).getStatement());
+
+ List<ParsedResult> res3 = SimpleParser.parseScript("select * from test1;select * from test2;");
+ assertEquals(2, res3.size());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res3.get(0).getType());
+ assertEquals("select * from test1", res3.get(0).getStatement());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res3.get(1).getType());
+ assertEquals("select * from test2", res3.get(1).getStatement());
+
+ List<ParsedResult> res4 = SimpleParser.parseScript("\t\t\n\rselect * from \ntest1;select * from test2\n;");
+ assertEquals(2, res4.size());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res4.get(0).getType());
+ assertEquals("select * from \ntest1", res4.get(0).getStatement());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res4.get(1).getType());
+ assertEquals("select * from test2", res4.get(1).getStatement());
+
+ List<ParsedResult> res5 =
+ SimpleParser.parseScript("\t\t\n\rselect * from \ntest1;\\d test;select * from test2;\n\nselect 1;");
+ assertEquals(4, res5.size());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res5.get(0).getType());
+ assertEquals("select * from \ntest1", res5.get(0).getStatement());
+ assertEquals(ParsedResult.StatementType.META, res5.get(1).getType());
+ assertEquals("\\d test", res5.get(1).getStatement());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res5.get(2).getType());
+ assertEquals("select * from test2", res5.get(2).getStatement());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res5.get(3).getType());
+ assertEquals("select 1", res5.get(3).getStatement());
+ }
+
+ @Test
+ public final void testQuoted() throws InvalidStatementException {
+ List<ParsedResult> res1 = SimpleParser.parseScript("select '\n;' from test;");
+ assertEquals(1, res1.size());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res1.get(0).getType());
+ assertEquals("select '\n;' from test", res1.get(0).getStatement());
+
+ List<ParsedResult> res2 = SimpleParser.parseScript("select 'abc\nbbc\nddf' from test;");
+ assertEquals(1, res2.size());
+ assertEquals(ParsedResult.StatementType.STATEMENT, res2.get(0).getType());
+ assertEquals("select 'abc\nbbc\nddf' from test", res2.get(0).getStatement());
+
+ try {
+ SimpleParser.parseScript("select 'abc");
+ assertTrue(false);
+ } catch (InvalidStatementException is) {
+ assertTrue(true);
+ }
+ }
+
+ @Test
+ public final void testParseLines1() throws InvalidStatementException {
+ String [] lines = {
+ "select abc, ",
+ "bbc from test"
+ };
+ SimpleParser parser = new SimpleParser();
+ List<ParsedResult> result1 = parser.parseLines(lines[0]);
+ assertEquals(0, result1.size());
+ List<ParsedResult> result2 = parser.parseLines(lines[1]);
+ assertEquals(0, result2.size());
+ List<ParsedResult> result3 = parser.EOF();
+ assertEquals(1, result3.size());
+ assertEquals(lines[0] + lines[1], result3.get(0).getStatement());
+ }
+
+ @Test
+ public final void testParseLines2() throws InvalidStatementException {
+ String [] lines = {
+ "select abc, '",
+ "bbc' from test; select * from test3;"
+ };
+ SimpleParser parser = new SimpleParser();
+ List<ParsedResult> result1 = parser.parseLines(lines[0]);
+ assertEquals(0, result1.size());
+ List<ParsedResult> result2 = parser.parseLines(lines[1]);
+ assertEquals(2, result2.size());
+ assertEquals("select abc, 'bbc' from test", result2.get(0).getStatement());
+ assertEquals("select * from test3", result2.get(1).getStatement());
+ }
+
+ @Test
+ public final void testParseLines3() throws InvalidStatementException {
+ String [] lines = {
+ "select abc, 'bbc",
+ "' from test; select * from test3;"
+ };
+ SimpleParser parser = new SimpleParser();
+ List<ParsedResult> result1 = parser.parseLines(lines[0]);
+ assertEquals(0, result1.size());
+ List<ParsedResult> result2 = parser.parseLines(lines[1]);
+ assertEquals(2, result2.size());
+ assertEquals("select abc, 'bbc' from test", result2.get(0).getStatement());
+ assertEquals("select * from test3", result2.get(1).getStatement());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java b/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
new file mode 100644
index 0000000..1855217
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestDDLBuilder {
+ private static final Schema schema1;
+ private static final TableMeta meta1;
+ private static final PartitionMethodDesc partitionMethod1;
+
+ static {
+ schema1 = new Schema();
+ schema1.addColumn("name", TajoDataTypes.Type.BLOB);
+ schema1.addColumn("addr", TajoDataTypes.Type.TEXT);
+
+ meta1 = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+ meta1.putOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ meta1.putOption(StorageConstants.COMPRESSION_CODEC, GzipCodec.class.getName());
+
+ Schema expressionSchema = new Schema();
+ expressionSchema.addColumn("key", TajoDataTypes.Type.INT4);
+ expressionSchema.addColumn("key2", TajoDataTypes.Type.TEXT);
+ partitionMethod1 = new PartitionMethodDesc(
+ "db1",
+ "table1",
+ CatalogProtos.PartitionType.COLUMN,
+ "key,key2",
+ expressionSchema);
+ }
+
+ @Test
+ public void testBuildDDLForExternalTable() throws Exception {
+ TableDesc desc = new TableDesc("db1.table1", schema1, meta1, new Path("/table1"));
+ desc.setPartitionMethod(partitionMethod1);
+ desc.setExternal(true);
+ assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLForExternalTable.result"),
+ DDLBuilder.buildDDLForExternalTable(desc));
+ }
+
+ @Test
+ public void testBuildDDLQuotedTableName() throws Exception {
+ Schema schema2 = new Schema();
+ schema2.addColumn("name", TajoDataTypes.Type.BLOB);
+ schema2.addColumn("addr", TajoDataTypes.Type.TEXT);
+ schema2.addColumn("FirstName", TajoDataTypes.Type.TEXT);
+ schema2.addColumn("LastName", TajoDataTypes.Type.TEXT);
+ schema2.addColumn("with", TajoDataTypes.Type.TEXT);
+
+ Schema expressionSchema2 = new Schema();
+ expressionSchema2.addColumn("BirthYear", TajoDataTypes.Type.INT4);
+
+ PartitionMethodDesc partitionMethod2 = new PartitionMethodDesc(
+ "db1",
+ "table1",
+ CatalogProtos.PartitionType.COLUMN,
+ "key,key2",
+ expressionSchema2);
+
+ TableDesc desc = new TableDesc("db1.TABLE2", schema2, meta1, new Path("/table1"));
+ desc.setPartitionMethod(partitionMethod2);
+ desc.setExternal(true);
+ assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLQuotedTableName1.result"),
+ DDLBuilder.buildDDLForExternalTable(desc));
+
+ desc = new TableDesc("db1.TABLE1", schema2, meta1, new Path("/table1"));
+ desc.setPartitionMethod(partitionMethod2);
+ desc.setExternal(false);
+ assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLQuotedTableName2.result"),
+ DDLBuilder.buildDDLForBaseTable(desc));
+ }
+
+ @Test
+ public void testBuildDDLForBaseTable() throws Exception {
+ TableDesc desc = new TableDesc("db1.table2", schema1, meta1, new Path("/table1"));
+ assertEquals(FileUtil.readTextFileFromResource("results/testDDLBuilder/testBuildDDLForBaseTable.result"),
+ DDLBuilder.buildDDLForBaseTable(desc));
+ }
+
+ @Test
+ public void testBuildColumn() throws Exception {
+ String [] tobeUnquoted = {
+ "column_name",
+ "columnname",
+ "column_1",
+ };
+
+ for (String columnName : tobeUnquoted) {
+ assertFalse(CatalogUtil.isShouldBeQuoted(columnName));
+ }
+
+ String [] quoted = {
+ "Column_Name",
+ "COLUMN_NAME",
+ "컬럼",
+ "$column_name",
+ "Column_Name1",
+ "with",
+ "when"
+ };
+
+ for (String columnName : quoted) {
+ assertTrue(CatalogUtil.isShouldBeQuoted(columnName));
+ }
+ }
+}