You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/27 09:29:38 UTC
[3/4] TAJO-475: Table partition catalog recap. (Min Zhou and hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 60c88e9..5d6f747 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -21,19 +21,21 @@
*/
package org.apache.tajo.catalog.store;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.CatalogConstants;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.exception.InternalException;
@@ -43,7 +45,6 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
-import java.util.Map.Entry;
public abstract class AbstractDBStore extends CatalogConstants implements CatalogStore {
protected final Log LOG = LogFactory.getLog(getClass());
@@ -60,9 +61,9 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
protected abstract Connection createConnection(final Configuration conf) throws SQLException;
- protected abstract boolean isInitialized() throws SQLException;
+ protected abstract boolean isInitialized() throws IOException;
- protected abstract void createBaseTable() throws SQLException;
+ protected abstract void createBaseTable() throws IOException;
public AbstractDBStore(Configuration conf)
throws InternalException {
@@ -117,7 +118,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
} else {
LOG.info("The base tables of CatalogServer already is initialized.");
}
- } catch (SQLException se) {
+ } catch (IOException se) {
throw new InternalException(
"Cannot initialize the persistent storage of Catalog", se);
}
@@ -125,7 +126,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
int dbVersion = 0;
try {
dbVersion = needUpgrade();
- } catch (SQLException e) {
+ } catch (IOException e) {
throw new InternalException(
"Cannot check if the DB need to be upgraded", e);
}
@@ -145,250 +146,189 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
return catalogUri;
}
- public Connection getConnection() throws SQLException{
- boolean isValid = false;
- try{
- isValid = conn.isValid(100);
- } catch (SQLException e){
- }
-
- if(!isValid){
+ public Connection getConnection() throws IOException {
+ try {
+ boolean isValid = conn.isValid(100);
+ if (!isValid) {
+ CatalogUtil.closeSQLWrapper(conn);
+ conn = createConnection(conf);
+ }
+ } catch (SQLException e) {
CatalogUtil.closeSQLWrapper(conn);
- conn = createConnection(conf);
+ throw new IOException(e);
}
return conn;
}
- private int needUpgrade() throws SQLException {
- String sql = "SELECT VERSION FROM " + TB_META;
- Statement stmt = getConnection().createStatement();
- ResultSet res = stmt.executeQuery(sql);
+ private int needUpgrade() throws IOException {
+ Statement stmt = null;
+ try {
+ String sql = "SELECT VERSION FROM " + TB_META;
+ stmt = getConnection().createStatement();
+ ResultSet res = stmt.executeQuery(sql);
- if (!res.next()) { // if this db version is 0
- insertVersion();
- return 0;
- } else {
- return res.getInt(1);
+ if (!res.next()) { // if this db version is 0
+ insertVersion();
+ return 0;
+ } else {
+ return res.getInt(1);
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
}
}
- private void insertVersion() throws SQLException {
- String sql = "INSERT INTO " + TB_META + " values (0)";
- Statement stmt = getConnection().createStatement();
- stmt.executeUpdate(sql);
- stmt.close();
+ private void insertVersion() throws IOException {
+ Statement stmt = null;
+ try {
+ String sql = "INSERT INTO " + TB_META + " values (0)";
+ stmt = getConnection().createStatement();
+ stmt.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
}
- private void upgrade(int from, int to) throws SQLException {
- String sql;
- Statement stmt;
- if (from == 0) {
- if (to == 1) {
- sql = "DROP INDEX idx_options_key";
- LOG.info(sql);
+ private void upgrade(int from, int to) throws IOException {
+ Statement stmt = null;
+ try {
+ if (from == 0) {
+ if (to == 1) {
+ String sql = "DROP INDEX idx_options_key";
+ LOG.info(sql);
- stmt = getConnection().createStatement();
- stmt.addBatch(sql);
+ stmt = getConnection().createStatement();
+ stmt.addBatch(sql);
- sql =
- "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
- stmt.addBatch(sql);
- LOG.info(sql);
- stmt.executeBatch();
- stmt.close();
+ sql =
+ "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+ stmt.addBatch(sql);
+ LOG.info(sql);
+ stmt.executeBatch();
- LOG.info("DB Upgraded from " + from + " to " + to);
- } else {
- LOG.info("DB Upgraded from " + from + " to " + to);
+ LOG.info("DB Upgraded from " + from + " to " + to);
+ } else {
+ LOG.info("DB Upgraded from " + from + " to " + to);
+ }
}
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
}
}
@Override
- public void addTable(final TableDesc table) throws IOException {
- Statement stmt = null;
+ public void addTable(final CatalogProtos.TableDescProto table) throws IOException {
PreparedStatement pstmt = null;
+ ResultSet res = null;
+ Connection conn = getConnection();
+ try {
+ conn.setAutoCommit(false);
- ResultSet res;
-
- String sql =
- "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
- + "VALUES('" + table.getName() + "', "
- + "'" + table.getPath() + "', "
- + "'" + table.getMeta().getStoreType() + "'"
- + ")";
+ String tableName = table.getId().toLowerCase();
- try {
- stmt = getConnection().createStatement();
+ String sql = String.format("INSERT INTO %s (%s, path, store_type) VALUES(?, ?, ?) ", TB_TABLES, C_TABLE_ID);
+ pstmt = conn.prepareStatement(sql);
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, table.getPath());
+ pstmt.setString(3, table.getMeta().getStoreType().name());
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
- stmt.executeUpdate(sql);
+ pstmt.executeUpdate();
-
- stmt = getConnection().createStatement();
- sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID
- + " = '" + table.getName() + "'";
+ String tidSql = String.format("SELECT TID from %s WHERE %s = ?", TB_TABLES, C_TABLE_ID);
+ pstmt = conn.prepareStatement(tidSql);
+ pstmt.setString(1, tableName);
if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
+ LOG.debug(tidSql);
}
- res = stmt.executeQuery(sql);
+ res = pstmt.executeQuery();
if (!res.next()) {
throw new IOException("ERROR: there is no tid matched to "
- + table.getName());
+ + table.getId());
}
int tid = res.getInt("TID");
- String colSql;
- int columnId = 0;
- for (Column col : table.getSchema().getColumns()) {
- colSql = columnToSQL(tid, table, columnId, col);
+ String colSql = String.format("INSERT INTO %s (TID, %s, column_id, column_name, data_type, type_length)"
+ + " VALUES(?, ?, ?, ?, ?, ?) ", TB_COLUMNS, C_TABLE_ID);
+ pstmt = conn.prepareStatement(colSql);
+ for(int i = 0; i < table.getSchema().getFieldsCount(); i++) {
+ ColumnProto col = table.getSchema().getFields(i);
+ pstmt.setInt(1, tid);
+ pstmt.setString(2, tableName);
+ pstmt.setInt(3, i);
+ pstmt.setString(4, col.getColumnName());
+ pstmt.setString(5, col.getDataType().getType().name());
+ pstmt.setInt(6, (col.getDataType().hasLength() ? col.getDataType().getLength() : 0));
+ pstmt.addBatch();
if (LOG.isDebugEnabled()) {
LOG.debug(colSql);
}
- stmt.addBatch(colSql);
- columnId++;
}
+ pstmt.executeBatch();
-
- String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
- pstmt = getConnection().prepareStatement(optSql);
- try {
- for (Entry<String, String> entry : table.getMeta().toMap().entrySet()) {
- pstmt.setString(1, table.getName());
+ if(table.getMeta().hasParams()) {
+ String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
+ pstmt = conn.prepareStatement(optSql);
+ for (CatalogProtos.KeyValueProto entry : table.getMeta().getParams().getKeyvalList()) {
+ pstmt.setString(1, tableName);
pstmt.setString(2, entry.getKey());
pstmt.setString(3, entry.getValue());
+ pstmt.addBatch();
if (LOG.isDebugEnabled()) {
LOG.debug(optSql);
}
- pstmt.addBatch();
}
pstmt.executeBatch();
- } finally {
- CatalogUtil.closeSQLWrapper(pstmt);
}
- if (table.getStats() != null) {
- sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
- + "VALUES ('" + table.getName() + "', "
- + table.getStats().getNumRows() + ","
- + table.getStats().getNumBytes() + ")";
+
+ if (table.hasStats()) {
+ String statSql =
+ String.format("INSERT INTO %s (%s, num_rows, num_bytes) VALUES(?, ?, ?)", TB_STATISTICS, C_TABLE_ID);
+ pstmt = conn.prepareStatement(statSql);
+ pstmt.setString(1, tableName);
+ pstmt.setLong(2, table.getStats().getNumRows());
+ pstmt.setLong(3, table.getStats().getNumBytes());
+ pstmt.addBatch();
if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
+ LOG.debug(statSql);
}
- stmt.addBatch(sql);
- stmt.executeBatch();
- }
-
- //Partition
- if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
- try {
- PartitionDesc partitionDesc = table.getPartitions();
- List<Column> columnList = partitionDesc.getColumns();
-
- // Find columns which used for a partitioned table.
- StringBuffer columns = new StringBuffer();
- for(Column eachColumn : columnList) {
- sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
- + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
- if (!res.next()) {
- throw new IOException("ERROR: there is no columnId matched to "
- + table.getName());
- }
- columnId = res.getInt("column_id");
-
- if (columns.length() > 0) {
- columns.append(",");
- }
- columns.append(columnId);
- }
-
- // Set default partition name. But if user named to subpartition, it would be updated.
-// String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
-
- sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
- + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
- pstmt = getConnection().prepareStatement(sql);
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
+ pstmt.executeBatch();
+ }
- // Find information for subpartitions
- if (partitionDesc.getSpecifiers() != null) {
- int count = 1;
- if (partitionDesc.getSpecifiers().size() == 0) {
- pstmt.clearParameters();
- pstmt.setString(1, null);
- pstmt.setInt(2, tid);
- pstmt.setString(3, partitionDesc.getPartitionsType().name());
- pstmt.setInt(4, partitionDesc.getNumPartitions());
- pstmt.setString(5, columns.toString());
- pstmt.setString(6, null);
- pstmt.addBatch();
- } else {
- for(Specifier specifier: partitionDesc.getSpecifiers()) {
- pstmt.clearParameters();
- if (specifier.getName() != null && !specifier.getName().equals("")) {
- pstmt.setString(1, specifier.getName());
- } else {
- pstmt.setString(1, null);
- }
- pstmt.setInt(2, tid);
- pstmt.setString(3, partitionDesc.getPartitionsType().name());
- pstmt.setInt(4, partitionDesc.getNumPartitions());
- pstmt.setString(5, columns.toString());
- pstmt.setString(6, specifier.getExpressions());
- pstmt.addBatch();
- count++;
- }
- }
- } else {
- pstmt.clearParameters();
- pstmt.setString(1, null);
- pstmt.setInt(2, tid);
- pstmt.setString(3, partitionDesc.getPartitionsType().name());
- pstmt.setInt(4, partitionDesc.getNumPartitions());
- pstmt.setString(5, columns.toString());
- pstmt.setString(6, null);
- pstmt.addBatch();
- }
- pstmt.executeBatch();
- } finally {
- CatalogUtil.closeSQLWrapper(pstmt);
- }
+ if(table.hasPartition()) {
+ String partSql =
+ String.format("INSERT INTO %s (%s, partition_type, expression, expression_schema) VALUES(?, ?, ?, ?)",
+ TB_PARTITION_METHODS, C_TABLE_ID);
+ pstmt = conn.prepareStatement(partSql);
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, table.getPartition().getPartitionType().name());
+ pstmt.setString(3, table.getPartition().getExpression());
+ pstmt.setBytes(4, table.getPartition().getExpressionSchema().toByteArray());
+ pstmt.executeUpdate();
}
+ conn.commit();
} catch (SQLException se) {
+ try {
+ conn.rollback();
+ } catch (SQLException rbe) {
+ throw new IOException(se.getMessage(), rbe);
+ }
throw new IOException(se.getMessage(), se);
} finally {
CatalogUtil.closeSQLWrapper(pstmt);
- CatalogUtil.closeSQLWrapper(stmt);
}
}
- private String columnToSQL(final int tid, final TableDesc desc,
- final int columnId, final Column col) {
- String sql =
- "INSERT INTO " + TB_COLUMNS
- + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
- + "VALUES("
- + tid + ","
- + "'" + desc.getName() + "',"
- + columnId + ", "
- + "'" + col.getColumnName() + "',"
- + "'" + col.getDataType().getType().name() + "',"
- + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
- + ")";
-
- return sql;
- }
-
@Override
public boolean existTable(final String name) throws IOException {
StringBuilder sql = new StringBuilder();
@@ -423,6 +363,14 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
String sql = null;
try {
+ getConnection().setAutoCommit(false);
+
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ try {
stmt = getConnection().createStatement();
sql = "DELETE FROM " + TB_COLUMNS +
" WHERE " + C_TABLE_ID + " = '" + name + "'";
@@ -460,9 +408,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
try {
- sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
- + " SELECT TID FROM " + TB_TABLES
- + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+ sql = "DELETE FROM " + TB_PARTTIONS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
LOG.info(sql);
stmt = getConnection().createStatement();
stmt.execute(sql);
@@ -487,21 +434,15 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
@Override
- public TableDesc getTable(final String name) throws IOException {
+ public CatalogProtos.TableDescProto getTable(final String name) throws IOException {
ResultSet res = null;
Statement stmt = null;
- String tableName = null;
- Path path = null;
+ CatalogProtos.TableDescProto.Builder tableBuilder = CatalogProtos.TableDescProto.newBuilder();
StoreType storeType = null;
- Options options;
- TableStats stat = null;
- PartitionDesc partitionDesc = null;
- int tid = 0;
-
try {
String sql =
- "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
+ "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+ " WHERE " + C_TABLE_ID + "='" + name + "'";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
@@ -511,17 +452,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
if (!res.next()) { // there is no table of the given name.
return null;
}
- tableName = res.getString(C_TABLE_ID).trim();
- path = new Path(res.getString("path").trim());
+ tableBuilder.setId(res.getString(C_TABLE_ID).trim());
+ tableBuilder.setPath(res.getString("path").trim());
+
storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
- tid = res.getInt("TID");
+
} catch (SQLException se) {
throw new IOException(se);
} finally {
CatalogUtil.closeSQLWrapper(res, stmt);
}
- Schema schema = null;
+ CatalogProtos.SchemaProto.Builder schemaBuilder = CatalogProtos.SchemaProto.newBuilder();
try {
String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
+ " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
@@ -531,27 +473,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
LOG.debug(sql);
}
res = stmt.executeQuery(sql);
-
- schema = new Schema();
while (res.next()) {
- String columnName = tableName + "."
- + res.getString("column_name").trim();
- Type dataType = getDataType(res.getString("data_type")
- .trim());
- int typeLength = res.getInt("type_length");
- if (typeLength > 0) {
- schema.addColumn(columnName, dataType, typeLength);
- } else {
- schema.addColumn(columnName, dataType);
- }
+ schemaBuilder.addFields(resultToColumnProto(res));
}
} catch (SQLException se) {
throw new IOException(se);
} finally {
CatalogUtil.closeSQLWrapper(res, stmt);
}
+ tableBuilder.setSchema(CatalogUtil.getQualfiedSchema(name, schemaBuilder.build()));
- options = Options.create();
+ CatalogProtos.TableProto.Builder metaBuilder = CatalogProtos.TableProto.newBuilder();
+ metaBuilder.setStoreType(storeType);
try {
String sql = "SELECT key_, value_ from " + TB_OPTIONS
+ " WHERE " + C_TABLE_ID + "='" + name + "'";
@@ -560,17 +493,13 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
LOG.debug(sql);
}
res = stmt.executeQuery(sql);
-
- while (res.next()) {
- options.put(
- res.getString("key_"),
- res.getString("value_"));
- }
+ metaBuilder.setParams(resultToKeyValueSetProto(res));
} catch (SQLException se) {
throw new IOException(se);
} finally {
CatalogUtil.closeSQLWrapper(res, stmt);
}
+ tableBuilder.setMeta(metaBuilder);
try {
String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
@@ -582,9 +511,10 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
res = stmt.executeQuery(sql);
if (res.next()) {
- stat = new TableStats();
- stat.setNumRows(res.getLong("num_rows"));
- stat.setNumBytes(res.getLong("num_bytes"));
+ TableStatsProto.Builder statBuilder = TableStatsProto.newBuilder();
+ statBuilder.setNumRows(res.getLong("num_rows"));
+ statBuilder.setNumBytes(res.getLong("num_bytes"));
+ tableBuilder.setStats(statBuilder);
}
} catch (SQLException se) {
throw new IOException(se);
@@ -593,53 +523,30 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
try {
- String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
- + " WHERE TID =" + tid + "";
- stmt = getConnection().createStatement();
+ String sql =
+ "SELECT partition_type, expression, expression_schema from " + TB_PARTITION_METHODS
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
+ stmt = getConnection().createStatement();
res = stmt.executeQuery(sql);
- while (res.next()) {
- if (partitionDesc == null) {
- partitionDesc = new PartitionDesc();
- String[] columns = res.getString("columns").split(",");
- for(String eachColumn: columns) {
- partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
- }
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
- partitionDesc.setNumPartitions(res.getInt("quantity"));
- }
-
- Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
- partitionDesc.addSpecifier(specifier);
+ if (res.next()) {
+ tableBuilder.setPartition(resultToPartitionMethodProto(name, res));
}
-
} catch (SQLException se) {
throw new IOException(se);
} finally {
CatalogUtil.closeSQLWrapper(res, stmt);
}
- TableMeta meta = new TableMeta(storeType, options);
- TableDesc table = new TableDesc(tableName, schema, meta, path);
- if (stat != null) {
- table.setStats(stat);
- }
-
- if (partitionDesc != null) {
- table.setPartitions(partitionDesc);
- }
-
- return table;
+ return tableBuilder.build();
}
- private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+ private ColumnProto getColumn(String tableName, int tid, String columnId) throws IOException {
ResultSet res = null;
- Column column = null;
Statement stmt = null;
-
try {
String sql = "SELECT column_name, data_type, type_length from "
+ TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
@@ -651,23 +558,14 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
res = stmt.executeQuery(sql);
if (res.next()) {
- String columnName = tableName + "."
- + res.getString("column_name").trim();
- Type dataType = getDataType(res.getString("data_type")
- .trim());
- int typeLength = res.getInt("type_length");
- if (typeLength > 0) {
- column = new Column(columnName, dataType, typeLength);
- } else {
- column = new Column(columnName, dataType);
- }
+ return resultToColumnProto(res);
}
} catch (SQLException se) {
throw new IOException(se);
} finally {
CatalogUtil.closeSQLWrapper(res, stmt);
}
- return column;
+ return null;
}
private Type getDataType(final String typeStr) {
@@ -706,6 +604,266 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
@Override
+ public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws IOException {
+ PreparedStatement pstmt = null;
+ String sql = "INSERT INTO " + TB_PARTTIONS + " (" + C_TABLE_ID
+ + ", partition_name, ordinal_position, path, cache_nodes) "
+ + "VALUES (?, ?, ?, ?, ?, ?) ";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ try {
+ pstmt = getConnection().prepareStatement(sql);
+ for (CatalogProtos.PartitionDescProto proto : partitionsProto.getPartitionList()) {
+ pstmt.setString(1, proto.getTableId());
+ pstmt.setString(2, proto.getPartitionName());
+ pstmt.setInt(3, proto.getOrdinalPosition());
+ pstmt.setString(4, proto.getPartitionValue());
+ pstmt.setString(5, proto.getPath());
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(pstmt);
+ }
+ }
+
+ @Override
+ public void addPartitionMethod(CatalogProtos.PartitionMethodProto proto) throws IOException {
+ PreparedStatement pstmt = null;
+ String sql = "INSERT INTO " + TB_PARTITION_METHODS + " (" + C_TABLE_ID
+ + ", partition_type, expression, expression_schema) "
+ + "VALUES (?, ?, ?, ?) ";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ try {
+ pstmt = getConnection().prepareStatement(sql);
+ pstmt.setString(1, proto.getTableId().toLowerCase());
+ pstmt.setString(2, proto.getPartitionType().name());
+ pstmt.setString(3, proto.getExpression());
+ pstmt.setBytes(4, proto.getExpressionSchema().toByteArray());
+ pstmt.executeUpdate();
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(pstmt);
+ }
+ }
+
+ @Override
+ public void delPartitionMethod(String tableName) throws IOException {
+ String sql =
+ "DELETE FROM " + TB_PARTITION_METHODS
+ + " WHERE " + C_TABLE_ID + " ='" + tableName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ Statement stmt = null;
+
+ try {
+ stmt = getConnection().createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ @Override
+ public CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws IOException {
+ ResultSet res = null;
+ Statement stmt = null;
+
+ try {
+ String sql =
+ "SELECT partition_type, expression, expression_schema from " + TB_PARTITION_METHODS
+ + " WHERE " + C_TABLE_ID + "='" + tableName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt = getConnection().createStatement();
+ res = stmt.executeQuery(sql);
+
+ if (res.next()) {
+ return resultToPartitionMethodProto(tableName, res);
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean existPartitionMethod(String tableName) throws IOException {
+ ResultSet res = null;
+ Statement stmt = null;
+ boolean exist = false;
+ try {
+ String sql =
+ "SELECT partition_type, expression, expression_schema from " + TB_PARTITION_METHODS
+ + " WHERE " + C_TABLE_ID + "='" + tableName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt = getConnection().createStatement();
+ res = stmt.executeQuery(sql);
+ exist = res.next();
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+ return exist;
+ }
+
+ @Override
+ public void addPartition(CatalogProtos.PartitionDescProto proto) throws IOException {
+ PreparedStatement pstmt = null;
+ String sql = "INSERT INTO " + TB_PARTTIONS + " (" + C_TABLE_ID
+ + ", partition_method_id, partition_name, ordinal_position, path, cache_nodes) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?) ";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ try {
+ pstmt = getConnection().prepareStatement(sql);
+ pstmt.setString(1, proto.getTableId().toLowerCase());
+ pstmt.setString(2, proto.getPartitionName().toLowerCase());
+ pstmt.setInt(3, proto.getOrdinalPosition());
+ pstmt.setString(4, proto.getPartitionValue());
+ pstmt.setString(5, proto.getPath());
+ pstmt.executeUpdate();
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(pstmt);
+ }
+ }
+
+ @Override
+ public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+ CatalogProtos.PartitionDescProto proto = null;
+
+ String sql = "SELECT " + C_TABLE_ID
+ + ", partition_name, ordinal_position, partition_value, path, cache_nodes FROM "
+ + TB_PARTTIONS + "where partition_name = ?";
+
+ try {
+ stmt = getConnection().prepareStatement(sql);
+ stmt.setString(1, partitionName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ res = stmt.executeQuery();
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no index matched to " + partitionName);
+ }
+
+ proto = resultToPartitionDescProto(res);
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return proto;
+ }
+
+
+ @Override
+ public CatalogProtos.PartitionsProto getPartitions(String tableName) throws IOException {
+ ResultSet res = null;
+ PreparedStatement stmt = null;
+ CatalogProtos.PartitionsProto proto = null;
+
+ String partsSql = "SELECT " + C_TABLE_ID
+ + ", partition_name, ordinal_position, partition_value, path, cache_nodes FROM "
+ + TB_PARTTIONS + "where " + C_TABLE_ID + " = ?";
+
+ try {
+ // PARTITIONS
+ stmt = getConnection().prepareStatement(partsSql);
+ stmt.setString(1, tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ res = stmt.executeQuery();
+ CatalogProtos.PartitionsProto.Builder builder = CatalogProtos.PartitionsProto.newBuilder();
+ while(res.next()) {
+ builder.addPartition(resultToPartitionDescProto(res));
+ }
+ proto = builder.build();
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
+ return proto;
+ }
+
+
+ @Override
+ public void delPartition(String partitionName) throws IOException {
+ String sql =
+ "DELETE FROM " + TB_PARTTIONS
+ + " WHERE partition_name ='" + partitionName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ Statement stmt = null;
+
+ try {
+ stmt = getConnection().createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+ @Override
+ public void delPartitions(String tableName) throws IOException {
+ String sql =
+ "DELETE FROM " + TB_PARTTIONS
+ + " WHERE " + C_TABLE_ID + " ='" + tableName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ Statement stmt = null;
+
+ try {
+ stmt = getConnection().createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+ }
+
+
+ @Override
public void addIndex(final IndexDescProto proto) throws IOException {
String sql =
"INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, "
@@ -781,7 +939,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
if (!res.next()) {
throw new IOException("ERROR: there is no index matched to " + indexName);
}
- proto = resultToProto(res);
+ proto = resultToIndexDescProto(res);
} catch (SQLException se) {
} finally {
CatalogUtil.closeSQLWrapper(res, stmt);
@@ -814,7 +972,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
throw new IOException("ERROR: there is no index matched to "
+ tableName + "." + columnName);
}
- proto = resultToProto(res);
+ proto = resultToIndexDescProto(res);
} catch (SQLException se) {
new IOException(se);
} finally {
@@ -903,7 +1061,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
res = stmt.executeQuery();
while (res.next()) {
- protos.add(resultToProto(res));
+ protos.add(resultToIndexDescProto(res));
}
} catch (SQLException se) {
} finally {
@@ -913,11 +1071,11 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
return protos.toArray(new IndexDescProto[protos.size()]);
}
- private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+ private IndexDescProto resultToIndexDescProto(final ResultSet res) throws SQLException {
IndexDescProto.Builder builder = IndexDescProto.newBuilder();
builder.setName(res.getString("index_name"));
builder.setTableId(res.getString(C_TABLE_ID));
- builder.setColumn(resultToColumnProto(res));
+ builder.setColumn(indexResultToColumnProto(res));
builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
builder.setIsUnique(res.getBoolean("is_unique"));
builder.setIsClustered(res.getBoolean("is_clustered"));
@@ -925,10 +1083,60 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
return builder.build();
}
+ /**
+ * INDEXS table doesn't store type_length, so we need another resultToColumnProto method
+ */
+ private ColumnProto indexResultToColumnProto(final ResultSet res) throws SQLException {
+ ColumnProto.Builder builder = ColumnProto.newBuilder();
+ builder.setColumnName(res.getString("column_name").trim());
+
+ Type type = getDataType(res.getString("data_type").trim());
+ builder.setDataType(CatalogUtil.newSimpleDataType(type));
+
+ return builder.build();
+ }
+
private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
ColumnProto.Builder builder = ColumnProto.newBuilder();
- builder.setColumnName(res.getString("column_name"));
- builder.setDataType(CatalogUtil.newSimpleDataType(getDataType(res.getString("data_type").trim())));
+ builder.setColumnName(res.getString("column_name").trim());
+
+ Type type = getDataType(res.getString("data_type").trim());
+ int typeLength = res.getInt("type_length");
+ if(typeLength > 0 ) {
+ builder.setDataType(CatalogUtil.newDataTypeWithLen(type, typeLength));
+ } else {
+ builder.setDataType(CatalogUtil.newSimpleDataType(type));
+ }
+
+ return builder.build();
+ }
+
+ private CatalogProtos.KeyValueSetProto resultToKeyValueSetProto(final ResultSet res) throws SQLException {
+ CatalogProtos.KeyValueSetProto.Builder setBuilder = CatalogProtos.KeyValueSetProto.newBuilder();
+ CatalogProtos.KeyValueProto.Builder builder = CatalogProtos.KeyValueProto.newBuilder();
+ while (res.next()) {
+ builder.setKey(res.getString("key_"));
+ builder.setValue(res.getString("value_"));
+ setBuilder.addKeyval(builder.build());
+ }
+ return setBuilder.build();
+ }
+
+ private ColumnProto resultToQualifiedColumnProto(String tableName, final ResultSet res) throws SQLException {
+ ColumnProto.Builder builder = ColumnProto.newBuilder();
+
+ String columnName = tableName + "."
+ + res.getString("column_name").trim();
+ builder.setColumnName(columnName);
+
+ Type type = getDataType(res.getString("data_type").trim());
+ int typeLength = res.getInt("type_length");
+ if(typeLength > 0 ) {
+ builder.setDataType(CatalogUtil.newDataTypeWithLen(type, typeLength));
+ } else {
+ builder.setDataType(CatalogUtil.newSimpleDataType(type));
+ }
+
return builder.build();
}
@@ -943,9 +1151,52 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
}
+ private CatalogProtos.PartitionMethodProto resultToPartitionMethodProto(final String tableName, final ResultSet res)
+ throws SQLException, InvalidProtocolBufferException {
+ CatalogProtos.PartitionMethodProto.Builder partBuilder = CatalogProtos.PartitionMethodProto.newBuilder();
+ partBuilder.setTableId(tableName);
+ partBuilder.setPartitionType(CatalogProtos.PartitionType.valueOf(res.getString("partition_type")));
+ partBuilder.setExpression(res.getString("expression"));
+ partBuilder.setExpressionSchema(SchemaProto.parseFrom(res.getBytes("expression_schema")));
+ return partBuilder.build();
+ }
+
+ private CatalogProtos.PartitionDescProto resultToPartitionDescProto(ResultSet res) throws SQLException {
+ CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+ builder.setTableId(res.getString(1));
+ builder.setPartitionName(res.getString(2));
+ builder.setOrdinalPosition(res.getInt(3));
+ builder.setPartitionValue(res.getString(4));
+ builder.setPath(res.getString(5));
+ return builder.build();
+ }
+
@Override
public void close() {
CatalogUtil.closeSQLWrapper(conn);
LOG.info("Shutdown database (" + catalogUri + ")");
}
+
+ @Override
+ public final void addFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void deleteFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void existFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final List<String> getAllFunctionNames() throws IOException {
+ // TODO - not implemented yet
+ return null;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
index 44f4e7a..8a824bd 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -19,7 +19,7 @@
package org.apache.tajo.catalog.store;
import org.apache.tajo.catalog.FunctionDesc;
-import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import java.io.Closeable;
@@ -27,16 +27,48 @@ import java.io.IOException;
import java.util.List;
public interface CatalogStore extends Closeable {
- void addTable(TableDesc desc) throws IOException;
+ /*************************** TABLE ******************************/
+ void addTable(CatalogProtos.TableDescProto desc) throws IOException;
boolean existTable(String name) throws IOException;
void deleteTable(String name) throws IOException;
- TableDesc getTable(String name) throws IOException;
+ CatalogProtos.TableDescProto getTable(String name) throws IOException;
List<String> getAllTableNames() throws IOException;
-
+
+
+ /************************ PARTITION METHOD **************************/
+ void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws IOException;
+
+ CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws IOException;
+
+ boolean existPartitionMethod(String tableName) throws IOException;
+
+ void delPartitionMethod(String tableName) throws IOException;
+
+
+ /************************** PARTITIONS *****************************/
+ void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws IOException;
+
+ void addPartition(CatalogProtos.PartitionDescProto partitionDescProto) throws IOException;
+
+ /**
+ * Get all partitions of a table
+ * @param tableName the table name
+ * @return
+ * @throws IOException
+ */
+ CatalogProtos.PartitionsProto getPartitions(String tableName) throws IOException;
+
+ CatalogProtos.PartitionDescProto getPartition(String partitionName) throws IOException;
+
+ void delPartition(String partitionName) throws IOException;
+
+ void delPartitions(String tableName) throws IOException;
+
+ /**************************** INDEX *******************************/
void addIndex(IndexDescProto proto) throws IOException;
void delIndex(String indexName) throws IOException;
@@ -49,7 +81,8 @@ public interface CatalogStore extends Closeable {
boolean existIndex(String indexName) throws IOException;
boolean existIndex(String tableName, String columnName) throws IOException;
-
+
+ /************************** FUNCTION *****************************/
IndexDescProto [] getIndexes(String tableName) throws IOException;
void addFunction(FunctionDesc func) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index f21ab0e..6f1b612 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -22,34 +22,18 @@
package org.apache.tajo.catalog.store;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.InternalException;
import java.io.IOException;
import java.sql.*;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DerbyStore extends AbstractDBStore {
private static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
- protected Lock rlock;
- protected Lock wlock;
protected String getCatalogDriverName(){
return CATALOG_DRIVER;
@@ -57,7 +41,6 @@ public class DerbyStore extends AbstractDBStore {
public DerbyStore(final Configuration conf)
throws InternalException {
-
super(conf);
}
@@ -66,8 +49,7 @@ public class DerbyStore extends AbstractDBStore {
}
// TODO - DDL and index statements should be renamed
- protected void createBaseTable() throws SQLException {
- wlock.lock();
+ protected void createBaseTable() throws IOException {
Statement stmt = null;
try {
// META
@@ -220,43 +202,72 @@ public class DerbyStore extends AbstractDBStore {
LOG.debug(idx_stats_fk_table_name);
}
stmt.addBatch(idx_stats_fk_table_name);
+ stmt.executeBatch();
LOG.info("Table '" + TB_STATISTICS + "' is created.");
}
- // PARTITION
+ // PARTITION_METHODS
+ if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
+ String partition_method_ddl = "CREATE TABLE " + TB_PARTITION_METHODS + " ("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "partition_type VARCHAR(10) NOT NULL,"
+ + "expression VARCHAR(1024) NOT NULL,"
+ + "expression_schema VARCHAR(1024) FOR BIT DATA NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(partition_method_ddl);
+ }
+ stmt.addBatch(partition_method_ddl);
+
+ String idx_partition_methods_table_name = "CREATE INDEX idx_partition_methods_table_name ON "
+ + TB_PARTITION_METHODS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_partition_methods_table_name);
+ }
+ stmt.addBatch(idx_partition_methods_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
+ }
+
+ // PARTITIONS
if (!baseTableMaps.get(TB_PARTTIONS)) {
String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+ "PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
- + "name VARCHAR(255), "
- + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
- + "type VARCHAR(10) NOT NULL,"
- + "quantity INT ,"
- + "columns VARCHAR(255),"
- + "expressions VARCHAR(1024)"
- + ", CONSTRAINT PARTITION_PK PRIMARY KEY (PID)"
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "partition_name VARCHAR(255), "
+ + "ordinal_position INT NOT NULL,"
+ + "partition_value VARCHAR(1024),"
+ + "path VARCHAR(1024),"
+ + "cache_nodes VARCHAR(255), "
+ + " CONSTRAINT PARTITION_PK PRIMARY KEY (PID)"
+ " )";
-
if (LOG.isDebugEnabled()) {
LOG.debug(partition_ddl);
}
stmt.addBatch(partition_ddl);
- LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+
+ String idx_partitions_table_name = "CREATE INDEX idx_partitions_table_name ON "
+ + TB_PARTTIONS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_partitions_table_name);
+ }
+ stmt.addBatch(idx_partitions_table_name);
stmt.executeBatch();
+ LOG.info("Table '" + TB_PARTTIONS + "' is created.");
}
+ } catch (SQLException se) {
+ throw new IOException(se);
} finally {
- wlock.unlock();
CatalogUtil.closeSQLWrapper(stmt);
}
}
@Override
- protected boolean isInitialized() throws SQLException {
+ protected boolean isInitialized() throws IOException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- this.rlock = lock.readLock();
- this.wlock = lock.writeLock();
- wlock.lock();
ResultSet res = null;
int foundCount = 0;
try {
@@ -269,13 +280,15 @@ public class DerbyStore extends AbstractDBStore {
baseTableMaps.put(TB_OPTIONS, false);
baseTableMaps.put(TB_STATISTICS, false);
baseTableMaps.put(TB_INDEXES, false);
+ baseTableMaps.put(TB_PARTITION_METHODS, false);
baseTableMaps.put(TB_PARTTIONS, false);
while (res.next()) {
baseTableMaps.put(res.getString("TABLE_NAME"), true);
}
+ } catch (SQLException se){
+ throw new IOException(se);
} finally {
- wlock.unlock();
CatalogUtil.closeSQLWrapper(res);
}
@@ -288,8 +301,7 @@ public class DerbyStore extends AbstractDBStore {
return true;
}
- final boolean checkInternalTable(final String tableName) throws SQLException {
- rlock.lock();
+ final boolean checkInternalTable(final String tableName) throws IOException {
ResultSet res = null;
try {
boolean found = false;
@@ -301,786 +313,13 @@ public class DerbyStore extends AbstractDBStore {
}
return found;
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res);
- }
- }
-
- @Override
- public final void addTable(final TableDesc table) throws IOException {
- PreparedStatement pstmt = null;
- Statement stmt = null;
- ResultSet res = null;
-
- String sql =
- "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
- + "VALUES('" + table.getName() + "', "
- + "'" + table.getPath() + "', "
- + "'" + table.getMeta().getStoreType() + "'"
- + ")";
-
- wlock.lock();
- try {
- stmt = getConnection().createStatement();
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- stmt.executeUpdate(sql);
-
- sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID
- + " = '" + table.getName() + "'";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
- if (!res.next()) {
- throw new IOException("ERROR: there is no tid matched to "
- + table.getName());
- }
- int tid = res.getInt("TID");
-
- String colSql;
- int columnId = 0;
- for (Column col : table.getSchema().getColumns()) {
- colSql = columnToSQL(tid, table, columnId, col);
- if (LOG.isDebugEnabled()) {
- LOG.debug(colSql);
- }
- stmt.addBatch(colSql);
- columnId++;
- }
-
- Iterator<Entry<String,String>> it = table.getMeta().toMap().entrySet().iterator();
- String optSql;
- while (it.hasNext()) {
- optSql = keyvalToSQL(table, it.next());
- if (LOG.isDebugEnabled()) {
- LOG.debug(optSql);
- }
- stmt.addBatch(optSql);
- }
- stmt.executeBatch();
- if (table.getStats() != null) {
- sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
- + "VALUES ('" + table.getName() + "', "
- + table.getStats().getNumRows() + ","
- + table.getStats().getNumBytes() + ")";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- stmt.executeUpdate(sql);
- }
-
- //Partition
- if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
- try {
- PartitionDesc partitionDesc = table.getPartitions();
- List<Column> columnList = partitionDesc.getColumns();
-
- // Find columns which used for a partitioned table.
- StringBuffer columns = new StringBuffer();
- for(Column eachColumn : columnList) {
- sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
- + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
- if (!res.next()) {
- throw new IOException("ERROR: there is no columnId matched to "
- + table.getName());
- }
- columnId = res.getInt("column_id");
-
- if(columns.length() > 0) {
- columns.append(",");
- }
- columns.append(columnId);
- }
-
- // Set default partition name. But if user named to subpartition, it would be updated.
-// String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
-
- sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
- + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
- pstmt = getConnection().prepareStatement(sql);
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
-
- // Find information for subpartitions
- if (partitionDesc.getSpecifiers() != null) {
- int count = 1;
- if (partitionDesc.getSpecifiers().size() == 0) {
- pstmt.clearParameters();
- pstmt.setString(1, null);
- pstmt.setInt(2, tid);
- pstmt.setString(3, partitionDesc.getPartitionsType().name());
- pstmt.setInt(4, partitionDesc.getNumPartitions());
- pstmt.setString(5, columns.toString());
- pstmt.setString(6, null);
- pstmt.addBatch();
- } else {
- for(Specifier eachValue: partitionDesc.getSpecifiers()) {
- pstmt.clearParameters();
- if (eachValue.getName() != null && !eachValue.getName().equals("")) {
- pstmt.setString(1, eachValue.getName());
- } else {
- pstmt.setString(1, null);
- }
- pstmt.setInt(2, tid);
- pstmt.setString(3, partitionDesc.getPartitionsType().name());
- pstmt.setInt(4, partitionDesc.getNumPartitions());
- pstmt.setString(5, columns.toString());
- pstmt.setString(6, eachValue.getExpressions());
- pstmt.addBatch();
- count++;
- }
- }
- } else {
- pstmt.clearParameters();
- pstmt.setString(1, null);
- pstmt.setInt(2, tid);
- pstmt.setString(3, partitionDesc.getPartitionsType().name());
- pstmt.setInt(4, partitionDesc.getNumPartitions());
- pstmt.setString(5, columns.toString());
- pstmt.setString(6, null);
- pstmt.addBatch();
- }
- pstmt.executeBatch();
- } finally {
- CatalogUtil.closeSQLWrapper(pstmt);
- }
- }
-
- } catch (SQLException se) {
- throw new IOException(se.getMessage(), se);
- } finally {
- wlock.unlock();
- CatalogUtil.closeSQLWrapper(res, pstmt);
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
- }
-
- private String columnToSQL(final int tid, final TableDesc desc,
- final int columnId, final Column col) {
- String sql =
- "INSERT INTO " + TB_COLUMNS
- + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
- + "VALUES("
- + tid + ","
- + "'" + desc.getName() + "',"
- + columnId + ", "
- + "'" + col.getColumnName() + "',"
- + "'" + col.getDataType().getType().name() + "',"
- + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
- + ")";
-
- return sql;
- }
-
- private String keyvalToSQL(final TableDesc desc,
- final Entry<String,String> keyVal) {
- String sql =
- "INSERT INTO " + TB_OPTIONS
- + " (" + C_TABLE_ID + ", key_, value_) "
- + "VALUES("
- + "'" + desc.getName() + "',"
- + "'" + keyVal.getKey() + "',"
- + "'" + keyVal.getValue() + "'"
- + ")";
-
- return sql;
- }
-
- @Override
- public final boolean existTable(final String name) throws IOException {
- StringBuilder sql = new StringBuilder();
- sql.append("SELECT " + C_TABLE_ID + " from ")
- .append(TB_TABLES)
- .append(" WHERE " + C_TABLE_ID + " = '")
- .append(name)
- .append("'");
-
- Statement stmt = null;
- ResultSet res = null;
- boolean exist = false;
- rlock.lock();
- try {
- stmt = getConnection().createStatement();
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql.toString());
- }
- res = stmt.executeQuery(sql.toString());
- exist = res.next();
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
-
- return exist;
- }
-
- @Override
- public final void deleteTable(final String name) throws IOException {
- Statement stmt = null;
- String sql = null;
- try {
- wlock.lock();
- stmt = getConnection().createStatement();
- try {
- sql = "DELETE FROM " + TB_COLUMNS +
- " WHERE " + C_TABLE_ID + " = '" + name + "'";
- LOG.info(sql);
- stmt.execute(sql);
- } catch (SQLException se) {
- throw new IOException(se);
- }
-
- try {
- sql = "DELETE FROM " + TB_OPTIONS +
- " WHERE " + C_TABLE_ID + " = '" + name + "'";
- LOG.info(sql);
- stmt.execute(sql);
- } catch (SQLException se) {
- throw new IOException(se);
- }
-
- try {
- sql = "DELETE FROM " + TB_STATISTICS +
- " WHERE " + C_TABLE_ID + " = '" + name + "'";
- LOG.info(sql);
- stmt.execute(sql);
- } catch (SQLException se) {
- throw new IOException(se);
- }
-
- try {
- sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
- + " SELECT TID FROM " + TB_TABLES
- + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
- LOG.info(sql);
- stmt = getConnection().createStatement();
- stmt.execute(sql);
- } catch (SQLException se) {
- throw new IOException(se);
- }
-
- try {
- sql = "DELETE FROM " + TB_TABLES +
- " WHERE " + C_TABLE_ID +" = '" + name + "'";
- LOG.info(sql);
- stmt.execute(sql);
- } catch (SQLException se) {
- throw new IOException(se);
- }
} catch (SQLException se) {
throw new IOException(se);
} finally {
- wlock.unlock();
- CatalogUtil.closeSQLWrapper(stmt);
- }
- }
-
- @Override
- public final TableDesc getTable(final String name) throws IOException {
- ResultSet res = null;
- Statement stmt = null;
-
- String tableName = null;
- Path path = null;
- StoreType storeType = null;
- Options options;
- TableStats stat = null;
- PartitionDesc partitionDesc = null;
- int tid = 0;
-
- try {
- rlock.lock();
- stmt = getConnection().createStatement();
-
- try {
- String sql =
- "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
- + " WHERE " + C_TABLE_ID + "='" + name + "'";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
-
- res = stmt.executeQuery(sql);
- if (!res.next()) { // there is no table of the given name.
- return null;
- }
- tableName = res.getString(C_TABLE_ID).trim();
- path = new Path(res.getString("path").trim());
- storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
- tid = res.getInt("TID");
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- CatalogUtil.closeSQLWrapper(res);
- }
-
- Schema schema = null;
- try {
- String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
- + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
-
- schema = new Schema();
- while (res.next()) {
- String columnName = tableName + "."
- + res.getString("column_name").trim();
- Type dataType = getDataType(res.getString("data_type")
- .trim());
- int typeLength = res.getInt("type_length");
- if (typeLength > 0) {
- schema.addColumn(columnName, dataType, typeLength);
- } else {
- schema.addColumn(columnName, dataType);
- }
- }
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- CatalogUtil.closeSQLWrapper(res);
- }
-
- options = Options.create();
- try {
- String sql = "SELECT key_, value_ from " + TB_OPTIONS
- + " WHERE " + C_TABLE_ID + "='" + name + "'";
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
-
- while (res.next()) {
- options.put(
- res.getString("key_"),
- res.getString("value_"));
- }
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- CatalogUtil.closeSQLWrapper(res);
- }
-
- try {
- String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
- + " WHERE " + C_TABLE_ID + "='" + name + "'";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
-
- res = stmt.executeQuery(sql);
-
- if (res.next()) {
- stat = new TableStats();
- stat.setNumRows(res.getLong("num_rows"));
- stat.setNumBytes(res.getLong("num_bytes"));
- }
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- CatalogUtil.closeSQLWrapper(res);
- }
-
-
- try {
- String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
- + " WHERE TID =" + tid + "";
- stmt = getConnection().createStatement();
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
-
- while (res.next()) {
- if (partitionDesc == null) {
- partitionDesc = new PartitionDesc();
- String[] columns = res.getString("columns").split(",");
- for(String eachColumn: columns) {
- partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
- }
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
- ("type")));
- partitionDesc.setNumPartitions(res.getInt("quantity"));
- }
-
- Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
- partitionDesc.addSpecifier(specifier);
- }
-
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
-
- TableMeta meta = new TableMeta(storeType, options);
- TableDesc table = new TableDesc(tableName, schema, meta, path);
- if (stat != null) {
- table.setStats(stat);
- }
-
- if (partitionDesc != null) {
- table.setPartitions(partitionDesc);
- }
-
- return table;
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(stmt);
- }
- }
-
- private Column getColumn(String tableName, int tid, String columnId) throws IOException {
- ResultSet res = null;
- Column column = null;
- Statement stmt = null;
-
- try {
- String sql = "SELECT column_name, data_type, type_length from "
- + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
-
- stmt = getConnection().createStatement();
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
-
- if (res.next()) {
- String columnName = tableName + "."
- + res.getString("column_name").trim();
- Type dataType = getDataType(res.getString("data_type")
- .trim());
- int typeLength = res.getInt("type_length");
- if (typeLength > 0) {
- column = new Column(columnName, dataType, typeLength);
- } else {
- column = new Column(columnName, dataType);
- }
- }
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
- return column;
- }
-
- private Type getDataType(final String typeStr) {
- try {
- return Enum.valueOf(Type.class, typeStr);
- } catch (IllegalArgumentException iae) {
- LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
- return null;
- }
- }
-
- @Override
- public final List<String> getAllTableNames() throws IOException {
- String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
-
- Statement stmt = null;
- ResultSet res = null;
-
- List<String> tables = new ArrayList<String>();
- rlock.lock();
- try {
- stmt = getConnection().createStatement();
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery(sql);
- while (res.next()) {
- tables.add(res.getString(C_TABLE_ID).trim());
- }
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
- return tables;
- }
-
- public final void addIndex(final IndexDescProto proto) throws IOException {
- String sql =
- "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, "
- +"data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
- +"(?,?,?,?,?,?,?,?)";
-
- PreparedStatement stmt = null;
-
- wlock.lock();
- try {
- stmt = getConnection().prepareStatement(sql);
- stmt.setString(1, proto.getName());
- stmt.setString(2, proto.getTableId());
- stmt.setString(3, proto.getColumn().getColumnName());
- stmt.setString(4, proto.getColumn().getDataType().getType().name());
- stmt.setString(5, proto.getIndexMethod().toString());
- stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
- stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
- stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
- stmt.executeUpdate();
- if (LOG.isDebugEnabled()) {
- LOG.debug(stmt.toString());
- }
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- wlock.unlock();
- CatalogUtil.closeSQLWrapper(stmt);
- }
- }
-
- public final void delIndex(final String indexName) throws IOException {
- String sql =
- "DELETE FROM " + TB_INDEXES
- + " WHERE index_name='" + indexName + "'";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
-
- Statement stmt = null;
- wlock.lock();
- try {
- stmt = getConnection().createStatement();
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- stmt.executeUpdate(sql);
- } catch (SQLException se) {
- throw new IOException(se);
- } finally {
- wlock.unlock();
- CatalogUtil.closeSQLWrapper(stmt);
- }
- }
-
- public final IndexDescProto getIndex(final String indexName)
- throws IOException {
- ResultSet res = null;
- PreparedStatement stmt = null;
-
- IndexDescProto proto = null;
-
- rlock.lock();
- try {
- String sql =
- "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
- + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
- + "where index_name = ?";
- stmt = getConnection().prepareStatement(sql);
- stmt.setString(1, indexName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(stmt.toString());
- }
- res = stmt.executeQuery();
- if (!res.next()) {
- throw new IOException("ERROR: there is no index matched to " + indexName);
- }
- proto = resultToProto(res);
- } catch (SQLException se) {
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
-
- return proto;
- }
-
- public final IndexDescProto getIndex(final String tableName,
- final String columnName) throws IOException {
- ResultSet res = null;
- PreparedStatement stmt = null;
-
- IndexDescProto proto = null;
-
- rlock.lock();
- try {
- String sql =
- "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
- + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
- + "where " + C_TABLE_ID + " = ? AND column_name = ?";
- stmt = getConnection().prepareStatement(sql);
- stmt.setString(1, tableName);
- stmt.setString(2, columnName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery();
- if (!res.next()) {
- throw new IOException("ERROR: there is no index matched to "
- + tableName + "." + columnName);
- }
- proto = resultToProto(res);
- } catch (SQLException se) {
- new IOException(se);
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
-
- return proto;
- }
-
- public final boolean existIndex(final String indexName) throws IOException {
- String sql = "SELECT index_name from " + TB_INDEXES
- + " WHERE index_name = ?";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
-
- PreparedStatement stmt = null;
- ResultSet res = null;
- boolean exist = false;
- rlock.lock();
- try {
- stmt = getConnection().prepareStatement(sql);
- stmt.setString(1, indexName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery();
- exist = res.next();
- } catch (SQLException se) {
-
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
-
- return exist;
- }
-
- @Override
- public boolean existIndex(String tableName, String columnName)
- throws IOException {
- String sql = "SELECT index_name from " + TB_INDEXES
- + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
-
- PreparedStatement stmt = null;
- ResultSet res = null;
- boolean exist = false;
- rlock.lock();
- try {
- stmt = getConnection().prepareStatement(sql);
- stmt.setString(1, tableName);
- stmt.setString(2, columnName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery();
- exist = res.next();
- } catch (SQLException se) {
-
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
-
- return exist;
- }
-
- public final IndexDescProto [] getIndexes(final String tableName)
- throws IOException {
- ResultSet res = null;
- PreparedStatement stmt = null;
-
- List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
-
- rlock.lock();
- try {
- String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
- + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
- + "where " + C_TABLE_ID + "= ?";
- stmt = getConnection().prepareStatement(sql);
- stmt.setString(1, tableName);
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
- res = stmt.executeQuery();
- while (res.next()) {
- protos.add(resultToProto(res));
- }
- } catch (SQLException se) {
- } finally {
- rlock.unlock();
- CatalogUtil.closeSQLWrapper(res, stmt);
- }
-
- return protos.toArray(new IndexDescProto [protos.size()]);
- }
-
- private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
- IndexDescProto.Builder builder = IndexDescProto.newBuilder();
- builder.setName(res.getString("index_name"));
- builder.setTableId(res.getString(C_TABLE_ID));
- builder.setColumn(resultToColumnProto(res));
- builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
- builder.setIsUnique(res.getBoolean("is_unique"));
- builder.setIsClustered(res.getBoolean("is_clustered"));
- builder.setIsAscending(res.getBoolean("is_ascending"));
- return builder.build();
- }
-
- private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
- ColumnProto.Builder builder = ColumnProto.newBuilder();
- builder.setColumnName(res.getString("column_name"));
- builder.setDataType(CatalogUtil.newSimpleDataType(
- getDataType(res.getString("data_type").trim())));
- return builder.build();
- }
-
- private IndexMethod getIndexMethod(final String typeStr) {
- if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
- return IndexMethod.TWO_LEVEL_BIN_TREE;
- } else {
- LOG.error("Cannot find a matched type aginst from '"
- + typeStr + "'");
- // TODO - needs exception handling
- return null;
+ CatalogUtil.closeSQLWrapper(res);
}
}
- @Override
- public final void addFunction(final FunctionDesc func) throws IOException {
- // TODO - not implemented yet
- }
-
- @Override
- public final void deleteFunction(final FunctionDesc func) throws IOException {
- // TODO - not implemented yet
- }
-
- @Override
- public final void existFunction(final FunctionDesc func) throws IOException {
- // TODO - not implemented yet
- }
-
- @Override
- public final List<String> getAllFunctionNames() throws IOException {
- // TODO - not implemented yet
- return null;
- }
@Override
public final void close() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index 77b258e..984ba6a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -23,8 +23,9 @@ package org.apache.tajo.catalog.store;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.FunctionDesc;
-import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import java.io.IOException;
@@ -33,8 +34,8 @@ import java.util.List;
import java.util.Map;
public class MemStore implements CatalogStore {
- private final Map<String,TableDesc> tables = Maps.newHashMap();
- private final Map<String, FunctionDesc> functions = Maps.newHashMap();
+ private final Map<String,CatalogProtos.TableDescProto> tables = Maps.newHashMap();
+ private final Map<String, CatalogProtos.FunctionDescProto> functions = Maps.newHashMap();
private final Map<String, IndexDescProto> indexes = Maps.newHashMap();
private final Map<String, IndexDescProto> indexesByColumn = Maps.newHashMap();
@@ -55,9 +56,10 @@ public class MemStore implements CatalogStore {
* @see CatalogStore#addTable(TableDesc)
*/
@Override
- public void addTable(TableDesc desc) throws IOException {
+ public void addTable(CatalogProtos.TableDescProto desc) throws IOException {
synchronized(tables) {
- tables.put(desc.getName(), desc);
+ String tableId = desc.getId().toLowerCase();
+ tables.put(tableId, desc);
}
}
@@ -67,7 +69,8 @@ public class MemStore implements CatalogStore {
@Override
public boolean existTable(String name) throws IOException {
synchronized(tables) {
- return tables.containsKey(name);
+ String tableId = name.toLowerCase();
+ return tables.containsKey(tableId);
}
}
@@ -77,7 +80,8 @@ public class MemStore implements CatalogStore {
@Override
public void deleteTable(String name) throws IOException {
synchronized(tables) {
- tables.remove(name);
+ String tableId = name.toLowerCase();
+ tables.remove(tableId);
}
}
@@ -85,8 +89,16 @@ public class MemStore implements CatalogStore {
* @see CatalogStore#getTable(java.lang.String)
*/
@Override
- public TableDesc getTable(String name) throws IOException {
- return tables.get(name);
+ public CatalogProtos.TableDescProto getTable(String name) throws IOException {
+ String tableId = name.toLowerCase();
+ CatalogProtos.TableDescProto unqualified = tables.get(tableId);
+ if(unqualified == null)
+ return null;
+ CatalogProtos.TableDescProto.Builder builder = CatalogProtos.TableDescProto.newBuilder();
+ CatalogProtos.SchemaProto schemaProto = CatalogUtil.getQualfiedSchema(tableId, unqualified.getSchema());
+ builder.mergeFrom(unqualified);
+ builder.setSchema(schemaProto);
+ return builder.build();
}
/* (non-Javadoc)
@@ -97,6 +109,60 @@ public class MemStore implements CatalogStore {
return new ArrayList<String>(tables.keySet());
}
+ @Override
+ public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws IOException {
+ throw new IOException("not supported!");
+ }
+
+ @Override
+ public CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws IOException {
+ String tableId = tableName.toLowerCase();
+ CatalogProtos.TableDescProto table = tables.get(tableId);
+ return (table != null && table.hasPartition()) ? table.getPartition() : null;
+ }
+
+ @Override
+ public boolean existPartitionMethod(String tableName) throws IOException {
+ String tableId = tableName.toLowerCase();
+ CatalogProtos.TableDescProto table = tables.get(tableId);
+ return (table != null && table.hasPartition());
+ }
+
+ @Override
+ public void delPartitionMethod(String tableName) throws IOException {
+ throw new IOException("not supported!");
+ }
+
+ @Override
+ public void addPartitions(CatalogProtos.PartitionsProto partitionDescList) throws IOException {
+ throw new IOException("not supported!");
+ }
+
+ @Override
+ public void addPartition(CatalogProtos.PartitionDescProto partitionDesc) throws IOException {
+ throw new IOException("not supported!");
+ }
+
+ @Override
+ public CatalogProtos.PartitionsProto getPartitions(String tableName) throws IOException {
+ throw new IOException("not supported!");
+ }
+
+ @Override
+ public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws IOException {
+ throw new IOException("not supported!");
+ }
+
+ @Override
+ public void delPartition(String partitionName) throws IOException {
+ throw new IOException("not supported!");
+ }
+
+ @Override
+ public void delPartitions(String tableName) throws IOException {
+ throw new IOException("not supported!");
+ }
+
/* (non-Javadoc)
* @see CatalogStore#addIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
*/
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
index c368969..e9c5a03 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -23,12 +23,10 @@ package org.apache.tajo.catalog.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.exception.InternalException;
import java.io.IOException;
import java.sql.*;
-import java.util.List;
import java.util.Map;
public class MySQLStore extends AbstractDBStore {
@@ -50,13 +48,14 @@ public class MySQLStore extends AbstractDBStore {
}
// TODO - DDL and index statements should be renamed
- protected void createBaseTable() throws SQLException {
-
+ @Override
+ protected void createBaseTable() throws IOException {
int result;
Statement stmt = null;
- try {
- stmt = getConnection().createStatement();
+ Connection conn = getConnection();
+ try {
+ stmt = conn.createStatement();
// META
if (!baseTableMaps.get(TB_META)) {
String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
@@ -88,12 +87,10 @@ public class MySQLStore extends AbstractDBStore {
if (!baseTableMaps.get(TB_COLUMNS)) {
String columns_ddl =
"CREATE TABLE " + TB_COLUMNS + " ("
- + "TID INT NOT NULL,"
+ C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ "column_id INT NOT NULL,"
+ "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+ "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
- + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
+ "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
if (LOG.isDebugEnabled()) {
LOG.debug(columns_ddl);
@@ -152,38 +149,61 @@ public class MySQLStore extends AbstractDBStore {
result = stmt.executeUpdate(stats_ddl);
}
- // PARTITION
+ // PARTITION_METHODS
+ if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
+ String partition_method_ddl = "CREATE TABLE " + TB_PARTITION_METHODS + " ("
+ + C_TABLE_ID + " VARCHAR(255) PRIMARY KEY,"
+ + "partition_type VARCHAR(10) NOT NULL,"
+ + "expression TEXT NOT NULL,"
+ + "expression_schema VARBINARY(1024) NOT NULL, "
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(partition_method_ddl);
+ }
+ LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
+ result = stmt.executeUpdate(partition_method_ddl);
+ }
+
+ // PARTITIONS
if (!baseTableMaps.get(TB_PARTTIONS)) {
String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+ "PID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
- + "name VARCHAR(255), "
- + "TID INT NOT NULL,"
- + "type VARCHAR(10) NOT NULL,"
- + "quantity INT ,"
- + "columns VARCHAR(255),"
- + "expressions TEXT )";
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "partition_name VARCHAR(255), "
+ + "ordinal_position INT NOT NULL,"
+ + "partition_value TEXT,"
+ + "path TEXT,"
+ + "cache_nodes VARCHAR(255), "
+ + "UNIQUE KEY(" + C_TABLE_ID + ", partition_name),"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
if (LOG.isDebugEnabled()) {
LOG.debug(partition_ddl);
}
LOG.info("Table '" + TB_PARTTIONS + "' is created.");
result = stmt.executeUpdate(partition_ddl);
}
+ } catch (SQLException se) {
+ throw new IOException(se);
} finally {
CatalogUtil.closeSQLWrapper(stmt);
}
}
- protected boolean isInitialized() throws SQLException {
- ResultSet res = getConnection().getMetaData().getTables(null, null, null,
- new String[]{"TABLE"});
-
+ @Override
+ protected boolean isInitialized() throws IOException {
+ ResultSet res = null;
+ Connection conn = getConnection();
try {
+ res = conn.getMetaData().getTables(null, null, null,
+ new String[]{"TABLE"});
+
baseTableMaps.put(TB_META, false);
baseTableMaps.put(TB_TABLES, false);
baseTableMaps.put(TB_COLUMNS, false);
baseTableMaps.put(TB_OPTIONS, false);
baseTableMaps.put(TB_STATISTICS, false);
baseTableMaps.put(TB_INDEXES, false);
+ baseTableMaps.put(TB_PARTITION_METHODS, false);
baseTableMaps.put(TB_PARTTIONS, false);
if (res.wasNull())
@@ -192,6 +212,8 @@ public class MySQLStore extends AbstractDBStore {
while (res.next()) {
baseTableMaps.put(res.getString("TABLE_NAME"), true);
}
+ } catch(SQLException se) {
+ throw new IOException(se);
} finally {
CatalogUtil.closeSQLWrapper(res);
}
@@ -206,24 +228,4 @@ public class MySQLStore extends AbstractDBStore {
// return false;
}
- @Override
- public final void addFunction(final FunctionDesc func) throws IOException {
- // TODO - not implemented yet
- }
-
- @Override
- public final void deleteFunction(final FunctionDesc func) throws IOException {
- // TODO - not implemented yet
- }
-
- @Override
- public final void existFunction(final FunctionDesc func) throws IOException {
- // TODO - not implemented yet
- }
-
- @Override
- public final List<String> getAllFunctionNames() throws IOException {
- // TODO - not implemented yet
- return null;
- }
}