You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/10/07 08:37:03 UTC
git commit: TAJO-179: Support MySQL CatalogStore. (jinho)
Updated Branches:
refs/heads/master 14ed2ea23 -> c0c6d47f3
TAJO-179: Support MySQL CatalogStore. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c0c6d47f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c0c6d47f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c0c6d47f
Branch: refs/heads/master
Commit: c0c6d47f3d0694458dfabf3d113b8fa941680622
Parents: 14ed2ea
Author: jinossy <ji...@gmail.com>
Authored: Mon Oct 7 15:35:32 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Oct 7 15:35:32 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tajo/catalog/CatalogConstants.java | 17 +-
.../org/apache/tajo/catalog/CatalogUtil.java | 20 +
.../src/main/resources/catalog-default.xml | 4 +-
.../org/apache/tajo/catalog/CatalogServer.java | 4 +-
.../tajo/catalog/store/AbstractDBStore.java | 738 +++++++++++++++
.../apache/tajo/catalog/store/DerbyStore.java | 886 +++++++++++++++++++
.../apache/tajo/catalog/store/MySQLStore.java | 184 ++++
.../org/apache/tajo/catalog/TestDBStore.java | 15 +-
.../src/test/resources/catalog-default.xml | 2 +-
10 files changed, 1855 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 32685cb..f990530 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.2.0 - unreleased
NEW FEATURES
+ TAJO-179: Support MySQL CatalogStore. (jinho)
+
TAJO-147: Implement trim(text), ltrim(text), and rtrim(text) function.
(hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 6625c7d..94736f7 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -20,11 +20,18 @@ package org.apache.tajo.catalog;
public class CatalogConstants {
public static final String STORE_CLASS="tajo.catalog.store.class";
-
- public static final String JDBC_DRIVER="tajo.catalog.jdbc.driver";
- public static final String DEFAULT_JDBC_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
-
+
+ //public static final String JDBC_DRIVER = "tajo.catalog.jdbc.driver";
+ public static final String CONNECTION_ID = "tajo.catalog.jdbc.connection.id";
+ public static final String CONNECTION_PASSWORD = "tajo.catalog.jdbc.connection.password";
+
public static final String JDBC_URI="tajo.catalog.jdbc.uri";
+ public static final String TB_META = "META";
+ public static final String TB_TABLES = "TABLES";
+ public static final String TB_COLUMNS = "COLUMNS";
+ public static final String TB_OPTIONS = "OPTIONS";
+ public static final String TB_INDEXES = "INDEXES";
+ public static final String TB_STATISTICS = "STATS";
+ public static final String C_TABLE_ID = "TABLE_ID";
- private CatalogConstants() {}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index fb896f1..6e17f42 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -29,6 +29,10 @@ import org.apache.tajo.util.FileUtil;
import java.io.File;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.Wrapper;
import java.util.Collection;
import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -178,4 +182,20 @@ public class CatalogUtil {
public static DataType newDataTypeWithLen(Type type, int length) {
return DataType.newBuilder().setType(type).setLength(length).build();
}
+
+ public static void closeSQLWrapper(Wrapper... wrapper) {
+ if(wrapper == null) return;
+
+ for(Wrapper w : wrapper){
+ try{
+ if(w instanceof Statement){
+ ((Statement)w).close();
+ } else if(w instanceof ResultSet){
+ ((ResultSet)w).close();
+ } else if(w instanceof Connection){
+ ((Connection)w).close();
+ }
+ } catch (Exception e){}
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
index 6308410..0e9e109 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
+++ b/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
@@ -22,11 +22,11 @@
<configuration>
<property>
<name>tajo.catalog.store.class</name>
- <value>org.apache.tajo.catalog.store.DBStore</value>
+ <value>org.apache.tajo.catalog.store.DerbyStore</value>
</property>
<property>
<name>tajo.catalog.jdbc.uri</name>
- <value>jdbc:derby:/tmp/tcat-${user.name}/db</value>
+ <value>jdbc:derby:/tmp/tcat-${user.name}/db;create=true</value>
</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 9c86d41..f810431 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -29,7 +29,7 @@ import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.store.CatalogStore;
-import org.apache.tajo.catalog.store.DBStore;
+import org.apache.tajo.catalog.store.DerbyStore;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -105,7 +105,7 @@ public class CatalogServer extends AbstractService {
Constructor<?> cons;
try {
Class<?> storeClass =
- this.conf.getClass(CatalogConstants.STORE_CLASS, DBStore.class);
+ this.conf.getClass(CatalogConstants.STORE_CLASS, DerbyStore.class);
LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
cons = storeClass.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
new file mode 100644
index 0000000..d6413f8
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+public abstract class AbstractDBStore extends CatalogConstants implements CatalogStore {
+ protected final Log LOG = LogFactory.getLog(getClass());
+ protected Configuration conf;
+ protected String jdbcUri;
+ protected Connection conn;
+
+ protected static final int VERSION = 1;
+
+ protected abstract String getJDBCDriverName();
+
+ protected abstract Connection createConnection(final Configuration conf) throws SQLException;
+
+ protected abstract boolean isInitialized() throws SQLException;
+
+ protected abstract void createBaseTable() throws SQLException;
+
+ public AbstractDBStore(Configuration conf)
+ throws InternalException {
+
+ this.conf = conf;
+ this.jdbcUri = conf.get(JDBC_URI);
+ String jdbcDriver = getJDBCDriverName();
+ try {
+ Class.forName(getJDBCDriverName()).newInstance();
+ LOG.info("Loaded the JDBC driver (" + jdbcDriver + ")");
+ } catch (Exception e) {
+ throw new InternalException("Cannot load JDBC driver " + jdbcDriver, e);
+ }
+
+ try {
+ LOG.info("Trying to connect database (" + jdbcUri + ")");
+ conn = createConnection(conf);
+ LOG.info("Connected to database (" + jdbcUri + ")");
+ } catch (SQLException e) {
+ throw new InternalException("Cannot connect to database (" + jdbcUri
+ + ")", e);
+ }
+
+ try {
+ if (!isInitialized()) {
+ LOG.info("The base tables of CatalogServer are created.");
+ createBaseTable();
+ } else {
+ LOG.info("The base tables of CatalogServer already is initialized.");
+ }
+ } catch (SQLException se) {
+ throw new InternalException(
+ "Cannot initialize the persistent storage of Catalog", se);
+ }
+
+ int dbVersion = 0;
+ try {
+ dbVersion = needUpgrade();
+ } catch (SQLException e) {
+ throw new InternalException(
+ "Cannot check if the DB need to be upgraded", e);
+ }
+
+// if (dbVersion < VERSION) {
+// LOG.info("DB Upgrade is needed");
+// try {
+// upgrade(dbVersion, VERSION);
+// } catch (SQLException e) {
+// LOG.error(e.getMessage());
+// throw new InternalException("DB upgrade is failed.", e);
+// }
+// }
+ }
+
+ protected String getJDBCUri(){
+ return jdbcUri;
+ }
+
+ private int needUpgrade() throws SQLException {
+ String sql = "SELECT VERSION FROM " + TB_META;
+ Statement stmt = conn.createStatement();
+ ResultSet res = stmt.executeQuery(sql);
+
+ if (!res.next()) { // if this db version is 0
+ insertVersion();
+ return 0;
+ } else {
+ return res.getInt(1);
+ }
+ }
+
+ private void insertVersion() throws SQLException {
+ String sql = "INSERT INTO " + TB_META + " values (0)";
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate(sql);
+ stmt.close();
+ }
+
+ private void upgrade(int from, int to) throws SQLException {
+ String sql;
+ Statement stmt;
+ if (from == 0) {
+ if (to == 1) {
+ sql = "DROP INDEX idx_options_key";
+ LOG.info(sql);
+
+ stmt = conn.createStatement();
+ stmt.addBatch(sql);
+
+ sql =
+ "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+ stmt.addBatch(sql);
+ LOG.info(sql);
+ stmt.executeBatch();
+ stmt.close();
+
+ LOG.info("DB Upgraded from " + from + " to " + to);
+ } else {
+ LOG.info("DB Upgraded from " + from + " to " + to);
+ }
+ }
+ }
+
+
+ @Override
+ public void addTable(final TableDesc table) throws IOException {
+ Statement stmt = null;
+ ResultSet res;
+
+ String sql =
+ "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
+ + "VALUES('" + table.getName() + "', "
+ + "'" + table.getPath() + "', "
+ + "'" + table.getMeta().getStoreType() + "'"
+ + ")";
+
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+
+
+ stmt = conn.createStatement();
+ sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID
+ + " = '" + table.getName() + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no tid matched to "
+ + table.getName());
+ }
+ int tid = res.getInt("TID");
+
+ String colSql;
+ int columnId = 0;
+ for (Column col : table.getMeta().getSchema().getColumns()) {
+ colSql = columnToSQL(tid, table, columnId, col);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(colSql);
+ }
+ stmt.addBatch(colSql);
+ columnId++;
+ }
+
+ Iterator<Entry<String, String>> it = table.getMeta().toMap().entrySet().iterator();
+ String optSql;
+ while (it.hasNext()) {
+ optSql = keyvalToSQL(table, it.next());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(optSql);
+ }
+ stmt.addBatch(optSql);
+ }
+ stmt.executeBatch();
+ if (table.getMeta().getStat() != null) {
+ sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
+ + "VALUES ('" + table.getName() + "', "
+ + table.getMeta().getStat().getNumRows() + ","
+ + table.getMeta().getStat().getNumBytes() + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.addBatch(sql);
+ stmt.executeBatch();
+ }
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ private String columnToSQL(final int tid, final TableDesc desc,
+ final int columnId, final Column col) {
+ String sql =
+ "INSERT INTO " + TB_COLUMNS
+ + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
+ + "VALUES("
+ + tid + ","
+ + "'" + desc.getName() + "',"
+ + columnId + ", "
+ + "'" + col.getColumnName() + "',"
+ + "'" + col.getDataType().getType().name() + "',"
+ + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
+ + ")";
+
+ return sql;
+ }
+
+ private String keyvalToSQL(final TableDesc desc,
+ final Entry<String, String> keyVal) {
+ String sql =
+ "INSERT INTO " + TB_OPTIONS
+ + " (" + C_TABLE_ID + ", key_, value_) "
+ + "VALUES("
+ + "'" + desc.getName() + "',"
+ + "'" + keyVal.getKey() + "',"
+ + "'" + keyVal.getValue() + "'"
+ + ")";
+
+ return sql;
+ }
+
+ @Override
+ public boolean existTable(final String name) throws IOException {
+ StringBuilder sql = new StringBuilder();
+ sql.append("SELECT " + C_TABLE_ID + " from ")
+ .append(TB_TABLES)
+ .append(" WHERE " + C_TABLE_ID + " = '")
+ .append(name)
+ .append("'");
+
+ Statement stmt = null;
+ boolean exist = false;
+
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql.toString());
+ }
+ ResultSet res = stmt.executeQuery(sql.toString());
+ exist = res.next();
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+
+ return exist;
+ }
+
+ @Override
+ public void deleteTable(final String name) throws IOException {
+ Statement stmt = null;
+ String sql = null;
+
+ try {
+ stmt = conn.createStatement();
+ sql = "DELETE FROM " + TB_COLUMNS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_OPTIONS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt = conn.createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_STATISTICS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt = conn.createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_TABLES +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt = conn.createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ @Override
+ public TableDesc getTable(final String name) throws IOException {
+ ResultSet res = null;
+ Statement stmt = null;
+
+ String tableName = null;
+ Path path = null;
+ StoreType storeType = null;
+ Options options;
+ TableStat stat = null;
+
+ try {
+ String sql =
+ "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt = conn.createStatement();
+ res = stmt.executeQuery(sql);
+ if (!res.next()) { // there is no table of the given name.
+ return null;
+ }
+ tableName = res.getString(C_TABLE_ID).trim();
+ path = new Path(res.getString("path").trim());
+ storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ Schema schema = null;
+ try {
+ String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
+ + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
+
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ schema = new Schema();
+ while (res.next()) {
+ String columnName = tableName + "."
+ + res.getString("column_name").trim();
+ Type dataType = getDataType(res.getString("data_type")
+ .trim());
+ int typeLength = res.getInt("type_length");
+ if (typeLength > 0) {
+ schema.addColumn(columnName, dataType, typeLength);
+ } else {
+ schema.addColumn(columnName, dataType);
+ }
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ options = Options.create();
+ try {
+ String sql = "SELECT key_, value_ from " + TB_OPTIONS
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ while (res.next()) {
+ options.put(
+ res.getString("key_"),
+ res.getString("value_"));
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ try {
+ String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt = conn.createStatement();
+ res = stmt.executeQuery(sql);
+
+ if (res.next()) {
+ stat = new TableStat();
+ stat.setNumRows(res.getLong("num_rows"));
+ stat.setNumBytes(res.getLong("num_bytes"));
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ TableMeta meta = new TableMetaImpl(schema, storeType, options);
+ if (stat != null) {
+ meta.setStat(stat);
+ }
+ TableDesc table = new TableDescImpl(tableName, meta, path);
+
+ return table;
+ }
+
+ private Type getDataType(final String typeStr) {
+ try {
+ return Enum.valueOf(Type.class, typeStr);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
+ return null;
+ }
+ }
+
+ @Override
+ public List<String> getAllTableNames() throws IOException {
+ String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
+
+ Statement stmt = null;
+ ResultSet res = null;
+
+ List<String> tables = new ArrayList<String>();
+
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ while (res.next()) {
+ tables.add(res.getString(C_TABLE_ID).trim());
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+ return tables;
+ }
+
+ @Override
+ public void addIndex(final IndexDescProto proto) throws IOException {
+ String sql =
+ "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, "
+ + "data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
+ + "(?,?,?,?,?,?,?,?)";
+
+ PreparedStatement stmt = null;
+
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, proto.getName());
+ stmt.setString(2, proto.getTableId());
+ stmt.setString(3, proto.getColumn().getColumnName());
+ stmt.setString(4, proto.getColumn().getDataType().getType().name());
+ stmt.setString(5, proto.getIndexMethod().toString());
+ stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
+ stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
+ stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
+ stmt.executeUpdate();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ @Override
+ public void delIndex(final String indexName) throws IOException {
+ String sql =
+ "DELETE FROM " + TB_INDEXES
+ + " WHERE index_name='" + indexName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ Statement stmt = null;
+
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ @Override
+ public IndexDescProto getIndex(final String indexName)
+ throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+
+ IndexDescProto proto = null;
+
+ try {
+ String sql =
+ "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where index_name = ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, indexName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ res = stmt.executeQuery();
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no index matched to " + indexName);
+ }
+ proto = resultToProto(res);
+ } catch (SQLException se) {
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return proto;
+ }
+
+ @Override
+ public IndexDescProto getIndex(final String tableName,
+ final String columnName) throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+
+ IndexDescProto proto = null;
+
+ try {
+ String sql =
+ "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where " + C_TABLE_ID + " = ? AND column_name = ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ stmt.setString(2, columnName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no index matched to "
+ + tableName + "." + columnName);
+ }
+ proto = resultToProto(res);
+ } catch (SQLException se) {
+ new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return proto;
+ }
+
+ @Override
+ public boolean existIndex(final String indexName) throws IOException {
+ String sql = "SELECT index_name from " + TB_INDEXES
+ + " WHERE index_name = ?";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ PreparedStatement stmt = null;
+ ResultSet res = null;
+ boolean exist = false;
+
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, indexName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ exist = res.next();
+ } catch (SQLException se) {
+
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return exist;
+ }
+
+ @Override
+ public boolean existIndex(String tableName, String columnName)
+ throws IOException {
+ String sql = "SELECT index_name from " + TB_INDEXES
+ + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ PreparedStatement stmt = null;
+ boolean exist = false;
+ ResultSet res = null;
+
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ stmt.setString(2, columnName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ exist = res.next();
+ } catch (SQLException se) {
+
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return exist;
+ }
+
+ @Override
+ public IndexDescProto[] getIndexes(final String tableName)
+ throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+
+ List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+
+ try {
+ String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where " + C_TABLE_ID + "= ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ while (res.next()) {
+ protos.add(resultToProto(res));
+ }
+ } catch (SQLException se) {
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return protos.toArray(new IndexDescProto[protos.size()]);
+ }
+
+ private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+ IndexDescProto.Builder builder = IndexDescProto.newBuilder();
+ builder.setName(res.getString("index_name"));
+ builder.setTableId(res.getString(C_TABLE_ID));
+ builder.setColumn(resultToColumnProto(res));
+ builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
+ builder.setIsUnique(res.getBoolean("is_unique"));
+ builder.setIsClustered(res.getBoolean("is_clustered"));
+ builder.setIsAscending(res.getBoolean("is_ascending"));
+ return builder.build();
+ }
+
+ private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
+ ColumnProto.Builder builder = ColumnProto.newBuilder();
+ builder.setColumnName(res.getString("column_name"));
+ builder.setDataType(CatalogUtil.newSimpleDataType(getDataType(res.getString("data_type").trim())));
+ return builder.build();
+ }
+
+ private IndexMethod getIndexMethod(final String typeStr) {
+ if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
+ return IndexMethod.TWO_LEVEL_BIN_TREE;
+ } else {
+ LOG.error("Cannot find a matched type aginst from '"
+ + typeStr + "'");
+ // TODO - needs exception handling
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ CatalogUtil.closeSQLWrapper(conn);
+ LOG.info("Shutdown database (" + jdbcUri + ")");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
new file mode 100644
index 0000000..21d8a02
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -0,0 +1,886 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DerbyStore extends AbstractDBStore {
+ private static final String JDBC_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
+ protected Lock rlock;
+ protected Lock wlock;
+
+ protected String getJDBCDriverName(){
+ return JDBC_DRIVER;
+ }
+
+ public DerbyStore(final Configuration conf)
+ throws InternalException {
+
+ super(conf);
+ }
+
+ protected Connection createConnection(Configuration conf) throws SQLException {
+ return DriverManager.getConnection(getJDBCUri());
+ }
+
+ // TODO - DDL and index statements should be renamed
+ protected void createBaseTable() throws SQLException {
+ wlock.lock();
+ Statement stmt = null;
+ try {
+ // META
+ stmt = conn.createStatement();
+ String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(meta_ddl);
+ }
+ stmt.executeUpdate(meta_ddl);
+ LOG.info("Table '" + TB_META + " is created.");
+
+ // TABLES
+ String tables_ddl = "CREATE TABLE "
+ + TB_TABLES + " ("
+ + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
+ + "path VARCHAR(1024), "
+ + "store_type CHAR(16), "
+ + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
+ ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(tables_ddl);
+ }
+ stmt.addBatch(tables_ddl);
+ String idx_tables_tid =
+ "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_tables_tid);
+ }
+ stmt.addBatch(idx_tables_tid);
+
+ String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on "
+ + TB_TABLES + "(" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_tables_name);
+ }
+ stmt.addBatch(idx_tables_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_TABLES + "' is created.");
+
+ // COLUMNS
+ String columns_ddl =
+ "CREATE TABLE " + TB_COLUMNS + " ("
+ + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
+ + C_TABLE_ID + ") ON DELETE CASCADE, "
+ + "column_id INT NOT NULL,"
+ + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+ + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(columns_ddl);
+ }
+ stmt.addBatch(columns_ddl);
+
+ String idx_fk_columns_table_name =
+ "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
+ + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_fk_columns_table_name);
+ }
+ stmt.addBatch(idx_fk_columns_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_COLUMNS + " is created.");
+
+ // OPTIONS
+ String options_ddl =
+ "CREATE TABLE " + TB_OPTIONS +" ("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
+ + "ON DELETE CASCADE, "
+ + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(options_ddl);
+ }
+ stmt.addBatch(options_ddl);
+
+ String idx_options_key =
+ "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_options_key);
+ }
+ stmt.addBatch(idx_options_key);
+ String idx_options_table_name =
+ "CREATE INDEX idx_options_table_name on " + TB_OPTIONS
+ + "(" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_options_table_name);
+ }
+ stmt.addBatch(idx_options_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_OPTIONS + " is created.");
+
+ // INDEXES
+ String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
+ + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "column_name VARCHAR(255) NOT NULL, "
+ + "data_type VARCHAR(255) NOT NULL, "
+ + "index_type CHAR(32) NOT NULL, "
+ + "is_unique BOOLEAN NOT NULL, "
+ + "is_clustered BOOLEAN NOT NULL, "
+ + "is_ascending BOOLEAN NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(indexes_ddl);
+ }
+ stmt.addBatch(indexes_ddl);
+
+ String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON "
+ + TB_INDEXES + " (index_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_indexes_key);
+ }
+ stmt.addBatch(idx_indexes_key);
+
+ String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON "
+ + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_indexes_columns);
+ }
+ stmt.addBatch(idx_indexes_columns);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_INDEXES + "' is created.");
+
+ String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "num_rows BIGINT, "
+ + "num_bytes BIGINT)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stats_ddl);
+ }
+ stmt.addBatch(stats_ddl);
+
+ String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
+ + TB_STATISTICS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_stats_fk_table_name);
+ }
+ stmt.addBatch(idx_stats_fk_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_STATISTICS + "' is created.");
+
+ } finally {
+ wlock.unlock();
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ @Override
+ protected boolean isInitialized() throws SQLException {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.rlock = lock.readLock();
+ this.wlock = lock.writeLock();
+
+ wlock.lock();
+ ResultSet res = null;
+ try {
+ boolean found = false;
+ res = conn.getMetaData().getTables(null, null, null,
+ new String [] {"TABLE"});
+
+ String resName;
+ while (res.next() && !found) {
+ resName = res.getString("TABLE_NAME");
+ if (TB_META.equals(resName)
+ || TB_TABLES.equals(resName)
+ || TB_COLUMNS.equals(resName)
+ || TB_OPTIONS.equals(resName)) {
+ return true;
+ }
+ }
+ } finally {
+ wlock.unlock();
+ CatalogUtil.closeSQLWrapper(res);
+ }
+ return false;
+ }
+
+ final boolean checkInternalTable(final String tableName) throws SQLException {
+ rlock.lock();
+ ResultSet res = null;
+ try {
+ boolean found = false;
+ res = conn.getMetaData().getTables(null, null, null,
+ new String [] {"TABLE"});
+ while(res.next() && !found) {
+ if (tableName.equals(res.getString("TABLE_NAME")))
+ found = true;
+ }
+
+ return found;
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res);
+ }
+ }
+
+ @Override
+ public final void addTable(final TableDesc table) throws IOException {
+ Statement stmt = null;
+ ResultSet res = null;
+
+ String sql =
+ "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
+ + "VALUES('" + table.getName() + "', "
+ + "'" + table.getPath() + "', "
+ + "'" + table.getMeta().getStoreType() + "'"
+ + ")";
+
+ wlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+
+ sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID
+ + " = '" + table.getName() + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no tid matched to "
+ + table.getName());
+ }
+ int tid = res.getInt("TID");
+
+ String colSql;
+ int columnId = 0;
+ for (Column col : table.getMeta().getSchema().getColumns()) {
+ colSql = columnToSQL(tid, table, columnId, col);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(colSql);
+ }
+ stmt.addBatch(colSql);
+ columnId++;
+ }
+
+ Iterator<Entry<String,String>> it = table.getMeta().toMap().entrySet().iterator();
+ String optSql;
+ while (it.hasNext()) {
+ optSql = keyvalToSQL(table, it.next());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(optSql);
+ }
+ stmt.addBatch(optSql);
+ }
+ stmt.executeBatch();
+ if (table.getMeta().getStat() != null) {
+ sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
+ + "VALUES ('" + table.getName() + "', "
+ + table.getMeta().getStat().getNumRows() + ","
+ + table.getMeta().getStat().getNumBytes() + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+ }
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ wlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+ }
+
+ private String columnToSQL(final int tid, final TableDesc desc,
+ final int columnId, final Column col) {
+ String sql =
+ "INSERT INTO " + TB_COLUMNS
+ + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
+ + "VALUES("
+ + tid + ","
+ + "'" + desc.getName() + "',"
+ + columnId + ", "
+ + "'" + col.getColumnName() + "',"
+ + "'" + col.getDataType().getType().name() + "',"
+ + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
+ + ")";
+
+ return sql;
+ }
+
+ private String keyvalToSQL(final TableDesc desc,
+ final Entry<String,String> keyVal) {
+ String sql =
+ "INSERT INTO " + TB_OPTIONS
+ + " (" + C_TABLE_ID + ", key_, value_) "
+ + "VALUES("
+ + "'" + desc.getName() + "',"
+ + "'" + keyVal.getKey() + "',"
+ + "'" + keyVal.getValue() + "'"
+ + ")";
+
+ return sql;
+ }
+
+ @Override
+ public final boolean existTable(final String name) throws IOException {
+ StringBuilder sql = new StringBuilder();
+ sql.append("SELECT " + C_TABLE_ID + " from ")
+ .append(TB_TABLES)
+ .append(" WHERE " + C_TABLE_ID + " = '")
+ .append(name)
+ .append("'");
+
+ Statement stmt = null;
+ ResultSet res = null;
+ boolean exist = false;
+ rlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql.toString());
+ }
+ res = stmt.executeQuery(sql.toString());
+ exist = res.next();
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return exist;
+ }
+
+ @Override
+ public final void deleteTable(final String name) throws IOException {
+ Statement stmt = null;
+ String sql = null;
+ try {
+ wlock.lock();
+ stmt = conn.createStatement();
+ try {
+ sql = "DELETE FROM " + TB_COLUMNS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_OPTIONS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_STATISTICS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_TABLES +
+ " WHERE " + C_TABLE_ID +" = '" + name + "'";
+ LOG.info(sql);
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ }
+
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ wlock.unlock();
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ @Override
+ public final TableDesc getTable(final String name) throws IOException {
+ ResultSet res = null;
+ Statement stmt = null;
+
+ String tableName = null;
+ Path path = null;
+ StoreType storeType = null;
+ Options options;
+ TableStat stat = null;
+
+ try {
+ rlock.lock();
+ stmt = conn.createStatement();
+
+ try {
+ String sql =
+ "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ res = stmt.executeQuery(sql);
+ if (!res.next()) { // there is no table of the given name.
+ return null;
+ }
+ tableName = res.getString(C_TABLE_ID).trim();
+ path = new Path(res.getString("path").trim());
+ storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res);
+ }
+
+ Schema schema = null;
+ try {
+ String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
+ + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ schema = new Schema();
+ while (res.next()) {
+ String columnName = tableName + "."
+ + res.getString("column_name").trim();
+ Type dataType = getDataType(res.getString("data_type")
+ .trim());
+ int typeLength = res.getInt("type_length");
+ if (typeLength > 0) {
+ schema.addColumn(columnName, dataType, typeLength);
+ } else {
+ schema.addColumn(columnName, dataType);
+ }
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res);
+ }
+
+ options = Options.create();
+ try {
+ String sql = "SELECT key_, value_ from " + TB_OPTIONS
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ while (res.next()) {
+ options.put(
+ res.getString("key_"),
+ res.getString("value_"));
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res);
+ }
+
+ try {
+ String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ res = stmt.executeQuery(sql);
+
+ if (res.next()) {
+ stat = new TableStat();
+ stat.setNumRows(res.getLong("num_rows"));
+ stat.setNumBytes(res.getLong("num_bytes"));
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res);
+ }
+
+ TableMeta meta = new TableMetaImpl(schema, storeType, options);
+ if (stat != null) {
+ meta.setStat(stat);
+ }
+ TableDesc table = new TableDescImpl(tableName, meta, path);
+
+ return table;
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ private Type getDataType(final String typeStr) {
+ try {
+ return Enum.valueOf(Type.class, typeStr);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
+ return null;
+ }
+ }
+
+ @Override
+ public final List<String> getAllTableNames() throws IOException {
+ String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
+
+ Statement stmt = null;
+ ResultSet res = null;
+
+ List<String> tables = new ArrayList<String>();
+ rlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ while (res.next()) {
+ tables.add(res.getString(C_TABLE_ID).trim());
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+ return tables;
+ }
+
+ public final void addIndex(final IndexDescProto proto) throws IOException {
+ String sql =
+ "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, "
+ +"data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
+ +"(?,?,?,?,?,?,?,?)";
+
+ PreparedStatement stmt = null;
+
+ wlock.lock();
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, proto.getName());
+ stmt.setString(2, proto.getTableId());
+ stmt.setString(3, proto.getColumn().getColumnName());
+ stmt.setString(4, proto.getColumn().getDataType().getType().name());
+ stmt.setString(5, proto.getIndexMethod().toString());
+ stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
+ stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
+ stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
+ stmt.executeUpdate();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ wlock.unlock();
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ public final void delIndex(final String indexName) throws IOException {
+ String sql =
+ "DELETE FROM " + TB_INDEXES
+ + " WHERE index_name='" + indexName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ Statement stmt = null;
+ wlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ wlock.unlock();
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ public final IndexDescProto getIndex(final String indexName)
+ throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+
+ IndexDescProto proto = null;
+
+ rlock.lock();
+ try {
+ String sql =
+ "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where index_name = ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, indexName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ res = stmt.executeQuery();
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no index matched to " + indexName);
+ }
+ proto = resultToProto(res);
+ } catch (SQLException se) {
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return proto;
+ }
+
+ public final IndexDescProto getIndex(final String tableName,
+ final String columnName) throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+
+ IndexDescProto proto = null;
+
+ rlock.lock();
+ try {
+ String sql =
+ "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where " + C_TABLE_ID + " = ? AND column_name = ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ stmt.setString(2, columnName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no index matched to "
+ + tableName + "." + columnName);
+ }
+ proto = resultToProto(res);
+ } catch (SQLException se) {
+ new IOException(se);
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return proto;
+ }
+
+ public final boolean existIndex(final String indexName) throws IOException {
+ String sql = "SELECT index_name from " + TB_INDEXES
+ + " WHERE index_name = ?";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ PreparedStatement stmt = null;
+ ResultSet res = null;
+ boolean exist = false;
+ rlock.lock();
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, indexName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ exist = res.next();
+ } catch (SQLException se) {
+
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return exist;
+ }
+
+ @Override
+ public boolean existIndex(String tableName, String columnName)
+ throws IOException {
+ String sql = "SELECT index_name from " + TB_INDEXES
+ + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ PreparedStatement stmt = null;
+ ResultSet res = null;
+ boolean exist = false;
+ rlock.lock();
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ stmt.setString(2, columnName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ exist = res.next();
+ } catch (SQLException se) {
+
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return exist;
+ }
+
+ public final IndexDescProto [] getIndexes(final String tableName)
+ throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+
+ List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+
+ rlock.lock();
+ try {
+ String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where " + C_TABLE_ID + "= ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ while (res.next()) {
+ protos.add(resultToProto(res));
+ }
+ } catch (SQLException se) {
+ } finally {
+ rlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return protos.toArray(new IndexDescProto [protos.size()]);
+ }
+
+ private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+ IndexDescProto.Builder builder = IndexDescProto.newBuilder();
+ builder.setName(res.getString("index_name"));
+ builder.setTableId(res.getString(C_TABLE_ID));
+ builder.setColumn(resultToColumnProto(res));
+ builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
+ builder.setIsUnique(res.getBoolean("is_unique"));
+ builder.setIsClustered(res.getBoolean("is_clustered"));
+ builder.setIsAscending(res.getBoolean("is_ascending"));
+ return builder.build();
+ }
+
+ private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
+ ColumnProto.Builder builder = ColumnProto.newBuilder();
+ builder.setColumnName(res.getString("column_name"));
+ builder.setDataType(CatalogUtil.newSimpleDataType(
+ getDataType(res.getString("data_type").trim())));
+ return builder.build();
+ }
+
+ private IndexMethod getIndexMethod(final String typeStr) {
+ if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
+ return IndexMethod.TWO_LEVEL_BIN_TREE;
+ } else {
+ LOG.error("Cannot find a matched type aginst from '"
+ + typeStr + "'");
+ // TODO - needs exception handling
+ return null;
+ }
+ }
+
+ @Override
+ public final void addFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void deleteFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void existFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final List<String> getAllFunctionNames() throws IOException {
+ // TODO - not implemented yet
+ return null;
+ }
+
+ @Override
+ public final void close() {
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true");
+ } catch (SQLException e) {
+ // TODO - to be fixed
+ //LOG.error(e.getMessage(), e);
+ }
+
+ LOG.info("Shutdown database (" + jdbcUri + ")");
+ }
+
+
+ public static void main(final String[] args) throws IOException {
+ @SuppressWarnings("unused")
+ DerbyStore store = new DerbyStore(new TajoConf());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
new file mode 100644
index 0000000..c638205
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.List;
+
+public class MySQLStore extends AbstractDBStore {
+
+ private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
+ protected String getJDBCDriverName(){
+ return JDBC_DRIVER;
+ }
+
+ public MySQLStore(Configuration conf) throws InternalException {
+ super(conf);
+ }
+
+ protected Connection createConnection(Configuration conf) throws SQLException {
+ Connection con = DriverManager.getConnection(getJDBCUri(), conf.get(CONNECTION_ID), conf.get(CONNECTION_PASSWORD));
+ //TODO con.setAutoCommit(false);
+ return con;
+ }
+
+ // TODO - DDL and index statements should be renamed
+ protected void createBaseTable() throws SQLException {
+
+ // META
+ Statement stmt = conn.createStatement();
+ String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(meta_ddl);
+ }
+ try {
+ int result = stmt.executeUpdate(meta_ddl);
+ LOG.info("Table '" + TB_META + " is created.");
+
+ // TABLES
+ String tables_ddl = "CREATE TABLE "
+ + TB_TABLES + " ("
+ + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
+ + "path TEXT, "
+ + "store_type CHAR(16)"
+ + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(tables_ddl);
+ }
+
+ LOG.info("Table '" + TB_TABLES + "' is created.");
+ result = stmt.executeUpdate(tables_ddl);
+ // COLUMNS
+
+ String columns_ddl =
+ "CREATE TABLE " + TB_COLUMNS + " ("
+ + "TID INT NOT NULL,"
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "column_id INT NOT NULL,"
+ + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+ + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
+ + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(columns_ddl);
+ }
+
+ LOG.info("Table '" + TB_COLUMNS + " is created.");
+ result = stmt.executeUpdate(columns_ddl);
+ // OPTIONS
+
+ String options_ddl =
+ "CREATE TABLE " + TB_OPTIONS + " ("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
+ + "INDEX("+C_TABLE_ID+", key_),"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(options_ddl);
+ }
+ LOG.info("Table '" + TB_OPTIONS + " is created.");
+ result = stmt.executeUpdate(options_ddl);
+ // INDEXES
+
+ String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
+ + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "column_name VARCHAR(255) NOT NULL, "
+ + "data_type VARCHAR(255) NOT NULL, "
+ + "index_type CHAR(32) NOT NULL, "
+ + "is_unique BOOLEAN NOT NULL, "
+ + "is_clustered BOOLEAN NOT NULL, "
+ + "is_ascending BOOLEAN NOT NULL,"
+ + "INDEX(" + C_TABLE_ID + ", column_name),"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(indexes_ddl);
+ }
+ LOG.info("Table '" + TB_INDEXES + "' is created.");
+ result = stmt.executeUpdate(indexes_ddl);
+
+ String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "num_rows BIGINT, "
+ + "num_bytes BIGINT,"
+ + "INDEX("+C_TABLE_ID+"),"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stats_ddl);
+ }
+ LOG.info("Table '" + TB_STATISTICS + "' is created.");
+ result = stmt.executeUpdate(stats_ddl);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ protected boolean isInitialized() throws SQLException {
+ boolean found = false;
+ ResultSet res = conn.getMetaData().getTables(null, null, null,
+ new String[]{"TABLE"});
+
+ String resName;
+ try {
+ while (res.next() && !found) {
+ resName = res.getString("TABLE_NAME");
+ if (TB_META.equals(resName)
+ || TB_TABLES.equals(resName)
+ || TB_COLUMNS.equals(resName)
+ || TB_OPTIONS.equals(resName)) {
+ return true;
+ }
+ }
+ } finally {
+ CatalogUtil.closeSQLWrapper(res);
+ }
+ return false;
+ }
+
+ @Override
+ public final void addFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void deleteFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void existFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final List<String> getAllFunctionNames() throws IOException {
+ // TODO - not implemented yet
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index 47ec4dd..183746d 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,15 +22,16 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.catalog.store.DBStore;
+import org.apache.tajo.catalog.store.AbstractDBStore;
+import org.apache.tajo.catalog.store.DerbyStore;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
import java.io.File;
@@ -39,16 +40,16 @@ import static org.junit.Assert.*;
public class TestDBStore {
private static final Log LOG = LogFactory.getLog(TestDBStore.class);
private static Configuration conf;
- private static DBStore store;
+ private static AbstractDBStore store;
@BeforeClass
public static void setUp() throws Exception {
conf = new TajoConf();
Path testDir = CommonTestingUtil.getTestDir("target/test-data/TestDBSTore");
File absolutePath = new File(testDir.toUri());
- conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:"+absolutePath.getAbsolutePath()+"/db");
+ conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:"+absolutePath.getAbsolutePath()+"/db;create=true");
LOG.info("derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
- store = new DBStore(conf);
+ store = new DerbyStore(conf);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
index 5349618..a6b2183 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
+++ b/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
@@ -32,6 +32,6 @@
<property>
<name>tajo.catalog.jdbc.uri</name>
- <value>jdbc:derby:target/test-data/tcat/db</value>
+ <value>jdbc:derby:target/test-data/tcat/db;create=true</value>
</property>
</configuration>