You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/27 09:29:37 UTC
[2/4] TAJO-475: Table partition catalog recap. (Min Zhou and hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 9b10f90..4dfc97a 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -21,8 +21,7 @@ package org.apache.tajo.catalog;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.exception.NoSuchFunctionException;
import org.apache.tajo.catalog.function.Function;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -246,13 +245,16 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitionDesc.setNumPartitions(2);
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
@@ -260,9 +262,8 @@ public class TestCatalog {
TableDesc retrieved = catalog.getTableDesc(tableName);
assertEquals(retrieved.getName(), tableName);
- assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
- assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
- assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
+ assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.HASH);
+ assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
catalog.deleteTable(tableName);
assertFalse(catalog.existsTable(tableName));
@@ -282,17 +283,17 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitionDesc.setNumPartitions(2);
-
- partitionDesc.addSpecifier(new Specifier("sub_part1"));
- partitionDesc.addSpecifier(new Specifier("sub_part2"));
- partitionDesc.addSpecifier(new Specifier("sub_part3"));
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
+
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -300,15 +301,8 @@ public class TestCatalog {
TableDesc retrieved = catalog.getTableDesc(tableName);
assertEquals(retrieved.getName(), tableName);
- assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
- assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
- assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
- assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
- "sub_part1");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
- "sub_part2");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
- "sub_part3");
+ assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.HASH);
+ assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
catalog.deleteTable(tableName);
assertFalse(catalog.existsTable(tableName));
@@ -327,15 +321,16 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
-
- partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
- partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.LIST);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -343,16 +338,8 @@ public class TestCatalog {
TableDesc retrieved = catalog.getTableDesc(tableName);
assertEquals(retrieved.getName(), tableName);
- assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.LIST);
- assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
- "sub_part1");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
- "Seoul,서울");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
- "sub_part2");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
- "Busan,부산");
+ assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.LIST);
+ assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
catalog.deleteTable(tableName);
assertFalse(catalog.existsTable(tableName));
@@ -371,16 +358,17 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
- partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
- partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
- partitionDesc.addSpecifier(new Specifier("sub_part3"));
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.RANGE);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -388,20 +376,8 @@ public class TestCatalog {
TableDesc retrieved = catalog.getTableDesc(tableName);
assertEquals(retrieved.getName(), tableName);
- assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.RANGE);
- assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
- "sub_part1");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
- "2");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
- "sub_part2");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
- "5");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
- "sub_part3");
- assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getExpressions(),
- "");
+ assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.RANGE);
+ assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
catalog.deleteTable(tableName);
assertFalse(catalog.existsTable(tableName));
@@ -420,12 +396,16 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -433,8 +413,8 @@ public class TestCatalog {
TableDesc retrieved = catalog.getTableDesc(tableName);
assertEquals(retrieved.getName(), tableName);
- assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
- assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+ assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
catalog.deleteTable(tableName);
assertFalse(catalog.existsTable(tableName));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index 1949afc..502daf0 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,8 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -75,10 +74,10 @@ public class TestDBStore {
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
assertFalse(store.existTable(tableName));
- store.addTable(desc);
+ store.addTable(desc.getProto());
assertTrue(store.existTable(tableName));
- TableDesc retrieved = store.getTable(tableName);
+ TableDesc retrieved = new TableDesc(store.getTable(tableName));
// Schema order check
assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
store.deleteTable(tableName);
@@ -105,8 +104,8 @@ public class TestDBStore {
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "gettable"));
desc.setStats(stat);
- store.addTable(desc);
- TableDesc retrieved = store.getTable(tableName);
+ store.addTable(desc.getProto());
+ TableDesc retrieved = new TableDesc(store.getTable(tableName));
assertEquals(",", retrieved.getMeta().getOption("file.delimiter"));
assertEquals(desc, retrieved);
assertTrue(957685 == desc.getStats().getNumRows());
@@ -130,7 +129,7 @@ public class TestDBStore {
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
TableDesc desc = new TableDesc(tableName, schema, meta,
new Path(CommonTestingUtil.getTestDir(), "tableA_" + i));
- store.addTable(desc);
+ store.addTable(desc.getProto());
}
assertEquals(numTables, store.getAllTableNames().size());
@@ -139,7 +138,7 @@ public class TestDBStore {
@Test
public final void testAddAndDeleteIndex() throws Exception {
TableDesc table = prepareTable();
- store.addTable(table);
+ store.addTable(table.getProto());
store.addIndex(TestCatalog.desc1.getProto());
assertTrue(store.existIndex(TestCatalog.desc1.getName()));
@@ -153,7 +152,7 @@ public class TestDBStore {
public final void testGetIndex() throws Exception {
TableDesc table = prepareTable();
- store.addTable(table);
+ store.addTable(table.getProto());
store.addIndex(TestCatalog.desc2.getProto());
assertEquals(
@@ -168,7 +167,7 @@ public class TestDBStore {
public final void testGetIndexByTableAndColumn() throws Exception {
TableDesc table = prepareTable();
- store.addTable(table);
+ store.addTable(table.getProto());
store.addIndex(TestCatalog.desc2.getProto());
@@ -186,7 +185,7 @@ public class TestDBStore {
public final void testGetAllIndexes() throws Exception {
TableDesc table = prepareTable();
- store.addTable(table);
+ store.addTable(table.getProto());
store.addIndex(TestCatalog.desc1.getProto());
store.addIndex(TestCatalog.desc2.getProto());
@@ -236,18 +235,21 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitionDesc.setNumPartitions(2);
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(store.existTable(tableName));
- store.addTable(desc);
+ store.addTable(desc.getProto());
assertTrue(store.existTable(tableName));
- TableDesc retrieved = store.getTable(tableName);
+ TableDesc retrieved = new TableDesc(store.getTable(tableName));
// Schema order check
assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -268,22 +270,22 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitionDesc.setNumPartitions(2);
- partitionDesc.addSpecifier(new Specifier("sub_part1"));
- partitionDesc.addSpecifier(new Specifier("sub_part2"));
- partitionDesc.addSpecifier(new Specifier("sub_part3"));
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(store.existTable(tableName));
- store.addTable(desc);
+ store.addTable(desc.getProto());
assertTrue(store.existTable(tableName));
- TableDesc retrieved = store.getTable(tableName);
+ TableDesc retrieved = new TableDesc(store.getTable(tableName));
// Schema order check
assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -304,20 +306,21 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
-
- partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
- partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.LIST);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(store.existTable(tableName));
- store.addTable(desc);
+ store.addTable(desc.getProto());
assertTrue(store.existTable(tableName));
- TableDesc retrieved = store.getTable(tableName);
+ TableDesc retrieved = new TableDesc(store.getTable(tableName));
// Schema order check
assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -338,21 +341,21 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
-
- partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
- partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
- partitionDesc.addSpecifier(new Specifier("sub_part3"));
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.RANGE);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(store.existTable(tableName));
- store.addTable(desc);
+ store.addTable(desc.getProto());
assertTrue(store.existTable(tableName));
- TableDesc retrieved = store.getTable(tableName);
+ TableDesc retrieved = new TableDesc(store.getTable(tableName));
// Schema order check
assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -373,17 +376,21 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- PartitionDesc partitionDesc = new PartitionDesc();
- partitionDesc.addColumn(new Column("id", Type.INT4));
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+ PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+ partitionDesc.setTableId(tableName);
+ partitionDesc.setExpression("id");
+ Schema partSchema = new Schema();
+ partSchema.addColumn("id", Type.INT4);
+ partitionDesc.setExpressionSchema(partSchema);
+ partitionDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitionDesc);
+ desc.setPartitionMethod(partitionDesc);
assertFalse(store.existTable(tableName));
- store.addTable(desc);
+ store.addTable(desc.getProto());
assertTrue(store.existTable(tableName));
- TableDesc retrieved = store.getTable(tableName);
+ TableDesc retrieved = new TableDesc(store.getTable(tableName));
// Schema order check
assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
index b4678c0..8bf00e1 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
@@ -20,9 +20,10 @@ package org.apache.tajo.cli;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
import java.io.PrintWriter;
import java.util.List;
@@ -99,36 +100,14 @@ public class DescTableCommand extends TajoShellCommand {
}
sb.append("\n");
- sb.append("Partitions: \n");
- if (desc.getPartitions() != null) {
- sb.append("type:").append(desc.getPartitions().getPartitionsType().name()).append("\n");
- if (desc.getPartitions().getNumPartitions() > 0)
- sb.append("numbers:").append(desc.getPartitions().getNumPartitions()).append("\n");
+ if (desc.getPartitionMethod() != null) {
+ PartitionMethodDesc partition = desc.getPartitionMethod();
+ sb.append("Partitions: \n");
- sb.append("columns:").append("\n");
- for(Column eachColumn: desc.getPartitions().getColumns()) {
- sb.append(" ");
- sb.append(eachColumn.getColumnName()).append("\t").append(eachColumn.getDataType().getType());
- if (eachColumn.getDataType().hasLength()) {
- sb.append("(").append(eachColumn.getDataType().getLength()).append(")");
- }
- sb.append("\n");
- }
+ sb.append("type:").append(partition.getPartitionType().name()).append("\n");
- if (desc.getPartitions().getSpecifiers() != null) {
- sb.append("specifier:").append("\n");
- for(Specifier specifier :desc.getPartitions().getSpecifiers()) {
- sb.append(" ");
- sb.append("name:").append(specifier.getName());
- if (!specifier.getExpressions().equals("")) {
- sb.append(", expressions:").append(specifier.getExpressions());
- } else {
- if (desc.getPartitions().getPartitionsType().name().equals("RANGE"));
- sb.append(" expressions: MAXVALUE");
- }
- sb.append("\n");
- }
- }
+ sb.append("columns:").append(":");
+ sb.append(TUtil.arrayToString(partition.getExpressionSchema().toArray()));
}
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index d75b5af..c0c9462 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -162,7 +162,7 @@ message CreateTableRequest {
required SchemaProto schema = 2;
required TableProto meta = 3;
required string path = 4;
- optional PartitionDescProto partitions = 5;
+ optional PartitionMethodProto partition = 5;
}
message DropTableRequest {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index abc217e..125a6d6 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -266,7 +266,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
- <scope>test</scope>
+ <type>test-jar</type>
<exclusions>
<exclusion>
<groupId>commons-el</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 5507ba0..daa8f8d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang.StringEscapeUtils;
import org.apache.tajo.algebra.*;
import org.apache.tajo.algebra.Aggregation.GroupType;
import org.apache.tajo.algebra.LiteralValue.LiteralType;
-import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.engine.parser.SQLParser.*;
import org.apache.tajo.storage.CSVFile;
@@ -966,9 +965,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
}
if (checkIfExist(ctx.table_partitioning_clauses())) {
- PartitionDescExpr partitionDesc =
+ PartitionMethodDescExpr partitionMethodDesc =
parseTablePartitioningClause(ctx.table_partitioning_clauses());
- createTable.setPartition(partitionDesc);
+ createTable.setPartitionMethod(partitionMethodDesc);
}
return createTable;
}
@@ -985,7 +984,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
return elements;
}
- public PartitionDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
+ public PartitionMethodDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
if (checkIfExist(ctx.range_partitions())) { // For Range Partition
Range_partitionsContext rangePartitionsContext = ctx.range_partitions();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index 3dd371b..d9645a4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -44,13 +44,17 @@ public class InsertNode extends StoreTableNode implements Cloneable {
public void setTargetTable(TableDesc desc) {
setTableName(desc.getName());
- tableSchema = desc.getSchema();
+ if (desc.hasPartition()) {
+ tableSchema = desc.getLogicalSchema();
+ } else {
+ tableSchema = desc.getSchema();
+ }
setPath(desc.getPath());
setOptions(desc.getMeta().getOptions());
setStorageType(desc.getMeta().getStoreType());
- if (desc.hasPartitions()) {
- this.setPartitions(desc.getPartitions());
+ if (desc.hasPartition()) {
+ this.setPartitionMethod(desc.getPartitionMethod());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 902cbee..0831b0a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -277,7 +277,7 @@ public class LogicalPlan {
// Trying to find columns from other relations in other blocks
for (QueryBlock eachBlock : queryBlocks.values()) {
for (RelationNode rel : eachBlock.getRelations()) {
- Column found = rel.getOutSchema().getColumnByName(columnRef.getName());
+ Column found = rel.getTableSchema().getColumnByName(columnRef.getName());
if (found != null) {
candidates.add(found);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 066a3bb..a5d9787 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -18,7 +18,9 @@
package org.apache.tajo.engine.planner;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -29,8 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.*;
import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.NullDatum;
@@ -44,7 +45,6 @@ import org.apache.tajo.util.TUtil;
import java.util.*;
-import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
import static org.apache.tajo.algebra.CreateTable.PartitionType;
import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
@@ -372,6 +372,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
}
}
+ } else if (projectable instanceof RelationNode) {
+ RelationNode relationNode = (RelationNode) projectable;
+ for (Target target : projectable.getTargets()) {
+ Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+ for (Column c : columns) {
+ if (!relationNode.getTableSchema().contains(c)) {
+ throw new PlanningException("Cannot get such a field: " + c);
+ }
+ }
+ }
} else {
for (Target target : projectable.getTargets()) {
Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
@@ -924,7 +934,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// Assume that each unique expr is evaluated once.
List<Target> targets = new ArrayList<Target>();
- for (Column column : scanNode.getInSchema().getColumns()) {
+ for (Column column : scanNode.getTableSchema().getColumns()) {
ColumnReferenceExpr columnRef = new ColumnReferenceExpr(column.getQualifier(), column.getColumnName());
if (block.namedExprsMgr.contains(columnRef)) {
String referenceName = block.namedExprsMgr.getName(columnRef);
@@ -1179,7 +1189,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
} else { // when a user do not specified target columns
// The output schema of select clause determines the target columns.
- Schema tableSchema = desc.getSchema();
+ Schema tableSchema = desc.getLogicalSchema();
Schema projectedSchema = insertNode.getChild().getOutSchema();
Schema targetColumns = new Schema();
@@ -1190,8 +1200,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
buildProjectedInsert(insertNode);
}
- if (desc.hasPartitions()) {
- insertNode.setPartitions(desc.getPartitions());
+ if (desc.hasPartition()) {
+ insertNode.setPartitionMethod(desc.getPartitionMethod());
}
return insertNode;
}
@@ -1254,223 +1264,122 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
throws PlanningException {
- // Get a table name to be created.
- String tableNameTobeCreated = expr.getTableName();
+ CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
+
+ // Set a table name to be created.
+ createTableNode.setTableName(expr.getTableName());
+
+ if (expr.hasStorageType()) { // If storage type (using clause) is specified
+ createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+ } else { // otherwise, default type
+ createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+ }
+
+ if (expr.hasParams()) {
+ Options options = new Options();
+ options.putAll(expr.getParams());
+ createTableNode.setOptions(options);
+ }
+
+ if (expr.hasPartition()) {
+ if (expr.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
+ createTableNode.setPartitionMethod(getPartitionMethod(context, expr.getTableName(), expr.getPartitionMethod()));
+ } else {
+ throw new PlanningException(String.format("Not supported PartitonType: %s",
+ expr.getPartitionMethod().getPartitionType()));
+ }
+ }
if (expr.hasSubQuery()) { // CREATE TABLE .. AS SELECT
stack.add(expr);
LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
stack.pop();
- CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
- createTableNode.setTableName(tableNameTobeCreated);
createTableNode.setChild(subQuery);
createTableNode.setInSchema(subQuery.getOutSchema());
- // if no table definition, the select clause's output schema will be used.
- // ex) CREATE TABLE tbl AS SELECT ...
- if(!expr.hasTableElements()) {
-
- expr.setTableElements(convertSchemaToTableElements(subQuery.getOutSchema()));
- }
-
- // Otherwise, it uses the defined table elements.
+ // If the table schema is defined
// ex) CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
- createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
- createTableNode.setSchema(convertTableElementsSchema(expr.getTableElements()));
-
- if (expr.hasStorageType()) { // If storage type (using clause) is specified
- createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
- } else { // If no specified storage type
- // default type
- createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
- }
-
- if (expr.hasParams()) { // if 'with clause' is specified
- Options options = new Options();
- options.putAll(expr.getParams());
- createTableNode.setOptions(options);
- }
-
- if (expr.hasPartition()) { // if 'partition by' is specified
- createTableNode.setPartitions(convertTableElementsPartition(context, expr));
- }
+ if (expr.hasTableElements()) {
+ createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
+ createTableNode.setTableSchema(convertTableElementsSchema(expr.getTableElements()));
+ } else {
+ // if no table definition, the select clause's output schema will be used.
+ // ex) CREATE TABLE tbl AS SELECT ...
- return createTableNode;
+ if (expr.hasPartition()) {
+ PartitionMethodDesc partitionMethod = createTableNode.getPartitionMethod();
- } else { // if CREATE AN EMPTY TABLE
- Schema tableSchema;
- boolean mergedPartition = false;
- if (expr.hasPartition()) {
- if (expr.getPartition().getPartitionType().equals(PartitionType.COLUMN)) {
- if (((ColumnPartition)expr.getPartition()).isOmitValues()) {
- mergedPartition = true;
+ Schema queryOutputSchema = subQuery.getOutSchema();
+ Schema partitionExpressionSchema = partitionMethod.getExpressionSchema();
+ if (partitionMethod.getPartitionType() == CatalogProtos.PartitionType.COLUMN &&
+ queryOutputSchema.getColumnNum() < partitionExpressionSchema.getColumnNum()) {
+ throw new VerifyException("Partition columns cannot be more than table columns.");
}
+ Schema tableSchema = new Schema();
+ for (int i = 0; i < queryOutputSchema.getColumnNum() - partitionExpressionSchema.getColumnNum(); i++) {
+ tableSchema.addColumn(queryOutputSchema.getColumn(i));
+ }
+ createTableNode.setOutSchema(tableSchema);
+ createTableNode.setTableSchema(tableSchema);
} else {
- throw new PlanningException(String.format("Not supported PartitonType: %s",
- expr.getPartition().getPartitionType()));
+ createTableNode.setOutSchema(subQuery.getOutSchema());
+ createTableNode.setTableSchema(subQuery.getOutSchema());
}
}
- if (mergedPartition) {
- ColumnDefinition [] merged = TUtil.concat(expr.getTableElements(),
- ((ColumnPartition)expr.getPartition()).getColumns());
- tableSchema = convertTableElementsSchema(merged);
- } else {
- tableSchema = convertTableElementsSchema(expr.getTableElements());
- }
+ return createTableNode;
- CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
- createTableNode.setTableName(expr.getTableName());
- createTableNode.setSchema(tableSchema);
+ } else { // if CREATE AN EMPTY TABLE
+ Schema tableSchema = convertColumnsToSchema(expr.getTableElements());
+ createTableNode.setTableSchema(tableSchema);
if (expr.isExternal()) {
createTableNode.setExternal(true);
}
- if (expr.hasStorageType()) {
- createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
- } else {
- // default type
- // TODO - it should be configurable.
- createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
- }
-
- if (expr.hasParams()) {
- Options options = new Options();
- options.putAll(expr.getParams());
- createTableNode.setOptions(options);
- }
-
if (expr.hasLocation()) {
createTableNode.setPath(new Path(expr.getLocation()));
}
- if (expr.hasPartition()) {
- if (expr.getPartition().getPartitionType().equals(PartitionType.COLUMN)) {
- createTableNode.setPartitions(convertTableElementsPartition(context, expr));
- } else {
- throw new PlanningException(String.format("Not supported PartitonType: %s",
- expr.getPartition().getPartitionType()));
- }
- }
-
return createTableNode;
}
}
+ private PartitionMethodDesc getPartitionMethod(PlanContext context,
+ String tableName,
+ CreateTable.PartitionMethodDescExpr expr) throws PlanningException {
+ PartitionMethodDesc partitionMethodDesc = new PartitionMethodDesc();
+ partitionMethodDesc.setTableId(tableName);
+
+ if(expr.getPartitionType() == PartitionType.COLUMN) {
+ CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr;
+ String partitionExpression = Joiner.on(',').join(partition.getColumns());
+ partitionMethodDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
+ partitionMethodDesc.setExpression(partitionExpression);
+ partitionMethodDesc.setExpressionSchema(convertColumnsToSchema(partition.getColumns()));
+ } else {
+ throw new PlanningException(String.format("Not supported PartitonType: %s",
+ expr.getPartitionType()));
+ }
+ return partitionMethodDesc;
+ }
+
/**
- * convert table elements into Partition.
+ * It transforms table definition elements to schema.
*
- * @param context
- * @param expr
- * @return
- * @throws PlanningException
+ * @param elements to be transformed
+ * @return schema transformed from table definition elements
*/
- private PartitionDesc convertTableElementsPartition(PlanContext context,
- CreateTable expr) throws PlanningException {
- Schema schema = convertTableElementsSchema(expr.getTableElements());
- PartitionDesc partitionDesc = null;
- List<Specifier> specifiers = null;
- if (expr.hasPartition()) {
- partitionDesc = new PartitionDesc();
- specifiers = TUtil.newList();
-
- partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
- .getPartitionType().name()));
-
- if (expr.getPartition().getPartitionType().equals(PartitionType.HASH)) {
- CreateTable.HashPartition hashPartition = expr.getPartition();
-
- partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
- , hashPartition.getColumns()));
-
- if (hashPartition.getColumns() != null) {
- if (hashPartition.getQuantifier() != null) {
- String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
- partitionDesc.setNumPartitions(Integer.parseInt(quantity));
- }
-
- if (hashPartition.getSpecifiers() != null) {
- for(CreateTable.PartitionSpecifier eachSpec: hashPartition.getSpecifiers()) {
- specifiers.add(new Specifier(eachSpec.getName()));
- }
- }
-
- if (specifiers.isEmpty() && partitionDesc.getNumPartitions() > 0) {
- for (int i = 0; i < partitionDesc.getNumPartitions(); i++) {
- String partitionName = partitionDesc.getPartitionsType().name() + "_" + expr
- .getTableName() + "_" + i;
- specifiers.add(new Specifier(partitionName));
- }
- }
-
- if (!specifiers.isEmpty())
- partitionDesc.setSpecifiers(specifiers);
- }
- } else if (expr.getPartition().getPartitionType().equals(PartitionType.LIST)) {
- CreateTable.ListPartition listPartition = expr.getPartition();
-
- partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
- , listPartition.getColumns()));
-
- if (listPartition.getSpecifiers() != null) {
- StringBuffer sb = new StringBuffer();
-
- for(CreateTable.ListPartitionSpecifier eachSpec: listPartition.getSpecifiers()) {
- Specifier specifier = new Specifier(eachSpec.getName());
- sb.delete(0, sb.length());
- for(Expr eachExpr : eachSpec.getValueList().getValues()) {
- context.queryBlock.setSchema(schema);
- EvalNode eval = exprAnnotator.createEvalNode(context.plan, context.queryBlock, eachExpr);
- if(sb.length() > 1)
- sb.append(",");
-
- sb.append(eval.toString());
- }
- specifier.setExpressions(sb.toString());
- specifiers.add(specifier);
- }
- if (!specifiers.isEmpty())
- partitionDesc.setSpecifiers(specifiers);
- }
- } else if (expr.getPartition().getPartitionType().equals(PartitionType.RANGE)) {
- CreateTable.RangePartition rangePartition = expr.getPartition();
-
- partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
- , rangePartition.getColumns()));
-
- if (rangePartition.getSpecifiers() != null) {
- for(CreateTable.RangePartitionSpecifier eachSpec: rangePartition.getSpecifiers()) {
- Specifier specifier = new Specifier();
-
- if (eachSpec.getName() != null)
- specifier.setName(eachSpec.getName());
-
- if (eachSpec.getEnd() != null) {
- context.queryBlock.setSchema(schema);
- EvalNode eval = exprAnnotator.createEvalNode(context.plan, context.queryBlock, eachSpec.getEnd());
- specifier.setExpressions(eval.toString());
- }
+ private Schema convertColumnsToSchema(CreateTable.ColumnDefinition[] elements) {
+ Schema schema = new Schema();
- if(eachSpec.isEndMaxValue()) {
- specifier.setExpressions(null);
- }
- specifiers.add(specifier);
- }
- if (!specifiers.isEmpty())
- partitionDesc.setSpecifiers(specifiers);
- }
- } else if (expr.getPartition().getPartitionType() == PartitionType.COLUMN) {
- ColumnPartition columnPartition = expr.getPartition();
- partitionDesc.setColumns(convertTableElementsSchema(columnPartition.getColumns()).getColumns());
- partitionDesc.setOmitValues(columnPartition.isOmitValues());
- }
+ for (CreateTable.ColumnDefinition columnDefinition: elements) {
+ schema.addColumn(convertColumn(columnDefinition));
}
- return partitionDesc;
+ return schema;
}
-
/**
* It transforms table definition elements to schema.
*
@@ -1487,32 +1396,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return schema;
}
- private ColumnDefinition[] convertSchemaToTableElements(Schema schema) {
- List<Column> columns = schema.getColumns();
- ColumnDefinition[] columnDefinitions = new ColumnDefinition[columns.size()];
- for(int i = 0; i < columns.size(); i ++) {
- Column col = columns.get(i);
- columnDefinitions[i] = new ColumnDefinition(col.getColumnName(), col.getDataType().getType().name());
- }
-
- return columnDefinitions;
- }
-
- private Collection<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
- ColumnReferenceExpr[] references) {
- List<Column> columnList = TUtil.newList();
-
- for(CreateTable.ColumnDefinition columnDefinition: elements) {
- for(ColumnReferenceExpr eachReference: references) {
- if (columnDefinition.getColumnName().equalsIgnoreCase(eachReference.getName())) {
- columnList.add(convertColumn(columnDefinition));
- }
- }
- }
-
- return columnList;
- }
-
private Column convertColumn(ColumnDefinition columnDefinition) {
return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
}
@@ -1541,7 +1424,25 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
===============================================================================================*/
public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode groupbyNode) {
- return checkIfBeEvaluateAtThis(evalNode, groupbyNode) && evalNode.getType() == EvalType.AGG_FUNCTION;
+ Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
+
+ if (!groupbyNode.getInSchema().containsAll(columnRefs)) {
+ return false;
+ }
+
+ Set<String> tableIds = Sets.newHashSet();
+ // getting distinct table references
+ for (Column col : columnRefs) {
+ if (!tableIds.contains(col.getQualifier())) {
+ tableIds.add(col.getQualifier());
+ }
+ }
+
+ if (tableIds.size() > 1) {
+ return false;
+ }
+
+ return true;
}
public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode joinNode,
@@ -1560,7 +1461,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// at the topmost join operator.
// TODO - It's also valid that case-when is evalauted at the topmost outer operator.
// But, how can we know there is no further outer join operator after this node?
- return checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin);
+ if (checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin)) {
+ return true;
+ } else {
+ return false;
+ }
}
private static boolean checkCaseWhenWithOuterJoin(QueryBlock block, EvalNode evalNode, boolean isTopMostJoin) {
@@ -1580,7 +1485,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return false;
}
- if (node.getInSchema().containsAll(columnRefs)) {
+ if (node.getTableSchema().containsAll(columnRefs)) {
// Why? - When a {case when} is used with outer join, case when must be evaluated at topmost outer join.
if (block.containsJoinType(JoinType.LEFT_OUTER) || block.containsJoinType(JoinType.RIGHT_OUTER)) {
Collection<CaseWhenEval> found = EvalTreeUtil.findEvalsByType(evalNode, EvalType.CASE);
@@ -1596,6 +1501,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
public static boolean checkIfBeEvaluateAtThis(EvalNode evalNode, LogicalNode node) {
Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
- return node.getInSchema().containsAll(columnRefs);
+ if (!node.getInSchema().containsAll(columnRefs)) {
+ return false;
+ }
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 7de053b..202c59d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -91,7 +91,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
PhysicalExec execPlan) throws IOException {
DataChannel channel = context.getDataChannel();
ShuffleFileWriteNode shuffleFileWriteNode = new ShuffleFileWriteNode(UNGENERATED_PID);
- shuffleFileWriteNode.setTableName(channel.getTargetId().toString());
shuffleFileWriteNode.setStorageType(context.getDataChannel().getStoreType());
shuffleFileWriteNode.setInSchema(plan.getOutSchema());
shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
@@ -675,12 +674,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
public PhysicalExec createStorePlan(TaskAttemptContext ctx,
StoreTableNode plan, PhysicalExec subOp) throws IOException {
- if (plan.getPartitions() != null) {
- switch (plan.getPartitions().getPartitionsType()) {
+ if (plan.getPartitionMethod() != null) {
+ switch (plan.getPartitionMethod().getPartitionType()) {
case COLUMN:
return new ColumnPartitionedTableStoreExec(ctx, plan, subOp);
default:
- throw new IllegalStateException(plan.getPartitions().getPartitionsType() + " is not supported yet.");
+ throw new IllegalStateException(plan.getPartitionMethod().getPartitionType() + " is not supported yet.");
}
} else {
return new StoreTableExec(ctx, plan, subOp);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 663954e..ef4e60a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -29,7 +29,7 @@ import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
@@ -644,7 +644,7 @@ public class PlannerUtil {
}
public static Schema rewriteColumnPartitionedTableSchema(
- PartitionDesc partitionDesc,
+ PartitionMethodDesc partitionDesc,
Schema columnPartitionSchema,
Schema sourceSchema,
String qualifier) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 3e756d5..6e44bb8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
@@ -433,10 +433,10 @@ public class GlobalPlanner {
ExecutionBlock childBlock,
StoreTableNode currentNode)
throws PlanningException {
- PartitionDesc partitionDesc = currentNode.getPartitions();
+ PartitionMethodDesc partitionMethod = currentNode.getPartitionMethod();
// if result table is not a partitioned table, directly store it
- if(partitionDesc == null) {
+ if(partitionMethod == null) {
if (childBlock.getPlan() == null) { // when the below is union
for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlock)) {
@@ -463,24 +463,22 @@ public class GlobalPlanner {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
DataChannel channel;
- CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+ CatalogProtos.PartitionType partitionsType = partitionMethod.getPartitionType();
- if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
+ if(partitionsType == CatalogProtos.PartitionType.COLUMN) {
channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
- Column[] columns = new Column[partitionDesc.getColumns().size()];
-
if (currentNode.getType() == NodeType.INSERT) {
InsertNode insertNode = (InsertNode) currentNode;
channel.setSchema(((InsertNode)currentNode).getProjectedSchema());
- Column [] shuffleKeys = new Column[partitionDesc.getColumns().size()];
+ Column [] shuffleKeys = new Column[partitionMethod.getExpressionSchema().getColumnNum()];
int i = 0;
- for (Column column : partitionDesc.getColumns()) {
+ for (Column column : partitionMethod.getExpressionSchema().getColumns()) {
int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id);
}
channel.setShuffleKeys(shuffleKeys);
} else {
- channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
+ channel.setShuffleKeys(partitionMethod.getExpressionSchema().toArray());
}
channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 66a2355..c266bbc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -34,14 +34,24 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
super(pid, NodeType.CREATE_TABLE);
}
- public void setSchema(Schema schema) {
+ public void setTableSchema(Schema schema) {
this.schema = schema;
}
- public Schema getSchema() {
+ public Schema getTableSchema() {
return this.schema;
}
+ public Schema getLogicalSchema() {
+ if (hasPartition()) {
+ Schema logicalSchema = new Schema(schema);
+ logicalSchema.addColumns(getPartitionMethod().getExpressionSchema());
+ return logicalSchema;
+ } else {
+ return schema;
+ }
+ }
+
public boolean hasPath() {
return this.path != null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
index d687829..824e858 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -35,7 +35,7 @@ public class PartitionedTableScanNode extends ScanNode {
this.setInSchema(scanNode.getInSchema());
this.setOutSchema(scanNode.getOutSchema());
this.alias = scanNode.alias;
- this.renamedSchema = scanNode.renamedSchema;
+ this.logicalSchema = scanNode.logicalSchema;
this.qual = scanNode.qual;
this.targets = scanNode.targets;
this.inputPaths = inputPaths;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
index 1bd3548..2a357c4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
@@ -32,7 +32,6 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
* This includes some basic information for materializing data.
*/
public abstract class PersistentStoreNode extends UnaryNode implements Cloneable {
- @Expose protected String tableName;
@Expose protected StoreType storageType = StoreType.CSV;
@Expose protected Options options;
@@ -40,18 +39,6 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
super(pid, nodeType);
}
- public boolean hasTargetTable() {
- return tableName != null;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public final String getTableName() {
- return this.tableName;
- }
-
public void setStorageType(StoreType storageType) {
this.storageType = storageType;
}
@@ -75,7 +62,6 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
@Override
public PlanString getPlanString() {
PlanString planStr = new PlanString("Store");
- planStr.appendTitle(" into ").appendTitle(tableName);
planStr.addExplan("Store type: " + storageType);
return planStr;
@@ -86,7 +72,6 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
if (obj instanceof PersistentStoreNode) {
PersistentStoreNode other = (PersistentStoreNode) obj;
boolean eq = super.equals(other);
- eq = eq && this.tableName.equals(other.tableName);
eq = eq && this.storageType.equals(other.storageType);
eq = eq && TUtil.checkEquals(options, other.options);
return eq;
@@ -98,25 +83,8 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
@Override
public Object clone() throws CloneNotSupportedException {
PersistentStoreNode store = (PersistentStoreNode) super.clone();
- store.tableName = tableName;
store.storageType = storageType != null ? storageType : null;
store.options = options != null ? (Options) options.clone() : null;
return store;
}
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("\"Store\": {\"table\": \""+tableName);
- if (storageType != null) {
- sb.append(", storage: "+ storageType.name());
- }
-
- sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
- .append("\n \"in schema\": ").append(getInSchema());
-
- sb.append("}");
-
- return sb.toString() + "\n"
- + getChild().toString();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 165e1e8..155c09f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -26,12 +26,13 @@ import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.util.TUtil;
public class ScanNode extends RelationNode implements Projectable {
@Expose protected TableDesc tableDesc;
@Expose protected String alias;
- @Expose protected Schema renamedSchema;
+ @Expose protected Schema logicalSchema;
@Expose protected EvalNode qual;
@Expose protected Target[] targets;
@@ -45,13 +46,16 @@ public class ScanNode extends RelationNode implements Projectable {
this.tableDesc = desc;
this.setInSchema(tableDesc.getSchema());
this.setOutSchema(tableDesc.getSchema());
+ logicalSchema = SchemaUtil.getQualifiedLogicalSchema(tableDesc, null);
}
public ScanNode(int pid, TableDesc desc, String alias) {
this(pid, desc);
this.alias = PlannerUtil.normalizeTableName(alias);
- renamedSchema = getOutSchema();
- renamedSchema.setQualifier(this.alias);
+ this.setInSchema(tableDesc.getSchema());
+ this.getInSchema().setQualifier(alias);
+ this.setOutSchema(new Schema(getInSchema()));
+ logicalSchema = SchemaUtil.getQualifiedLogicalSchema(tableDesc, alias);
}
public String getTableName() {
@@ -67,7 +71,11 @@ public class ScanNode extends RelationNode implements Projectable {
}
public Schema getTableSchema() {
- return hasAlias() ? renamedSchema : tableDesc.getSchema();
+ return logicalSchema;
+ }
+
+ public Schema getPhysicalSchema() {
+ return getInSchema();
}
public boolean hasQual() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
index 3abec9d..e4ef49b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -88,9 +88,9 @@ public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneab
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("\"Store\": {\"table\": \""+tableName);
+ sb.append("\"Store\":");
if (storageType != null) {
- sb.append(", storage: "+ storageType.name());
+ sb.append(" storage: "+ storageType.name());
}
sb.append(", partnum: ").append(numOutputs).append("}")
.append(", ");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 2aca6a7..47e33c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -19,12 +19,13 @@
package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
public class StoreTableNode extends PersistentStoreNode implements Cloneable {
- @Expose private PartitionDesc partitionDesc;
+ @Expose protected String tableName;
+ @Expose private PartitionMethodDesc partitionDesc;
public StoreTableNode(int pid) {
super(pid, NodeType.STORE);
@@ -34,15 +35,27 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
super(pid, nodeType);
}
+ public boolean hasTargetTable() {
+ return tableName != null;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public final String getTableName() {
+ return this.tableName;
+ }
+
public boolean hasPartition() {
return this.partitionDesc != null;
}
- public PartitionDesc getPartitions() {
+ public PartitionMethodDesc getPartitionMethod() {
return partitionDesc;
}
- public void setPartitions(PartitionDesc partitionDesc) {
+ public void setPartitionMethod(PartitionMethodDesc partitionDesc) {
this.partitionDesc = partitionDesc;
}
@@ -60,6 +73,7 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
if (obj instanceof StoreTableNode) {
StoreTableNode other = (StoreTableNode) obj;
boolean eq = super.equals(other);
+ eq = eq && this.tableName.equals(other.tableName);
eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
return eq;
} else {
@@ -70,7 +84,8 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
@Override
public Object clone() throws CloneNotSupportedException {
StoreTableNode store = (StoreTableNode) super.clone();
- store.partitionDesc = partitionDesc;
+ store.tableName = tableName;
+ store.partitionDesc = partitionDesc != null ? (PartitionMethodDesc) partitionDesc.clone() : null;
return store;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index 338d2f2..cee9bba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -28,16 +28,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.planner.InsertNode;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.storage.Appender;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.storage.StorageUtil;
@@ -50,8 +48,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
-
/**
* This class is a physical operator to store at column partitioned table.
*/
@@ -71,6 +67,10 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.plan = plan;
+ if (plan.getType() == NodeType.CREATE_TABLE) {
+ this.outSchema = ((CreateTableNode)plan).getTableSchema();
+ }
+
// set table meta
if (this.plan.hasOptions()) {
meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
@@ -78,24 +78,22 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
meta = CatalogUtil.newTableMeta(plan.getStorageType());
}
- // Rewrite a output schema because we don't have to store field values
- // corresponding to partition key columns.
- if (plan.getPartitions() != null && plan.getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
- rewriteColumnPartitionedTableSchema();
- }
-
// Find column index to name subpartition directory path
- int partitionKeyNum = this.plan.getPartitions().getColumns().size();
+ int partitionKeyNum = this.plan.getPartitionMethod().getExpressionSchema().getColumnNum();
partitionColumnIndices = new int[partitionKeyNum];
partitionColumnNames = new String[partitionKeyNum];
for (int i = 0; i < partitionKeyNum; i++) {
- Column column = this.plan.getPartitions().getColumns().get(i);
+ Column column = this.plan.getPartitionMethod().getExpressionSchema().getColumn(i);
partitionColumnNames[i] = column.getColumnName();
if (this.plan.getType() == NodeType.INSERT) {
InsertNode insertNode = ((InsertNode)plan);
int idx = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
partitionColumnIndices[i] = idx;
+ } else if (this.plan.getType() == NodeType.CREATE_TABLE) {
+ CreateTableNode createTable = (CreateTableNode) plan;
+ int idx = createTable.getLogicalSchema().getColumnId(column.getQualifiedName());
+ partitionColumnIndices[i] = idx;
} else {
// We can get partition column from a logical schema.
// Don't use output schema because it is rewritten.
@@ -104,23 +102,6 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
}
}
- /**
- * This method rewrites an input schema of column-partitioned table because
- * there are no actual field values in data file in a column-partitioned table.
- * So, this method removes partition key columns from the input schema.
- */
- private void rewriteColumnPartitionedTableSchema() {
- PartitionDesc partitionDesc = plan.getPartitions();
- Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
- String qualifier = plan.getTableName();
-
- outSchema = PlannerUtil.rewriteColumnPartitionedTableSchema(
- partitionDesc,
- columnPartitionSchema,
- outSchema,
- qualifier);
- }
-
public void init() throws IOException {
super.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index d533b82..a4153dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -44,7 +44,7 @@ public class HashAggregateExec extends AggregationExec {
hashTable = new HashMap<Tuple, FunctionContext []>(100000);
this.tuple = new VTuple(plan.getOutSchema().getColumnNum());
}
-
+
private void compute() throws IOException {
Tuple tuple;
Tuple keyTuple;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c38be92..31a944c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -20,14 +20,13 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.eval.ConstEval;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.eval.FieldEval;
-import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.ScanNode;
@@ -42,7 +41,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
public class SeqScanExec extends PhysicalExec {
private final ScanNode plan;
@@ -73,17 +71,14 @@ public class SeqScanExec extends PhysicalExec {
* indicate partition keys. In this time, it is right. Later, we have to fix it.
*/
private void rewriteColumnPartitionedTableSchema() throws IOException {
- PartitionDesc partitionDesc = plan.getTableDesc().getPartitions();
- Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+ PartitionMethodDesc partitionDesc = plan.getTableDesc().getPartitionMethod();
+ Schema columnPartitionSchema = (Schema) partitionDesc.getExpressionSchema().clone();
String qualifier = inSchema.getColumn(0).getQualifier();
columnPartitionSchema.setQualifier(qualifier);
// Remove partition key columns from an input schema.
- this.inSchema = PlannerUtil.rewriteColumnPartitionedTableSchema(
- partitionDesc,
- columnPartitionSchema,
- inSchema,
- qualifier);
+ this.inSchema = plan.getTableDesc().getSchema();
+
List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
@@ -97,6 +92,7 @@ public class SeqScanExec extends PhysicalExec {
FieldEval targetExpr = new FieldEval(column);
Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
ConstEval constExpr = new ConstEval(datum);
+
for (Target target : plan.getTargets()) {
if (target.getEvalTree().equals(targetExpr)) {
if (!target.hasAlias()) {
@@ -117,8 +113,8 @@ public class SeqScanExec extends PhysicalExec {
public void init() throws IOException {
Schema projected;
- if (plan.getTableDesc().hasPartitions()
- && plan.getTableDesc().getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+ if (plan.getTableDesc().hasPartition()
+ && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
rewriteColumnPartitionedTableSchema();
}
@@ -146,12 +142,13 @@ public class SeqScanExec extends PhysicalExec {
this.projector = new Projector(inSchema, outSchema, plan.getTargets());
if (fragments.length > 1) {
- this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
+ this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),
FragmentConvertor.<FileFragment>convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(),
fragments), projected);
} else {
this.scanner = StorageManagerFactory.getStorageManager(
- context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getTableSchema(), fragments[0], projected);
+ context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragments[0],
+ projected);
}
scanner.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 041220a..a1cbe76 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.*;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
@@ -65,7 +65,7 @@ public class PartitionedTableRewriter implements RewriteRule {
for (RelationNode relation : block.getRelations()) {
if (relation.getType() == NodeType.SCAN) {
TableDesc table = ((ScanNode)relation).getTableDesc();
- if (table.hasPartitions()) {
+ if (table.hasPartition()) {
return true;
}
}
@@ -82,7 +82,7 @@ public class PartitionedTableRewriter implements RewriteRule {
for (RelationNode relation : block.getRelations()) {
if (relation.getType() == NodeType.SCAN) {
TableDesc table = ((ScanNode)relation).getTableDesc();
- if (table.hasPartitions()) {
+ if (table.hasPartition()) {
containsPartitionedTables = true;
}
}
@@ -237,10 +237,10 @@ public class PartitionedTableRewriter implements RewriteRule {
FileSystem fs = table.getPath().getFileSystem(systemConf);
LOG.info("Partitioned Table Dir: " + table.getPath());
LOG.info("Summary: " + fs.getContentSummary(table.getPath()).getDirectoryCount());
- PartitionDesc partitionDesc = scanNode.getTableDesc().getPartitions();
+ PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
Schema paritionValuesSchema = new Schema();
- for (Column column : partitionDesc.getColumns()) {
+ for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
paritionValuesSchema.addColumn(column);
}
@@ -349,7 +349,7 @@ public class PartitionedTableRewriter implements RewriteRule {
Stack<LogicalNode> stack) throws PlanningException {
TableDesc table = scanNode.getTableDesc();
- if (!table.hasPartitions()) {
+ if (!table.hasPartition()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index e70e6c2..4e60448 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -690,7 +690,7 @@ public class ProjectionPushDownRule extends
if (node.hasTargets()) {
targets = node.getTargets();
} else {
- targets = PlannerUtil.schemaToTargets(node.getOutSchema());
+ targets = PlannerUtil.schemaToTargets(node.getTableSchema());
}
List<Target> projectedTargets = TUtil.newList();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index e710f9d..4a3bf46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.query;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.logical.NodeType;
@@ -130,16 +130,16 @@ public class QueryContext extends Options {
return strVal != null ? new Path(strVal) : null;
}
- public boolean hasPartitions() {
+ public boolean hasPartition() {
return get(OUTPUT_PARTITIONS) != null;
}
- public void setPartitions(PartitionDesc partitionDesc) {
- put(OUTPUT_PARTITIONS, partitionDesc != null ? partitionDesc.toJson() : null);
+ public void setPartitionMethod(PartitionMethodDesc partitionMethodDesc) {
+ put(OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null);
}
- public PartitionDesc getPartitions() {
- return PartitionDesc.fromJson(get(OUTPUT_PARTITIONS));
+ public PartitionMethodDesc getPartitionMethod() {
+ return PartitionMethodDesc.fromJson(get(OUTPUT_PARTITIONS));
}
public void setOutputOverwrite() {