You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/12/19 04:10:33 UTC
[1/3] TAJO-338 - Add Query Optimization Part for Column-Partitioned
Tables. (hyunsik)
Updated Branches:
refs/heads/master 4dbecf901 -> f58f6ee82
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index ed529a9..e7eaf40 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -19,26 +19,34 @@
package org.apache.tajo.engine.utils;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.dataserver.HttpUtil;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -401,4 +409,102 @@ public class TupleUtil {
}
return aTuple;
}
+
+ @SuppressWarnings("unused")
+ public static Collection<Tuple> filterTuple(Schema schema, Collection<Tuple> tupleBlock, EvalNode filterCondition) {
+ TupleBlockFilterScanner filter = new TupleBlockFilterScanner(schema, tupleBlock, filterCondition);
+ return filter.nextBlock();
+ }
+
+ private static class TupleBlockFilterScanner {
+ private EvalNode qual;
+ private EvalContext qualCtx;
+ private Iterator<Tuple> iterator;
+ private Schema schema;
+
+ public TupleBlockFilterScanner(Schema schema, Collection<Tuple> tuples, EvalNode qual) {
+ this.schema = schema;
+ this.qual = qual;
+ this.qualCtx = qual.newContext();
+ this.iterator = tuples.iterator();
+ }
+
+ public List<Tuple> nextBlock() {
+ List<Tuple> results = Lists.newArrayList();
+
+ Tuple tuple;
+ while (iterator.hasNext()) {
+ tuple = iterator.next();
+ qual.eval(qualCtx, schema, tuple);
+ if (qual.terminate(qualCtx).asBool()) {
+ results.add(tuple);
+ }
+ }
+ return results;
+ }
+ }
+
+ /**
+ * Take a look at a column partition path. A partition path consists
+ * of a table path part and column values part. This method transforms
+ * a partition path into a tuple with a given partition column schema.
+ *
+ * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
+ * ^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^
+ * table path part column values part
+ *
+ * When a file path is given, it can perform two ways depending on beNullIfFile flag.
+ * If it is true, it returns NULL when a given path is a file.
+ * Otherwise, it returns a built tuple regardless of file or directory.
+ *
+ * @param partitionColumnSchema The partition column schema
+ * @param partitionPath The partition path
+ * @param beNullIfFile If true, this method returns NULL when a given path is a file.
+ * @return The tuple transformed from a column values part.
+ */
+ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
+ boolean beNullIfFile) {
+ int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
+
+ if (startIdx == -1) { // if there is no partition column in the patch
+ return null;
+ }
+ String columnValuesPart = partitionPath.toString().substring(startIdx);
+
+ String [] columnValues = columnValuesPart.split("/");
+
+ // true means this is a file.
+ if (beNullIfFile && partitionColumnSchema.getColumnNum() < columnValues.length) {
+ return null;
+ }
+
+ Tuple tuple = new VTuple(partitionColumnSchema.getColumnNum());
+ int i = 0;
+ for (; i < columnValues.length && i < partitionColumnSchema.getColumnNum(); i++) {
+ String [] parts = columnValues[i].split("=");
+ if (parts.length != 2) {
+ return null;
+ }
+ int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
+ Column keyColumn = partitionColumnSchema.getColumn(columnId);
+ tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), parts[1]));
+ }
+ for (; i < partitionColumnSchema.getColumnNum(); i++) {
+ tuple.put(i, NullDatum.get());
+ }
+ return tuple;
+ }
+
+ /**
+ * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
+ * Then, you will get a string 'col1='.
+ *
+ * @param partitionColumn the schema of column partition
+ * @return The first part string of column partition path.
+ */
+ private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(partitionColumn.getColumn(0).getColumnName()).append("=");
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index d432017..b1deb43 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
@@ -470,7 +472,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
task.getLogicalPlan().toJson(),
context.getQueryContext(),
subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
- if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+ if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
taskAssign.setInterQuery();
}
@@ -490,6 +492,19 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
}
+ private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+ if (masterPlan.isRoot(block)) {
+ return false;
+ }
+
+ ExecutionBlock parent = masterPlan.getParent(block);
+ if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+ return false;
+ }
+
+ return true;
+ }
+
public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
Iterator<TaskRequestEvent> it = taskRequests.iterator();
@@ -519,7 +534,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
context.getQueryContext(),
subQuery.getDataChannel(),
subQuery.getBlock().getEnforcer());
- if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+ if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
taskAssign.setInterQuery();
}
for (ScanNode scan : task.getScanNodes()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 10f42c5..4575bf8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -32,7 +32,7 @@ import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
@@ -86,7 +86,7 @@ public class GlobalEngine extends AbstractService {
analyzer = new SQLAnalyzer();
converter = new HiveConverter();
planner = new LogicalPlanner(context.getCatalog());
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(context.getConf());
verifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
hookManager = new DistributedQueryHookManager();
@@ -234,7 +234,7 @@ public class GlobalEngine extends AbstractService {
if (!state.verified()) {
StringBuilder sb = new StringBuilder();
for (String error : state.getErrorMessages()) {
- sb.append("ERROR: ").append(error).append("\n");
+ sb.append(error).append("\n");
}
throw new VerifyException(sb.toString());
}
@@ -263,7 +263,7 @@ public class GlobalEngine extends AbstractService {
}
public TableDesc createTableOnPath(String tableName, Schema schema, TableMeta meta,
- Path path, boolean isCreated, Partitions partitions)
+ Path path, boolean isCreated, PartitionDesc partitionDesc)
throws IOException {
if (catalog.existsTable(tableName)) {
throw new AlreadyExistsTableException(tableName);
@@ -291,7 +291,9 @@ public class GlobalEngine extends AbstractService {
stats.setNumBytes(totalSize);
TableDesc desc = CatalogUtil.newTableDesc(tableName, schema, meta, path);
desc.setStats(stats);
- desc.setPartitions(partitions);
+ if (partitionDesc != null) {
+ desc.setPartitions(partitionDesc);
+ }
catalog.addTable(desc);
LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 863ed18..861147a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -35,8 +35,7 @@ import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ipc.ClientProtos;
@@ -316,12 +315,15 @@ public class TajoMasterClientService extends AbstractService {
Schema schema = new Schema(request.getSchema());
TableMeta meta = new TableMeta(request.getMeta());
- Partitions partitions = new Partitions(request.getPartitions());
+ PartitionDesc partitionDesc = null;
+ if (request.hasPartitions()) {
+ partitionDesc = new PartitionDesc(request.getPartitions());
+ }
TableDesc desc;
try {
desc = context.getGlobalEngine().createTableOnPath(request.getName(), schema,
- meta, path, false, partitions);
+ meta, path, false, partitionDesc);
} catch (Exception e) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 828ebfa..5d717ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -266,7 +266,7 @@ public class QueryMasterTask extends CompositeService {
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer();
+ LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
Expr expr;
if (queryContext.isHiveQueryMode()) {
HiveConverter hiveConverter = new HiveConverter();
@@ -297,6 +297,14 @@ public class QueryMasterTask extends CompositeService {
tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
}
}
+
+ scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
+ if(scanNodes != null) {
+ for(LogicalNode eachScanNode: scanNodes) {
+ ScanNode scanNode = (ScanNode)eachScanNode;
+ tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+ }
+ }
}
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 7fedd4f..03659c3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -43,10 +43,7 @@ import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.AbstractTaskScheduler;
import org.apache.tajo.master.TaskRunnerGroupEvent;
@@ -745,21 +742,42 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.eventHandler.handle(event);
}
+ /**
+ * It creates a number of fragments for all partitions.
+ */
+ private static Collection<FileFragment> getFragmentsFromPartitionedTable(SubQuery subQuery,
+ ScanNode scan,
+ TableDesc table) throws IOException {
+ List<FileFragment> fragments = Lists.newArrayList();
+ PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+ for (Path path : partitionsScan.getInputPaths()) {
+ fragments.addAll(subQuery.getStorageManager().getSplits(
+ scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
+ }
+ partitionsScan.setInputPaths(null);
+ return fragments;
+ }
+
private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
ScanNode[] scans = execBlock.getScanNodes();
Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
- TableMeta meta;
- Path inputPath;
-
ScanNode scan = scans[0];
- TableDesc desc = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
- inputPath = desc.getPath();
- meta = desc.getMeta();
+ TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
+
+ Collection<FileFragment> fragments;
+ TableMeta meta = table.getMeta();
- // TODO - should be change the inner directory
- List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
- inputPath);
+ // Depending on scanner node's type, it creates fragments. If scan is for
+ // a partitioned table, It will creates lots fragments for all partitions.
+ // Otherwise, it creates at least one fragments for a table, which may
+ // span a number of blocks or possibly consists of a number of files.
+ if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+ fragments = getFragmentsFromPartitionedTable(subQuery, scan, table);
+ } else {
+ Path inputPath = table.getPath();
+ fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
+ }
QueryUnit queryUnit;
List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index d95c352..0637023 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -124,7 +124,7 @@ message CreateTableRequest {
required SchemaProto schema = 2;
required TableProto meta = 3;
required string path = 4;
- optional PartitionsProto partitions = 5;
+ optional PartitionDescProto partitions = 5;
}
message DropTableRequest {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 8e257f2..44004d2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -380,8 +380,8 @@ public class TestTajoClient {
assertFalse(client.existTable(tableName));
- String sql = "create table " + tableName + " (deptname text, score int4)";
- sql += "PARTITION BY COLUMN (deptname)";
+ String sql = "create table " + tableName + " (score int4)";
+ sql += "PARTITION BY COLUMN (deptname text)";
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index c35c786..ca1259b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -97,7 +97,7 @@ public class ExprTestBase {
Schema inputSchema = null;
if (schema != null) {
inputSchema = (Schema) schema.clone();
- inputSchema.setQualifier(tableName, true);
+ inputSchema.setQualifier(tableName);
int targetIdx [] = new int[inputSchema.getColumnNum()];
for (int i = 0; i < targetIdx.length; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 1850081..3c67068 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -169,6 +169,7 @@ public class TestEvalTreeUtil {
"select score from people where 10 * 2 > score * 10", // 4
"select score from people where score < 10 and 4 < score", // 5
"select score from people where score < 10 and 4 < score and age > 5", // 6
+ "select score from people where (score > 1 and score < 3) or (7 < score and score < 10)", // 7
};
@Test
@@ -208,7 +209,7 @@ public class TestEvalTreeUtil {
public final void testGetCNF() {
// "select score from people where score < 10 and 4 < score "
EvalNode node = getRootSelection(QUERIES[5]);
- EvalNode [] cnf = EvalTreeUtil.getConjNormalForm(node);
+ EvalNode [] cnf = AlgebraicUtil.toConjunctiveNormalFormArray(node);
Column col1 = new Column("people.score", TajoDataTypes.Type.INT4);
@@ -235,26 +236,37 @@ public class TestEvalTreeUtil {
public final void testTransformCNF2Singleton() {
// "select score from people where score < 10 and 4 < score "
EvalNode node = getRootSelection(QUERIES[6]);
- EvalNode [] cnf1 = EvalTreeUtil.getConjNormalForm(node);
+ EvalNode [] cnf1 = AlgebraicUtil.toConjunctiveNormalFormArray(node);
assertEquals(3, cnf1.length);
- EvalNode conj = EvalTreeUtil.transformCNF2Singleton(cnf1);
- EvalNode [] cnf2 = EvalTreeUtil.getConjNormalForm(conj);
+ EvalNode conj = AlgebraicUtil.createSingletonExprFromCNF(cnf1);
+ EvalNode [] cnf2 = AlgebraicUtil.toConjunctiveNormalFormArray(conj);
Set<EvalNode> set1 = Sets.newHashSet(cnf1);
Set<EvalNode> set2 = Sets.newHashSet(cnf2);
assertEquals(set1, set2);
}
+
+ @Test
+ public final void testGetDNF() {
+ // "select score from people where score > 1 and score < 3 or score > 7 and score < 10", // 7
+ EvalNode node = getRootSelection(QUERIES[7]);
+ EvalNode [] cnf = AlgebraicUtil.toDisjunctiveNormalFormArray(node);
+ assertEquals(2, cnf.length);
+
+ assertEquals("people.score (INT4(0)) > 1 AND people.score (INT4(0)) < 3", cnf[0].toString());
+ assertEquals("7 < people.score (INT4(0)) AND people.score (INT4(0)) < 10", cnf[1].toString());
+ }
@Test
public final void testSimplify() throws PlanningException {
Target [] targets = getRawTargets(QUERIES[0]);
- EvalNode node = AlgebraicUtil.simplify(targets[0].getEvalTree());
+ EvalNode node = AlgebraicUtil.eliminateConstantExprs(targets[0].getEvalTree());
EvalContext nodeCtx = node.newContext();
assertEquals(EvalType.CONST, node.getType());
node.eval(nodeCtx, null, null);
assertEquals(7, node.terminate(nodeCtx).asInt4());
- node = AlgebraicUtil.simplify(targets[1].getEvalTree());
+ node = AlgebraicUtil.eliminateConstantExprs(targets[1].getEvalTree());
assertEquals(EvalType.CONST, node.getType());
nodeCtx = node.newContext();
node.eval(nodeCtx, null, null);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index 6775aa9..389a9e5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -24,7 +24,6 @@ import org.apache.tajo.algebra.CreateTable;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.LiteralValue;
import org.apache.tajo.algebra.OpType;
-import org.apache.tajo.engine.parser.SQLParser.Boolean_value_expressionContext;
import org.apache.tajo.engine.parser.SQLParser.SqlContext;
import org.apache.tajo.util.FileUtil;
import org.junit.Test;
@@ -321,9 +320,9 @@ public class TestSQLAnalyzer {
assertEquals(CreateTable.PartitionType.COLUMN, createTable.getPartition().getPartitionType());
CreateTable.ColumnPartition columnPartition = createTable.getPartition();
assertEquals(3, columnPartition.getColumns().length);
- assertEquals("col1", columnPartition.getColumns()[0].getCanonicalName());
- assertEquals("col2", columnPartition.getColumns()[1].getCanonicalName());
- assertEquals("col3", columnPartition.getColumns()[2].getCanonicalName());
+ assertEquals("col3", columnPartition.getColumns()[0].getColumnName());
+ assertEquals("col4", columnPartition.getColumns()[1].getColumnName());
+ assertEquals("col5", columnPartition.getColumns()[2].getColumnName());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 50bc8dc..92faec0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -83,7 +83,7 @@ public class TestLogicalOptimizer {
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(util.getConfiguration());
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 025c84b..1b38acd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -74,7 +74,7 @@ public class TestLogicalPlan {
catalog.addTable(d);
}
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(util.getConfiguration());
}
public static void tearDown() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index c9499d6..a8f4631 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -143,7 +143,7 @@ public class TestBSTIndexExec {
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 57f2376..c43046f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -122,7 +122,7 @@ public class TestHashAntiJoinExec {
catalog.addTable(people);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index d03f3c2..93f9c33 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -126,7 +126,7 @@ public class TestHashSemiJoinExec {
catalog.addTable(people);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b57ef3a..b4c66a1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -150,7 +150,7 @@ public class TestPhysicalPlanner {
catalog.addTable(score);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(conf);
masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 45badd5..07f726d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -96,7 +96,7 @@ public class TestSortExec {
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(conf);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 629ddb5..b770de5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -240,205 +240,4 @@ public class TestInsertQuery {
assertTrue(codec instanceof DeflateCodec);
}
}
-
- @Test
- public final void testInsertOverwritePartitionByColumn1() throws Exception {
- String tableName ="InsertOverwritePartitionByColumn1";
- ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1) ");
- res.close();
- TajoTestingCluster cluster = tpch.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(tableName));
-
- res = tpch.execute("insert overwrite into " + tableName
- + " select l_orderkey, l_partkey, l_quantity from lineitem");
- res.close();
-
- TableDesc desc = catalog.getTableDesc(tableName);
- Path path = desc.getPath();
-
- FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
- assertTrue(fs.isDirectory(path));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertEquals(5, desc.getStats().getNumRows().intValue());
- }
-
- @Test
- public final void testInsertOverwritePartitionByColumn2() throws Exception {
- String tableName ="InsertOverwritePartitionByColumn2";
- ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1, col2) ");
- res.close();
- TajoTestingCluster cluster = tpch.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(tableName));
-
- res = tpch.execute("insert overwrite into " + tableName
- + " select l_orderkey, l_partkey, l_quantity from lineitem");
- res.close();
-
- TableDesc desc = catalog.getTableDesc(tableName);
- Path path = desc.getPath();
-
- FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
- assertTrue(fs.isDirectory(path));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
- assertEquals(5, desc.getStats().getNumRows().intValue());
- }
-
- @Test
- public final void testInsertOverwritePartitionByColumn3() throws Exception {
- String tableName ="InsertOverwritePartitionByColumn3";
- ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1, col2, col3) ");
- res.close();
- TajoTestingCluster cluster = tpch.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(tableName));
-
- res = tpch.execute("insert overwrite into " + tableName
- + " select l_orderkey, l_partkey, l_quantity from lineitem");
- res.close();
-
- TableDesc desc = catalog.getTableDesc(tableName);
- Path path = desc.getPath();
-
- FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
- assertTrue(fs.isDirectory(path));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
- assertEquals(5, desc.getStats().getNumRows().intValue());
-
- }
- @Test
- public final void testInsertOverwritePartitionByColumnWithCompression1() throws Exception {
- String tableName = "testInsertOverwritePartitionByColumnWithCompression1";
- ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1)");
- res.close();
- TajoTestingCluster cluster = tpch.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(tableName));
-
- res = tpch.execute("insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem");
- res.close();
- TableDesc desc = catalog.getTableDesc(tableName);
- assertEquals(5, desc.getStats().getNumRows().intValue());
-
- FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
- assertTrue(fs.exists(desc.getPath()));
- CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
- Path path = desc.getPath();
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-
- for (FileStatus partition : fs.listStatus(path)){
- assertTrue(fs.isDirectory(partition.getPath()));
- for (FileStatus file : fs.listStatus(partition.getPath())) {
- CompressionCodec codec = factory.getCodec(file.getPath());
- assertTrue(codec instanceof DeflateCodec);
- }
- }
- }
-
- @Test
- public final void testInsertOverwritePartitionByColumnWithCompression2() throws Exception {
- String tableName = "testInsertOverwritePartitionByColumnWithCompression2";
- ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1, col2)");
- res.close();
- TajoTestingCluster cluster = tpch.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(tableName));
-
- res = tpch.execute("insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem");
- res.close();
- TableDesc desc = catalog.getTableDesc(tableName);
- assertEquals(5, desc.getStats().getNumRows().intValue());
-
- FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
- assertTrue(fs.exists(desc.getPath()));
- CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
- Path path = desc.getPath();
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-
- for (FileStatus partition1 : fs.listStatus(path)){
- assertTrue(fs.isDirectory(partition1.getPath()));
- for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
- assertTrue(fs.isDirectory(partition2.getPath()));
- for (FileStatus file : fs.listStatus(partition2.getPath())) {
- CompressionCodec codec = factory.getCodec(file.getPath());
- assertTrue(codec instanceof DeflateCodec);
- }
- }
- }
- }
-
- @Test
- public final void testInsertOverwritePartitionByColumnWithCompression3() throws Exception {
- String tableName = "testInsertOverwritePartitionByColumnWithCompression3";
- ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1, col2, col3)");
- res.close();
- TajoTestingCluster cluster = tpch.getTestingCluster();
- CatalogService catalog = cluster.getMaster().getCatalog();
- assertTrue(catalog.existsTable(tableName));
-
- res = tpch.execute("insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem");
- res.close();
- TableDesc desc = catalog.getTableDesc(tableName);
- assertEquals(5, desc.getStats().getNumRows().intValue());
-
- FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
- assertTrue(fs.exists(desc.getPath()));
- CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
- Path path = desc.getPath();
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
- assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-
- for (FileStatus partition1 : fs.listStatus(path)){
- assertTrue(fs.isDirectory(partition1.getPath()));
- for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
- assertTrue(fs.isDirectory(partition2.getPath()));
- for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
- assertTrue(fs.isDirectory(partition3.getPath()));
- for (FileStatus file : fs.listStatus(partition3.getPath())) {
- CompressionCodec codec = factory.getCodec(file.getPath());
- assertTrue(codec instanceof DeflateCodec);
- }
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
new file mode 100644
index 0000000..4fb5b8a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class TestTablePartitions {
+
+ private static TpchTestBase tpch;
+ public TestTablePartitions() throws IOException {
+ super();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ tpch = TpchTestBase.getInstance();
+ }
+
+ @Test
+ public final void testColumnPartitionedTableByOneColumn() throws Exception {
+ String tableName ="testColumnPartitionedTableByOneColumn";
+ ResultSet res = tpch.execute(
+ "create table " + tableName +" (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
+ res.close();
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ res = tpch.execute("insert overwrite into " + tableName
+ + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(tableName);
+ Path path = desc.getPath();
+
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ assertTrue(fs.isDirectory(path));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+
+ res = tpch.execute(
+ "select distinct * from " + tableName + " where (key = 45.0 or key = 38.0) and null_col is null");
+
+ Map<Double, int []> resultRows1 = Maps.newHashMap();
+ resultRows1.put(45.0d, new int[]{3, 2});
+ resultRows1.put(38.0d, new int[]{2, 2});
+
+ for (int i = 0; i < 3 && res.next(); i++) {
+ resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+ resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+ }
+ res.close();
+ }
+
+ @Test
+ public final void testColumnPartitionedTableByThreeColumns() throws Exception {
+ String tableName ="testColumnPartitionedTableByThreeColumns";
+ ResultSet res = tpch.execute(
+ "create table " + tableName +" (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
+ res.close();
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ res = tpch.execute("insert overwrite into " + tableName
+ + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+ res.close();
+
+ TableDesc desc = catalog.getTableDesc(tableName);
+ Path path = desc.getPath();
+
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ assertTrue(fs.isDirectory(path));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+
+ res = tpch.execute("select * from " + tableName + " where col2 = 2");
+
+ Map<Double, int []> resultRows1 = Maps.newHashMap();
+ resultRows1.put(45.0d, new int[]{3, 2});
+ resultRows1.put(38.0d, new int[]{2, 2});
+
+ for (int i = 0; i < 3 && res.next(); i++) {
+ resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+ resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+ }
+ res.close();
+
+ Map<Double, int []> resultRows2 = Maps.newHashMap();
+ resultRows2.put(49.0d, new int[]{3, 3});
+ resultRows2.put(45.0d, new int[]{3, 2});
+ resultRows2.put(38.0d, new int[]{2, 2});
+
+ res = tpch.execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+ for (int i = 0; i < 3 && res.next(); i++) {
+ resultRows2.get(res.getDouble(4))[0] = res.getInt(2);
+ resultRows2.get(res.getDouble(4))[1] = res.getInt(3);
+ }
+ assertFalse(res.next());
+ res.close();
+ }
+
+ @Test
+ public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
+ String tableName = "testColumnPartitionedTableByOneColumnsWithCompression";
+ ResultSet res = tpch.execute(
+ "create table " + tableName + " (col2 int4, col3 float8) USING csv " +
+ "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+ "PARTITION BY column(col1 int4)");
+ res.close();
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ res = tpch.execute(
+ "insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ assertTrue(fs.exists(desc.getPath()));
+ CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+ Path path = desc.getPath();
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+
+ for (FileStatus partition : fs.listStatus(path)){
+ assertTrue(fs.isDirectory(partition.getPath()));
+ for (FileStatus file : fs.listStatus(partition.getPath())) {
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+ }
+ }
+ }
+
+ @Test
+ public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception {
+ String tableName = "testColumnPartitionedTableByTwoColumnsWithCompression";
+ ResultSet res = tpch.execute("create table " + tableName + " (col3 float8, col4 text) USING csv " +
+ "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+ "PARTITION by column(col1 int4, col2 int4)");
+ res.close();
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ res = tpch.execute(
+ "insert overwrite into " + tableName +
+ " select l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ assertTrue(fs.exists(desc.getPath()));
+ CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+ Path path = desc.getPath();
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+
+ for (FileStatus partition1 : fs.listStatus(path)){
+ assertTrue(fs.isDirectory(partition1.getPath()));
+ for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+ assertTrue(fs.isDirectory(partition2.getPath()));
+ for (FileStatus file : fs.listStatus(partition2.getPath())) {
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+ }
+ }
+ }
+ }
+
+ @Test
+ public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception {
+ String tableName = "testColumnPartitionedTableNoMatchedPartition";
+ ResultSet res = tpch.execute(
+ "create table " + tableName + " (col4 text) USING csv " +
+ "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+ "partition by column(col1 int4, col2 int4, col3 float8)");
+ res.close();
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ res = tpch.execute(
+ "insert overwrite into " + tableName +
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ assertTrue(fs.exists(desc.getPath()));
+ CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+ Path path = desc.getPath();
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+ for (FileStatus partition1 : fs.listStatus(path)){
+ assertTrue(fs.isDirectory(partition1.getPath()));
+ for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+ assertTrue(fs.isDirectory(partition2.getPath()));
+ for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+ assertTrue(fs.isDirectory(partition3.getPath()));
+ for (FileStatus file : fs.listStatus(partition3.getPath())) {
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+ }
+ }
+ }
+ }
+
+ res = tpch.execute("select * from " + tableName + " where col2 = 2");
+
+ Map<Double, int []> resultRows1 = Maps.newHashMap();
+ resultRows1.put(45.0d, new int[]{3, 2});
+ resultRows1.put(38.0d, new int[]{2, 2});
+
+ for (int i = 0; i < 3 && res.next(); i++) {
+ resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+ resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+ }
+ res.close();
+
+ Map<Double, int []> resultRows2 = Maps.newHashMap();
+ resultRows2.put(49.0d, new int[]{3, 3});
+ resultRows2.put(45.0d, new int[]{3, 2});
+ resultRows2.put(38.0d, new int[]{2, 2});
+
+ res = tpch.execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+ for (int i = 0; i < 3 && res.next(); i++) {
+ resultRows2.get(res.getDouble(4))[0] = res.getInt(2);
+ resultRows2.get(res.getDouble(4))[1] = res.getInt(3);
+ }
+ assertFalse(res.next());
+ res.close();
+ }
+
+ @Test
+ public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
+ String tableName = "testColumnPartitionedTableNoMatchedPartition";
+ ResultSet res = tpch.execute(
+ "create table " + tableName + " (col4 text) USING csv " +
+ "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+ "partition by column(col1 int4, col2 int4, col3 float8)");
+ res.close();
+ TajoTestingCluster cluster = tpch.getTestingCluster();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ assertTrue(catalog.existsTable(tableName));
+
+ res = tpch.execute(
+ "insert overwrite into " + tableName +
+ " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+ res.close();
+ TableDesc desc = catalog.getTableDesc(tableName);
+ assertEquals(5, desc.getStats().getNumRows().intValue());
+
+ FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+ assertTrue(fs.exists(desc.getPath()));
+ CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+ Path path = desc.getPath();
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+ assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+ for (FileStatus partition1 : fs.listStatus(path)){
+ assertTrue(fs.isDirectory(partition1.getPath()));
+ for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+ assertTrue(fs.isDirectory(partition2.getPath()));
+ for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+ assertTrue(fs.isDirectory(partition3.getPath()));
+ for (FileStatus file : fs.listStatus(partition3.getPath())) {
+ CompressionCodec codec = factory.getCodec(file.getPath());
+ assertTrue(codec instanceof DeflateCodec);
+ }
+ }
+ }
+ }
+
+ res = tpch.execute("select * from " + tableName + " where col2 = 9");
+ assertFalse(res.next());
+ res.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index 6b3d90f..3c4d3c2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.util;
+import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -33,8 +34,7 @@ import org.apache.tajo.worker.dataserver.HttpUtil;
import java.io.UnsupportedEncodingException;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestTupleUtil {
@Test
@@ -228,4 +228,52 @@ public class TestTupleUtil {
assertEquals(range, result);
}
}
+
+ @Test
+ public void testBuildTupleFromPartitionPath() {
+
+ Schema schema = new Schema();
+ schema.addColumn("key1", Type.INT8);
+ schema.addColumn("key2", Type.TEXT);
+
+ Path path = new Path("hdfs://tajo/warehouse/partition_test/");
+ Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+ assertNull(tuple);
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+ assertNull(tuple);
+
+ path = new Path("hdfs://tajo/warehouse/partition_test/key1=123");
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+ assertNotNull(tuple);
+ assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+ assertNotNull(tuple);
+ assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+
+ path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/part-0000"); // wrong cases;
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+ assertNull(tuple);
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+ assertNull(tuple);
+
+ path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc");
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+ assertNotNull(tuple);
+ assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+ assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+ assertNotNull(tuple);
+ assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+ assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+
+
+ path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc/part-0001");
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+ assertNull(tuple);
+
+ tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+ assertNotNull(tuple);
+ assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+ assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 5b6b5a3..4d78a27 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -74,7 +74,7 @@ public class TestExecutionBlockCursor {
analyzer = new SQLAnalyzer();
logicalPlanner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(conf);
AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf);
dispatcher = new AsyncDispatcher();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index ca1247a..0396f33 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -82,7 +82,7 @@ public class TestGlobalPlanner {
sqlAnalyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(util.getConfiguration());
globalPlanner = new GlobalPlanner(util.getConfiguration(),
StorageManagerFactory.getStorageManager(util.getConfiguration()));
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 6934872..43ea5f8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -79,7 +79,7 @@ public class TestRangeRetrieverHandler {
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer();
+ optimizer = new LogicalOptimizer(conf);
schema = new Schema();
schema.addColumn("empId", Type.INT4);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql b/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
index 65493a2..397e7ac 100644
--- a/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
+++ b/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
@@ -1,4 +1,4 @@
CREATE TABLE sales ( col1 int, col2 int)
-PARTITION BY COLUMN (col1, col2, col3);
+PARTITION BY COLUMN (col3 int, col4 float, col5 text);
[2/3] TAJO-338 - Add Query Optimization Part for Column-Partitioned
Tables. (hyunsik)
Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 638c92a..d262bc3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -18,7 +18,6 @@
package org.apache.tajo.engine.eval;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -31,8 +30,8 @@ import org.apache.tajo.exception.InternalException;
import java.util.*;
public class EvalTreeUtil {
- public static void changeColumnRef(EvalNode node, String oldName,
- String newName) {
+
+ public static void changeColumnRef(EvalNode node, String oldName, String newName) {
node.postOrder(new ChangeColumnRefVisitor(oldName, newName));
}
@@ -80,50 +79,9 @@ public class EvalTreeUtil {
node.postOrder(finder);
return finder.getColumnRefs();
}
-
- /**
- * Convert a list of conjunctive normal forms into a singleton expression.
- *
- * @param cnfExprs
- * @return The EvalNode object that merges all CNF-formed expressions.
- */
- public static EvalNode transformCNF2Singleton(EvalNode...cnfExprs) {
- if (cnfExprs.length == 1) {
- return cnfExprs[0];
- }
-
- return transformCNF2Singleton_(cnfExprs, 0);
- }
-
- private static EvalNode transformCNF2Singleton_(EvalNode [] evalNode, int idx) {
- if (idx == evalNode.length - 2) {
- return new BinaryEval(EvalType.AND, evalNode[idx], evalNode[idx + 1]);
- } else {
- return new BinaryEval(EvalType.AND, evalNode[idx],
- transformCNF2Singleton_(evalNode, idx + 1));
- }
- }
-
- /**
- * Get a list of exprs similar to CNF
- *
- * @param expr The expression to be transformed to an array of CNF-formed expressions.
- * @return An array of CNF-formed expressions
- */
- public static EvalNode [] getConjNormalForm(EvalNode expr) {
- List<EvalNode> list = new ArrayList<EvalNode>();
- getConjNormalForm(expr, list);
- return list.toArray(new EvalNode[list.size()]);
- }
-
- private static void getConjNormalForm(EvalNode node, List<EvalNode> found) {
- if (node.getType() == EvalType.AND) {
- getConjNormalForm(node.getLeftExpr(), found);
- getConjNormalForm(node.getRightExpr(), found);
- } else {
- found.add(node);
- }
- }
+
+
+
public static Schema getSchemaByTargets(Schema inputSchema, Target [] targets)
throws InternalException {
@@ -234,16 +192,8 @@ public class EvalTreeUtil {
}
}
- public static boolean isComparisonOperator(EvalNode expr) {
- return expr.getType() == EvalType.EQUAL ||
- expr.getType() == EvalType.LEQ ||
- expr.getType() == EvalType.LTH ||
- expr.getType() == EvalType.GEQ ||
- expr.getType() == EvalType.GTH;
- }
-
public static boolean isJoinQual(EvalNode expr) {
- return isComparisonOperator(expr) &&
+ return AlgebraicUtil.isComparisonOperator(expr) &&
expr.getLeftExpr().getType() == EvalType.FIELD &&
expr.getRightExpr().getType() == EvalType.FIELD;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
index e2411e3..59e8b31 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
@@ -71,7 +71,7 @@ public class InEval extends BinaryEval {
Datum value = tuple.get(fieldId);
for (Datum datum : values) {
- if (value.equals(datum)) {
+ if (value.equalsTo(datum).asBool()) {
isIncluded = true;
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
index ac7aeeb..e1c693e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
@@ -42,6 +42,10 @@ public class LikePredicateEval extends PatternMatchPredicateEval {
this.compiled = Pattern.compile(regex, flags);
}
+ public boolean isLeadingWildCard() {
+ return pattern.indexOf(".*") == 0;
+ }
+
@Override
public String toString() {
return leftExpr.toString() + (caseInsensitive ? "ILIKE" : "LIKE") + "'" + pattern +"'";
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 2cd91f9..ad8acf1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -906,9 +906,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
}
if (checkIfExist(ctx.table_partitioning_clauses())) {
- CreateTable.PartitionOption partitionOption =
+ PartitionDescExpr partitionDesc =
parseTablePartitioningClause(ctx.table_partitioning_clauses());
- createTable.setPartition(partitionOption);
+ createTable.setPartition(partitionDesc);
}
return createTable;
}
@@ -925,7 +925,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
return elements;
}
- public CreateTable.PartitionOption parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
+ public PartitionDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
if (checkIfExist(ctx.range_partitions())) { // For Range Partition
Range_partitionsContext rangePartitionsContext = ctx.range_partitions();
@@ -978,9 +978,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
return new ListPartition(getColumnReferences(ctx.list_partitions().column_reference_list()), specifiers);
} else if (checkIfExist(ctx.column_partitions())) { // For Column Partition (Hive Style)
- return new CreateTable.ColumnPartition(getColumnReferences(ctx.column_partitions().column_reference_list()));
+ return new CreateTable.ColumnPartition(getDefinitions(ctx.column_partitions().table_elements()), true);
} else {
- throw new SQLSyntaxError("Wrong partition option");
+ throw new SQLSyntaxError("Invalid Partition Type: " + ctx.toStringTree());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 9478648..0cee8dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -92,6 +92,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
case SCAN:
current = visitScan(context, plan, (ScanNode) node, stack);
break;
+ case PARTITIONS_SCAN:
+ current = visitScan(context, plan, (ScanNode) node, stack);
+ break;
case STORE:
current = visitStoreTable(context, plan, (StoreTableNode) node, stack);
break;
@@ -221,6 +224,11 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
}
@Override
+ public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node, Stack<LogicalNode> stack) throws PlanningException {
+ return null;
+ }
+
+ @Override
public RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
throws PlanningException {
stack.push(node);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index 640383e..5f11f1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.logical.join.FoundJoinOrder;
@@ -32,6 +33,7 @@ import org.apache.tajo.engine.planner.logical.join.JoinGraph;
import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm;
import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
+import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter;
import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
import java.util.Set;
@@ -49,12 +51,13 @@ public class LogicalOptimizer {
private BasicQueryRewriteEngine rulesAfterToJoinOpt;
private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
- public LogicalOptimizer() {
+ public LogicalOptimizer(TajoConf systemConf) {
rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
rulesAfterToJoinOpt = new BasicQueryRewriteEngine();
rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule());
+ rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf));
}
public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
@@ -130,7 +133,7 @@ public class LogicalOptimizer {
public LogicalNode visitFilter(JoinGraphContext context, LogicalPlan plan, SelectionNode node,
Stack<LogicalNode> stack) throws PlanningException {
super.visitFilter(context, plan, node, stack);
- context.quals.addAll(Lists.newArrayList(EvalTreeUtil.getConjNormalForm(node.getQual())));
+ context.quals.addAll(Lists.newArrayList(AlgebraicUtil.toConjunctiveNormalFormArray(node.getQual())));
return node;
}
@@ -239,7 +242,7 @@ public class LogicalOptimizer {
double filterFactor = 1;
if (joinNode.hasJoinQual()) {
- EvalNode [] quals = EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual());
+ EvalNode [] quals = AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual());
filterFactor = Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, quals.length);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index c06b7a7..c6fcefc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -47,6 +47,8 @@ public interface LogicalPlanVisitor <CONTEXT, RESULT> {
throws PlanningException;
RESULT visitScan(CONTEXT context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
throws PlanningException;
+ RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node,
+ Stack<LogicalNode> stack) throws PlanningException;
RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
throws PlanningException;
RESULT visitInsert(CONTEXT context, LogicalPlan plan, InsertNode node, Stack<LogicalNode> stack)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 63b7985..605b9df 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.*;
import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
@@ -50,10 +50,13 @@ import org.apache.tajo.engine.utils.SchemaUtil;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.util.TUtil;
+import java.util.Collection;
import java.util.List;
import java.util.Stack;
import static org.apache.tajo.algebra.Aggregation.GroupType;
+import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
+import static org.apache.tajo.algebra.CreateTable.PartitionType;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
@@ -407,7 +410,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// 3. build this plan:
EvalNode searchCondition = createEvalTree(plan, block, selection.getQual());
- SelectionNode selectionNode = new SelectionNode(plan.newPID(), searchCondition);
+ EvalNode simplified = AlgebraicUtil.eliminateConstantExprs(searchCondition);
+ SelectionNode selectionNode = new SelectionNode(plan.newPID(), simplified);
// 4. set child plan, update input/output schemas:
selectionNode.setChild(child);
@@ -772,8 +776,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return storeNode;
} else {
- CreateTableNode createTableNode = new CreateTableNode(context.plan.newPID(), expr.getTableName(),
- convertTableElementsSchema(expr.getTableElements()));
+
+ Schema tableSchema;
+ if (expr.hasPartition() && expr.getPartition().getPartitionType() == PartitionType.COLUMN &&
+ ((ColumnPartition)expr.getPartition()).isOmitValues()) {
+ ColumnDefinition [] merged = TUtil.concat(expr.getTableElements(),
+ ((ColumnPartition)expr.getPartition()).getColumns());
+ tableSchema = convertTableElementsSchema(merged);
+ } else {
+ tableSchema = convertTableElementsSchema(expr.getTableElements());
+ }
+
+ CreateTableNode createTableNode = new CreateTableNode(
+ context.plan.newPID(),
+ expr.getTableName(),
+ tableSchema);
if (expr.isExternal()) {
createTableNode.setExternal(true);
@@ -811,28 +828,28 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
* @return
* @throws PlanningException
*/
- private Partitions convertTableElementsPartition(PlanContext context,
+ private PartitionDesc convertTableElementsPartition(PlanContext context,
CreateTable expr) throws PlanningException {
Schema schema = convertTableElementsSchema(expr.getTableElements());
- Partitions partitions = null;
+ PartitionDesc partitionDesc = null;
List<Specifier> specifiers = null;
if (expr.hasPartition()) {
- partitions = new Partitions();
+ partitionDesc = new PartitionDesc();
specifiers = TUtil.newList();
- partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
.getPartitionType().name()));
- if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.HASH)) {
+ if (expr.getPartition().getPartitionType().equals(PartitionType.HASH)) {
CreateTable.HashPartition hashPartition = expr.getPartition();
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
, hashPartition.getColumns()));
if (hashPartition.getColumns() != null) {
if (hashPartition.getQuantifier() != null) {
String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
- partitions.setNumPartitions(Integer.parseInt(quantity));
+ partitionDesc.setNumPartitions(Integer.parseInt(quantity));
}
if (hashPartition.getSpecifiers() != null) {
@@ -841,21 +858,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
}
- if (specifiers.isEmpty() && partitions.getNumPartitions() > 0) {
- for (int i = 0; i < partitions.getNumPartitions(); i++) {
- String partitionName = partitions.getPartitionsType().name() + "_" + expr
+ if (specifiers.isEmpty() && partitionDesc.getNumPartitions() > 0) {
+ for (int i = 0; i < partitionDesc.getNumPartitions(); i++) {
+ String partitionName = partitionDesc.getPartitionsType().name() + "_" + expr
.getTableName() + "_" + i;
specifiers.add(new Specifier(partitionName));
}
}
if (!specifiers.isEmpty())
- partitions.setSpecifiers(specifiers);
+ partitionDesc.setSpecifiers(specifiers);
}
- } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.LIST)) {
+ } else if (expr.getPartition().getPartitionType().equals(PartitionType.LIST)) {
CreateTable.ListPartition listPartition = expr.getPartition();
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
, listPartition.getColumns()));
if (listPartition.getSpecifiers() != null) {
@@ -876,12 +893,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
specifiers.add(specifier);
}
if (!specifiers.isEmpty())
- partitions.setSpecifiers(specifiers);
+ partitionDesc.setSpecifiers(specifiers);
}
- } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.RANGE)) {
+ } else if (expr.getPartition().getPartitionType().equals(PartitionType.RANGE)) {
CreateTable.RangePartition rangePartition = expr.getPartition();
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+ partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
, rangePartition.getColumns()));
if (rangePartition.getSpecifiers() != null) {
@@ -903,17 +920,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
specifiers.add(specifier);
}
if (!specifiers.isEmpty())
- partitions.setSpecifiers(specifiers);
+ partitionDesc.setSpecifiers(specifiers);
}
- } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.COLUMN)) {
- CreateTable.ColumnPartition columnPartition = expr.getPartition();
-
- partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
- , columnPartition.getColumns()));
+ } else if (expr.getPartition().getPartitionType() == PartitionType.COLUMN) {
+ ColumnPartition columnPartition = expr.getPartition();
+ partitionDesc.setColumns(convertTableElementsSchema(columnPartition.getColumns()).getColumns());
+ partitionDesc.setOmitValues(columnPartition.isOmitValues());
}
}
- return partitions;
+ return partitionDesc;
}
@@ -933,7 +949,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return schema;
}
- private List<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
+ private Collection<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
ColumnReferenceExpr[] references) {
List<Column> columnList = TUtil.newList();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index bbecd50..73395a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -138,7 +138,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
return leftExec;
- } case SCAN:
+ }
+ case PARTITIONS_SCAN:
+ case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
return leftExec;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 6b23ed8..d541680 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -545,7 +545,7 @@ public class PlannerUtil {
* @return true if two operands refers to columns and the operator is comparison,
*/
public static boolean isJoinQual(EvalNode qual) {
- if (EvalTreeUtil.isComparisonOperator(qual)) {
+ if (AlgebraicUtil.isComparisonOperator(qual)) {
List<Column> left = EvalTreeUtil.findAllColumnRefs(qual.getLeftExpr());
List<Column> right = EvalTreeUtil.findAllColumnRefs(qual.getRightExpr());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
index a49451c..ac50c46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -48,6 +48,10 @@ public class Target implements Cloneable, GsonObject {
return !hasAlias() ? column.getQualifiedName() : alias;
}
+ public final void setExpr(EvalNode expr) {
+ this.expr = expr;
+ }
+
public final void setAlias(String alias) {
this.alias = alias;
this.column = new Column(alias, expr.getValueType());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 4f936fc..4f3976e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -53,6 +53,10 @@ public class ExecutionBlock {
this.scanlist.clear();
this.plan = plan;
+ if (plan == null) {
+ return;
+ }
+
LogicalNode node = plan;
ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
s.add(node);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 6ef35ce..abf5620 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -112,7 +112,7 @@ public class GlobalPlanner {
}
private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
- ExecutionBlock leftBlock, ExecutionBlock rightBlock)
+ ExecutionBlock leftBlock, ExecutionBlock rightBlock)
throws PlanningException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 942309d..d0c8373 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
@@ -36,7 +36,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
@Expose private Path path;
@Expose private Options options;
@Expose private boolean external;
- @Expose private Partitions partitions;
+ @Expose private PartitionDesc partitionDesc;
public CreateTableNode(int pid, String tableName, Schema schema) {
super(pid, NodeType.CREATE_TABLE);
@@ -92,16 +92,16 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
this.external = external;
}
- public Partitions getPartitions() {
- return partitions;
+ public PartitionDesc getPartitions() {
+ return partitionDesc;
}
- public void setPartitions(Partitions partitions) {
- this.partitions = partitions;
+ public void setPartitions(PartitionDesc partitionDesc) {
+ this.partitionDesc = partitionDesc;
}
public boolean hasPartition() {
- return this.partitions != null;
+ return this.partitionDesc != null;
}
@Override
@@ -121,7 +121,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
&& TUtil.checkEquals(path, other.path)
&& TUtil.checkEquals(options, other.options)
&& TUtil.checkEquals(partitionKeys, other.partitionKeys)
- && TUtil.checkEquals(partitions, other.partitions);
+ && TUtil.checkEquals(partitionDesc, other.partitionDesc);
} else {
return false;
}
@@ -137,7 +137,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
store.path = path != null ? new Path(path.toString()) : null;
store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
store.options = (Options) (options != null ? options.clone() : null);
- store.partitions = (Partitions) (partitions != null ? partitions.clone() : null);
+ store.partitionDesc = (PartitionDesc) (partitionDesc != null ? partitionDesc.clone() : null);
return store;
}
@@ -157,7 +157,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
sb.append(",\"storeType\": \"" + this.storageType);
sb.append(",\"path\" : \"" + this.path).append("\",");
sb.append(",\"external\" : \"" + this.external).append("\",");
- sb.append(",\"partitions\" : \"" + this.partitions).append("\",");
+ sb.append(",\"partitions\" : \"" + this.partitionDesc).append("\",");
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
.append("\n \"in schema\": ").append(getInSchema())
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 44790ec..eaaf0c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -37,6 +37,7 @@ public enum NodeType {
INTERSECT(IntersectNode.class),
LIMIT(LimitNode.class),
JOIN(JoinNode.class),
+ PARTITIONS_SCAN(PartitionedTableScanNode.class),
PROJECTION(ProjectionNode.class),
ROOT(LogicalRootNode.class),
SCAN(ScanNode.class),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
new file mode 100644
index 0000000..21f5ef4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.util.TUtil;
+
+public class PartitionedTableScanNode extends ScanNode {
+ @Expose Path [] inputPaths;
+
+ public PartitionedTableScanNode(int pid, ScanNode scanNode, Path[] inputPaths) {
+ super(pid, NodeType.PARTITIONS_SCAN, scanNode.getTableDesc());
+ this.setInSchema(scanNode.getInSchema());
+ this.setOutSchema(scanNode.getOutSchema());
+ this.alias = scanNode.alias;
+ this.renamedSchema = scanNode.renamedSchema;
+ this.qual = scanNode.qual;
+ this.targets = scanNode.targets;
+ this.inputPaths = inputPaths;
+ }
+
+ public void setInputPaths(Path [] paths) {
+ this.inputPaths = paths;
+ }
+
+ public Path [] getInputPaths() {
+ return inputPaths;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\"Partitions Scan\" : {\"table\":\"")
+ .append(getTableName()).append("\"");
+ if (hasAlias()) {
+ sb.append(",\"alias\": \"").append(alias);
+ }
+
+ if (hasQual()) {
+ sb.append(", \"qual\": \"").append(this.qual).append("\"");
+ }
+
+ if (hasTargets()) {
+ sb.append(", \"target list\": ");
+ boolean first = true;
+ for (Target target : targets) {
+ if (!first) {
+ sb.append(", ");
+ }
+ sb.append(target);
+ first = false;
+ }
+ }
+
+ if (inputPaths != null) {
+ sb.append(", \"Partition paths\": ");
+ for (Path path : inputPaths) {
+ sb.append("\n ");
+ sb.append(path);
+ }
+ sb.append("\n");
+ }
+
+ sb.append(",");
+ sb.append("\n \"out schema\": ").append(getOutSchema());
+ sb.append("\n \"in schema\": ").append(getInSchema());
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(this.tableDesc, this.qual, this.targets);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PartitionedTableScanNode) {
+ PartitionedTableScanNode other = (PartitionedTableScanNode) obj;
+
+ boolean eq = super.equals(other);
+ eq = eq && TUtil.checkEquals(this.tableDesc, other.tableDesc);
+ eq = eq && TUtil.checkEquals(this.qual, other.qual);
+ eq = eq && TUtil.checkEquals(this.targets, other.targets);
+ eq = eq && TUtil.checkEquals(this.inputPaths, other.inputPaths);
+
+ return eq;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ PartitionedTableScanNode unionScan = (PartitionedTableScanNode) super.clone();
+
+ unionScan.tableDesc = (TableDesc) this.tableDesc.clone();
+
+ if (hasQual()) {
+ unionScan.qual = (EvalNode) this.qual.clone();
+ }
+
+ if (hasTargets()) {
+ unionScan.targets = new Target[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ unionScan.targets[i] = (Target) targets[i].clone();
+ }
+ }
+
+ unionScan.inputPaths = inputPaths;
+
+ return unionScan;
+ }
+
+ @Override
+ public void preOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ public void postOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString("Scan on ").appendTitle(getTableName());
+ if (hasAlias()) {
+ planStr.appendTitle(" as ").appendTitle(alias);
+ }
+
+ if (hasQual()) {
+ planStr.addExplan("filter: ").appendExplain(this.qual.toString());
+ }
+
+ if (hasTargets()) {
+ planStr.addExplan("target list: ");
+ boolean first = true;
+ for (Target target : targets) {
+ if (!first) {
+ planStr.appendExplain(", ");
+ }
+ planStr.appendExplain(target.toString());
+ first = false;
+ }
+ }
+
+ if (inputPaths != null) {
+ planStr.addExplan("Path list: ");
+ int i = 0;
+ for (Path path : inputPaths) {
+ planStr.addExplan((i++) + ": ").appendExplain(path.toString());
+ }
+ }
+
+ planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString());
+ planStr.addDetail("in schema: ").appendDetail(getInSchema().toString());
+
+ return planStr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
index 6a30aa6..5ea61b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
@@ -24,7 +24,7 @@ public abstract class RelationNode extends LogicalNode {
public RelationNode(int pid, NodeType nodeType) {
super(pid, nodeType);
- assert(nodeType == NodeType.SCAN || nodeType == NodeType.TABLE_SUBQUERY);
+ assert(nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.TABLE_SUBQUERY);
}
public abstract String getTableName();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 04c7b5a..cd9c1f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -29,11 +29,16 @@ import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.util.TUtil;
public class ScanNode extends RelationNode implements Projectable {
- @Expose private TableDesc tableDesc;
- @Expose private String alias;
- @Expose private Schema renamedSchema;
- @Expose private EvalNode qual;
- @Expose private Target[] targets;
+ @Expose protected TableDesc tableDesc;
+ @Expose protected String alias;
+ @Expose protected Schema renamedSchema;
+ @Expose protected EvalNode qual;
+ @Expose protected Target[] targets;
+
+ protected ScanNode(int pid, NodeType nodeType, TableDesc desc) {
+ super(pid, nodeType);
+ this.tableDesc = desc;
+ }
public ScanNode(int pid, TableDesc desc) {
super(pid, NodeType.SCAN);
@@ -46,7 +51,7 @@ public class ScanNode extends RelationNode implements Projectable {
this(pid, desc);
this.alias = PlannerUtil.normalizeTableName(alias);
renamedSchema = getOutSchema();
- renamedSchema.setQualifier(this.alias, true);
+ renamedSchema.setQualifier(this.alias);
}
public String getTableName() {
@@ -119,7 +124,7 @@ public class ScanNode extends RelationNode implements Projectable {
first = false;
}
}
-
+
sb.append(",");
sb.append("\n \"out schema\": ").append(getOutSchema());
sb.append("\n \"in schema\": ").append(getInSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index b2bd937..634fa3a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
@@ -39,17 +39,17 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
@Expose private Options options;
@Expose private boolean isCreatedTable = false;
@Expose private boolean isOverwritten = false;
- @Expose private Partitions partitions;
+ @Expose private PartitionDesc partitionDesc;
public StoreTableNode(int pid, String tableName) {
super(pid, NodeType.STORE);
this.tableName = tableName;
}
- public StoreTableNode(int pid, String tableName, Partitions partitions) {
+ public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
super(pid, NodeType.STORE);
this.tableName = tableName;
- this.partitions = partitions;
+ this.partitionDesc = partitionDesc;
}
public final String getTableName() {
@@ -109,12 +109,12 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
return this.options;
}
- public Partitions getPartitions() {
- return partitions;
+ public PartitionDesc getPartitions() {
+ return partitionDesc;
}
- public void setPartitions(Partitions partitions) {
- this.partitions = partitions;
+ public void setPartitions(PartitionDesc partitionDesc) {
+ this.partitionDesc = partitionDesc;
}
@Override
@@ -146,7 +146,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
eq = eq && TUtil.checkEquals(options, other.options);
eq = eq && isCreatedTable == other.isCreatedTable;
eq = eq && isOverwritten == other.isOverwritten;
- eq = eq && TUtil.checkEquals(partitions, other.partitions);
+ eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
return eq;
} else {
return false;
@@ -163,7 +163,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
store.options = options != null ? (Options) options.clone() : null;
store.isCreatedTable = isCreatedTable;
store.isOverwritten = isOverwritten;
- store.partitions = partitions;
+ store.partitionDesc = partitionDesc;
return store;
}
@@ -188,8 +188,8 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",")
.append("\n \"in schema\": ").append(getInSchema());
- if(partitions != null) {
- sb.append(partitions.toString());
+ if(partitionDesc != null) {
+ sb.append(partitionDesc.toString());
}
sb.append("}");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index d1f0986..335d12f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -35,7 +35,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
this.tableName = PlannerUtil.normalizeTableName(tableName);
this.subQuery = subQuery;
setOutSchema((Schema) this.subQuery.getOutSchema().clone());
- getOutSchema().setQualifier(this.tableName, true);
+ getOutSchema().setQualifier(this.tableName);
setInSchema((Schema) this.subQuery.getInSchema().clone());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
index bd1b8d3..cbdad1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.PlanningException;
@@ -90,7 +90,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
joinNode.setInSchema(mergedSchema);
joinNode.setOutSchema(mergedSchema);
if (joinEdge.hasJoinQual()) {
- joinNode.setJoinQual(EvalTreeUtil.transformCNF2Singleton(joinEdge.getJoinQual()));
+ joinNode.setJoinQual(AlgebraicUtil.createSingletonExprFromCNF(joinEdge.getJoinQual()));
}
return joinNode;
}
@@ -206,7 +206,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
double filterFactor = 1;
if (joinNode.hasJoinQual()) {
filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR,
- EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()).length);
+ AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()).length);
return getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()) * filterFactor;
} else {
return Math.pow(getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()), 2);
@@ -215,7 +215,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
case SELECTION:
SelectionNode selectionNode = (SelectionNode) node;
return getCost(selectionNode.getChild()) *
- Math.pow(DEFAULT_SELECTION_FACTOR, EvalTreeUtil.getConjNormalForm(selectionNode.getQual()).length);
+ Math.pow(DEFAULT_SELECTION_FACTOR, AlgebraicUtil.toConjunctiveNormalFormArray(selectionNode.getQual()).length);
case TABLE_SUBQUERY:
TableSubQueryNode subQueryNode = (TableSubQueryNode) node;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index 66c82f3..74ef38a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
import com.google.common.collect.Sets;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.LogicalPlan;
@@ -35,7 +36,7 @@ import java.util.Set;
public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
JoinNode joinNode) throws PlanningException {
- Set<EvalNode> cnf = Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()));
+ Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
Set<EvalNode> nonJoinQuals = Sets.newHashSet();
for (EvalNode singleQual : cnf) {
if (PlannerUtil.isJoinQual(singleQual)) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index f7db4bd..db7e566 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
@@ -45,6 +47,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
/**
* This class is a physical operator to store at column partitioned table.
*/
@@ -56,15 +60,11 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
private Tuple tuple;
private Path storeTablePath;
private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();
- private int[] columnIndexes;
- private String[] columnNames;
-
+ private int[] partitionColumnIndices;
+ private String[] partitionColumnNames;
- /**
- * @throws java.io.IOException
- *
- */
- public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
+ public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
+ throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.plan = plan;
@@ -75,18 +75,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
meta = CatalogUtil.newTableMeta(plan.getStorageType());
}
+ // Rewrite a output schema because we don't have to store field values
+ // corresponding to partition key columns.
+ if (plan.getPartitions() != null && plan.getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+ rewriteColumnPartitionedTableSchema();
+ }
+
// Find column index to name subpartition directory path
if (this.plan.getPartitions() != null) {
if (this.plan.getPartitions().getColumns() != null) {
- columnIndexes = new int[plan.getPartitions().getColumns().size()];
- columnNames = new String[columnIndexes.length];
- for(int i = 0; i < plan.getPartitions().getColumns().size(); i++) {
- Column targetColumn = plan.getPartitions().getColumn(i);
+ partitionColumnIndices = new int[plan.getPartitions().getColumns().size()];
+ partitionColumnNames = new String[partitionColumnIndices.length];
+ Schema columnPartitionSchema = plan.getPartitions().getSchema();
+ for(int i = 0; i < columnPartitionSchema.getColumnNum(); i++) {
+ Column targetColumn = columnPartitionSchema.getColumn(i);
for(int j = 0; j < plan.getInSchema().getColumns().size();j++) {
Column inputColumn = plan.getInSchema().getColumn(j);
if (inputColumn.getColumnName().equals(targetColumn.getColumnName())) {
- columnIndexes[i] = j;
- columnNames[i] = targetColumn.getColumnName();
+ partitionColumnIndices[i] = j;
+ partitionColumnNames[i] = targetColumn.getColumnName();
}
}
}
@@ -94,6 +101,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
}
}
+ /**
+ * This method rewrites an input schema of column-partitioned table because
+ * there are no actual field values in data file in a column-partitioned table.
+ * So, this method removes partition key columns from the input schema.
+ */
+ private void rewriteColumnPartitionedTableSchema() {
+ PartitionDesc partitionDesc = plan.getPartitions();
+ Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+ columnPartitionSchema.setQualifier(plan.getTableName());
+
+ Schema modifiedOutputSchema = new Schema();
+ for (Column column : outSchema.toArray()) {
+ if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+ modifiedOutputSchema.addColumn(column);
+ }
+ }
+ outSchema = modifiedOutputSchema;
+ }
+
public void init() throws IOException {
super.init();
@@ -124,8 +150,7 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
LOG.info("File size: " + status.getLen());
}
- appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
- outSchema, dataFile);
+ appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
appenderMap.put(partition, appender);
@@ -148,12 +173,12 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
while((tuple = child.next()) != null) {
// set subpartition directory name
sb.delete(0, sb.length());
- if (columnIndexes != null) {
- for(int i = 0; i < columnIndexes.length; i++) {
- Datum datum = (Datum) tuple.get(columnIndexes[i]);
+ if (partitionColumnIndices != null) {
+ for(int i = 0; i < partitionColumnIndices.length; i++) {
+ Datum datum = tuple.get(partitionColumnIndices[i]);
if(i > 0)
sb.append("/");
- sb.append(columnNames[i]).append("=");
+ sb.append(partitionColumnNames[i]).append("=");
sb.append(datum.asChars());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index b799095..4e6cd64 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -27,9 +27,9 @@ import java.io.IOException;
public abstract class PhysicalExec implements SchemaObject {
protected final TaskAttemptContext context;
- protected final Schema inSchema;
- protected final Schema outSchema;
- protected final int outColumnNum;
+ protected Schema inSchema;
+ protected Schema outSchema;
+ protected int outColumnNum;
public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
final Schema outSchema) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 5783080..d17f7ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,15 +18,16 @@
package org.apache.tajo.engine.planner.physical;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.engine.planner.Target;
import org.apache.tajo.engine.planner.logical.ScanNode;
@@ -34,8 +35,11 @@ import org.apache.tajo.storage.*;
import java.io.IOException;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
public class SeqScanExec extends PhysicalExec {
private final ScanNode plan;
private Scanner scanner = null;
@@ -63,8 +67,67 @@ public class SeqScanExec extends PhysicalExec {
}
}
+ /**
+ * This method rewrites an input schema of column-partitioned table because
+ * there are no actual field values in data file in a column-partitioned table.
+ * So, this method removes partition key columns from the input schema.
+ *
+ * TODO - This implementation assumes that a fragment is always FileFragment.
+ * In the column partitioned table, a path has an important role to
+ * indicate partition keys. In this time, it is right. Later, we have to fix it.
+ */
+ private void rewriteColumnPartitionedTableSchema() throws IOException {
+ PartitionDesc partitionDesc = plan.getTableDesc().getPartitions();
+ Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+ List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+
+ // Get a partition key value from a given path
+ Tuple partitionRow =
+ TupleUtil.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), false);
+
+ // Remove partition key columns from an input schema.
+ columnPartitionSchema.setQualifier(inSchema.getColumn(0).getQualifier());
+ Schema modifiedInputSchema = new Schema();
+ for (Column column : inSchema.toArray()) {
+ if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+ modifiedInputSchema.addColumn(column);
+ }
+ }
+ this.inSchema = modifiedInputSchema;
+
+ // Targets or search conditions may contain column references.
+ // However, actual values absent in tuples. So, Replace all column references by constant datum.
+ for (Column column : columnPartitionSchema.toArray()) {
+ FieldEval targetExpr = new FieldEval(column);
+ EvalContext evalContext = targetExpr.newContext();
+ targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
+ Datum datum = targetExpr.terminate(evalContext);
+ ConstEval constExpr = new ConstEval(datum);
+ for (Target target : plan.getTargets()) {
+ if (target.getEvalTree().equals(targetExpr)) {
+ if (!target.hasAlias()) {
+ target.setAlias(target.getEvalTree().getName());
+ }
+ target.setExpr(constExpr);
+ } else {
+ EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
+ }
+ }
+
+ if (plan.hasQual()) {
+ EvalTreeUtil.replace(plan.getQual(), targetExpr, constExpr);
+ }
+ }
+ }
+
public void init() throws IOException {
Schema projected;
+
+ if (plan.getTableDesc().hasPartitions()
+ && plan.getTableDesc().getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+ rewriteColumnPartitionedTableSchema();
+ }
+
if (plan.hasTargets()) {
projected = new Schema();
Set<Column> columnSet = new HashSet<Column>();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 7673253..817e48a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -22,10 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.PlannerUtil;
@@ -65,7 +62,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
@Override
public LogicalNode visitFilter(Set<EvalNode> cnf, LogicalPlan plan, SelectionNode selNode, Stack<LogicalNode> stack)
throws PlanningException {
- cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(selNode.getQual())));
+ cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
stack.push(selNode);
visitChild(cnf, plan, selNode.getChild(), stack);
@@ -178,15 +175,15 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
EvalNode qual2 = null;
if (matched2.size() > 1) {
// merged into one eval tree
- qual2 = EvalTreeUtil.transformCNF2Singleton(
- matched2.toArray(new EvalNode [matched2.size()]));
+ qual2 = AlgebraicUtil.createSingletonExprFromCNF(
+ matched2.toArray(new EvalNode[matched2.size()]));
} else if (matched2.size() == 1) {
// if the number of matched expr is one
qual2 = matched2.get(0);
}
if (qual2 != null) {
- EvalNode conjQual2 = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual2);
+ EvalNode conjQual2 = AlgebraicUtil.createSingletonExprFromCNF(joinNode.getJoinQual(), qual2);
joinNode.setJoinQual(conjQual2);
cnf.removeAll(matched2);
} // for the remaining cnf, push it as usual
@@ -194,7 +191,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
}
if (joinNode.hasJoinQual()) {
- cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual())));
+ cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
}
visitChild(cnf, plan, left, stack);
@@ -210,7 +207,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
EvalNode qual = null;
if (matched.size() > 1) {
// merged into one eval tree
- qual = EvalTreeUtil.transformCNF2Singleton(
+ qual = AlgebraicUtil.createSingletonExprFromCNF(
matched.toArray(new EvalNode[matched.size()]));
} else if (matched.size() == 1) {
// if the number of matched expr is one
@@ -243,8 +240,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
EvalNode qual = null;
if (matched.size() > 1) {
// merged into one eval tree
- qual = EvalTreeUtil.transformCNF2Singleton(
- matched.toArray(new EvalNode [matched.size()]));
+ qual = AlgebraicUtil.createSingletonExprFromCNF(
+ matched.toArray(new EvalNode[matched.size()]));
} else if (matched.size() == 1) {
// if the number of matched expr is one
qual = matched.get(0);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
new file mode 100644
index 0000000..561424f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class PartitionedTableRewriter implements RewriteRule {
+ private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
+
+ private static final String NAME = "Partitioned Table Rewriter";
+ private final Rewriter rewriter = new Rewriter();
+
+ private final TajoConf systemConf;
+
+ public PartitionedTableRewriter(TajoConf conf) {
+ systemConf = conf;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public boolean isEligible(LogicalPlan plan) {
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ for (RelationNode relation : block.getRelations()) {
+ if (relation.getType() == NodeType.SCAN) {
+ TableDesc table = ((ScanNode)relation).getTableDesc();
+ if (table.hasPartitions()) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+ boolean containsPartitionedTables;
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ containsPartitionedTables = false;
+ for (RelationNode relation : block.getRelations()) {
+ if (relation.getType() == NodeType.SCAN) {
+ TableDesc table = ((ScanNode)relation).getTableDesc();
+ if (table.hasPartitions()) {
+ containsPartitionedTables = true;
+ }
+ }
+ }
+ if (containsPartitionedTables) {
+ rewriter.visitChild(block, plan, block.getRoot(), new Stack<LogicalNode>());
+ }
+ }
+ return plan;
+ }
+
+ private static class PartitionPathFilter implements PathFilter {
+ private FileSystem fs;
+ private Schema schema;
+ private EvalNode partitionFilter;
+ private EvalContext evalContext;
+
+
+ public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
+ this.schema = schema;
+ this.partitionFilter = partitionFilter;
+ evalContext = partitionFilter.newContext();
+ }
+
+ @Override
+ public boolean accept(Path path) {
+ Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+ if (tuple == null) { // if it is a file or not acceptable file
+ return false;
+ }
+ partitionFilter.eval(evalContext, schema, tuple);
+ return partitionFilter.terminate(evalContext).asBool();
+ }
+
+ @Override
+ public String toString() {
+ return partitionFilter.toString();
+ }
+ }
+
+ /**
+ * It assumes that each conjunctive form corresponds to one column.
+ *
+ * @param partitionColumns
+ * @param conjunctiveForms search condition corresponding to partition columns.
+ * If it is NULL, it means that there is no search condition for this table.
+ * @param tablePath
+ * @return
+ * @throws IOException
+ */
+ private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
+ throws IOException {
+
+ FileSystem fs = tablePath.getFileSystem(systemConf);
+
+ PathFilter [] filters;
+ if (conjunctiveForms == null) {
+ filters = buildAllAcceptingPathFilters(partitionColumns);
+ } else {
+ filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
+ }
+
+ // loop from one to the number of partition columns
+ Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
+
+ for (int i = 1; i < partitionColumns.getColumnNum(); i++) {
+ // Get all file status matched to a ith level path filter.
+ filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
+ }
+
+ return filteredPaths;
+ }
+
+ /**
+ * Build path filters for all levels with a list of filter conditions.
+ *
+ * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
+ * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
+ *
+ * Corresponding filter conditions will be placed on each path filter,
+ * If there is no corresponding expression for certain column,
+ * The condition will be filled with a true value.
+ *
+ * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
+ * There is no filter condition corresponding to col2.
+ * Then, the path filter conditions are corresponding to the followings:
+ *
+ * The first path filter: col1 = 'A'
+ * The second path filter: col1 = 'A' AND col2 IS NOT NULL
+ * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
+ *
+ * 'IS NOT NULL' predicate is always true against the partition path.
+ *
+ * @param partitionColumns
+ * @param conjunctiveForms
+ * @return
+ */
+ private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
+ EvalNode [] conjunctiveForms) {
+ // Building partition path filters for all levels
+ Column target;
+ PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+ List<EvalNode> accumulatedFilters = Lists.newArrayList();
+ for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+ target = partitionColumns.getColumn(i);
+
+ for (EvalNode expr : conjunctiveForms) {
+ if (EvalTreeUtil.findDistinctRefColumns(expr).contains(target)) {
+ // Accumulate one qual per level
+ accumulatedFilters.add(expr);
+ }
+ }
+
+ if (accumulatedFilters.size() < (i + 1)) {
+ accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+ }
+
+ EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+ accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+ filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+ }
+ return filters;
+ }
+
+ /**
+ * Build an array of path filters for all levels with all accepting filter condition.
+ * @param partitionColumns The partition columns schema
+ * @return The array of path filter, accpeting all partition paths.
+ */
+ private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+ Column target;
+ PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+ List<EvalNode> accumulatedFilters = Lists.newArrayList();
+ for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+ target = partitionColumns.getColumn(i);
+ accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+
+ EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+ accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+ filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+ }
+ return filters;
+ }
+
+ private static Path [] toPathArray(FileStatus[] fileStatuses) {
+ Path [] paths = new Path[fileStatuses.length];
+ for (int j = 0; j < fileStatuses.length; j++) {
+ paths[j] = fileStatuses[j].getPath();
+ }
+ return paths;
+ }
+
+ private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
+ TableDesc table = scanNode.getTableDesc();
+ FileSystem fs = table.getPath().getFileSystem(systemConf);
+ LOG.info("Partitioned Table Dir: " + table.getPath());
+ LOG.info("Summary: " + fs.getContentSummary(table.getPath()).getDirectoryCount());
+ PartitionDesc partitionDesc = scanNode.getTableDesc().getPartitions();
+
+ Schema paritionValuesSchema = new Schema();
+ for (Column column : partitionDesc.getColumns()) {
+ paritionValuesSchema.addColumn(column);
+ }
+
+ Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+
+ // if a query statement has a search condition, try to find indexable predicates
+ if (scanNode.hasQual()) {
+ EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
+ Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
+
+ // add qualifier to schema for qual
+ paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
+ for (Column column : paritionValuesSchema.getColumns()) {
+ for (EvalNode simpleExpr : conjunctiveForms) {
+ if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
+ indexablePredicateSet.add(simpleExpr);
+ }
+ }
+ }
+
+ // Partitions which are not matched to the partition filter conditions are pruned immediately.
+ // So, the partition filter conditions are not necessary later, and they are removed from
+ // original search condition for simplicity and efficiency.
+ remainExprs.removeAll(indexablePredicateSet);
+ if (remainExprs.isEmpty()) {
+ scanNode.setQual(null);
+ } else {
+ scanNode.setQual(
+ AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
+ }
+ }
+
+ if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
+ return findFilteredPaths(paritionValuesSchema,
+ indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
+ } else { // otherwise, we will get all partition paths.
+ return findFilteredPaths(paritionValuesSchema, null, table.getPath());
+ }
+ }
+
+ private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+ if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
+ Set<Column> variables = EvalTreeUtil.findDistinctRefColumns(evalNode);
+ // if it contains only single variable matched to a target column
+ return variables.size() == 1 && variables.contains(targetColumn);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Check if an expression consists of one variable and one constant and
+ * the expression is a comparison operator.
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an expression consists of one variable and one constant
+ * and the expression is a comparison operator. Other, false.
+ */
+ private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+ // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
+ return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
+ }
+
+ /**
+ *
+ * @param evalNode The expression to be checked
+ * @return true if an disjunctive expression, consisting of indexable expressions
+ */
+ private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
+ if (evalNode.getType() == EvalType.OR) {
+ boolean indexable =
+ checkIfIndexablePredicate(evalNode.getLeftExpr()) &&
+ checkIfIndexablePredicate(evalNode.getRightExpr());
+
+ boolean sameVariable =
+ EvalTreeUtil.findDistinctRefColumns(evalNode.getLeftExpr())
+ .equals(EvalTreeUtil.findDistinctRefColumns(evalNode.getRightExpr()));
+
+ return indexable && sameVariable;
+ } else {
+ return false;
+ }
+ }
+
+ private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
+ if (scanNode.getInputPaths().length > 0) {
+ try {
+ FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
+ long totalVolume = 0;
+
+ for (Path input : scanNode.getInputPaths()) {
+ ContentSummary summary = fs.getContentSummary(input);
+ totalVolume += summary.getLength();
+ totalVolume += summary.getFileCount();
+ }
+ scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
+ } catch (IOException e) {
+ throw new PlanningException(e);
+ }
+ }
+ }
+
+ private final class Rewriter extends BasicLogicalPlanVisitor<LogicalPlan.QueryBlock, Object> {
+ @Override
+ public Object visitScan(LogicalPlan.QueryBlock block, LogicalPlan plan, ScanNode scanNode, Stack<LogicalNode> stack)
+ throws PlanningException {
+
+ TableDesc table = scanNode.getTableDesc();
+ if (!table.hasPartitions()) {
+ return null;
+ }
+
+ try {
+ Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
+ plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
+ PartitionedTableScanNode rewrittenScanNode = new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
+ updateTableStat(rewrittenScanNode);
+ PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
+ } catch (IOException e) {
+ throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index d0e9a6f..403b403 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -304,7 +304,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
LogicalNode child = visitChild(newContext, plan, subRoot, newStack);
newStack.pop();
Schema inSchema = (Schema) child.getOutSchema().clone();
- inSchema.setQualifier(node.getCanonicalName(), true);
+ inSchema.setQualifier(node.getCanonicalName());
node.setInSchema(inSchema);
return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
}
[3/3] git commit: TAJO-338 - Add Query Optimization Part for
Column-Partitioned Tables. (hyunsik)
Posted by hy...@apache.org.
TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/f58f6ee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/f58f6ee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/f58f6ee8
Branch: refs/heads/master
Commit: f58f6ee827139c01253902f0b1076b47fde9d5e6
Parents: 4dbecf9
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 18 21:39:30 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Dec 19 11:36:10 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/tajo/algebra/CreateTable.java | 30 +-
.../tajo/catalog/AbstractCatalogClient.java | 2 +-
.../org/apache/tajo/catalog/DDLBuilder.java | 20 +-
.../java/org/apache/tajo/catalog/Schema.java | 57 ++-
.../java/org/apache/tajo/catalog/TableDesc.java | 30 +-
.../tajo/catalog/partition/PartitionDesc.java | 194 ++++++++++
.../tajo/catalog/partition/Partitions.java | 349 -----------------
.../src/main/proto/CatalogProtos.proto | 7 +-
.../org/apache/tajo/catalog/TestSchema.java | 26 ++
.../tajo/catalog/store/HCatalogStore.java | 6 +-
.../tajo/catalog/store/TestHCatalogStore.java | 2 +-
.../tajo/catalog/store/AbstractDBStore.java | 42 +--
.../apache/tajo/catalog/store/DerbyStore.java | 42 +--
.../org/apache/tajo/catalog/TestCatalog.java | 72 ++--
.../org/apache/tajo/catalog/TestDBStore.java | 62 ++--
.../org/apache/tajo/datum/BooleanDatum.java | 2 +
.../org/apache/tajo/datum/DatumFactory.java | 34 ++
.../main/java/org/apache/tajo/util/TUtil.java | 2 +-
.../org/apache/tajo/engine/parser/SQLLexer.g4 | 3 +-
.../org/apache/tajo/engine/parser/SQLParser.g4 | 2 +-
.../apache/tajo/engine/eval/AlgebraicUtil.java | 134 ++++++-
.../apache/tajo/engine/eval/EvalTreeUtil.java | 62 +---
.../org/apache/tajo/engine/eval/InEval.java | 2 +-
.../tajo/engine/eval/LikePredicateEval.java | 4 +
.../apache/tajo/engine/parser/SQLAnalyzer.java | 10 +-
.../engine/planner/BasicLogicalPlanVisitor.java | 8 +
.../tajo/engine/planner/LogicalOptimizer.java | 11 +-
.../tajo/engine/planner/LogicalPlanVisitor.java | 2 +
.../tajo/engine/planner/LogicalPlanner.java | 72 ++--
.../engine/planner/PhysicalPlannerImpl.java | 4 +-
.../apache/tajo/engine/planner/PlannerUtil.java | 2 +-
.../org/apache/tajo/engine/planner/Target.java | 4 +
.../engine/planner/global/ExecutionBlock.java | 4 +
.../engine/planner/global/GlobalPlanner.java | 2 +-
.../engine/planner/logical/CreateTableNode.java | 20 +-
.../tajo/engine/planner/logical/NodeType.java | 1 +
.../logical/PartitionedTableScanNode.java | 180 +++++++++
.../engine/planner/logical/RelationNode.java | 2 +-
.../tajo/engine/planner/logical/ScanNode.java | 19 +-
.../engine/planner/logical/StoreTableNode.java | 24 +-
.../planner/logical/TableSubQueryNode.java | 2 +-
.../join/GreedyHeuristicJoinOrderAlgorithm.java | 8 +-
.../engine/planner/logical/join/JoinGraph.java | 3 +-
.../ColumnPartitionedTableStoreExec.java | 65 +++-
.../engine/planner/physical/PhysicalExec.java | 6 +-
.../engine/planner/physical/SeqScanExec.java | 69 +++-
.../planner/rewrite/FilterPushDownRule.java | 21 +-
.../rewrite/PartitionedTableRewriter.java | 371 +++++++++++++++++++
.../planner/rewrite/ProjectionPushDownRule.java | 2 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 106 ++++++
.../tajo/master/DefaultTaskScheduler.java | 19 +-
.../org/apache/tajo/master/GlobalEngine.java | 12 +-
.../tajo/master/TajoMasterClientService.java | 10 +-
.../master/querymaster/QueryMasterTask.java | 10 +-
.../tajo/master/querymaster/SubQuery.java | 44 ++-
.../src/main/proto/ClientProtos.proto | 2 +-
.../org/apache/tajo/client/TestTajoClient.java | 4 +-
.../apache/tajo/engine/eval/ExprTestBase.java | 2 +-
.../tajo/engine/eval/TestEvalTreeUtil.java | 24 +-
.../tajo/engine/parser/TestSQLAnalyzer.java | 7 +-
.../engine/planner/TestLogicalOptimizer.java | 2 +-
.../tajo/engine/planner/TestLogicalPlan.java | 2 +-
.../planner/physical/TestBSTIndexExec.java | 2 +-
.../planner/physical/TestHashAntiJoinExec.java | 2 +-
.../planner/physical/TestHashSemiJoinExec.java | 2 +-
.../planner/physical/TestPhysicalPlanner.java | 2 +-
.../engine/planner/physical/TestSortExec.java | 2 +-
.../tajo/engine/query/TestInsertQuery.java | 201 ----------
.../tajo/engine/query/TestTablePartitions.java | 360 ++++++++++++++++++
.../apache/tajo/engine/util/TestTupleUtil.java | 52 ++-
.../tajo/master/TestExecutionBlockCursor.java | 2 +-
.../apache/tajo/master/TestGlobalPlanner.java | 2 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 2 +-
.../create_table_partition_by_column.sql | 2 +-
75 files changed, 2000 insertions(+), 976 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5d6ba59..cceb1cb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.8.0 - unreleased
NEW FEATURES
+ TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables.
+ (hyunsik)
+
TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)
TAJO-413: Implement pi function. (DaeMyung Kang via jihoon)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
index a6bc1e7..a85e366 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
@@ -31,7 +31,7 @@ public class CreateTable extends Expr {
private String location;
private Expr subquery;
private Map<String, String> params;
- private PartitionOption partition;
+ private PartitionDescExpr partition;
public CreateTable(final String tableName) {
super(OpType.CreateTable);
@@ -107,11 +107,11 @@ public class CreateTable extends Expr {
return partition != null;
}
- public void setPartition(PartitionOption partition) {
+ public void setPartition(PartitionDescExpr partition) {
this.partition = partition;
}
- public <T extends PartitionOption> T getPartition() {
+ public <T extends PartitionDescExpr> T getPartition() {
return (T) this.partition;
}
@@ -178,10 +178,10 @@ public class CreateTable extends Expr {
COLUMN
}
- public static abstract class PartitionOption {
+ public static abstract class PartitionDescExpr {
PartitionType type;
- public PartitionOption(PartitionType type) {
+ public PartitionDescExpr(PartitionType type) {
this.type = type;
}
@@ -190,7 +190,7 @@ public class CreateTable extends Expr {
}
}
- public static class RangePartition extends PartitionOption {
+ public static class RangePartition extends PartitionDescExpr {
ColumnReferenceExpr [] columns;
List<RangePartitionSpecifier> specifiers;
@@ -209,7 +209,7 @@ public class CreateTable extends Expr {
}
}
- public static class HashPartition extends PartitionOption {
+ public static class HashPartition extends PartitionDescExpr {
ColumnReferenceExpr [] columns;
Expr quantity;
List<PartitionSpecifier> specifiers;
@@ -246,7 +246,7 @@ public class CreateTable extends Expr {
}
}
- public static class ListPartition extends PartitionOption {
+ public static class ListPartition extends PartitionDescExpr {
ColumnReferenceExpr [] columns;
List<ListPartitionSpecifier> specifiers;
@@ -265,17 +265,23 @@ public class CreateTable extends Expr {
}
}
- public static class ColumnPartition extends PartitionOption {
- ColumnReferenceExpr [] columns;
+ public static class ColumnPartition extends PartitionDescExpr {
+ private ColumnDefinition [] columns;
+ private boolean isOmitValues;
- public ColumnPartition(ColumnReferenceExpr [] columns) {
+ public ColumnPartition(ColumnDefinition [] columns, boolean isOmitValues) {
super(PartitionType.COLUMN);
this.columns = columns;
+ this.isOmitValues = isOmitValues;
}
- public ColumnReferenceExpr [] getColumns() {
+ public ColumnDefinition [] getColumns() {
return columns;
}
+
+ public boolean isOmitValues() {
+ return isOmitValues;
+ }
}
public static class RangePartitionSpecifier extends PartitionSpecifier {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 5f79f10..44ac8f4 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -122,7 +122,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
public Boolean call(NettyClientBase client) throws ServiceException {
CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.addTable(null, (TableDescProto) desc.getProto()).getValue();
+ return stub.addTable(null, desc.getProto()).getValue();
}
}.withRetries();
} catch (ServiceException e) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
index a9d0f03..c818be7 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
@@ -18,7 +18,7 @@
package org.apache.tajo.catalog;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
@@ -96,15 +96,15 @@ public class DDLBuilder {
}
private static void buildPartitionClause(StringBuilder sb, TableDesc desc) {
- Partitions partitions = desc.getPartitions();
+ PartitionDesc partitionDesc = desc.getPartitions();
sb.append(" PARTITION BY ");
- sb.append(partitions.getPartitionsType().name());
+ sb.append(partitionDesc.getPartitionsType().name());
// columns
sb.append("(");
int columnCount = 0;
- for(Column column: partitions.getColumns()) {
+ for(Column column: partitionDesc.getColumns()) {
for(Column targetColumn: desc.getSchema().getColumns()) {
if (column.getColumnName().equals(targetColumn.getColumnName())) {
if (columnCount > 0)
@@ -118,12 +118,12 @@ public class DDLBuilder {
sb.append(")");
// specifier
- if (partitions.getSpecifiers() != null
- && !partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
+ if (partitionDesc.getSpecifiers() != null
+ && !partitionDesc.getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
sb.append(" (");
- for(int i = 0; i < partitions.getSpecifiers().size(); i++) {
- Specifier specifier = partitions.getSpecifiers().get(i);
+ for(int i = 0; i < partitionDesc.getSpecifiers().size(); i++) {
+ Specifier specifier = partitionDesc.getSpecifiers().get(i);
if (i > 0)
sb.append(",");
@@ -132,7 +132,7 @@ public class DDLBuilder {
if (!specifier.getName().isEmpty())
sb.append(" ").append(specifier.getName());
- if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
+ if (partitionDesc.getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
if (!specifier.getExpressions().isEmpty()) {
sb.append(" VALUES (");
String[] expressions = specifier.getExpressions().split("\\,");
@@ -144,7 +144,7 @@ public class DDLBuilder {
sb.append(")");
}
- } else if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE)) {
+ } else if (partitionDesc.getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE)) {
sb.append(" VALUES LESS THAN (");
if (!specifier.getExpressions().isEmpty()) {
sb.append(specifier.getExpressions());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 8a2d028..4ea3b81 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -39,26 +39,24 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
private SchemaProto.Builder builder = SchemaProto.newBuilder();
@Expose protected List<Column> fields = null;
- @Expose protected Map<String, Integer> fieldsByQialifiedName = null;
+ @Expose protected Map<String, Integer> fieldsByQualifiedName = null;
@Expose protected Map<String, List<Integer>> fieldsByName = null;
public Schema() {
- this.fields = new ArrayList<Column>();
- this.fieldsByQialifiedName = new TreeMap<String, Integer>();
- this.fieldsByName = new HashMap<String, List<Integer>>();
+ init();
}
public Schema(SchemaProto proto) {
this.fields = new ArrayList<Column>();
- this.fieldsByQialifiedName = new HashMap<String, Integer>();
+ this.fieldsByQualifiedName = new HashMap<String, Integer>();
this.fieldsByName = new HashMap<String, List<Integer>>();
for(ColumnProto colProto : proto.getFieldsList()) {
Column tobeAdded = new Column(colProto);
fields.add(tobeAdded);
if (tobeAdded.hasQualifier()) {
- fieldsByQialifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(), fields.size() - 1);
+ fieldsByQualifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(), fields.size() - 1);
} else {
- fieldsByQialifiedName.put(tobeAdded.getColumnName(), fields.size() - 1);
+ fieldsByQualifiedName.put(tobeAdded.getColumnName(), fields.size() - 1);
}
if (fieldsByName.containsKey(tobeAdded.getColumnName())) {
fieldsByName.get(tobeAdded.getColumnName()).add(fields.size() - 1);
@@ -71,17 +69,23 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
public Schema(Schema schema) {
this();
this.fields.addAll(schema.fields);
- this.fieldsByQialifiedName.putAll(schema.fieldsByQialifiedName);
+ this.fieldsByQualifiedName.putAll(schema.fieldsByQualifiedName);
this.fieldsByName.putAll(schema.fieldsByName);
}
-
+
public Schema(Column [] columns) {
- this();
+ init();
for(Column c : columns) {
addColumn(c);
}
}
+ private void init() {
+ this.fields = new ArrayList<Column>();
+ this.fieldsByQualifiedName = new HashMap<String, Integer>();
+ this.fieldsByName = new HashMap<String, List<Integer>>();
+ }
+
/**
* Set a qualifier to this schema.
* This changes the qualifier of all columns except for not-qualified columns.
@@ -89,26 +93,10 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
* @param qualifier The qualifier
*/
public void setQualifier(String qualifier) {
- setQualifier(qualifier, false);
- }
-
- /**
- * Set a qualifier to this schema. This changes the qualifier of all columns if force is true.
- * Otherwise, it changes the qualifier of all columns except for non-qualified columns
- *
- * @param qualifier The qualifier
- * @param force If true, all columns' qualifiers will be changed. Otherwise, only qualified columns' qualifiers will
- * be changed.
- */
- public void setQualifier(String qualifier, boolean force) {
- fieldsByQialifiedName.clear();
-
+ fieldsByQualifiedName.clear();
for (int i = 0; i < getColumnNum(); i++) {
- if (!force && fields.get(i).hasQualifier()) {
- continue;
- }
fields.get(i).setQualifier(qualifier);
- fieldsByQialifiedName.put(fields.get(i).getQualifiedName(), i);
+ fieldsByQualifiedName.put(fields.get(i).getQualifiedName(), i);
}
}
@@ -121,7 +109,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
}
public Column getColumnByFQN(String qualifiedName) {
- Integer cid = fieldsByQialifiedName.get(qualifiedName.toLowerCase());
+ Integer cid = fieldsByQualifiedName.get(qualifiedName.toLowerCase());
return cid != null ? fields.get(cid) : null;
}
@@ -151,13 +139,14 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
}
public int getColumnId(String qualifiedName) {
- return fieldsByQialifiedName.get(qualifiedName.toLowerCase());
+ return fieldsByQualifiedName.get(qualifiedName.toLowerCase());
}
public int getColumnIdByName(String colName) {
for (Column col : fields) {
if (col.getColumnName().equals(colName.toLowerCase())) {
- return fieldsByQialifiedName.get(col.getQualifiedName());
+ String qualifiedName = col.getQualifiedName();
+ return fieldsByQualifiedName.get(qualifiedName);
}
}
return -1;
@@ -168,7 +157,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
}
public boolean contains(String colName) {
- return fieldsByQialifiedName.containsKey(colName.toLowerCase());
+ return fieldsByQualifiedName.containsKey(colName.toLowerCase());
}
@@ -189,14 +178,14 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
public synchronized Schema addColumn(String name, DataType dataType) {
String normalized = name.toLowerCase();
- if(fieldsByQialifiedName.containsKey(normalized)) {
+ if(fieldsByQualifiedName.containsKey(normalized)) {
LOG.error("Already exists column " + normalized);
throw new AlreadyExistsFieldException(normalized);
}
Column newCol = new Column(normalized, dataType);
fields.add(newCol);
- fieldsByQialifiedName.put(newCol.getQualifiedName(), fields.size() - 1);
+ fieldsByQualifiedName.put(newCol.getQualifiedName(), fields.size() - 1);
fieldsByName.put(newCol.getColumnName(), TUtil.newList(fields.size() - 1));
return this;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index 458a99a..8fffc6e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -25,12 +25,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Cloneable {
private final Log LOG = LogFactory.getLog(TableDesc.class);
@@ -42,7 +43,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
@Expose protected TableMeta meta; // required
@Expose protected Path uri; // required
@Expose protected TableStats stats; // optional
- @Expose protected Partitions partitions; //optional
+ @Expose protected PartitionDesc partitionDesc; //optional
public TableDesc() {
builder = TableDescProto.newBuilder();
@@ -65,7 +66,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
this(proto.getId(), new Schema(proto.getSchema()), new TableMeta(proto.getMeta()), new Path(proto.getPath()));
this.stats = new TableStats(proto.getStats());
if (proto.getPartitions() != null && !proto.getPartitions().toString().isEmpty()) {
- this.partitions = new Partitions(proto.getPartitions());
+ this.partitionDesc = new PartitionDesc(proto.getPartitions());
}
}
@@ -115,22 +116,27 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
}
public boolean hasPartitions() {
- return this.partitions != null;
+ return this.partitionDesc != null;
}
- public Partitions getPartitions() {
- return partitions;
+ public PartitionDesc getPartitions() {
+ return partitionDesc;
}
- public void setPartitions(Partitions partitions) {
- this.partitions = partitions;
+ public void setPartitions(PartitionDesc partitionDesc) {
+ this.partitionDesc = partitionDesc;
}
public boolean equals(Object object) {
if(object instanceof TableDesc) {
TableDesc other = (TableDesc) object;
- return this.getProto().equals(other.getProto());
+ boolean eq = tableName.equals(other.tableName);
+ eq = eq && schema.equals(other.schema);
+ eq = eq && meta.equals(other.meta);
+ eq = eq && uri.equals(other.uri);
+ eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
+ return eq && TUtil.checkEquals(stats, other.stats);
}
return false;
@@ -144,7 +150,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
desc.meta = (TableMeta) meta.clone();
desc.uri = uri;
desc.stats = stats != null ? (TableStats) stats.clone() : null;
- desc.partitions = partitions != null ? (Partitions) partitions.clone() : null;
+ desc.partitionDesc = partitionDesc != null ? (PartitionDesc) partitionDesc.clone() : null;
return desc;
}
@@ -178,8 +184,8 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
if (this.stats != null) {
builder.setStats(this.stats.getProto());
}
- if (this.partitions != null) {
- builder.setPartitions(this.partitions.getProto());
+ if (this.partitionDesc != null) {
+ builder.setPartitions(this.partitionDesc.getProto());
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
new file mode 100644
index 0000000..fbec807
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescProto>, Cloneable, GsonObject {
+ @Expose protected CatalogProtos.PartitionsType partitionsType; //required
+ @Expose protected Schema schema;
+ @Expose protected int numPartitions; //optional
+ @Expose protected List<Specifier> specifiers; //optional
+ @Expose protected boolean isOmitValues = false; // optional;
+
+ private CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+
+ public PartitionDesc() {
+ }
+
+ public PartitionDesc(PartitionDesc partition) {
+ this();
+ this.partitionsType = partition.partitionsType;
+ this.schema = partition.schema;
+ this.numPartitions = partition.numPartitions;
+ this.specifiers = partition.specifiers;
+ }
+
+ public PartitionDesc(CatalogProtos.PartitionsType partitionsType, Column[] columns, int numPartitions,
+ List<Specifier> specifiers) {
+ this();
+ this.partitionsType = partitionsType;
+ for (Column c : columns) {
+ addColumn(c);
+ }
+ this.numPartitions = numPartitions;
+ this.specifiers = specifiers;
+ }
+
+ public PartitionDesc(CatalogProtos.PartitionDescProto proto) {
+ this.partitionsType = proto.getPartitionsType();
+ this.schema = new Schema(proto.getSchema());
+ this.numPartitions = proto.getNumPartitions();
+ this.isOmitValues = proto.getIsOmitValues();
+ if(proto.getSpecifiersList() != null) {
+ this.specifiers = TUtil.newList();
+ for(CatalogProtos.SpecifierProto specifier: proto.getSpecifiersList()) {
+ this.specifiers.add(new Specifier(specifier));
+ }
+ }
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public List<Column> getColumns() {
+ return ImmutableList.copyOf(schema.toArray());
+ }
+
+ public void setColumns(Collection<Column> columns) {
+ this.schema = new Schema(columns.toArray(new Column[columns.size()]));
+ }
+
+ public synchronized void addColumn(Column column) {
+ if (schema == null) {
+ schema = new Schema();
+ }
+ schema.addColumn(column);
+ }
+
+ public synchronized void addSpecifier(Specifier specifier) {
+ if(specifiers == null)
+ specifiers = TUtil.newList();
+
+ specifiers.add(specifier);
+ }
+
+ public CatalogProtos.PartitionsType getPartitionsType() {
+ return partitionsType;
+ }
+
+ public void setPartitionsType(CatalogProtos.PartitionsType partitionsType) {
+ this.partitionsType = partitionsType;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ public List<Specifier> getSpecifiers() {
+ return specifiers;
+ }
+
+ public void setSpecifiers(List<Specifier> specifiers) {
+ this.specifiers = specifiers;
+ }
+
+ public void setOmitValues(boolean flag) {
+ isOmitValues = flag;
+ }
+
+ public boolean isOmitValues() {
+ return isOmitValues;
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof PartitionDesc) {
+ PartitionDesc another = (PartitionDesc) o;
+ boolean eq = partitionsType == another.partitionsType;
+ eq = eq && schema.equals(another.schema);
+ eq = eq && numPartitions == another.numPartitions;
+ eq = eq && TUtil.checkEquals(specifiers, another.specifiers);
+ eq = eq && isOmitValues == another.isOmitValues;
+ return eq;
+ }
+ return false;
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ PartitionDesc copy = (PartitionDesc) super.clone();
+ copy.builder = CatalogProtos.PartitionDescProto.newBuilder();
+ copy.setPartitionsType(this.partitionsType);
+ copy.schema = new Schema(schema.getProto());
+ copy.setNumPartitions(this.numPartitions);
+ copy.specifiers = new ArrayList<Specifier>(this.specifiers);
+ copy.isOmitValues = isOmitValues;
+
+ return copy;
+ }
+
+ @Override
+ public CatalogProtos.PartitionDescProto getProto() {
+ if (builder == null) {
+ builder = CatalogProtos.PartitionDescProto.newBuilder();
+ }
+ if (this.partitionsType != null) {
+ builder.setPartitionsType(this.partitionsType);
+ }
+ builder.setSchema(schema.getProto());
+ builder.setNumPartitions(numPartitions);
+ builder.setIsOmitValues(isOmitValues);
+ if (this.specifiers != null) {
+ for(Specifier specifier: specifiers) {
+ builder.addSpecifiers(specifier.getProto());
+ }
+ }
+ return builder.build();
+ }
+
+ public String toString() {
+ Gson gson = new GsonBuilder().setPrettyPrinting().
+ excludeFieldsWithoutExposeAnnotation().create();
+ return gson.toJson(this);
+ }
+
+ @Override
+ public String toJson() {
+ return CatalogGsonHelper.toJson(this, PartitionDesc.class);
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
deleted file mode 100644
index c82f0cb..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.catalog.partition;
-
-import com.google.common.collect.ImmutableList;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.annotations.Expose;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.exception.AlreadyExistsFieldException;
-import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.json.GsonObject;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-public class Partitions implements ProtoObject<CatalogProtos.PartitionsProto>, Cloneable, GsonObject {
-
- private static final Log LOG = LogFactory.getLog(Partitions.class);
-
- @Expose protected CatalogProtos.PartitionsType partitionsType; //required
- @Expose protected List<Column> columns; //required
- @Expose protected int numPartitions; //optional
- @Expose protected List<Specifier> specifiers; //optional
- @Expose protected Map<String, Integer> columnsByQialifiedName = null;
- @Expose protected Map<String, List<Integer>> columnsByName = null;
-
- private CatalogProtos.PartitionsProto.Builder builder = CatalogProtos.PartitionsProto.newBuilder();
-
- public Partitions() {
- this.columns = new ArrayList<Column>();
- this.columnsByQialifiedName = new TreeMap<String, Integer>();
- this.columnsByName = new HashMap<String, List<Integer>>();
- }
-
- public Partitions(Partitions partition) {
- this();
- this.partitionsType = partition.partitionsType;
- this.columns.addAll(partition.columns);
- this.columnsByQialifiedName.putAll(partition.columnsByQialifiedName);
- this.columnsByName.putAll(partition.columnsByName);
- this.numPartitions = partition.numPartitions;
- this.specifiers = partition.specifiers;
- }
-
- public Partitions(CatalogProtos.PartitionsType partitionsType, Column[] columns, int numPartitions,
- List<Specifier> specifiers) {
- this();
- this.partitionsType = partitionsType;
- for (Column c : columns) {
- addColumn(c);
- }
- this.numPartitions = numPartitions;
- this.specifiers = specifiers;
- }
-
- public Partitions(CatalogProtos.PartitionsProto proto) {
- this.partitionsType = proto.getPartitionsType();
- this.columns = new ArrayList<Column>();
- this.columnsByQialifiedName = new HashMap<String, Integer>();
- this.columnsByName = new HashMap<String, List<Integer>>();
- for (CatalogProtos.ColumnProto colProto : proto.getColumnsList()) {
- Column tobeAdded = new Column(colProto);
- columns.add(tobeAdded);
- if (tobeAdded.hasQualifier()) {
- columnsByQialifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(),
- columns.size() - 1);
- } else {
- columnsByQialifiedName.put(tobeAdded.getColumnName(), columns.size() - 1);
- }
- if (columnsByName.containsKey(tobeAdded.getColumnName())) {
- columnsByName.get(tobeAdded.getColumnName()).add(columns.size() - 1);
- } else {
- columnsByName.put(tobeAdded.getColumnName(), TUtil.newList(columns.size() - 1));
- }
- }
- this.numPartitions = proto.getNumPartitions();
- if(proto.getSpecifiersList() != null) {
- this.specifiers = TUtil.newList();
- for(CatalogProtos.SpecifierProto specifier: proto.getSpecifiersList()) {
- this.specifiers.add(new Specifier(specifier));
- }
- }
- }
-
- /**
- * Set a qualifier to this schema.
- * This changes the qualifier of all columns except for not-qualified columns.
- *
- * @param qualifier The qualifier
- */
- public void setQualifier(String qualifier) {
- setQualifier(qualifier, false);
- }
-
- /**
- * Set a qualifier to this schema. This changes the qualifier of all columns if force is true.
- * Otherwise, it changes the qualifier of all columns except for non-qualified columns
- *
- * @param qualifier The qualifier
- * @param force If true, all columns' qualifiers will be changed. Otherwise,
- * only qualified columns' qualifiers will
- * be changed.
- */
- public void setQualifier(String qualifier, boolean force) {
- columnsByQialifiedName.clear();
-
- for (int i = 0; i < getColumnNum(); i++) {
- if (!force && columns.get(i).hasQualifier()) {
- continue;
- }
- columns.get(i).setQualifier(qualifier);
- columnsByQialifiedName.put(columns.get(i).getQualifiedName(), i);
- }
- }
-
- public int getColumnNum() {
- return this.columns.size();
- }
-
- public Column getColumn(int id) {
- return columns.get(id);
- }
-
- public Column getColumnByFQN(String qualifiedName) {
- Integer cid = columnsByQialifiedName.get(qualifiedName.toLowerCase());
- return cid != null ? columns.get(cid) : null;
- }
-
- public Column getColumnByName(String colName) {
- String normalized = colName.toLowerCase();
- List<Integer> list = columnsByName.get(normalized);
-
- if (list == null || list.size() == 0) {
- return null;
- }
-
- if (list.size() == 1) {
- return columns.get(list.get(0));
- } else {
- StringBuilder sb = new StringBuilder();
- boolean first = true;
- for (Integer id : list) {
- if (first) {
- first = false;
- } else {
- sb.append(", ");
- }
- sb.append(columns.get(id));
- }
- throw new RuntimeException("Ambiguous Column Name: " + sb.toString());
- }
- }
-
- public int getColumnId(String qualifiedName) {
- return columnsByQialifiedName.get(qualifiedName.toLowerCase());
- }
-
- public int getColumnIdByName(String colName) {
- for (Column col : columns) {
- if (col.getColumnName().equals(colName.toLowerCase())) {
- return columnsByQialifiedName.get(col.getQualifiedName());
- }
- }
- return -1;
- }
-
- public List<Column> getColumns() {
- return ImmutableList.copyOf(columns);
- }
-
- public void setColumns(List<Column> columns) {
- this.columns = columns;
- }
-
- public boolean contains(String colName) {
- return columnsByQialifiedName.containsKey(colName.toLowerCase());
-
- }
-
- public boolean containsAll(Collection<Column> columns) {
- return columns.containsAll(columns);
- }
-
- public synchronized Partitions addColumn(String name, TajoDataTypes.Type type) {
- if (type == TajoDataTypes.Type.CHAR) {
- return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));
- }
- return addColumn(name, CatalogUtil.newSimpleDataType(type));
- }
-
- public synchronized Partitions addColumn(String name, TajoDataTypes.Type type, int length) {
- return addColumn(name, CatalogUtil.newDataTypeWithLen(type, length));
- }
-
- public synchronized Partitions addColumn(String name, TajoDataTypes.DataType dataType) {
- String normalized = name.toLowerCase();
- if (columnsByQialifiedName.containsKey(normalized)) {
- LOG.error("Already exists column " + normalized);
- throw new AlreadyExistsFieldException(normalized);
- }
-
- Column newCol = new Column(normalized, dataType);
- columns.add(newCol);
- columnsByQialifiedName.put(newCol.getQualifiedName(), columns.size() - 1);
- columnsByName.put(newCol.getColumnName(), TUtil.newList(columns.size() - 1));
-
- return this;
- }
-
- public synchronized void addColumn(Column column) {
- addColumn(column.getQualifiedName(), column.getDataType());
- }
-
- public synchronized void addColumns(Partitions schema) {
- for (Column column : schema.getColumns()) {
- addColumn(column);
- }
- }
-
- public synchronized void addSpecifier(Specifier specifier) {
- if(specifiers == null)
- specifiers = TUtil.newList();
-
- specifiers.add(specifier);
- }
-
- public CatalogProtos.PartitionsType getPartitionsType() {
- return partitionsType;
- }
-
- public void setPartitionsType(CatalogProtos.PartitionsType partitionsType) {
- this.partitionsType = partitionsType;
- }
-
- public int getNumPartitions() {
- return numPartitions;
- }
-
- public void setNumPartitions(int numPartitions) {
- this.numPartitions = numPartitions;
- }
-
- public List<Specifier> getSpecifiers() {
- return specifiers;
- }
-
- public void setSpecifiers(List<Specifier> specifiers) {
- this.specifiers = specifiers;
- }
-
- public Map<String, Integer> getColumnsByQialifiedName() {
- return columnsByQialifiedName;
- }
-
- public void setColumnsByQialifiedName(Map<String, Integer> columnsByQialifiedName) {
- this.columnsByQialifiedName = columnsByQialifiedName;
- }
-
- public Map<String, List<Integer>> getColumnsByName() {
- return columnsByName;
- }
-
- public void setColumnsByName(Map<String, List<Integer>> columnsByName) {
- this.columnsByName = columnsByName;
- }
-
- public boolean equals(Object o) {
- if (o instanceof Partitions) {
- Partitions other = (Partitions) o;
- return getProto().equals(other.getProto());
- }
- return false;
- }
-
- public Object clone() throws CloneNotSupportedException {
- Partitions clone = (Partitions) super.clone();
- clone.builder = CatalogProtos.PartitionsProto.newBuilder();
- clone.setPartitionsType(this.partitionsType);
- clone.setColumns(this.columns);
- clone.setNumPartitions(this.numPartitions);
- clone.specifiers = new ArrayList<Specifier>(this.specifiers);
-
- return clone;
- }
-
- @Override
- public CatalogProtos.PartitionsProto getProto() {
- if (builder == null) {
- builder = CatalogProtos.PartitionsProto.newBuilder();
- }
- if (this.partitionsType != null) {
- builder.setPartitionsType(this.partitionsType);
- }
- builder.clearColumns();
- if (this.columns != null) {
- for (Column col : columns) {
- builder.addColumns(col.getProto());
- }
- }
- builder.setNumPartitions(numPartitions);
-
- if (this.specifiers != null) {
- for(Specifier specifier: specifiers) {
- builder.addSpecifiers(specifier.getProto());
- }
- }
- return builder.build();
- }
-
- public String toString() {
- Gson gson = new GsonBuilder().setPrettyPrinting().
- excludeFieldsWithoutExposeAnnotation().create();
- return gson.toJson(this);
- }
-
- @Override
- public String toJson() {
- return CatalogGsonHelper.toJson(this, Partitions.class);
-
- }
-
- public Column[] toArray() {
- return this.columns.toArray(new Column[this.columns.size()]);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index e5af491..ad6ef3e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -113,7 +113,7 @@ message TableDescProto {
required TableProto meta = 3;
required SchemaProto schema = 4;
optional TableStatsProto stats = 5;
- optional PartitionsProto partitions = 6;
+ optional PartitionDescProto partitions = 6;
}
enum FunctionType {
@@ -234,11 +234,12 @@ message SortSpecProto {
optional bool nullFirst = 3 [default = false];
}
-message PartitionsProto {
+message PartitionDescProto {
required PartitionsType partitionsType = 1;
- repeated ColumnProto columns = 2;
+ optional SchemaProto schema = 2;
optional int32 numPartitions = 3;
repeated SpecifierProto specifiers = 4;
+ optional bool isOmitValues = 5;
}
message SpecifierProto {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
index 1747e68..1422cf2 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
@@ -140,7 +140,33 @@ public class TestSchema {
Schema schema2 = new Schema(schema.getProto());
schema2.setQualifier("test1");
Column column = schema2.getColumn(1);
+ assertEquals(1, schema2.getColumnIdByName("age"));
assertEquals(column, schema2.getColumnByName("age"));
assertEquals(column, schema2.getColumnByFQN("test1.age"));
+
+ Schema schema3 = new Schema();
+ schema3.addColumn("tb1.col1", Type.INT4);
+ schema3.addColumn("col2", Type.INT4);
+ assertEquals("tb1", schema3.getColumn(0).getQualifier());
+ assertEquals("tb1.col1", schema3.getColumn(0).getQualifiedName());
+ assertEquals("col1", schema3.getColumn(0).getColumnName());
+ assertEquals("col2", schema3.getColumn(1).getQualifiedName());
+
+ assertEquals(schema3.getColumn(0), schema3.getColumnByName("col1"));
+ assertEquals(schema3.getColumn(0), schema3.getColumnByFQN("tb1.col1"));
+ assertEquals(schema3.getColumn(1), schema3.getColumnByName("col2"));
+ assertEquals(schema3.getColumn(1), schema3.getColumnByFQN("col2"));
+
+ schema3.setQualifier("tb2");
+ assertEquals("tb2", schema3.getColumn(0).getQualifier());
+ assertEquals("tb2.col1", schema3.getColumn(0).getQualifiedName());
+ assertEquals("col1", schema3.getColumn(0).getColumnName());
+ assertEquals("tb2.col2", schema3.getColumn(1).getQualifiedName());
+
+ assertEquals(schema3.getColumn(0), schema3.getColumnByName("col1"));
+ assertEquals(schema3.getColumn(0), schema3.getColumnByFQN("tb2.col1"));
+ assertEquals(schema3.getColumn(1), schema3.getColumnByName("col2"));
+ assertEquals(schema3.getColumn(1), schema3.getColumnByFQN("tb2.col2"));
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
index e7a862a..74c7cb5 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
@@ -30,7 +30,7 @@ import org.apache.hcatalog.data.Pair;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
@@ -106,7 +106,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
org.apache.tajo.catalog.Schema schema = null;
Options options = null;
TableStats stats = null;
- Partitions partitions = null;
+ PartitionDesc partitions = null;
// get db name and table name.
try {
@@ -178,7 +178,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
// set partition keys
if (table.getPartitionKeys() != null) {
if (table.getPartitionKeys().size() > 0) {
- partitions = new Partitions();
+ partitions = new PartitionDesc();
List<FieldSchema> partitionKeys = table.getPartitionKeys();
for(int i = 0; i < partitionKeys.size(); i++) {
FieldSchema fieldSchema = partitionKeys.get(i);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
index d8ee85e..9ab208f 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
@@ -265,7 +265,7 @@ public class TestHCatalogStore {
assertEquals("type", columns.get(4).getColumnName());
assertEquals(TajoDataTypes.Type.TEXT, columns.get(4).getDataType().getType());
assertNotNull(table.getPartitions());
- assertEquals("type", table.getPartitions().getColumn(0).getColumnName());
+ assertEquals("type", table.getPartitions().getSchema().getColumn(0).getColumnName());
assertEquals(CatalogProtos.PartitionsType.COLUMN, table.getPartitions().getPartitionsType());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 3414e83..60c88e9 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
@@ -284,8 +284,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
//Partition
if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
try {
- Partitions partitions = table.getPartitions();
- List<Column> columnList = partitions.getColumns();
+ PartitionDesc partitionDesc = table.getPartitions();
+ List<Column> columnList = partitionDesc.getColumns();
// Find columns which used for a partitioned table.
StringBuffer columns = new StringBuffer();
@@ -320,19 +320,19 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
// Find information for subpartitions
- if (partitions.getSpecifiers() != null) {
+ if (partitionDesc.getSpecifiers() != null) {
int count = 1;
- if (partitions.getSpecifiers().size() == 0) {
+ if (partitionDesc.getSpecifiers().size() == 0) {
pstmt.clearParameters();
pstmt.setString(1, null);
pstmt.setInt(2, tid);
- pstmt.setString(3, partitions.getPartitionsType().name());
- pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(3, partitionDesc.getPartitionsType().name());
+ pstmt.setInt(4, partitionDesc.getNumPartitions());
pstmt.setString(5, columns.toString());
pstmt.setString(6, null);
pstmt.addBatch();
} else {
- for(Specifier specifier: partitions.getSpecifiers()) {
+ for(Specifier specifier: partitionDesc.getSpecifiers()) {
pstmt.clearParameters();
if (specifier.getName() != null && !specifier.getName().equals("")) {
pstmt.setString(1, specifier.getName());
@@ -340,8 +340,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
pstmt.setString(1, null);
}
pstmt.setInt(2, tid);
- pstmt.setString(3, partitions.getPartitionsType().name());
- pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(3, partitionDesc.getPartitionsType().name());
+ pstmt.setInt(4, partitionDesc.getNumPartitions());
pstmt.setString(5, columns.toString());
pstmt.setString(6, specifier.getExpressions());
pstmt.addBatch();
@@ -352,8 +352,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
pstmt.clearParameters();
pstmt.setString(1, null);
pstmt.setInt(2, tid);
- pstmt.setString(3, partitions.getPartitionsType().name());
- pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(3, partitionDesc.getPartitionsType().name());
+ pstmt.setInt(4, partitionDesc.getNumPartitions());
pstmt.setString(5, columns.toString());
pstmt.setString(6, null);
pstmt.addBatch();
@@ -496,7 +496,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
StoreType storeType = null;
Options options;
TableStats stat = null;
- Partitions partitions = null;
+ PartitionDesc partitionDesc = null;
int tid = 0;
try {
@@ -602,18 +602,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
res = stmt.executeQuery(sql);
while (res.next()) {
- if (partitions == null) {
- partitions = new Partitions();
+ if (partitionDesc == null) {
+ partitionDesc = new PartitionDesc();
String[] columns = res.getString("columns").split(",");
for(String eachColumn: columns) {
- partitions.addColumn(getColumn(tableName, tid, eachColumn));
+ partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
}
- partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
- partitions.setNumPartitions(res.getInt("quantity"));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
+ partitionDesc.setNumPartitions(res.getInt("quantity"));
}
Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
- partitions.addSpecifier(specifier);
+ partitionDesc.addSpecifier(specifier);
}
} catch (SQLException se) {
@@ -628,8 +628,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
table.setStats(stat);
}
- if (partitions != null) {
- table.setPartitions(partitions);
+ if (partitionDesc != null) {
+ table.setPartitions(partitionDesc);
}
return table;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index 06a701b..f21ab0e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -24,7 +24,7 @@ package org.apache.tajo.catalog.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
@@ -375,8 +375,8 @@ public class DerbyStore extends AbstractDBStore {
//Partition
if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
try {
- Partitions partitions = table.getPartitions();
- List<Column> columnList = partitions.getColumns();
+ PartitionDesc partitionDesc = table.getPartitions();
+ List<Column> columnList = partitionDesc.getColumns();
// Find columns which used for a partitioned table.
StringBuffer columns = new StringBuffer();
@@ -411,19 +411,19 @@ public class DerbyStore extends AbstractDBStore {
}
// Find information for subpartitions
- if (partitions.getSpecifiers() != null) {
+ if (partitionDesc.getSpecifiers() != null) {
int count = 1;
- if (partitions.getSpecifiers().size() == 0) {
+ if (partitionDesc.getSpecifiers().size() == 0) {
pstmt.clearParameters();
pstmt.setString(1, null);
pstmt.setInt(2, tid);
- pstmt.setString(3, partitions.getPartitionsType().name());
- pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(3, partitionDesc.getPartitionsType().name());
+ pstmt.setInt(4, partitionDesc.getNumPartitions());
pstmt.setString(5, columns.toString());
pstmt.setString(6, null);
pstmt.addBatch();
} else {
- for(Specifier eachValue: partitions.getSpecifiers()) {
+ for(Specifier eachValue: partitionDesc.getSpecifiers()) {
pstmt.clearParameters();
if (eachValue.getName() != null && !eachValue.getName().equals("")) {
pstmt.setString(1, eachValue.getName());
@@ -431,8 +431,8 @@ public class DerbyStore extends AbstractDBStore {
pstmt.setString(1, null);
}
pstmt.setInt(2, tid);
- pstmt.setString(3, partitions.getPartitionsType().name());
- pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(3, partitionDesc.getPartitionsType().name());
+ pstmt.setInt(4, partitionDesc.getNumPartitions());
pstmt.setString(5, columns.toString());
pstmt.setString(6, eachValue.getExpressions());
pstmt.addBatch();
@@ -443,8 +443,8 @@ public class DerbyStore extends AbstractDBStore {
pstmt.clearParameters();
pstmt.setString(1, null);
pstmt.setInt(2, tid);
- pstmt.setString(3, partitions.getPartitionsType().name());
- pstmt.setInt(4, partitions.getNumPartitions());
+ pstmt.setString(3, partitionDesc.getPartitionsType().name());
+ pstmt.setInt(4, partitionDesc.getNumPartitions());
pstmt.setString(5, columns.toString());
pstmt.setString(6, null);
pstmt.addBatch();
@@ -596,7 +596,7 @@ public class DerbyStore extends AbstractDBStore {
StoreType storeType = null;
Options options;
TableStats stat = null;
- Partitions partitions = null;
+ PartitionDesc partitionDesc = null;
int tid = 0;
try {
@@ -706,19 +706,19 @@ public class DerbyStore extends AbstractDBStore {
res = stmt.executeQuery(sql);
while (res.next()) {
- if (partitions == null) {
- partitions = new Partitions();
+ if (partitionDesc == null) {
+ partitionDesc = new PartitionDesc();
String[] columns = res.getString("columns").split(",");
for(String eachColumn: columns) {
- partitions.addColumn(getColumn(tableName, tid, eachColumn));
+ partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
}
- partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
("type")));
- partitions.setNumPartitions(res.getInt("quantity"));
+ partitionDesc.setNumPartitions(res.getInt("quantity"));
}
Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
- partitions.addSpecifier(specifier);
+ partitionDesc.addSpecifier(specifier);
}
} catch (SQLException se) {
@@ -733,8 +733,8 @@ public class DerbyStore extends AbstractDBStore {
table.setStats(stat);
}
- if (partitions != null) {
- table.setPartitions(partitions);
+ if (partitionDesc != null) {
+ table.setPartitions(partitionDesc);
}
return table;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index d174e72..3abbb81 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -20,7 +20,7 @@ package org.apache.tajo.catalog;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.function.Function;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
@@ -214,13 +214,13 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitions.setNumPartitions(2);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitionDesc.setNumPartitions(2);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
@@ -229,7 +229,7 @@ public class TestCatalog {
assertEquals(retrieved.getName(), tableName);
assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
- assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
catalog.deleteTable(tableName);
@@ -250,17 +250,17 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitions.setNumPartitions(2);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitionDesc.setNumPartitions(2);
- partitions.addSpecifier(new Specifier("sub_part1"));
- partitions.addSpecifier(new Specifier("sub_part2"));
- partitions.addSpecifier(new Specifier("sub_part3"));
+ partitionDesc.addSpecifier(new Specifier("sub_part1"));
+ partitionDesc.addSpecifier(new Specifier("sub_part2"));
+ partitionDesc.addSpecifier(new Specifier("sub_part3"));
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -269,7 +269,7 @@ public class TestCatalog {
assertEquals(retrieved.getName(), tableName);
assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
- assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
"sub_part1");
@@ -295,15 +295,15 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
- partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
- partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+ partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+ partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -312,7 +312,7 @@ public class TestCatalog {
assertEquals(retrieved.getName(), tableName);
assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.LIST);
- assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
"sub_part1");
assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
@@ -339,16 +339,16 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
- partitions.addSpecifier(new Specifier("sub_part1", "2"));
- partitions.addSpecifier(new Specifier("sub_part2", "5"));
- partitions.addSpecifier(new Specifier("sub_part3"));
+ partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
+ partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
+ partitionDesc.addSpecifier(new Specifier("sub_part3"));
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -357,7 +357,7 @@ public class TestCatalog {
assertEquals(retrieved.getName(), tableName);
assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.RANGE);
- assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
"sub_part1");
assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
@@ -388,12 +388,12 @@ public class TestCatalog {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(catalog.existsTable(tableName));
catalog.addTable(desc);
assertTrue(catalog.existsTable(tableName));
@@ -402,7 +402,7 @@ public class TestCatalog {
assertEquals(retrieved.getName(), tableName);
assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
- assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+ assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
catalog.deleteTable(tableName);
assertFalse(catalog.existsTable(tableName));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index d3671b3..1949afc 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.Specifier;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -236,13 +236,13 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitions.setNumPartitions(2);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitionDesc.setNumPartitions(2);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(store.existTable(tableName));
store.addTable(desc);
assertTrue(store.existTable(tableName));
@@ -268,17 +268,17 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
- partitions.setNumPartitions(2);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+ partitionDesc.setNumPartitions(2);
- partitions.addSpecifier(new Specifier("sub_part1"));
- partitions.addSpecifier(new Specifier("sub_part2"));
- partitions.addSpecifier(new Specifier("sub_part3"));
+ partitionDesc.addSpecifier(new Specifier("sub_part1"));
+ partitionDesc.addSpecifier(new Specifier("sub_part2"));
+ partitionDesc.addSpecifier(new Specifier("sub_part3"));
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(store.existTable(tableName));
store.addTable(desc);
assertTrue(store.existTable(tableName));
@@ -304,15 +304,15 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
- partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
- partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+ partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+ partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(store.existTable(tableName));
store.addTable(desc);
assertTrue(store.existTable(tableName));
@@ -338,16 +338,16 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
- partitions.addSpecifier(new Specifier("sub_part1", "2"));
- partitions.addSpecifier(new Specifier("sub_part2", "5"));
- partitions.addSpecifier(new Specifier("sub_part3"));
+ partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
+ partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
+ partitionDesc.addSpecifier(new Specifier("sub_part3"));
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(store.existTable(tableName));
store.addTable(desc);
assertTrue(store.existTable(tableName));
@@ -373,12 +373,12 @@ public class TestDBStore {
opts.put("file.delimiter", ",");
TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
- Partitions partitions = new Partitions();
- partitions.addColumn(new Column("id", Type.INT4));
- partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.addColumn(new Column("id", Type.INT4));
+ partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
- desc.setPartitions(partitions);
+ desc.setPartitions(partitionDesc);
assertFalse(store.existTable(tableName));
store.addTable(desc);
assertTrue(store.existTable(tableName));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
index bf4d83d..56fd3b5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
@@ -24,6 +24,8 @@ import org.apache.tajo.datum.exception.InvalidOperationException;
public class BooleanDatum extends Datum {
@Expose private boolean val;
+ public static final String TRUE="t";
+ public static final String FALSE="f";
public BooleanDatum() {
super(TajoDataTypes.Type.BOOLEAN);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
index 343bbfe..2082225 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
@@ -69,6 +69,40 @@ public class DatumFactory {
}
}
+ public static Datum createFromString(DataType dataType, String value) {
+ switch (dataType.getType()) {
+
+ case BOOLEAN:
+ return createBool(value.equals(BooleanDatum.TRUE));
+ case INT2:
+ return createInt2(value);
+ case INT4:
+ return createInt4(value);
+ case INT8:
+ return createInt8(value);
+ case FLOAT4:
+ return createFloat4(value);
+ case FLOAT8:
+ return createFloat8(value);
+ case CHAR:
+ return createChar(value);
+ case TEXT:
+ return createText(value);
+ case DATE:
+ return createDate(value);
+ case TIME:
+ return createTime(value);
+ case TIMESTAMP:
+ return createTimeStamp(value);
+ case BLOB:
+ return createBlob(value);
+ case INET4:
+ return createInet4(value);
+ default:
+ throw new UnsupportedOperationException(dataType.toString());
+ }
+ }
+
public static Datum createFromBytes(DataType dataType, byte[] bytes) {
switch (dataType.getType()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 5b5a406..2f81ef4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -183,7 +183,7 @@ public class TUtil {
}
public static String arrayToString(Object [] objects) {
- boolean first = false;
+ boolean first = true;
StringBuilder sb = new StringBuilder();
for(Object object : objects) {
if (first) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4 b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
index a19f3e9..3bc9ff4 100644
--- a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
+++ b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
@@ -290,7 +290,6 @@ Nonreserved_keywords
| MAXVALUE
- | VALUES
| PARTITION
| PARTITIONS
| ROLLUP
@@ -305,6 +304,8 @@ Nonreserved_keywords
| THAN
| TRIM
| TO
+
+ | VALUES
;
/*
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 0728008..54e43c7 100644
--- a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -152,7 +152,7 @@ list_value_partition
;
column_partitions
- : PARTITION BY COLUMN LEFT_PAREN column_reference_list RIGHT_PAREN
+ : PARTITION BY COLUMN table_elements
;
partition_name
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
index a5ee425..e933ddb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
@@ -20,6 +20,8 @@ package org.apache.tajo.engine.eval;
import org.apache.tajo.catalog.Column;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
public class AlgebraicUtil {
@@ -51,7 +53,7 @@ public class AlgebraicUtil {
}
private static EvalNode _transpose(EvalNode _expr, Column target) {
- EvalNode expr = simplify(_expr);
+ EvalNode expr = eliminateConstantExprs(_expr);
if (isSingleVar(expr.getLeftExpr())) {
return expr;
@@ -128,7 +130,7 @@ public class AlgebraicUtil {
* @param expr to be simplified
* @return the simplified expr
*/
- public static EvalNode simplify(EvalNode expr) {
+ public static EvalNode eliminateConstantExprs(EvalNode expr) {
EvalNode left = expr.getLeftExpr();
EvalNode right = expr.getRightExpr();
@@ -136,31 +138,27 @@ public class AlgebraicUtil {
case AND:
case OR:
case EQUAL:
+ case NOT_EQUAL:
case LTH:
case LEQ:
case GTH:
case GEQ:
- left = simplify(left);
- right = simplify(right);
- return new BinaryEval(expr.getType(), left, right);
-
case PLUS:
case MINUS:
case MULTIPLY:
case DIVIDE:
- left = simplify(left);
- right = simplify(right);
-
- // If both are constants, they can be evaluated immediately.
- if (left.getType() == EvalType.CONST
- && right.getType() == EvalType.CONST) {
+ case MODULAR:
+ left = eliminateConstantExprs(left);
+ right = eliminateConstantExprs(right);
+
+ if (left.getType() == EvalType.CONST && right.getType() == EvalType.CONST) {
EvalContext exprCtx = expr.newContext();
expr.eval(exprCtx, null, null);
return new ConstEval(expr.terminate(exprCtx));
} else {
- return new BinaryEval(expr.getType(), left, right);
+ return new BinaryEval(expr.getType(), left, right);
}
-
+
case CONST:
return expr;
@@ -286,4 +284,112 @@ public class AlgebraicUtil {
return expr;
}
+
+ public static boolean isComparisonOperator(EvalNode expr) {
+ return expr.getType() == EvalType.EQUAL ||
+ expr.getType() == EvalType.LEQ ||
+ expr.getType() == EvalType.LTH ||
+ expr.getType() == EvalType.GEQ ||
+ expr.getType() == EvalType.GTH ||
+ expr.getType() == EvalType.BETWEEN;
+ }
+
+ public static boolean isIndexableOperator(EvalNode expr) {
+ return expr.getType() == EvalType.EQUAL ||
+ expr.getType() == EvalType.LEQ ||
+ expr.getType() == EvalType.LTH ||
+ expr.getType() == EvalType.GEQ ||
+ expr.getType() == EvalType.GTH ||
+ expr.getType() == EvalType.BETWEEN ||
+ expr.getType() == EvalType.IN ||
+ (expr.getType() == EvalType.LIKE && !((LikePredicateEval)expr).isLeadingWildCard());
+ }
+
+ /**
+ * Convert a list of conjunctive normal forms into a singleton expression.
+ *
+ * @param cnfExprs
+ * @return The EvalNode object that merges all CNF-formed expressions.
+ */
+ public static EvalNode createSingletonExprFromCNF(EvalNode... cnfExprs) {
+ if (cnfExprs.length == 1) {
+ return cnfExprs[0];
+ }
+
+ return createSingletonExprFromCNFRecursive(cnfExprs, 0);
+ }
+
+ private static EvalNode createSingletonExprFromCNFRecursive(EvalNode[] evalNode, int idx) {
+ if (idx == evalNode.length - 2) {
+ return new BinaryEval(EvalType.AND, evalNode[idx], evalNode[idx + 1]);
+ } else {
+ return new BinaryEval(EvalType.AND, evalNode[idx], createSingletonExprFromCNFRecursive(evalNode, idx + 1));
+ }
+ }
+
+ /**
+ * Transforms a expression to an array of conjunctive normal formed expressions.
+ *
+ * @param expr The expression to be transformed to an array of CNF-formed expressions.
+ * @return An array of CNF-formed expressions
+ */
+ public static EvalNode [] toConjunctiveNormalFormArray(EvalNode expr) {
+ List<EvalNode> list = new ArrayList<EvalNode>();
+ toConjunctiveNormalFormArrayRecursive(expr, list);
+ return list.toArray(new EvalNode[list.size()]);
+ }
+
+ private static void toConjunctiveNormalFormArrayRecursive(EvalNode node, List<EvalNode> found) {
+ if (node.getType() == EvalType.AND) {
+ toConjunctiveNormalFormArrayRecursive(node.getLeftExpr(), found);
+ toConjunctiveNormalFormArrayRecursive(node.getRightExpr(), found);
+ } else {
+ found.add(node);
+ }
+ }
+
+ /**
+ * Convert a list of conjunctive normal forms into a singleton expression.
+ *
+ * @param cnfExprs
+ * @return The EvalNode object that merges all CNF-formed expressions.
+ */
+ public static EvalNode createSingletonExprFromDNF(EvalNode... cnfExprs) {
+ if (cnfExprs.length == 1) {
+ return cnfExprs[0];
+ }
+
+ return createSingletonExprFromDNFRecursive(cnfExprs, 0);
+ }
+
+ private static EvalNode createSingletonExprFromDNFRecursive(EvalNode[] evalNode, int idx) {
+ if (idx == evalNode.length - 2) {
+ return new BinaryEval(EvalType.OR, evalNode[idx], evalNode[idx + 1]);
+ } else {
+ return new BinaryEval(EvalType.OR, evalNode[idx], createSingletonExprFromDNFRecursive(evalNode, idx + 1));
+ }
+ }
+
+ /**
+ * Transforms a expression to an array of disjunctive normal formed expressions.
+ *
+ * @param exprs The expressions to be transformed to an array of CNF-formed expressions.
+ * @return An array of CNF-formed expressions
+ */
+ public static EvalNode [] toDisjunctiveNormalFormArray(EvalNode...exprs) {
+ List<EvalNode> list = new ArrayList<EvalNode>();
+ for (EvalNode expr : exprs) {
+ toDisjunctiveNormalFormArrayRecursive(expr, list);
+ }
+ return list.toArray(new EvalNode[list.size()]);
+ }
+
+ private static void toDisjunctiveNormalFormArrayRecursive(EvalNode node, List<EvalNode> found) {
+ if (node.getType() == EvalType.OR) {
+ toDisjunctiveNormalFormArrayRecursive(node.getLeftExpr(), found);
+ toDisjunctiveNormalFormArrayRecursive(node.getRightExpr(), found);
+ } else {
+ found.add(node);
+ }
+ }
}