You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2013/12/03 10:29:41 UTC
[1/2] TAJO-284: Add table partitioning entry to Catalog. (jaehwa)
Updated Branches:
refs/heads/master 29a0aa01c -> 0b0de13b2
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index b824756..d174e72 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -20,6 +20,9 @@ package org.apache.tajo.catalog;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.function.Function;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -70,19 +73,19 @@ public class TestCatalog {
schema1.addColumn(FieldName2, Type.INT4);
schema1.addColumn(FieldName3, Type.INT8);
Path path = new Path(CommonTestingUtil.getTestDir(), "table1");
- TableDesc meta = CatalogUtil.newTableDesc(
+ TableDesc meta = CatalogUtil.newTableDesc(
"getTable",
schema1,
StoreType.CSV,
new Options(),
path);
-
- assertFalse(catalog.existsTable("getTable"));
- catalog.addTable(meta);
- assertTrue(catalog.existsTable("getTable"));
-
- catalog.deleteTable("getTable");
+
assertFalse(catalog.existsTable("getTable"));
+ catalog.addTable(meta);
+ assertTrue(catalog.existsTable("getTable"));
+
+ catalog.deleteTable("getTable");
+ assertFalse(catalog.existsTable("getTable"));
}
@Test(expected = Throwable.class)
@@ -197,4 +200,212 @@ public class TestCatalog {
catalog.createFunction(overload);
assertTrue(catalog.containFunction("test3", CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.BLOB)));
}
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByHash1() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitions.setNumPartitions(2);
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+
+ assertFalse(catalog.existsTable(tableName));
+ catalog.addTable(desc);
+ assertTrue(catalog.existsTable(tableName));
+ TableDesc retrieved = catalog.getTableDesc(tableName);
+
+ assertEquals(retrieved.getName(), tableName);
+ assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
+ assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
+
+ catalog.deleteTable(tableName);
+ assertFalse(catalog.existsTable(tableName));
+ }
+
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByHash2() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitions.setNumPartitions(2);
+
+ partitions.addSpecifier(new Specifier("sub_part1"));
+ partitions.addSpecifier(new Specifier("sub_part2"));
+ partitions.addSpecifier(new Specifier("sub_part3"));
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(catalog.existsTable(tableName));
+ catalog.addTable(desc);
+ assertTrue(catalog.existsTable(tableName));
+
+ TableDesc retrieved = catalog.getTableDesc(tableName);
+
+ assertEquals(retrieved.getName(), tableName);
+ assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
+ assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
+ "sub_part1");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
+ "sub_part2");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
+ "sub_part3");
+
+ catalog.deleteTable(tableName);
+ assertFalse(catalog.existsTable(tableName));
+ }
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByList() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+
+ partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+ partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(catalog.existsTable(tableName));
+ catalog.addTable(desc);
+ assertTrue(catalog.existsTable(tableName));
+
+ TableDesc retrieved = catalog.getTableDesc(tableName);
+
+ assertEquals(retrieved.getName(), tableName);
+ assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.LIST);
+ assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
+ "sub_part1");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
+ "Seoul,서울");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
+ "sub_part2");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
+ "Busan,부산");
+
+ catalog.deleteTable(tableName);
+ assertFalse(catalog.existsTable(tableName));
+ }
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByRange() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+
+ partitions.addSpecifier(new Specifier("sub_part1", "2"));
+ partitions.addSpecifier(new Specifier("sub_part2", "5"));
+ partitions.addSpecifier(new Specifier("sub_part3"));
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(catalog.existsTable(tableName));
+ catalog.addTable(desc);
+ assertTrue(catalog.existsTable(tableName));
+
+ TableDesc retrieved = catalog.getTableDesc(tableName);
+
+ assertEquals(retrieved.getName(), tableName);
+ assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.RANGE);
+ assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
+ "sub_part1");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
+ "2");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
+ "sub_part2");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
+ "5");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
+ "sub_part3");
+ assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getExpressions(),
+ "");
+
+ catalog.deleteTable(tableName);
+ assertFalse(catalog.existsTable(tableName));
+ }
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByColumn() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(catalog.existsTable(tableName));
+ catalog.addTable(desc);
+ assertTrue(catalog.existsTable(tableName));
+
+ TableDesc retrieved = catalog.getTableDesc(tableName);
+
+ assertEquals(retrieved.getName(), tableName);
+ assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
+ assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+
+ catalog.deleteTable(tableName);
+ assertFalse(catalog.existsTable(tableName));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index 260b4c7..d3671b3 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,6 +22,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.catalog.store.AbstractDBStore;
@@ -219,4 +222,175 @@ public class TestDBStore {
s2.getColumn(i).getColumnName());
}
}
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByHash1() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitions.setNumPartitions(2);
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(store.existTable(tableName));
+ store.addTable(desc);
+ assertTrue(store.existTable(tableName));
+
+ TableDesc retrieved = store.getTable(tableName);
+
+ // Schema order check
+ assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+ store.deleteTable(tableName);
+ assertFalse(store.existTable(tableName));
+ }
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByHash2() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitions.setNumPartitions(2);
+
+ partitions.addSpecifier(new Specifier("sub_part1"));
+ partitions.addSpecifier(new Specifier("sub_part2"));
+ partitions.addSpecifier(new Specifier("sub_part3"));
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(store.existTable(tableName));
+ store.addTable(desc);
+ assertTrue(store.existTable(tableName));
+
+ TableDesc retrieved = store.getTable(tableName);
+
+ // Schema order check
+ assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+ store.deleteTable(tableName);
+ assertFalse(store.existTable(tableName));
+ }
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByList() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+
+ partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+ partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(store.existTable(tableName));
+ store.addTable(desc);
+ assertTrue(store.existTable(tableName));
+
+ TableDesc retrieved = store.getTable(tableName);
+
+ // Schema order check
+ assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+ store.deleteTable(tableName);
+ assertFalse(store.existTable(tableName));
+ }
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByRange() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+
+ partitions.addSpecifier(new Specifier("sub_part1", "2"));
+ partitions.addSpecifier(new Specifier("sub_part2", "5"));
+ partitions.addSpecifier(new Specifier("sub_part3"));
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(store.existTable(tableName));
+ store.addTable(desc);
+ assertTrue(store.existTable(tableName));
+
+ TableDesc retrieved = store.getTable(tableName);
+
+ // Schema order check
+ assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+ store.deleteTable(tableName);
+ assertFalse(store.existTable(tableName));
+ }
+
+ @Test
+ public final void testAddAndDeleteTablePartitionByColumn() throws Exception {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4)
+ .addColumn("name", Type.TEXT)
+ .addColumn("age", Type.INT4)
+ .addColumn("score", Type.FLOAT8);
+
+ String tableName = "addedtable";
+ Options opts = new Options();
+ opts.put("file.delimiter", ",");
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+ Partitions partitions = new Partitions();
+ partitions.addColumn(new Column("id", Type.INT4));
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+ desc.setPartitions(partitions);
+ assertFalse(store.existTable(tableName));
+ store.addTable(desc);
+ assertTrue(store.existTable(tableName));
+
+ TableDesc retrieved = store.getTable(tableName);
+
+ // Schema order check
+ assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+ store.deleteTable(tableName);
+ assertFalse(store.existTable(tableName));
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index c94260f..58eabfd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -27,8 +27,10 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.algebra.CreateTable;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.client.TajoClient;
@@ -463,6 +465,40 @@ public class TajoCli {
}
sb.append("\n");
}
+
+ sb.append("\n");
+ sb.append("Partitions: \n");
+ if (desc.getPartitions() != null) {
+ sb.append("type:").append(desc.getPartitions().getPartitionsType().name()).append("\n");
+ if (desc.getPartitions().getNumPartitions() > 0)
+ sb.append("numbers:").append(desc.getPartitions().getNumPartitions()).append("\n");
+
+ sb.append("columns:").append("\n");
+ for(Column eachColumn: desc.getPartitions().getColumns()) {
+ sb.append(" ");
+ sb.append(eachColumn.getColumnName()).append("\t").append(eachColumn.getDataType().getType());
+ if (eachColumn.getDataType().hasLength()) {
+ sb.append("(").append(eachColumn.getDataType().getLength()).append(")");
+ }
+ sb.append("\n");
+ }
+
+ if (desc.getPartitions().getSpecifiers() != null) {
+ sb.append("specifier:").append("\n");
+ for(Specifier specifier :desc.getPartitions().getSpecifiers()) {
+ sb.append(" ");
+ sb.append("name:").append(specifier.getName());
+ if (!specifier.getExpressions().equals("")) {
+ sb.append(", expressions:").append(specifier.getExpressions());
+ } else {
+ if (desc.getPartitions().getPartitionsType().name().equals(CreateTable.PartitionType.RANGE))
+ sb.append(" expressions: MAXVALUE");
+ }
+ sb.append("\n");
+ }
+ }
+ }
+
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 4a305ae..5bba89b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -222,6 +222,18 @@ public class LogicalPlan {
return ensureUniqueColumn(candidates);
}
+ // Trying to find columns from schema in current block.
+ if (block.getSchema() != null) {
+ Column found = block.getSchema().getColumnByName(columnRef.getName());
+ if (found != null) {
+ candidates.add(found);
+ }
+ }
+
+ if (!candidates.isEmpty()) {
+ return ensureUniqueColumn(candidates);
+ }
+
throw new VerifyException("ERROR: no such a column name "+ columnRef.getCanonicalName());
}
}
@@ -724,7 +736,6 @@ public class LogicalPlan {
// add target to list if a target can be evaluated at this node
List<Integer> newEvaluatedTargetIds = new ArrayList<Integer>();
for (int i = 0; i < getTargetListNum(); i++) {
-
if (getTarget(i) != null && !isTargetResolved(i)) {
EvalNode expr = getTarget(i).getEvalTree();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 79f02d1..b5f8d2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.*;
import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
@@ -46,6 +48,7 @@ import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.util.TUtil;
import java.util.List;
import java.util.Stack;
@@ -732,7 +735,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
public LogicalNode visitCreateTable(PlanContext context, Stack<OpType> stack, CreateTable expr)
throws PlanningException {
- String tableName = expr.getTableName();
+ String tableName = expr.getTableName();
if (expr.hasSubQuery()) {
stack.add(OpType.CreateTable);
@@ -789,10 +792,125 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
createTableNode.setPath(new Path(expr.getLocation()));
}
+ if (expr.hasPartition()) {
+ createTableNode.setPartitions(convertTableElementsPartition(context, expr));
+ }
return createTableNode;
}
}
+ /**
+ * convert table elements into Partition.
+ *
+ * @param context
+ * @param expr
+ * @return
+ * @throws PlanningException
+ */
+ private Partitions convertTableElementsPartition(PlanContext context,
+ CreateTable expr) throws PlanningException {
+ Schema schema = convertTableElementsSchema(expr.getTableElements());
+ Partitions partitions = null;
+ List<Specifier> specifiers = null;
+ if (expr.hasPartition()) {
+ partitions = new Partitions();
+ specifiers = TUtil.newList();
+
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
+ .getPartitionType().name()));
+
+ if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.HASH)) {
+ CreateTable.HashPartition hashPartition = expr.getPartition();
+
+ partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ , hashPartition.getColumns()));
+
+ if (hashPartition.getColumns() != null) {
+ if (hashPartition.getQuantifier() != null) {
+ String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
+ partitions.setNumPartitions(Integer.parseInt(quantity));
+ }
+
+ if (hashPartition.getSpecifiers() != null) {
+ for(CreateTable.PartitionSpecifier eachSpec: hashPartition.getSpecifiers()) {
+ specifiers.add(new Specifier(eachSpec.getName()));
+ }
+ }
+
+ if (specifiers.isEmpty() && partitions.getNumPartitions() > 0) {
+ for (int i = 0; i < partitions.getNumPartitions(); i++) {
+ String partitionName = partitions.getPartitionsType().name() + "_" + expr
+ .getTableName() + "_" + i;
+ specifiers.add(new Specifier(partitionName));
+ }
+ }
+
+ if (!specifiers.isEmpty())
+ partitions.setSpecifiers(specifiers);
+ }
+ } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.LIST)) {
+ CreateTable.ListPartition listPartition = expr.getPartition();
+
+ partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ , listPartition.getColumns()));
+
+ if (listPartition.getSpecifiers() != null) {
+ StringBuffer sb = new StringBuffer();
+
+ for(CreateTable.ListPartitionSpecifier eachSpec: listPartition.getSpecifiers()) {
+ Specifier specifier = new Specifier(eachSpec.getName());
+ sb.delete(0, sb.length());
+ for(Expr eachExpr : eachSpec.getValueList().getValues()) {
+ context.block.setSchema(schema);
+ EvalNode eval = createEvalTree(context.plan, context.block, eachExpr);
+ if(sb.length() > 1)
+ sb.append(",");
+
+ sb.append(eval.toString());
+ }
+ specifier.setExpressions(sb.toString());
+ specifiers.add(specifier);
+ }
+ if (!specifiers.isEmpty())
+ partitions.setSpecifiers(specifiers);
+ }
+ } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.RANGE)) {
+ CreateTable.RangePartition rangePartition = expr.getPartition();
+
+ partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ , rangePartition.getColumns()));
+
+ if (rangePartition.getSpecifiers() != null) {
+ for(CreateTable.RangePartitionSpecifier eachSpec: rangePartition.getSpecifiers()) {
+ Specifier specifier = new Specifier();
+
+ if (eachSpec.getName() != null)
+ specifier.setName(eachSpec.getName());
+
+ if (eachSpec.getEnd() != null) {
+ context.block.setSchema(schema);
+ EvalNode eval = createEvalTree(context.plan, context.block, eachSpec.getEnd());
+ specifier.setExpressions(eval.toString());
+ }
+
+ if(eachSpec.isEndMaxValue()) {
+ specifier.setExpressions(null);
+ }
+ specifiers.add(specifier);
+ }
+ if (!specifiers.isEmpty())
+ partitions.setSpecifiers(specifiers);
+ }
+ } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.COLUMN)) {
+ CreateTable.ColumnPartition columnPartition = expr.getPartition();
+
+ partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ , columnPartition.getColumns()));
+ }
+ }
+
+ return partitions;
+ }
/**
@@ -811,6 +929,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return schema;
}
+ private List<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
+ ColumnReferenceExpr[] references) {
+ List<Column> columnList = TUtil.newList();
+
+ for(CreateTable.ColumnDefinition columnDefinition: elements) {
+ for(ColumnReferenceExpr eachReference: references) {
+ if (columnDefinition.getColumnName().equalsIgnoreCase(eachReference.getName())) {
+ columnList.add(convertColumn(columnDefinition));
+ }
+ }
+ }
+
+ return columnList;
+ }
+
private DataType convertDataType(org.apache.tajo.algebra.DataType dataType) {
TajoDataTypes.Type type = TajoDataTypes.Type.valueOf(dataType.getTypeName());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 50656c5..942309d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.partition.Partitions;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
@@ -35,6 +36,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
@Expose private Path path;
@Expose private Options options;
@Expose private boolean external;
+ @Expose private Partitions partitions;
public CreateTableNode(int pid, String tableName, Schema schema) {
super(pid, NodeType.CREATE_TABLE);
@@ -90,6 +92,17 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
this.external = external;
}
+ public Partitions getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(Partitions partitions) {
+ this.partitions = partitions;
+ }
+
+ public boolean hasPartition() {
+ return this.partitions != null;
+ }
@Override
public PlanString getPlanString() {
@@ -107,7 +120,8 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
&& this.external == other.external
&& TUtil.checkEquals(path, other.path)
&& TUtil.checkEquals(options, other.options)
- && TUtil.checkEquals(partitionKeys, other.partitionKeys);
+ && TUtil.checkEquals(partitionKeys, other.partitionKeys)
+ && TUtil.checkEquals(partitions, other.partitions);
} else {
return false;
}
@@ -123,6 +137,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
store.path = path != null ? new Path(path.toString()) : null;
store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
store.options = (Options) (options != null ? options.clone() : null);
+ store.partitions = (Partitions) (partitions != null ? partitions.clone() : null);
return store;
}
@@ -142,6 +157,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
sb.append(",\"storeType\": \"" + this.storageType);
sb.append(",\"path\" : \"" + this.path).append("\",");
sb.append(",\"external\" : \"" + this.external).append("\",");
+ sb.append(",\"partitions\" : \"" + this.partitions).append("\",");
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
.append("\n \"in schema\": ").append(getInSchema())
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 94447c0..b2bd937 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.partition.Partitions;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
@@ -38,12 +39,19 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
@Expose private Options options;
@Expose private boolean isCreatedTable = false;
@Expose private boolean isOverwritten = false;
+ @Expose private Partitions partitions;
public StoreTableNode(int pid, String tableName) {
super(pid, NodeType.STORE);
this.tableName = tableName;
}
+ public StoreTableNode(int pid, String tableName, Partitions partitions) {
+ super(pid, NodeType.STORE);
+ this.tableName = tableName;
+ this.partitions = partitions;
+ }
+
public final String getTableName() {
return this.tableName;
}
@@ -101,6 +109,13 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
return this.options;
}
+ public Partitions getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(Partitions partitions) {
+ this.partitions = partitions;
+ }
@Override
public PlanString getPlanString() {
@@ -131,6 +146,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
eq = eq && TUtil.checkEquals(options, other.options);
eq = eq && isCreatedTable == other.isCreatedTable;
eq = eq && isOverwritten == other.isOverwritten;
+ eq = eq && TUtil.checkEquals(partitions, other.partitions);
return eq;
} else {
return false;
@@ -147,6 +163,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
store.options = options != null ? (Options) options.clone() : null;
store.isCreatedTable = isCreatedTable;
store.isOverwritten = isOverwritten;
+ store.partitions = partitions;
return store;
}
@@ -169,8 +186,13 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
}
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
- .append("\n \"in schema\": ").append(getInSchema())
- .append("}");
+ .append("\n \"in schema\": ").append(getInSchema());
+
+ if(partitions != null) {
+ sb.append(partitions.toString());
+ }
+
+ sb.append("}");
return sb.toString() + "\n"
+ getChild().toString();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 505fd71..4f18b11 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -32,6 +32,7 @@ import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
import org.apache.tajo.catalog.exception.NoSuchTableException;
+import org.apache.tajo.catalog.partition.Partitions;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
@@ -131,7 +132,7 @@ public class GlobalEngine extends AbstractService {
}
Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
-
+
LogicalPlan plan = createLogicalPlan(planningContext);
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
@@ -253,10 +254,11 @@ public class GlobalEngine extends AbstractService {
}
return createTableOnDirectory(createTable.getTableName(), createTable.getSchema(), meta,
- createTable.getPath(), true);
+ createTable.getPath(), true, createTable.getPartitions());
}
- public TableDesc createTableOnDirectory(String tableName, Schema schema, TableMeta meta, Path path, boolean isCreated)
+ public TableDesc createTableOnDirectory(String tableName, Schema schema, TableMeta meta,
+ Path path, boolean isCreated, Partitions partitions)
throws IOException {
if (catalog.existsTable(tableName)) {
throw new AlreadyExistsTableException(tableName);
@@ -284,6 +286,7 @@ public class GlobalEngine extends AbstractService {
stats.setNumBytes(totalSize);
TableDesc desc = CatalogUtil.newTableDesc(tableName, schema, meta, path);
desc.setStats(stats);
+ desc.setPartitions(partitions);
catalog.addTable(desc);
LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index d1faf4f..03cb4d0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.NoSuchTableException;
+import org.apache.tajo.catalog.partition.Partitions;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ipc.ClientProtos;
@@ -307,11 +308,12 @@ public class TajoMasterClientService extends AbstractService {
Schema schema = new Schema(request.getSchema());
TableMeta meta = new TableMeta(request.getMeta());
+ Partitions partitions = new Partitions(request.getPartitions());
TableDesc desc;
try {
- desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), schema, meta, path,
- false);
+ desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), schema,
+ meta, path, false, partitions);
} catch (Exception e) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index 89c40c8..dc9c905 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -124,6 +124,7 @@ message CreateTableRequest {
required SchemaProto schema = 2;
required TableProto meta = 3;
required string path = 4;
+ optional PartitionsProto partitions = 5;
}
message AttachTableRequest {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index bd62f40..8b06ebf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -191,4 +191,126 @@ public class TestTajoClient {
assertEquals(tableName1, desc.getName());
assertTrue(desc.getStats().getNumBytes() > 0);
}
+
+ @Test
+ public final void testCreateAndDropTablePartitionedHash1ByExecuteQuery() throws IOException,
+ ServiceException, SQLException {
+ TajoConf conf = cluster.getConfiguration();
+ final String tableName = "testCreateAndDropTablePartitionedHash1";
+
+ assertFalse(client.existTable(tableName));
+
+ String sql = "create table " + tableName + " (deptname text, score int4)";
+ sql += " PARTITION BY HASH (deptname)";
+ sql += " (PARTITION sub_part1, PARTITION sub_part2, PARTITION sub_part3)";
+
+ client.updateQuery(sql);
+ assertTrue(client.existTable(tableName));
+
+ Path tablePath = client.getTableDesc(tableName).getPath();
+ FileSystem hdfs = tablePath.getFileSystem(conf);
+ assertTrue(hdfs.exists(tablePath));
+
+ client.updateQuery("drop table " + tableName);
+ assertFalse(client.existTable(tableName));
+ assertFalse(hdfs.exists(tablePath));
+ }
+
+ @Test
+ public final void testCreateAndDropTablePartitionedHash2ByExecuteQuery() throws IOException,
+ ServiceException, SQLException {
+ TajoConf conf = cluster.getConfiguration();
+ final String tableName = "testCreateAndDropTablePartitionedHash2";
+
+ assertFalse(client.existTable(tableName));
+
+ String sql = "create table " + tableName + " (deptname text, score int4)";
+ sql += "PARTITION BY HASH (deptname)";
+ sql += "PARTITIONS 2";
+
+ client.updateQuery(sql);
+ assertTrue(client.existTable(tableName));
+
+ Path tablePath = client.getTableDesc(tableName).getPath();
+ FileSystem hdfs = tablePath.getFileSystem(conf);
+ assertTrue(hdfs.exists(tablePath));
+
+ client.updateQuery("drop table " + tableName);
+ assertFalse(client.existTable(tableName));
+ assertFalse(hdfs.exists(tablePath));
+ }
+
+ @Test
+ public final void testCreateAndDropTablePartitionedListByExecuteQuery() throws IOException,
+ ServiceException, SQLException {
+ TajoConf conf = cluster.getConfiguration();
+ final String tableName = "testCreateAndDropTablePartitionedList";
+
+ assertFalse(client.existTable(tableName));
+
+ String sql = "create table " + tableName + " (deptname text, score int4)";
+ sql += "PARTITION BY LIST (deptname)";
+ sql += "( PARTITION sub_part1 VALUES('r&d', 'design'),";
+ sql += "PARTITION sub_part2 VALUES('sales', 'hr') )";
+
+ client.updateQuery(sql);
+ assertTrue(client.existTable(tableName));
+
+ Path tablePath = client.getTableDesc(tableName).getPath();
+ FileSystem hdfs = tablePath.getFileSystem(conf);
+ assertTrue(hdfs.exists(tablePath));
+
+ client.updateQuery("drop table " + tableName);
+ assertFalse(client.existTable(tableName));
+ assertFalse(hdfs.exists(tablePath));
+ }
+
+ @Test
+ public final void testCreateAndDropTablePartitionedRangeByExecuteQuery() throws IOException,
+ ServiceException, SQLException {
+ TajoConf conf = cluster.getConfiguration();
+ final String tableName = "testCreateAndDropTablePartitionedRange";
+
+ assertFalse(client.existTable(tableName));
+
+ String sql = "create table " + tableName + " (deptname text, score int4)";
+ sql += "PARTITION BY RANGE (score)";
+ sql += "( PARTITION sub_part1 VALUES LESS THAN (2),";
+ sql += "PARTITION sub_part2 VALUES LESS THAN (5),";
+ sql += "PARTITION sub_part2 VALUES LESS THAN (MAXVALUE) )";
+
+ client.updateQuery(sql);
+ assertTrue(client.existTable(tableName));
+
+ Path tablePath = client.getTableDesc(tableName).getPath();
+ FileSystem hdfs = tablePath.getFileSystem(conf);
+ assertTrue(hdfs.exists(tablePath));
+
+ client.updateQuery("drop table " + tableName);
+ assertFalse(client.existTable(tableName));
+ assertFalse(hdfs.exists(tablePath));
+ }
+ @Test
+ public final void testCreateAndDropTablePartitionedColumnByExecuteQuery() throws IOException,
+ ServiceException, SQLException {
+ TajoConf conf = cluster.getConfiguration();
+ final String tableName = "testCreateAndDropTablePartitionedColumn";
+
+ assertFalse(client.existTable(tableName));
+
+ String sql = "create table " + tableName + " (deptname text, score int4)";
+ sql += "PARTITION BY COLUMN (deptname)";
+
+ client.updateQuery(sql);
+ assertTrue(client.existTable(tableName));
+
+ Path tablePath = client.getTableDesc(tableName).getPath();
+ FileSystem hdfs = tablePath.getFileSystem(conf);
+ assertTrue(hdfs.exists(tablePath));
+
+ client.updateQuery("drop table " + tableName);
+ assertFalse(client.existTable(tableName));
+ assertFalse(hdfs.exists(tablePath));
+ }
+
}
[2/2] git commit: TAJO-284: Add table partitioning entry to Catalog.
(jaehwa)
Posted by bl...@apache.org.
TAJO-284: Add table partitioning entry to Catalog. (jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/0b0de13b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/0b0de13b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/0b0de13b
Branch: refs/heads/master
Commit: 0b0de13b2444f9e75ad3e1f42cba51ddb1f86dc2
Parents: 29a0aa0
Author: blrunner <jh...@gruter.com>
Authored: Tue Dec 3 18:29:22 2013 +0900
Committer: blrunner <jh...@gruter.com>
Committed: Tue Dec 3 18:29:22 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tajo/algebra/CreateTable.java | 11 +-
.../apache/tajo/catalog/CatalogConstants.java | 1 +
.../org/apache/tajo/catalog/CatalogUtil.java | 3 +
.../org/apache/tajo/catalog/DDLBuilder.java | 70 +++
.../java/org/apache/tajo/catalog/Schema.java | 2 +-
.../java/org/apache/tajo/catalog/TableDesc.java | 33 +-
.../tajo/catalog/partition/Partitions.java | 349 +++++++++++++
.../tajo/catalog/partition/Specifier.java | 128 +++++
.../src/main/proto/CatalogProtos.proto | 20 +
tajo-catalog/tajo-catalog-server/pom.xml | 4 +
.../org/apache/tajo/catalog/CatalogServer.java | 5 +-
.../tajo/catalog/store/AbstractDBStore.java | 184 ++++++-
.../apache/tajo/catalog/store/DerbyStore.java | 488 +++++++++++++------
.../apache/tajo/catalog/store/MySQLStore.java | 204 +++++---
.../org/apache/tajo/catalog/TestCatalog.java | 225 ++++++++-
.../org/apache/tajo/catalog/TestDBStore.java | 174 +++++++
.../main/java/org/apache/tajo/cli/TajoCli.java | 36 ++
.../apache/tajo/engine/planner/LogicalPlan.java | 13 +-
.../tajo/engine/planner/LogicalPlanner.java | 135 ++++-
.../engine/planner/logical/CreateTableNode.java | 18 +-
.../engine/planner/logical/StoreTableNode.java | 26 +-
.../org/apache/tajo/master/GlobalEngine.java | 9 +-
.../tajo/master/TajoMasterClientService.java | 6 +-
.../src/main/proto/ClientProtos.proto | 1 +
.../org/apache/tajo/client/TestTajoClient.java | 122 +++++
26 files changed, 2022 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ab0e893..8c53d9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-284: Add table partitioning entry to Catalog. (jaehwa)
+
TAJO-317: Improve TajoResourceManager to support more elaborate resource
management. (Keuntae Park via jihoon)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
index 6e36f3a..41276ad 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
@@ -279,7 +279,6 @@ public class CreateTable extends Expr {
}
public static class RangePartitionSpecifier extends PartitionSpecifier {
- String name;
Expr end;
boolean maxValue;
@@ -293,10 +292,6 @@ public class CreateTable extends Expr {
maxValue = true;
}
- public String getName() {
- return name;
- }
-
public Expr getEnd() {
return end;
}
@@ -320,10 +315,14 @@ public class CreateTable extends Expr {
}
public static class PartitionSpecifier {
- String name;
+ private String name;
public PartitionSpecifier(String name) {
this.name = name;
}
+
+ public String getName() {
+ return this.name;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 4b1f794..ed23b08 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -35,6 +35,7 @@ public class CatalogConstants {
public static final String TB_OPTIONS = "OPTIONS";
public static final String TB_INDEXES = "INDEXES";
public static final String TB_STATISTICS = "STATS";
+ public static final String TB_PARTTIONS = "PARTITIONS";
public static final String C_TABLE_ID = "TABLE_ID";
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 6b4848c..dc91035 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -27,6 +27,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.sql.PreparedStatement;
import java.sql.Wrapper;
import java.util.Collection;
@@ -148,6 +149,8 @@ public class CatalogUtil {
try{
if(w instanceof Statement){
((Statement)w).close();
+ } else if(w instanceof PreparedStatement){
+ ((PreparedStatement)w).close();
} else if(w instanceof ResultSet){
((ResultSet)w).close();
} else if(w instanceof Connection){
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
index e6cc46d..a9d0f03 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
@@ -18,6 +18,9 @@
package org.apache.tajo.catalog;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
import java.util.Map;
@@ -38,6 +41,10 @@ public class DDLBuilder {
buildWithClause(sb, desc.getMeta());
buildLocationClause(sb, desc);
+ if (desc.getPartitions() != null) {
+ buildPartitionClause(sb, desc);
+ }
+
sb.append(";");
return sb.toString();
}
@@ -87,4 +94,67 @@ public class DDLBuilder {
private static void buildLocationClause(StringBuilder sb, TableDesc desc) {
sb.append(" LOCATION '").append(desc.getPath()).append("'");
}
+
+ private static void buildPartitionClause(StringBuilder sb, TableDesc desc) {
+ Partitions partitions = desc.getPartitions();
+
+ sb.append(" PARTITION BY ");
+ sb.append(partitions.getPartitionsType().name());
+
+ // columns
+ sb.append("(");
+ int columnCount = 0;
+ for(Column column: partitions.getColumns()) {
+ for(Column targetColumn: desc.getSchema().getColumns()) {
+ if (column.getColumnName().equals(targetColumn.getColumnName())) {
+ if (columnCount > 0)
+ sb.append(",");
+
+ sb.append(column.getColumnName());
+ columnCount++;
+ }
+ }
+ }
+ sb.append(")");
+
+ // specifier
+ if (partitions.getSpecifiers() != null
+ && !partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
+
+ sb.append(" (");
+ for(int i = 0; i < partitions.getSpecifiers().size(); i++) {
+ Specifier specifier = partitions.getSpecifiers().get(i);
+ if (i > 0)
+ sb.append(",");
+
+ sb.append(" PARTITION");
+
+ if (!specifier.getName().isEmpty())
+ sb.append(" ").append(specifier.getName());
+
+ if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
+ if (!specifier.getExpressions().isEmpty()) {
+ sb.append(" VALUES (");
+ String[] expressions = specifier.getExpressions().split("\\,");
+ for(int j = 0; j < expressions.length; j++) {
+ if (j > 0)
+ sb.append(",");
+ sb.append("'").append(expressions[j]).append("'");
+ }
+ sb.append(")");
+
+ }
+ } else if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE)) {
+ sb.append(" VALUES LESS THAN (");
+ if (!specifier.getExpressions().isEmpty()) {
+ sb.append(specifier.getExpressions());
+ } else {
+ sb.append("MAXVALUE");
+ }
+ sb.append(")");
+ }
+ }
+ sb.append(")");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 7c0de81..8a2d028 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -212,7 +212,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
}
}
- @Override
+ @Override
public boolean equals(Object o) {
if (o instanceof Schema) {
Schema other = (Schema) o;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index f59feef..458a99a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -21,8 +21,11 @@ package org.apache.tajo.catalog;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.partition.Partitions;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -30,6 +33,8 @@ import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.json.GsonObject;
public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Cloneable {
+ private final Log LOG = LogFactory.getLog(TableDesc.class);
+
protected TableDescProto.Builder builder = null;
@Expose protected String tableName; // required
@@ -37,6 +42,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
@Expose protected TableMeta meta; // required
@Expose protected Path uri; // required
@Expose protected TableStats stats; // optional
+ @Expose protected Partitions partitions; //optional
public TableDesc() {
builder = TableDescProto.newBuilder();
@@ -48,7 +54,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
this.tableName = tableName.toLowerCase();
this.schema = schema;
this.meta = info;
- this.uri = path;
+ this.uri = path;
}
public TableDesc(String tableName, Schema schema, StoreType type, Options options, Path path) {
@@ -58,6 +64,9 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
public TableDesc(TableDescProto proto) {
this(proto.getId(), new Schema(proto.getSchema()), new TableMeta(proto.getMeta()), new Path(proto.getPath()));
this.stats = new TableStats(proto.getStats());
+ if (proto.getPartitions() != null && !proto.getPartitions().toString().isEmpty()) {
+ this.partitions = new Partitions(proto.getPartitions());
+ }
}
public void setName(String tableId) {
@@ -104,8 +113,20 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
public TableStats getStats() {
return this.stats;
}
-
- public boolean equals(Object object) {
+
+ public boolean hasPartitions() {
+ return this.partitions != null;
+ }
+
+ public Partitions getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(Partitions partitions) {
+ this.partitions = partitions;
+ }
+
+ public boolean equals(Object object) {
if(object instanceof TableDesc) {
TableDesc other = (TableDesc) object;
@@ -123,6 +144,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
desc.meta = (TableMeta) meta.clone();
desc.uri = uri;
desc.stats = stats != null ? (TableStats) stats.clone() : null;
+ desc.partitions = partitions != null ? (Partitions) partitions.clone() : null;
return desc;
}
@@ -154,7 +176,10 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
builder.setPath(this.uri.toString());
}
if (this.stats != null) {
- builder.setStats(stats.getProto());
+ builder.setStats(this.stats.getProto());
+ }
+ if (this.partitions != null) {
+ builder.setPartitions(this.partitions.getProto());
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
new file mode 100644
index 0000000..c82f0cb
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.exception.AlreadyExistsFieldException;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+public class Partitions implements ProtoObject<CatalogProtos.PartitionsProto>, Cloneable, GsonObject {
+
+ private static final Log LOG = LogFactory.getLog(Partitions.class);
+
+ @Expose protected CatalogProtos.PartitionsType partitionsType; //required
+ @Expose protected List<Column> columns; //required
+ @Expose protected int numPartitions; //optional
+ @Expose protected List<Specifier> specifiers; //optional
+ @Expose protected Map<String, Integer> columnsByQialifiedName = null;
+ @Expose protected Map<String, List<Integer>> columnsByName = null;
+
+ private CatalogProtos.PartitionsProto.Builder builder = CatalogProtos.PartitionsProto.newBuilder();
+
+ public Partitions() {
+ this.columns = new ArrayList<Column>();
+ this.columnsByQialifiedName = new TreeMap<String, Integer>();
+ this.columnsByName = new HashMap<String, List<Integer>>();
+ }
+
+ public Partitions(Partitions partition) {
+ this();
+ this.partitionsType = partition.partitionsType;
+ this.columns.addAll(partition.columns);
+ this.columnsByQialifiedName.putAll(partition.columnsByQialifiedName);
+ this.columnsByName.putAll(partition.columnsByName);
+ this.numPartitions = partition.numPartitions;
+ this.specifiers = partition.specifiers;
+ }
+
+ public Partitions(CatalogProtos.PartitionsType partitionsType, Column[] columns, int numPartitions,
+ List<Specifier> specifiers) {
+ this();
+ this.partitionsType = partitionsType;
+ for (Column c : columns) {
+ addColumn(c);
+ }
+ this.numPartitions = numPartitions;
+ this.specifiers = specifiers;
+ }
+
+ public Partitions(CatalogProtos.PartitionsProto proto) {
+ this.partitionsType = proto.getPartitionsType();
+ this.columns = new ArrayList<Column>();
+ this.columnsByQialifiedName = new HashMap<String, Integer>();
+ this.columnsByName = new HashMap<String, List<Integer>>();
+ for (CatalogProtos.ColumnProto colProto : proto.getColumnsList()) {
+ Column tobeAdded = new Column(colProto);
+ columns.add(tobeAdded);
+ if (tobeAdded.hasQualifier()) {
+ columnsByQialifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(),
+ columns.size() - 1);
+ } else {
+ columnsByQialifiedName.put(tobeAdded.getColumnName(), columns.size() - 1);
+ }
+ if (columnsByName.containsKey(tobeAdded.getColumnName())) {
+ columnsByName.get(tobeAdded.getColumnName()).add(columns.size() - 1);
+ } else {
+ columnsByName.put(tobeAdded.getColumnName(), TUtil.newList(columns.size() - 1));
+ }
+ }
+ this.numPartitions = proto.getNumPartitions();
+ if(proto.getSpecifiersList() != null) {
+ this.specifiers = TUtil.newList();
+ for(CatalogProtos.SpecifierProto specifier: proto.getSpecifiersList()) {
+ this.specifiers.add(new Specifier(specifier));
+ }
+ }
+ }
+
+ /**
+ * Set a qualifier to this schema.
+ * This changes the qualifier of all columns except for not-qualified columns.
+ *
+ * @param qualifier The qualifier
+ */
+ public void setQualifier(String qualifier) {
+ setQualifier(qualifier, false);
+ }
+
+ /**
+ * Set a qualifier to this schema. This changes the qualifier of all columns if force is true.
+ * Otherwise, it changes the qualifier of all columns except for non-qualified columns
+ *
+ * @param qualifier The qualifier
+ * @param force If true, all columns' qualifiers will be changed. Otherwise,
+ * only qualified columns' qualifiers will
+ * be changed.
+ */
+ public void setQualifier(String qualifier, boolean force) {
+ columnsByQialifiedName.clear();
+
+ for (int i = 0; i < getColumnNum(); i++) {
+ if (!force && columns.get(i).hasQualifier()) {
+ continue;
+ }
+ columns.get(i).setQualifier(qualifier);
+ columnsByQialifiedName.put(columns.get(i).getQualifiedName(), i);
+ }
+ }
+
+ public int getColumnNum() {
+ return this.columns.size();
+ }
+
+ public Column getColumn(int id) {
+ return columns.get(id);
+ }
+
+ public Column getColumnByFQN(String qualifiedName) {
+ Integer cid = columnsByQialifiedName.get(qualifiedName.toLowerCase());
+ return cid != null ? columns.get(cid) : null;
+ }
+
+ public Column getColumnByName(String colName) {
+ String normalized = colName.toLowerCase();
+ List<Integer> list = columnsByName.get(normalized);
+
+ if (list == null || list.size() == 0) {
+ return null;
+ }
+
+ if (list.size() == 1) {
+ return columns.get(list.get(0));
+ } else {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Integer id : list) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(columns.get(id));
+ }
+ throw new RuntimeException("Ambiguous Column Name: " + sb.toString());
+ }
+ }
+
+ public int getColumnId(String qualifiedName) {
+ return columnsByQialifiedName.get(qualifiedName.toLowerCase());
+ }
+
+ public int getColumnIdByName(String colName) {
+ for (Column col : columns) {
+ if (col.getColumnName().equals(colName.toLowerCase())) {
+ return columnsByQialifiedName.get(col.getQualifiedName());
+ }
+ }
+ return -1;
+ }
+
+ public List<Column> getColumns() {
+ return ImmutableList.copyOf(columns);
+ }
+
+ public void setColumns(List<Column> columns) {
+ this.columns = columns;
+ }
+
+ public boolean contains(String colName) {
+ return columnsByQialifiedName.containsKey(colName.toLowerCase());
+
+ }
+
+ public boolean containsAll(Collection<Column> columns) {
+ return columns.containsAll(columns);
+ }
+
+ public synchronized Partitions addColumn(String name, TajoDataTypes.Type type) {
+ if (type == TajoDataTypes.Type.CHAR) {
+ return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));
+ }
+ return addColumn(name, CatalogUtil.newSimpleDataType(type));
+ }
+
+ public synchronized Partitions addColumn(String name, TajoDataTypes.Type type, int length) {
+ return addColumn(name, CatalogUtil.newDataTypeWithLen(type, length));
+ }
+
+ public synchronized Partitions addColumn(String name, TajoDataTypes.DataType dataType) {
+ String normalized = name.toLowerCase();
+ if (columnsByQialifiedName.containsKey(normalized)) {
+ LOG.error("Already exists column " + normalized);
+ throw new AlreadyExistsFieldException(normalized);
+ }
+
+ Column newCol = new Column(normalized, dataType);
+ columns.add(newCol);
+ columnsByQialifiedName.put(newCol.getQualifiedName(), columns.size() - 1);
+ columnsByName.put(newCol.getColumnName(), TUtil.newList(columns.size() - 1));
+
+ return this;
+ }
+
+ public synchronized void addColumn(Column column) {
+ addColumn(column.getQualifiedName(), column.getDataType());
+ }
+
+ public synchronized void addColumns(Partitions schema) {
+ for (Column column : schema.getColumns()) {
+ addColumn(column);
+ }
+ }
+
+ public synchronized void addSpecifier(Specifier specifier) {
+ if(specifiers == null)
+ specifiers = TUtil.newList();
+
+ specifiers.add(specifier);
+ }
+
+ public CatalogProtos.PartitionsType getPartitionsType() {
+ return partitionsType;
+ }
+
+ public void setPartitionsType(CatalogProtos.PartitionsType partitionsType) {
+ this.partitionsType = partitionsType;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ public List<Specifier> getSpecifiers() {
+ return specifiers;
+ }
+
+ public void setSpecifiers(List<Specifier> specifiers) {
+ this.specifiers = specifiers;
+ }
+
+ public Map<String, Integer> getColumnsByQialifiedName() {
+ return columnsByQialifiedName;
+ }
+
+ public void setColumnsByQialifiedName(Map<String, Integer> columnsByQialifiedName) {
+ this.columnsByQialifiedName = columnsByQialifiedName;
+ }
+
+ public Map<String, List<Integer>> getColumnsByName() {
+ return columnsByName;
+ }
+
+ public void setColumnsByName(Map<String, List<Integer>> columnsByName) {
+ this.columnsByName = columnsByName;
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof Partitions) {
+ Partitions other = (Partitions) o;
+ return getProto().equals(other.getProto());
+ }
+ return false;
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ Partitions clone = (Partitions) super.clone();
+ clone.builder = CatalogProtos.PartitionsProto.newBuilder();
+ clone.setPartitionsType(this.partitionsType);
+ clone.setColumns(this.columns);
+ clone.setNumPartitions(this.numPartitions);
+ clone.specifiers = new ArrayList<Specifier>(this.specifiers);
+
+ return clone;
+ }
+
+ @Override
+ public CatalogProtos.PartitionsProto getProto() {
+ if (builder == null) {
+ builder = CatalogProtos.PartitionsProto.newBuilder();
+ }
+ if (this.partitionsType != null) {
+ builder.setPartitionsType(this.partitionsType);
+ }
+ builder.clearColumns();
+ if (this.columns != null) {
+ for (Column col : columns) {
+ builder.addColumns(col.getProto());
+ }
+ }
+ builder.setNumPartitions(numPartitions);
+
+ if (this.specifiers != null) {
+ for(Specifier specifier: specifiers) {
+ builder.addSpecifiers(specifier.getProto());
+ }
+ }
+ return builder.build();
+ }
+
+ public String toString() {
+ Gson gson = new GsonBuilder().setPrettyPrinting().
+ excludeFieldsWithoutExposeAnnotation().create();
+ return gson.toJson(this);
+ }
+
+ @Override
+ public String toJson() {
+ return CatalogGsonHelper.toJson(this, Partitions.class);
+
+ }
+
+ public Column[] toArray() {
+ return this.columns.toArray(new Column[this.columns.size()]);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
new file mode 100644
index 0000000..feb8a33
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.base.Objects;
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+public class Specifier implements ProtoObject<CatalogProtos.SpecifierProto>, Cloneable,
+ GsonObject {
+
+ private static final Log LOG = LogFactory.getLog(Specifier.class);
+ protected CatalogProtos.SpecifierProto.Builder builder = null;
+
+
+ @Expose protected String name;
+ @Expose protected String expressions;
+
+ public Specifier() {
+ builder = CatalogProtos.SpecifierProto.newBuilder();
+ }
+
+ public Specifier(String name) {
+ this();
+ this.name = name;
+ }
+
+ public Specifier(String name, String expressions) {
+ this();
+ this.name = name;
+ this.expressions = expressions;
+ }
+
+ public Specifier(CatalogProtos.SpecifierProto proto) {
+ this();
+ this.name = proto.getName().toLowerCase();
+ this.expressions = proto.getExpressions();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getExpressions() {
+ return expressions;
+ }
+
+ public void setExpressions(String expressions) {
+ this.expressions = expressions;
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof Specifier) {
+ Specifier other = (Specifier)o;
+ boolean eq = TUtil.checkEquals(this.name, other.name);
+ eq = eq && TUtil.checkEquals(this.expressions, other.expressions);
+ return eq;
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return Objects.hashCode(this.name, this.expressions);
+
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ Specifier clone = (Specifier) super.clone();
+ clone.builder = CatalogProtos.SpecifierProto.newBuilder();
+ clone.name = this.name;
+ clone.expressions = this.expressions;
+ return clone;
+ }
+
+ public String toString() {
+ Gson gson = CatalogGsonHelper.getPrettyInstance();
+ return gson.toJson(this);
+ }
+
+ @Override
+ public CatalogProtos.SpecifierProto getProto() {
+ if(builder == null) {
+ builder = CatalogProtos.SpecifierProto.newBuilder();
+ }
+
+ if(this.name != null) {
+ builder.setName(this.name);
+ }
+
+ if(this.expressions != null) {
+ builder.setExpressions(this.expressions);
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public String toJson() {
+ return CatalogGsonHelper.toJson(this, Specifier.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index acfa4fb..e5af491 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -50,6 +50,13 @@ enum CompressType {
LZ = 6;
}
+enum PartitionsType {
+ RANGE = 0;
+ HASH = 1;
+ LIST = 2;
+ COLUMN = 3;
+}
+
message ColumnMetaProto {
required DataType dataType = 1;
required bool compressed = 2;
@@ -106,6 +113,7 @@ message TableDescProto {
required TableProto meta = 3;
required SchemaProto schema = 4;
optional TableStatsProto stats = 5;
+ optional PartitionsProto partitions = 6;
}
enum FunctionType {
@@ -225,3 +233,15 @@ message SortSpecProto {
optional bool ascending = 2 [default = true];
optional bool nullFirst = 3 [default = false];
}
+
+message PartitionsProto {
+ required PartitionsType partitionsType = 1;
+ repeated ColumnProto columns = 2;
+ optional int32 numPartitions = 3;
+ repeated SpecifierProto specifiers = 4;
+}
+
+message SpecifierProto {
+ optional string name = 1;
+ optional string expressions = 2;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml
index ca8a3ef..e1105eb 100644
--- a/tajo-catalog/tajo-catalog-server/pom.xml
+++ b/tajo-catalog/tajo-catalog-server/pom.xml
@@ -127,6 +127,10 @@
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-rpc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-algebra</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index dc279bc..e6566af 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -253,9 +253,12 @@ public class CatalogServer extends AbstractService {
descBuilder.setMeta(tableDesc.getMeta());
descBuilder.setSchema(tableDesc.getSchema());
+ if( tableDesc.getPartitions() != null
+ && !tableDesc.getPartitions().toString().isEmpty()) {
+ descBuilder.setPartitions(tableDesc.getPartitions());
+ }
store.addTable(new TableDesc(descBuilder.build()));
-
} catch (IOException ioe) {
LOG.error(ioe.getMessage(), ioe);
return BOOL_FALSE;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 6a76794..3414e83 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -26,6 +26,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -37,6 +40,8 @@ import org.apache.tajo.exception.InternalException;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
@@ -47,6 +52,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
protected String connectionPassword;
protected String catalogUri;
private Connection conn;
+ protected Map<String, Boolean> baseTableMaps = new HashMap<String, Boolean>();
protected static final int VERSION = 1;
@@ -202,6 +208,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
@Override
public void addTable(final TableDesc table) throws IOException {
Statement stmt = null;
+ PreparedStatement pstmt = null;
+
ResultSet res;
String sql =
@@ -245,7 +253,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
- PreparedStatement pstmt = getConnection().prepareStatement(optSql);
+ pstmt = getConnection().prepareStatement(optSql);
try {
for (Entry<String, String> entry : table.getMeta().toMap().entrySet()) {
pstmt.setString(1, table.getName());
@@ -272,9 +280,94 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
stmt.addBatch(sql);
stmt.executeBatch();
}
+
+ //Partition
+ if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
+ try {
+ Partitions partitions = table.getPartitions();
+ List<Column> columnList = partitions.getColumns();
+
+ // Find columns which used for a partitioned table.
+ StringBuffer columns = new StringBuffer();
+ for(Column eachColumn : columnList) {
+ sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
+ + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no columnId matched to "
+ + table.getName());
+ }
+ columnId = res.getInt("column_id");
+
+ if (columns.length() > 0) {
+ columns.append(",");
+ }
+ columns.append(columnId);
+ }
+
+ // Set default partition name. But if user named to subpartition, it would be updated.
+// String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
+
+ sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
+ + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
+ pstmt = getConnection().prepareStatement(sql);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ // Find information for subpartitions
+ if (partitions.getSpecifiers() != null) {
+ int count = 1;
+ if (partitions.getSpecifiers().size() == 0) {
+ pstmt.clearParameters();
+ pstmt.setString(1, null);
+ pstmt.setInt(2, tid);
+ pstmt.setString(3, partitions.getPartitionsType().name());
+ pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(5, columns.toString());
+ pstmt.setString(6, null);
+ pstmt.addBatch();
+ } else {
+ for(Specifier specifier: partitions.getSpecifiers()) {
+ pstmt.clearParameters();
+ if (specifier.getName() != null && !specifier.getName().equals("")) {
+ pstmt.setString(1, specifier.getName());
+ } else {
+ pstmt.setString(1, null);
+ }
+ pstmt.setInt(2, tid);
+ pstmt.setString(3, partitions.getPartitionsType().name());
+ pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(5, columns.toString());
+ pstmt.setString(6, specifier.getExpressions());
+ pstmt.addBatch();
+ count++;
+ }
+ }
+ } else {
+ pstmt.clearParameters();
+ pstmt.setString(1, null);
+ pstmt.setInt(2, tid);
+ pstmt.setString(3, partitions.getPartitionsType().name());
+ pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(5, columns.toString());
+ pstmt.setString(6, null);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ } finally {
+ CatalogUtil.closeSQLWrapper(pstmt);
+ }
+ }
+
} catch (SQLException se) {
throw new IOException(se.getMessage(), se);
} finally {
+ CatalogUtil.closeSQLWrapper(pstmt);
CatalogUtil.closeSQLWrapper(stmt);
}
}
@@ -365,6 +458,20 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
CatalogUtil.closeSQLWrapper(stmt);
}
+
+ try {
+ sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
+ + " SELECT TID FROM " + TB_TABLES
+ + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+ LOG.info(sql);
+ stmt = getConnection().createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(stmt);
+ }
+
try {
sql = "DELETE FROM " + TB_TABLES +
" WHERE " + C_TABLE_ID + " = '" + name + "'";
@@ -376,6 +483,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
} finally {
CatalogUtil.closeSQLWrapper(stmt);
}
+
}
@Override
@@ -388,10 +496,12 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
StoreType storeType = null;
Options options;
TableStats stat = null;
+ Partitions partitions = null;
+ int tid = 0;
try {
String sql =
- "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+ "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
+ " WHERE " + C_TABLE_ID + "='" + name + "'";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
@@ -404,6 +514,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
tableName = res.getString(C_TABLE_ID).trim();
path = new Path(res.getString("path").trim());
storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+ tid = res.getInt("TID");
} catch (SQLException se) {
throw new IOException(se);
} finally {
@@ -481,15 +592,84 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
CatalogUtil.closeSQLWrapper(res, stmt);
}
+ try {
+ String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
+ + " WHERE TID =" + tid + "";
+ stmt = getConnection().createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ while (res.next()) {
+ if (partitions == null) {
+ partitions = new Partitions();
+ String[] columns = res.getString("columns").split(",");
+ for(String eachColumn: columns) {
+ partitions.addColumn(getColumn(tableName, tid, eachColumn));
+ }
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
+ partitions.setNumPartitions(res.getInt("quantity"));
+ }
+
+ Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
+ partitions.addSpecifier(specifier);
+ }
+
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
TableMeta meta = new TableMeta(storeType, options);
TableDesc table = new TableDesc(tableName, schema, meta, path);
if (stat != null) {
table.setStats(stat);
}
+ if (partitions != null) {
+ table.setPartitions(partitions);
+ }
+
return table;
}
+ private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+ ResultSet res = null;
+ Column column = null;
+ Statement stmt = null;
+
+ try {
+ String sql = "SELECT column_name, data_type, type_length from "
+ + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
+
+ stmt = getConnection().createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ if (res.next()) {
+ String columnName = tableName + "."
+ + res.getString("column_name").trim();
+ Type dataType = getDataType(res.getString("data_type")
+ .trim());
+ int typeLength = res.getInt("type_length");
+ if (typeLength > 0) {
+ column = new Column(columnName, dataType, typeLength);
+ } else {
+ column = new Column(columnName, dataType);
+ }
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+ return column;
+ }
+
private Type getDataType(final String typeStr) {
try {
return Enum.valueOf(Type.class, typeStr);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index 6fee78b..06a701b 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -24,6 +24,9 @@ package org.apache.tajo.catalog.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -38,6 +41,7 @@ import java.sql.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -68,143 +72,177 @@ public class DerbyStore extends AbstractDBStore {
try {
// META
stmt = getConnection().createStatement();
- String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(meta_ddl);
+
+ if (!baseTableMaps.get(TB_META)) {
+ String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(meta_ddl);
+ }
+ stmt.executeUpdate(meta_ddl);
+ LOG.info("Table '" + TB_META + " is created.");
}
- stmt.executeUpdate(meta_ddl);
- LOG.info("Table '" + TB_META + " is created.");
// TABLES
- String tables_ddl = "CREATE TABLE "
- + TB_TABLES + " ("
- + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
- + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
- + "path VARCHAR(1024), "
- + "store_type CHAR(16), "
- + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
- ")";
- if (LOG.isDebugEnabled()) {
- LOG.debug(tables_ddl);
- }
- stmt.addBatch(tables_ddl);
- String idx_tables_tid =
- "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_tables_tid);
- }
- stmt.addBatch(idx_tables_tid);
-
- String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on "
- + TB_TABLES + "(" + C_TABLE_ID + ")";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_tables_name);
+ if (!baseTableMaps.get(TB_TABLES)) {
+ String tables_ddl = "CREATE TABLE "
+ + TB_TABLES + " ("
+ + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
+ + "path VARCHAR(1024), "
+ + "store_type CHAR(16), "
+ + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
+ ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(tables_ddl);
+ }
+ stmt.addBatch(tables_ddl);
+
+ String idx_tables_tid =
+ "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_tables_tid);
+ }
+ stmt.addBatch(idx_tables_tid);
+
+ String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on "
+ + TB_TABLES + "(" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_tables_name);
+ }
+ stmt.addBatch(idx_tables_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_TABLES + "' is created.");
}
- stmt.addBatch(idx_tables_name);
- stmt.executeBatch();
- LOG.info("Table '" + TB_TABLES + "' is created.");
// COLUMNS
- String columns_ddl =
- "CREATE TABLE " + TB_COLUMNS + " ("
- + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
- + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
- + C_TABLE_ID + ") ON DELETE CASCADE, "
- + "column_id INT NOT NULL,"
- + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
- + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
- if (LOG.isDebugEnabled()) {
- LOG.debug(columns_ddl);
- }
- stmt.addBatch(columns_ddl);
+ if (!baseTableMaps.get(TB_COLUMNS)) {
+ String columns_ddl =
+ "CREATE TABLE " + TB_COLUMNS + " ("
+ + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
+ + C_TABLE_ID + ") ON DELETE CASCADE, "
+ + "column_id INT NOT NULL,"
+ + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+ + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(columns_ddl);
+ }
+ stmt.addBatch(columns_ddl);
- String idx_fk_columns_table_name =
- "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
- + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_fk_columns_table_name);
+ String idx_fk_columns_table_name =
+ "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
+ + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_fk_columns_table_name);
+ }
+ stmt.addBatch(idx_fk_columns_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_COLUMNS + " is created.");
}
- stmt.addBatch(idx_fk_columns_table_name);
- stmt.executeBatch();
- LOG.info("Table '" + TB_COLUMNS + " is created.");
// OPTIONS
- String options_ddl =
- "CREATE TABLE " + TB_OPTIONS +" ("
- + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
- + "ON DELETE CASCADE, "
- + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(options_ddl);
- }
- stmt.addBatch(options_ddl);
-
- String idx_options_key =
- "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_options_key);
- }
- stmt.addBatch(idx_options_key);
- String idx_options_table_name =
- "CREATE INDEX idx_options_table_name on " + TB_OPTIONS
- + "(" + C_TABLE_ID + ")";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_options_table_name);
+ if (!baseTableMaps.get(TB_OPTIONS)) {
+ String options_ddl =
+ "CREATE TABLE " + TB_OPTIONS +" ("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
+ + "ON DELETE CASCADE, "
+ + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(options_ddl);
+ }
+ stmt.addBatch(options_ddl);
+
+ String idx_options_key =
+ "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_options_key);
+ }
+ stmt.addBatch(idx_options_key);
+ String idx_options_table_name =
+ "CREATE INDEX idx_options_table_name on " + TB_OPTIONS
+ + "(" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_options_table_name);
+ }
+ stmt.addBatch(idx_options_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_OPTIONS + " is created.");
}
- stmt.addBatch(idx_options_table_name);
- stmt.executeBatch();
- LOG.info("Table '" + TB_OPTIONS + " is created.");
-
+
// INDEXES
- String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
- + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
- + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
- + "ON DELETE CASCADE, "
- + "column_name VARCHAR(255) NOT NULL, "
- + "data_type VARCHAR(255) NOT NULL, "
- + "index_type CHAR(32) NOT NULL, "
- + "is_unique BOOLEAN NOT NULL, "
- + "is_clustered BOOLEAN NOT NULL, "
- + "is_ascending BOOLEAN NOT NULL)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(indexes_ddl);
+ if (!baseTableMaps.get(TB_INDEXES)) {
+ String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
+ + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "column_name VARCHAR(255) NOT NULL, "
+ + "data_type VARCHAR(255) NOT NULL, "
+ + "index_type CHAR(32) NOT NULL, "
+ + "is_unique BOOLEAN NOT NULL, "
+ + "is_clustered BOOLEAN NOT NULL, "
+ + "is_ascending BOOLEAN NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(indexes_ddl);
+ }
+ stmt.addBatch(indexes_ddl);
+
+ String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON "
+ + TB_INDEXES + " (index_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_indexes_key);
+ }
+ stmt.addBatch(idx_indexes_key);
+
+ String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON "
+ + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_indexes_columns);
+ }
+ stmt.addBatch(idx_indexes_columns);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_INDEXES + "' is created.");
}
- stmt.addBatch(indexes_ddl);
-
- String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON "
- + TB_INDEXES + " (index_name)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_indexes_key);
- }
- stmt.addBatch(idx_indexes_key);
-
- String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON "
- + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_indexes_columns);
- }
- stmt.addBatch(idx_indexes_columns);
- stmt.executeBatch();
- LOG.info("Table '" + TB_INDEXES + "' is created.");
- String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
- + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
- + "ON DELETE CASCADE, "
- + "num_rows BIGINT, "
- + "num_bytes BIGINT)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(stats_ddl);
+ if (!baseTableMaps.get(TB_STATISTICS)) {
+ String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "num_rows BIGINT, "
+ + "num_bytes BIGINT)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stats_ddl);
+ }
+ stmt.addBatch(stats_ddl);
+
+ String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
+ + TB_STATISTICS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_stats_fk_table_name);
+ }
+ stmt.addBatch(idx_stats_fk_table_name);
+ LOG.info("Table '" + TB_STATISTICS + "' is created.");
}
- stmt.addBatch(stats_ddl);
- String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
- + TB_STATISTICS + " (" + C_TABLE_ID + ")";
- if (LOG.isDebugEnabled()) {
- LOG.debug(idx_stats_fk_table_name);
+ // PARTITION
+ if (!baseTableMaps.get(TB_PARTTIONS)) {
+ String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+ + "PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ + "name VARCHAR(255), "
+ + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+ + "type VARCHAR(10) NOT NULL,"
+ + "quantity INT ,"
+ + "columns VARCHAR(255),"
+ + "expressions VARCHAR(1024)"
+ + ", CONSTRAINT PARTITION_PK PRIMARY KEY (PID)"
+ + " )";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(partition_ddl);
+ }
+ stmt.addBatch(partition_ddl);
+ LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+ stmt.executeBatch();
}
- stmt.addBatch(idx_stats_fk_table_name);
- stmt.executeBatch();
- LOG.info("Table '" + TB_STATISTICS + "' is created.");
} finally {
wlock.unlock();
@@ -220,26 +258,34 @@ public class DerbyStore extends AbstractDBStore {
wlock.lock();
ResultSet res = null;
+ int foundCount = 0;
try {
- boolean found = false;
res = getConnection().getMetaData().getTables(null, null, null,
new String [] {"TABLE"});
-
- String resName;
- while (res.next() && !found) {
- resName = res.getString("TABLE_NAME");
- if (TB_META.equals(resName)
- || TB_TABLES.equals(resName)
- || TB_COLUMNS.equals(resName)
- || TB_OPTIONS.equals(resName)) {
- return true;
- }
+
+ baseTableMaps.put(TB_META, false);
+ baseTableMaps.put(TB_TABLES, false);
+ baseTableMaps.put(TB_COLUMNS, false);
+ baseTableMaps.put(TB_OPTIONS, false);
+ baseTableMaps.put(TB_STATISTICS, false);
+ baseTableMaps.put(TB_INDEXES, false);
+ baseTableMaps.put(TB_PARTTIONS, false);
+
+ while (res.next()) {
+ baseTableMaps.put(res.getString("TABLE_NAME"), true);
}
} finally {
wlock.unlock();
CatalogUtil.closeSQLWrapper(res);
- }
- return false;
+ }
+
+ for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
+ if (!entry.getValue()) {
+ return false;
+ }
+ }
+
+ return true;
}
final boolean checkInternalTable(final String tableName) throws SQLException {
@@ -263,6 +309,7 @@ public class DerbyStore extends AbstractDBStore {
@Override
public final void addTable(final TableDesc table) throws IOException {
+ PreparedStatement pstmt = null;
Statement stmt = null;
ResultSet res = null;
@@ -324,10 +371,95 @@ public class DerbyStore extends AbstractDBStore {
}
stmt.executeUpdate(sql);
}
+
+ //Partition
+ if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
+ try {
+ Partitions partitions = table.getPartitions();
+ List<Column> columnList = partitions.getColumns();
+
+ // Find columns which used for a partitioned table.
+ StringBuffer columns = new StringBuffer();
+ for(Column eachColumn : columnList) {
+ sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
+ + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no columnId matched to "
+ + table.getName());
+ }
+ columnId = res.getInt("column_id");
+
+ if(columns.length() > 0) {
+ columns.append(",");
+ }
+ columns.append(columnId);
+ }
+
+ // Set default partition name. But if user named to subpartition, it would be updated.
+// String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
+
+ sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
+ + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
+ pstmt = getConnection().prepareStatement(sql);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ // Find information for subpartitions
+ if (partitions.getSpecifiers() != null) {
+ int count = 1;
+ if (partitions.getSpecifiers().size() == 0) {
+ pstmt.clearParameters();
+ pstmt.setString(1, null);
+ pstmt.setInt(2, tid);
+ pstmt.setString(3, partitions.getPartitionsType().name());
+ pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(5, columns.toString());
+ pstmt.setString(6, null);
+ pstmt.addBatch();
+ } else {
+ for(Specifier eachValue: partitions.getSpecifiers()) {
+ pstmt.clearParameters();
+ if (eachValue.getName() != null && !eachValue.getName().equals("")) {
+ pstmt.setString(1, eachValue.getName());
+ } else {
+ pstmt.setString(1, null);
+ }
+ pstmt.setInt(2, tid);
+ pstmt.setString(3, partitions.getPartitionsType().name());
+ pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(5, columns.toString());
+ pstmt.setString(6, eachValue.getExpressions());
+ pstmt.addBatch();
+ count++;
+ }
+ }
+ } else {
+ pstmt.clearParameters();
+ pstmt.setString(1, null);
+ pstmt.setInt(2, tid);
+ pstmt.setString(3, partitions.getPartitionsType().name());
+ pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(5, columns.toString());
+ pstmt.setString(6, null);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ } finally {
+ CatalogUtil.closeSQLWrapper(pstmt);
+ }
+ }
+
} catch (SQLException se) {
throw new IOException(se.getMessage(), se);
} finally {
wlock.unlock();
+ CatalogUtil.closeSQLWrapper(res, pstmt);
CatalogUtil.closeSQLWrapper(res, stmt);
}
}
@@ -428,6 +560,17 @@ public class DerbyStore extends AbstractDBStore {
}
try {
+ sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
+ + " SELECT TID FROM " + TB_TABLES
+ + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+ LOG.info(sql);
+ stmt = getConnection().createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ }
+
+ try {
sql = "DELETE FROM " + TB_TABLES +
" WHERE " + C_TABLE_ID +" = '" + name + "'";
LOG.info(sql);
@@ -435,7 +578,6 @@ public class DerbyStore extends AbstractDBStore {
} catch (SQLException se) {
throw new IOException(se);
}
-
} catch (SQLException se) {
throw new IOException(se);
} finally {
@@ -454,6 +596,8 @@ public class DerbyStore extends AbstractDBStore {
StoreType storeType = null;
Options options;
TableStats stat = null;
+ Partitions partitions = null;
+ int tid = 0;
try {
rlock.lock();
@@ -461,7 +605,7 @@ public class DerbyStore extends AbstractDBStore {
try {
String sql =
- "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+ "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
+ " WHERE " + C_TABLE_ID + "='" + name + "'";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
@@ -474,6 +618,7 @@ public class DerbyStore extends AbstractDBStore {
tableName = res.getString(C_TABLE_ID).trim();
path = new Path(res.getString("path").trim());
storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+ tid = res.getInt("TID");
} catch (SQLException se) {
throw new IOException(se);
} finally {
@@ -550,12 +695,48 @@ public class DerbyStore extends AbstractDBStore {
CatalogUtil.closeSQLWrapper(res);
}
+
+ try {
+ String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
+ + " WHERE TID =" + tid + "";
+ stmt = getConnection().createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ while (res.next()) {
+ if (partitions == null) {
+ partitions = new Partitions();
+ String[] columns = res.getString("columns").split(",");
+ for(String eachColumn: columns) {
+ partitions.addColumn(getColumn(tableName, tid, eachColumn));
+ }
+ partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
+ ("type")));
+ partitions.setNumPartitions(res.getInt("quantity"));
+ }
+
+ Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
+ partitions.addSpecifier(specifier);
+ }
+
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+
TableMeta meta = new TableMeta(storeType, options);
TableDesc table = new TableDesc(tableName, schema, meta, path);
if (stat != null) {
table.setStats(stat);
}
+ if (partitions != null) {
+ table.setPartitions(partitions);
+ }
+
return table;
} catch (SQLException se) {
throw new IOException(se);
@@ -564,7 +745,42 @@ public class DerbyStore extends AbstractDBStore {
CatalogUtil.closeSQLWrapper(stmt);
}
}
-
+
+ private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+ ResultSet res = null;
+ Column column = null;
+ Statement stmt = null;
+
+ try {
+ String sql = "SELECT column_name, data_type, type_length from "
+ + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
+
+ stmt = getConnection().createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ if (res.next()) {
+ String columnName = tableName + "."
+ + res.getString("column_name").trim();
+ Type dataType = getDataType(res.getString("data_type")
+ .trim());
+ int typeLength = res.getInt("type_length");
+ if (typeLength > 0) {
+ column = new Column(columnName, dataType, typeLength);
+ } else {
+ column = new Column(columnName, dataType);
+ }
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ CatalogUtil.closeSQLWrapper(res, stmt);
+ }
+ return column;
+ }
+
private Type getDataType(final String typeStr) {
try {
return Enum.valueOf(Type.class, typeStr);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
index 259d9d6..c368969 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -29,6 +29,7 @@ import org.apache.tajo.exception.InternalException;
import java.io.IOException;
import java.sql.*;
import java.util.List;
+import java.util.Map;
public class MySQLStore extends AbstractDBStore {
@@ -51,115 +52,158 @@ public class MySQLStore extends AbstractDBStore {
// TODO - DDL and index statements should be renamed
protected void createBaseTable() throws SQLException {
- // META
- Statement stmt = getConnection().createStatement();
- String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(meta_ddl);
- }
+ int result;
+ Statement stmt = null;
try {
- int result = stmt.executeUpdate(meta_ddl);
- LOG.info("Table '" + TB_META + " is created.");
+ stmt = getConnection().createStatement();
+
+ // META
+ if (!baseTableMaps.get(TB_META)) {
+ String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(meta_ddl);
+ }
+ result = stmt.executeUpdate(meta_ddl);
+ LOG.info("Table '" + TB_META + " is created.");
+ }
// TABLES
- String tables_ddl = "CREATE TABLE "
- + TB_TABLES + " ("
- + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
- + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
- + "path TEXT, "
- + "store_type CHAR(16)"
- + ")";
- if (LOG.isDebugEnabled()) {
- LOG.debug(tables_ddl);
+ if (!baseTableMaps.get(TB_TABLES)) {
+ String tables_ddl = "CREATE TABLE "
+ + TB_TABLES + " ("
+ + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
+ + "path TEXT, "
+ + "store_type CHAR(16)"
+ + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(tables_ddl);
+ }
+
+ LOG.info("Table '" + TB_TABLES + "' is created.");
+ result = stmt.executeUpdate(tables_ddl);
}
- LOG.info("Table '" + TB_TABLES + "' is created.");
- result = stmt.executeUpdate(tables_ddl);
// COLUMNS
+ if (!baseTableMaps.get(TB_COLUMNS)) {
+ String columns_ddl =
+ "CREATE TABLE " + TB_COLUMNS + " ("
+ + "TID INT NOT NULL,"
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "column_id INT NOT NULL,"
+ + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+ + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
+ + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(columns_ddl);
+ }
- String columns_ddl =
- "CREATE TABLE " + TB_COLUMNS + " ("
- + "TID INT NOT NULL,"
- + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
- + "column_id INT NOT NULL,"
- + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
- + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
- + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
- + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(columns_ddl);
+ LOG.info("Table '" + TB_COLUMNS + " is created.");
+ result = stmt.executeUpdate(columns_ddl);
}
- LOG.info("Table '" + TB_COLUMNS + " is created.");
- result = stmt.executeUpdate(columns_ddl);
// OPTIONS
-
- String options_ddl =
- "CREATE TABLE " + TB_OPTIONS + " ("
- + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
- + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
- + "INDEX("+C_TABLE_ID+", key_),"
- + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(options_ddl);
+ if (!baseTableMaps.get(TB_OPTIONS)) {
+ String options_ddl =
+ "CREATE TABLE " + TB_OPTIONS + " ("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
+ + "INDEX("+C_TABLE_ID+", key_),"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(options_ddl);
+ }
+ LOG.info("Table '" + TB_OPTIONS + " is created.");
+ result = stmt.executeUpdate(options_ddl);
}
- LOG.info("Table '" + TB_OPTIONS + " is created.");
- result = stmt.executeUpdate(options_ddl);
+
// INDEXES
+ if (!baseTableMaps.get(TB_INDEXES)) {
+ String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
+ + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "column_name VARCHAR(255) NOT NULL, "
+ + "data_type VARCHAR(255) NOT NULL, "
+ + "index_type CHAR(32) NOT NULL, "
+ + "is_unique BOOLEAN NOT NULL, "
+ + "is_clustered BOOLEAN NOT NULL, "
+ + "is_ascending BOOLEAN NOT NULL,"
+ + "INDEX(" + C_TABLE_ID + ", column_name),"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(indexes_ddl);
+ }
+ LOG.info("Table '" + TB_INDEXES + "' is created.");
+ result = stmt.executeUpdate(indexes_ddl);
+ }
- String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
- + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
- + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
- + "column_name VARCHAR(255) NOT NULL, "
- + "data_type VARCHAR(255) NOT NULL, "
- + "index_type CHAR(32) NOT NULL, "
- + "is_unique BOOLEAN NOT NULL, "
- + "is_clustered BOOLEAN NOT NULL, "
- + "is_ascending BOOLEAN NOT NULL,"
- + "INDEX(" + C_TABLE_ID + ", column_name),"
- + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(indexes_ddl);
+ if (!baseTableMaps.get(TB_STATISTICS)) {
+ String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+ + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+ + "num_rows BIGINT, "
+ + "num_bytes BIGINT,"
+ + "INDEX("+C_TABLE_ID+"),"
+ + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stats_ddl);
+ }
+ LOG.info("Table '" + TB_STATISTICS + "' is created.");
+ result = stmt.executeUpdate(stats_ddl);
}
- LOG.info("Table '" + TB_INDEXES + "' is created.");
- result = stmt.executeUpdate(indexes_ddl);
-
- String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
- + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
- + "num_rows BIGINT, "
- + "num_bytes BIGINT,"
- + "INDEX("+C_TABLE_ID+"),"
- + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
- if (LOG.isDebugEnabled()) {
- LOG.debug(stats_ddl);
+
+ // PARTITION
+ if (!baseTableMaps.get(TB_PARTTIONS)) {
+ String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+ + "PID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+ + "name VARCHAR(255), "
+ + "TID INT NOT NULL,"
+ + "type VARCHAR(10) NOT NULL,"
+ + "quantity INT ,"
+ + "columns VARCHAR(255),"
+ + "expressions TEXT )";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(partition_ddl);
+ }
+ LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+ result = stmt.executeUpdate(partition_ddl);
}
- LOG.info("Table '" + TB_STATISTICS + "' is created.");
- result = stmt.executeUpdate(stats_ddl);
} finally {
CatalogUtil.closeSQLWrapper(stmt);
}
}
protected boolean isInitialized() throws SQLException {
- boolean found = false;
ResultSet res = getConnection().getMetaData().getTables(null, null, null,
new String[]{"TABLE"});
- String resName;
try {
- while (res.next() && !found) {
- resName = res.getString("TABLE_NAME");
- if (TB_META.equals(resName)
- || TB_TABLES.equals(resName)
- || TB_COLUMNS.equals(resName)
- || TB_OPTIONS.equals(resName)) {
- return true;
- }
+ baseTableMaps.put(TB_META, false);
+ baseTableMaps.put(TB_TABLES, false);
+ baseTableMaps.put(TB_COLUMNS, false);
+ baseTableMaps.put(TB_OPTIONS, false);
+ baseTableMaps.put(TB_STATISTICS, false);
+ baseTableMaps.put(TB_INDEXES, false);
+ baseTableMaps.put(TB_PARTTIONS, false);
+
+ if (res.wasNull())
+ return false;
+
+ while (res.next()) {
+ baseTableMaps.put(res.getString("TABLE_NAME"), true);
}
} finally {
CatalogUtil.closeSQLWrapper(res);
}
- return false;
+
+ for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
+ if (!entry.getValue()) {
+ return false;
+ }
+ }
+
+ return true;
+// return false;
}
@Override