You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/27 18:11:38 UTC
[1/2] TAJO-145: count(distinct column) should be supported. (hyunsik)
Updated Branches:
refs/heads/master f4600ddfa -> 733192f3b
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 3466d17..1dec4ee 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -35,7 +35,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -139,7 +138,7 @@ public class TestHashJoinExec {
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
- enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index cae4853..ee5ab83 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -36,8 +36,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.SortNode;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -156,7 +154,7 @@ public class TestMergeJoinExec {
JoinNode joinNode = PlannerUtil.findTopNode(root, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
- enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);;
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index c5366a1..5dac14a 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -42,7 +42,6 @@ import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -60,7 +59,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce;
import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
import static org.junit.Assert.*;
@@ -90,7 +88,7 @@ public class TestPhysicalPlanner {
sm = StorageManagerFactory.getStorageManager(conf, testDir);
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
}
Schema schema = new Schema();
@@ -864,7 +862,7 @@ public class TestPhysicalPlanner {
SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
Enforcer enforcer = new Enforcer();
- enforcer.addSort(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
+ enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT);
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
@@ -885,7 +883,7 @@ public class TestPhysicalPlanner {
sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
enforcer = new Enforcer();
- enforcer.addSort(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
+ enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.MERGE_SORT);
ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
@@ -912,7 +910,7 @@ public class TestPhysicalPlanner {
GroupbyNode groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
Enforcer enforcer = new Enforcer();
- enforcer.addGroupby(groupByNode.getPID(), GroupbyEnforce.GroupbyAlgorithm.HASH_AGGREGATION);
+ enforcer.enforceHashAggregation(groupByNode.getPID());
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
@@ -933,7 +931,7 @@ public class TestPhysicalPlanner {
groupByNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
enforcer = new Enforcer();
- enforcer.addGroupby(groupByNode.getPID(), GroupbyEnforce.GroupbyAlgorithm.SORT_AGGREGATION);
+ enforcer.enforceSortAggregation(groupByNode.getPID(), null);
ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
new Fragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index e4d9108..aacdb70 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -18,8 +18,11 @@
package org.apache.tajo.engine.query;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.client.ResultSetUtil;
import org.apache.tajo.util.TUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -29,6 +32,7 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.sql.ResultSet;
import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.*;
@@ -49,6 +53,54 @@ public class TestGroupByQuery {
}
@Test
+ public final void testGroupBy() throws Exception {
+ ResultSet res = tpch.execute(
+ "select count(1) as unique_key from lineitem;");
+ assertTrue(res.next());
+ assertEquals(5, res.getLong(1));
+ assertFalse(res.next());
+ res.close();
+ }
+
+ @Test
+ public final void testGroupBy2() throws Exception {
+ ResultSet res = tpch.execute(
+ "select count(1) as unique_key from lineitem group by l_linenumber");
+ Set<Long> expected = Sets.newHashSet(2l,3l);
+ for (int i = 0; i < 2; i++) {
+ assertTrue(res.next());
+ assertTrue(expected.contains(res.getLong(1)));
+ }
+ assertFalse(res.next());
+ res.close();
+ }
+
+ @Test
+ public final void testCountDistinct() throws Exception {
+ ResultSet res = tpch.execute(
+ "select l_orderkey, max(l_orderkey) as maximum, count(distinct l_linenumber) as unique_key from lineitem " +
+ "group by l_orderkey");
+
+ long [][] expectedRows = new long[3][];
+ expectedRows[0] = new long [] {1,1,2};
+ expectedRows[1] = new long [] {2,2,1};
+ expectedRows[2] = new long [] {3,3,2};
+
+ Map<Long, long []> expected = Maps.newHashMap();
+ for (long [] expectedRow : expectedRows) {
+ expected.put(expectedRow[0], expectedRow);
+ }
+ for (int i = 0; i < expectedRows.length; i++) {
+ assertTrue(res.next());
+ long [] expectedRow = expected.get(res.getLong(1));
+ assertEquals(expectedRow[1], res.getLong(2));
+ assertEquals(expectedRow[2], res.getLong(3));
+ }
+ assertFalse(res.next());
+ res.close();
+ }
+
+ @Test
public final void testComplexParameter() throws Exception {
ResultSet res = tpch.execute(
"select sum(l_extendedprice*l_discount) as revenue from lineitem");
@@ -94,7 +146,9 @@ public class TestGroupByQuery {
@Test
public final void testHavingWithNamedTarget() throws Exception {
- ResultSet res = tpch.execute("select l_orderkey, avg(l_partkey) total, sum(l_linenumber) as num from lineitem group by l_orderkey having total >= 2 or num = 3");
+ ResultSet res = tpch.execute(
+ "select l_orderkey, avg(l_partkey) total, sum(l_linenumber) as num from lineitem " +
+ "group by l_orderkey having total >= 2 or num = 3");
Map<Integer, Double> result = TUtil.newHashMap();
result.put(3, 2.5d);
result.put(2, 2.0d);
@@ -112,7 +166,9 @@ public class TestGroupByQuery {
@Test
public final void testHavingWithAggFunction() throws Exception {
- ResultSet res = tpch.execute("select l_orderkey, avg(l_partkey) total, sum(l_linenumber) as num from lineitem group by l_orderkey having avg(l_partkey) = 2.5 or num = 1");
+ ResultSet res = tpch.execute(
+ "select l_orderkey, avg(l_partkey) total, sum(l_linenumber) as num from lineitem " +
+ "group by l_orderkey having avg(l_partkey) = 2.5 or num = 1");
Map<Integer, Double> result = TUtil.newHashMap();
result.put(3, 2.5d);
result.put(2, 2.0d);
@@ -131,7 +187,8 @@ public class TestGroupByQuery {
//@Test
public final void testCube() throws Exception {
ResultSet res = tpch.execute(
- "cube_test := select l_orderkey, l_partkey, sum(l_quantity) from lineitem group by cube(l_orderkey, l_partkey)");
+ "cube_test := select l_orderkey, l_partkey, sum(l_quantity) from lineitem " +
+ "group by cube(l_orderkey, l_partkey)");
try {
int count = 0;
for (;res.next();) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index 3ab644a..23f82c0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -55,7 +55,7 @@ public class TestGlobalPlanner {
util.startCatalogCluster();
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
}
// TPC-H Schema for Complex Queries
[2/2] git commit: TAJO-145: count(distinct column) should be
supported. (hyunsik)
Posted by hy...@apache.org.
TAJO-145: count(distinct column) should be supported. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/733192f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/733192f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/733192f3
Branch: refs/heads/master
Commit: 733192f3bfac8686055f219d4a5bad71d7948d30
Parents: f4600dd
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Sep 28 01:11:09 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Sep 28 01:11:09 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tajo/catalog/AbstractCatalogClient.java | 43 ++--
.../org/apache/tajo/catalog/CatalogService.java | 20 +-
.../src/main/proto/CatalogProtocol.proto | 4 +-
.../org/apache/tajo/catalog/CatalogUtil.java | 7 +-
.../java/org/apache/tajo/catalog/Schema.java | 5 +
.../src/main/proto/CatalogProtos.proto | 15 +-
.../org/apache/tajo/catalog/CatalogServer.java | 223 ++++++++++++++-----
.../org/apache/tajo/catalog/TestCatalog.java | 30 ++-
.../org/apache/tajo/annotation/Nullable.java | 25 +++
.../main/java/org/apache/tajo/util/TUtil.java | 16 ++
.../org/apache/tajo/engine/eval/FuncEval.java | 7 +
.../engine/function/builtin/CountValue.java | 1 +
.../function/builtin/CountValueDistinct.java | 74 ++++++
.../apache/tajo/engine/planner/LogicalPlan.java | 14 +-
.../tajo/engine/planner/LogicalPlanner.java | 40 ++--
.../engine/planner/PhysicalPlannerImpl.java | 30 ++-
.../apache/tajo/engine/planner/PlannerUtil.java | 109 ++++++---
.../tajo/engine/planner/enforce/Enforcer.java | 26 ++-
.../engine/planner/logical/GroupbyNode.java | 11 +-
.../planner/rewrite/ProjectionPushDownRule.java | 4 +
.../org/apache/tajo/master/GlobalPlanner.java | 158 ++++++++-----
.../java/org/apache/tajo/master/TajoMaster.java | 4 +-
.../tajo/master/querymaster/Repartitioner.java | 2 +-
.../src/main/proto/TajoWorkerProtocol.proto | 1 +
.../apache/tajo/engine/eval/TestEvalTree.java | 4 +-
.../tajo/engine/eval/TestEvalTreeUtil.java | 4 +-
.../tajo/engine/function/ExprTestBase.java | 2 +-
.../engine/planner/TestLogicalOptimizer.java | 4 +-
.../tajo/engine/planner/TestLogicalPlan.java | 2 +-
.../tajo/engine/planner/TestLogicalPlanner.java | 4 +-
.../tajo/engine/planner/TestPlannerUtil.java | 2 +-
.../planner/physical/TestBNLJoinExec.java | 5 +-
.../planner/physical/TestHashJoinExec.java | 3 +-
.../planner/physical/TestMergeJoinExec.java | 4 +-
.../planner/physical/TestPhysicalPlanner.java | 12 +-
.../tajo/engine/query/TestGroupByQuery.java | 63 +++++-
.../apache/tajo/master/TestGlobalPlanner.java | 2 +-
38 files changed, 716 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b768f2..219ceb5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-145: count(distinct column) should be supported. (hyunsik)
+
TAJO-197: Implement Enforcer that forces physical planner to choose
specified algorithms. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index ac4c796..ad085f8 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -49,8 +49,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final TableDesc getTableDesc(final String name) {
try {
- return CatalogUtil.newTableDesc(stub.getTableDesc(null, StringProto.newBuilder()
- .setValue(name).build()));
+ return CatalogUtil.newTableDesc(stub.getTableDesc(null, StringProto.newBuilder().setValue(name).build()));
} catch (ServiceException e) {
LOG.error(e);
return null;
@@ -201,9 +200,9 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
@Override
- public final boolean registerFunction(final FunctionDesc funcDesc) {
+ public final boolean createFunction(final FunctionDesc funcDesc) {
try {
- return stub.registerFunction(null, funcDesc.getProto()).getValue();
+ return stub.createFunction(null, funcDesc.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e);
return false;
@@ -211,16 +210,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
@Override
- public final boolean unregisterFunction(final String signature,
- DataType... paramTypes) {
- UnregisterFunctionRequest.Builder builder =
- UnregisterFunctionRequest.newBuilder();
+ public final boolean dropFunction(final String signature) {
+ UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
builder.setSignature(signature);
- for (DataType type : paramTypes) {
- builder.addParameterTypes(type);
- }
try {
- return stub.unregisterFunction(null, builder.build()).getValue();
+ return stub.dropFunction(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e);
return false;
@@ -228,11 +222,17 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
@Override
- public final FunctionDesc getFunction(final String signature,
- DataType... paramTypes) {
- GetFunctionMetaRequest.Builder builder =
- GetFunctionMetaRequest.newBuilder();
+ public final FunctionDesc getFunction(final String signature, DataType... paramTypes) {
+ return getFunction(signature, null, paramTypes);
+ }
+
+ @Override
+ public final FunctionDesc getFunction(final String signature, FunctionType funcType, DataType... paramTypes) {
+ GetFunctionMetaRequest.Builder builder = GetFunctionMetaRequest.newBuilder();
builder.setSignature(signature);
+ if (funcType != null) {
+ builder.setFunctionType(funcType);
+ }
for (DataType type : paramTypes) {
builder.addParameterTypes(type);
}
@@ -253,10 +253,17 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
@Override
- public final boolean containFunction(final String signature,
- DataType... paramTypes) {
+ public final boolean containFunction(final String signature, DataType... paramTypes) {
+ return containFunction(signature, null, paramTypes);
+ }
+
+ @Override
+ public final boolean containFunction(final String signature, FunctionType funcType, DataType... paramTypes) {
ContainFunctionRequest.Builder builder =
ContainFunctionRequest.newBuilder();
+ if (funcType != null) {
+ builder.setFunctionType(funcType);
+ }
builder.setSignature(signature);
for (DataType type : paramTypes) {
builder.addParameterTypes(type);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
index d0b5f50..02650bc 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -22,6 +22,8 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import java.util.Collection;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
+
public interface CatalogService {
/**
@@ -76,21 +78,15 @@ public interface CatalogService {
boolean deleteIndex(String indexName);
- boolean registerFunction(FunctionDesc funcDesc);
+ boolean createFunction(FunctionDesc funcDesc);
- boolean unregisterFunction(String signature, DataType... paramTypes);
+ boolean dropFunction(String signature);
- /**
- *
- * @param signature
- * @return
- */
FunctionDesc getFunction(String signature, DataType... paramTypes);
- /**
- *
- * @param signature
- * @return
- */
+ FunctionDesc getFunction(String signature, FunctionType funcType, DataType... paramTypes);
+
boolean containFunction(String signature, DataType... paramTypes);
+
+ boolean containFunction(String signature, FunctionType funcType, DataType... paramTypes);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
index 7ea2782..6374278 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
+++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
@@ -37,8 +37,8 @@ service CatalogProtocolService {
rpc getIndexByName(StringProto) returns (IndexDescProto);
rpc getIndex(GetIndexRequest) returns (IndexDescProto);
rpc delIndex(StringProto) returns (BoolProto);
- rpc registerFunction(FunctionDescProto) returns (BoolProto);
- rpc unregisterFunction(UnregisterFunctionRequest) returns (BoolProto);
+ rpc createFunction(FunctionDescProto) returns (BoolProto);
+ rpc dropFunction(UnregisterFunctionRequest) returns (BoolProto);
rpc getFunctionMeta(GetFunctionMetaRequest) returns (FunctionDescProto);
rpc containFunction(ContainFunctionRequest) returns (BoolProto);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index fb55bf7..166a015 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -35,13 +35,11 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import static org.apache.tajo.common.TajoDataTypes.Type;
public class CatalogUtil {
- public static String getCanonicalName(String signature,
- Collection<DataType> paramTypes) {
+ public static String getCanonicalName(String signature, Collection<DataType> paramTypes) {
DataType [] types = paramTypes.toArray(new DataType[paramTypes.size()]);
return getCanonicalName(signature, types);
}
- public static String getCanonicalName(String signature,
- DataType...paramTypes) {
+ public static String getCanonicalName(String signature, DataType...paramTypes) {
StringBuilder sb = new StringBuilder(signature);
sb.append("(");
int i = 0;
@@ -50,7 +48,6 @@ public class CatalogUtil {
if(i < paramTypes.length - 1) {
sb.append(",");
}
-
i++;
}
sb.append(")");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index e36fbf3..05ef6df 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -169,8 +169,13 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
public boolean contains(String colName) {
return fieldsByQialifiedName.containsKey(colName.toLowerCase());
+
}
+ public boolean containsAll(Collection<Column> columns) {
+ return fields.containsAll(columns);
+ }
+
public synchronized Schema addColumn(String name, Type type) {
if (type == Type.CHAR) {
return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 6ef7613..470c12c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -107,8 +107,12 @@ message TableDescProto {
}
enum FunctionType {
- GENERAL = 0;
- AGGREGATION = 1;
+ GENERAL = 0;
+ AGGREGATION = 1;
+ DISTINCT_AGGREGATION = 2;
+ UDF = 3;
+ UDA = 4;
+ DISTINCT_UDA = 5;
}
message FunctionDescProto {
@@ -151,17 +155,18 @@ message GetFunctionsResponse {
message UnregisterFunctionRequest {
required string signature = 1;
- repeated DataType parameterTypes = 2;
}
message GetFunctionMetaRequest {
required string signature = 1;
- repeated DataType parameterTypes = 2;
+ optional FunctionType functionType = 2;
+ repeated DataType parameterTypes = 3;
}
message ContainFunctionRequest {
required string signature = 1;
- repeated DataType parameterTypes = 2;
+ optional FunctionType functionType = 2;
+ repeated DataType parameterTypes = 3;
}
message TableStatProto {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 94eae97..6edc03a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.store.CatalogStore;
import org.apache.tajo.catalog.store.DBStore;
@@ -39,21 +38,27 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
+
/**
* This class provides the catalog service. The catalog service enables clients
* to register, unregister and access information about tables, functions, and
* cluster information.
*/
public class CatalogServer extends AbstractService {
-
private final static Log LOG = LogFactory.getLog(CatalogServer.class);
private TajoConf conf;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -61,8 +66,8 @@ public class CatalogServer extends AbstractService {
private final Lock wlock = lock.writeLock();
private CatalogStore store;
-
- private Map<String, FunctionDescProto> functions = new HashMap<String, FunctionDescProto>();
+ private Map<String, List<FunctionDescProto>> functions = new ConcurrentHashMap<String,
+ List<FunctionDescProto>>();
// RPC variables
private ProtoBlockingRpcServer rpcServer;
@@ -120,7 +125,7 @@ public class CatalogServer extends AbstractService {
private void initBuiltinFunctions(List<FunctionDesc> functions)
throws ServiceException {
for (FunctionDesc desc : functions) {
- handler.registerFunction(null, desc.getProto());
+ handler.createFunction(null, desc.getProto());
}
}
@@ -161,8 +166,7 @@ public class CatalogServer extends AbstractService {
return this.bindAddress;
}
- public class CatalogProtocolHandler
- implements CatalogProtocolService.BlockingInterface {
+ public class CatalogProtocolHandler implements CatalogProtocolService.BlockingInterface {
@Override
public TableDescProto getTableDesc(RpcController controller,
@@ -210,10 +214,10 @@ public class CatalogServer extends AbstractService {
public GetFunctionsResponse getFunctions(RpcController controller,
NullProto request)
throws ServiceException {
- Iterator<FunctionDescProto> iterator = functions.values().iterator();
+ Iterator<List<FunctionDescProto>> iterator = functions.values().iterator();
GetFunctionsResponse.Builder builder = GetFunctionsResponse.newBuilder();
while (iterator.hasNext()) {
- builder.addFunctionDesc(iterator.next());
+ builder.addAllFunctionDesc(iterator.next());
}
return builder.build();
}
@@ -397,81 +401,194 @@ public class CatalogServer extends AbstractService {
return BOOL_TRUE;
}
+ public boolean checkIfBuiltin(FunctionType type) {
+ return type == GENERAL || type == AGGREGATION || type == DISTINCT_AGGREGATION;
+ }
+
+ private boolean containFunction(String signature) {
+ List<FunctionDescProto> found = findFunction(signature);
+ return found != null && found.size() > 0;
+ }
+
+ private boolean containFunction(String signature, FunctionType type, List<DataType> params) {
+ return findFunction(signature, type, params) != null;
+ }
+
+ private List<FunctionDescProto> findFunction(String signature) {
+ return functions.get(signature);
+ }
+
+ private FunctionDescProto findFunction(String signature, List<DataType> params) {
+ if (functions.containsKey(signature)) {
+ for (FunctionDescProto existing : functions.get(signature)) {
+ if (existing.getParameterTypesList().containsAll(params) &&
+ params.containsAll(existing.getParameterTypesList())) {
+ return existing;
+ }
+ }
+ }
+ return null;
+ }
+
+ private FunctionDescProto findFunction(String signature, FunctionType type, List<DataType> params) {
+ if (functions.containsKey(signature)) {
+ for (FunctionDescProto existing : functions.get(signature)) {
+ if (existing.getType() == type &&
+ existing.getParameterTypesList().containsAll(params)
+ && params.containsAll(existing.getParameterTypesList())) {
+ return existing;
+ }
+ }
+ }
+ return null;
+ }
+
+ private FunctionDescProto findFunction(FunctionDescProto target) {
+ return findFunction(target.getSignature(), target.getType(), target.getParameterTypesList());
+ }
+
@Override
- public BoolProto registerFunction(RpcController controller,
- FunctionDescProto funcDesc)
+ public BoolProto createFunction(RpcController controller, FunctionDescProto funcDesc)
throws ServiceException {
- String canonicalName =
- CatalogUtil.getCanonicalName(funcDesc.getSignature(),
- funcDesc.getParameterTypesList());
- if (functions.containsKey(canonicalName)) {
- throw new AlreadyExistsFunctionException(canonicalName);
+ FunctionSignature signature = FunctionSignature.create(funcDesc);
+
+ if (functions.containsKey(funcDesc.getSignature())) {
+ FunctionDescProto found = findFunction(funcDesc);
+ if (found != null) {
+ throw new AlreadyExistsFunctionException(signature.toString());
+ }
}
- functions.put(canonicalName, funcDesc);
+ TUtil.putToNestedList(functions, funcDesc.getSignature(), funcDesc);
if (LOG.isDebugEnabled()) {
- LOG.info("Function " + canonicalName + " is registered.");
+ LOG.info("Function " + signature + " is registered.");
}
return BOOL_TRUE;
}
@Override
- public BoolProto unregisterFunction(RpcController controller,
- UnregisterFunctionRequest request)
+ public BoolProto dropFunction(RpcController controller, UnregisterFunctionRequest request)
throws ServiceException {
- String signature = request.getSignature();
- List<DataType> paramTypes = new ArrayList<DataType>();
- int size = request.getParameterTypesCount();
- for (int i = 0; i < size; i++) {
- paramTypes.add(request.getParameterTypes(i));
- }
- String canonicalName = CatalogUtil.getCanonicalName(signature, paramTypes);
- if (!functions.containsKey(canonicalName)) {
- throw new NoSuchFunctionException(canonicalName);
+
+ if (!containFunction(request.getSignature())) {
+ throw new NoSuchFunctionException(request.getSignature());
}
- functions.remove(canonicalName);
- LOG.info("GeneralFunction " + canonicalName + " is unregistered.");
+ functions.remove(request.getSignature());
+ LOG.info(request.getSignature() + " is dropped.");
return BOOL_TRUE;
}
@Override
- public FunctionDescProto getFunctionMeta(RpcController controller,
- GetFunctionMetaRequest request)
+ public FunctionDescProto getFunctionMeta(RpcController controller, GetFunctionMetaRequest request)
throws ServiceException {
- List<DataType> paramTypes = new ArrayList<DataType>();
- int size = request.getParameterTypesCount();
- for (int i = 0; i < size; i++) {
- paramTypes.add(request.getParameterTypes(i));
- }
-
- String key = CatalogUtil.getCanonicalName(
- request.getSignature().toLowerCase(), paramTypes);
- if (!functions.containsKey(key)) {
+ if (request.hasFunctionType()) {
+ if (containFunction(request.getSignature(), request.getFunctionType(), request.getParameterTypesList())) {
+ FunctionDescProto desc = findFunction(request.getSignature(), request.getFunctionType(),
+ request.getParameterTypesList());
+ return desc;
+ }
return null;
} else {
- return functions.get(key);
+ FunctionDescProto function = findFunction(request.getSignature(), request.getParameterTypesList());
+ return function;
}
}
@Override
- public BoolProto containFunction(RpcController controller,
- ContainFunctionRequest request)
+ public BoolProto containFunction(RpcController controller, ContainFunctionRequest request)
throws ServiceException {
- List<DataType> paramTypes = new ArrayList<DataType>();
- int size = request.getParameterTypesCount();
- for (int i = 0; i < size; i++) {
- paramTypes.add(request.getParameterTypes(i));
+ boolean returnValue;
+ if (request.hasFunctionType()) {
+ returnValue = containFunction(request.getSignature(), request.getFunctionType(),
+ request.getParameterTypesList());
+ } else {
+ returnValue = containFunction(request.getSignature());
}
- boolean returnValue =
- functions.containsKey(CatalogUtil.getCanonicalName(
- request.getSignature().toLowerCase(), paramTypes));
+
return BoolProto.newBuilder().setValue(returnValue).build();
}
}
+ private static class FunctionSignature implements Comparable<FunctionSignature> {
+ private String signature;
+ private FunctionType type;
+ private DataType [] arguments;
+
+ public FunctionSignature(String signature, FunctionType type, List<DataType> arguments) {
+ this.signature = signature;
+ this.type = type;
+ this.arguments = arguments.toArray(new DataType[arguments.size()]);
+ }
+
+ public static FunctionSignature create(FunctionDescProto proto) {
+ return new FunctionSignature(proto.getSignature(), proto.getType(), proto.getParameterTypesList());
+ }
+
+ public static FunctionSignature create (GetFunctionMetaRequest proto) {
+ return new FunctionSignature(proto.getSignature(), proto.getFunctionType(), proto.getParameterTypesList());
+ }
+
+ public static FunctionSignature create(ContainFunctionRequest proto) {
+ return new FunctionSignature(proto.getSignature(), proto.getFunctionType(), proto.getParameterTypesList());
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(signature);
+ sb.append("#").append(type.name());
+ sb.append("(");
+ int i = 0;
+ for (DataType type : arguments) {
+ sb.append(type.getType());
+ sb.append("[").append(type.getLength()).append("]");
+ if(i < arguments.length - 1) {
+ sb.append(",");
+ }
+ i++;
+ }
+ sb.append(")");
+
+ return sb.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return com.google.common.base.Objects.hashCode(signature, type, arguments);
+ }
+
+ @Override
+ public int compareTo(FunctionSignature o) {
+ int signatureCmp = signature.compareTo(o.signature);
+ if (signatureCmp != 0) {
+ return signatureCmp;
+ }
+
+ int typeCmp = type.compareTo(o.type);
+ if (typeCmp != 0) {
+ return typeCmp;
+ }
+
+ int min = Math.min(arguments.length, o.arguments.length);
+ int argCmp = 0;
+ for (int i = 0; i < min; i++) {
+ if (arguments.length < min && o.arguments.length < min) {
+ argCmp = arguments[i].getType().compareTo(o.arguments[i].getType());
+
+ if (argCmp != 0) {
+ return argCmp;
+ }
+ } else {
+ argCmp = arguments.length - o.arguments.length;
+ }
+ }
+ return argCmp;
+ }
+ }
+
public static void main(String[] args) throws Exception {
TajoConf conf = new TajoConf();
CatalogServer catalog = new CatalogServer(new ArrayList<FunctionDesc>());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 2bf944d..bc3bf4b 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -179,40 +179,36 @@ public class TestCatalog {
@Test
public final void testRegisterFunc() throws Exception {
- assertFalse(catalog.containFunction("test2"));
- FunctionDesc meta = new FunctionDesc("test2", TestFunc1.class, FunctionType.GENERAL,
+ assertFalse(catalog.containFunction("test2", FunctionType.UDF));
+ FunctionDesc meta = new FunctionDesc("test2", TestFunc1.class, FunctionType.UDF,
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4));
- catalog.registerFunction(meta);
+ catalog.createFunction(meta);
assertTrue(catalog.containFunction("test2", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
FunctionDesc retrived = catalog.getFunction("test2", CatalogUtil.newDataTypesWithoutLen(Type.INT4));
assertEquals(retrived.getSignature(),"test2");
assertEquals(retrived.getFuncClass(),TestFunc1.class);
- assertEquals(retrived.getFuncType(),FunctionType.GENERAL);
+ assertEquals(retrived.getFuncType(),FunctionType.UDF);
}
@Test
- public final void testUnregisterFunc() throws Exception {
- assertFalse(catalog
- .containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
- FunctionDesc meta = new FunctionDesc("test3", TestFunc1.class, FunctionType.GENERAL,
+ public final void testDropFunction() throws Exception {
+ assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+ FunctionDesc meta = new FunctionDesc("test3", TestFunc1.class, FunctionType.UDF,
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4 ));
- catalog.registerFunction(meta);
+ catalog.createFunction(meta);
assertTrue(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
- catalog.unregisterFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4));
- assertFalse(catalog
- .containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+ catalog.dropFunction("test3");
+ assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
- assertFalse(catalog.containFunction("test3",
- CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
+ assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
FunctionDesc overload = new FunctionDesc("test3", TestFunc2.class, FunctionType.GENERAL,
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB));
- catalog.registerFunction(overload);
- assertTrue(catalog.containFunction("test3",
- CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
+ catalog.createFunction(overload);
+ assertTrue(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java b/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java
new file mode 100644
index 0000000..d6a757d
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.annotation;
+
+@java.lang.annotation.Documented
+@java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
+@java.lang.annotation.Target({java.lang.annotation.ElementType.PARAMETER, java.lang.annotation.ElementType.FIELD})
+public @interface Nullable {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 3931709..76e9608 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -165,4 +165,20 @@ public class TUtil {
map.put(k1, TUtil.newLinkedHashMap(k2, value));
}
}
+
+ public static String arrayToString(Object [] objects) {
+ boolean first = false;
+ StringBuilder sb = new StringBuilder();
+ for(Object object : objects) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+
+ sb.append(object.toString());
+ }
+
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
index f6c54cb..afadaad 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
@@ -27,6 +27,9 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.TUtil;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_AGGREGATION;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_UDA;
+
public abstract class FuncEval extends EvalNode implements Cloneable {
@Expose protected FunctionDesc funcDesc;
@Expose protected EvalNode [] argEvals;
@@ -37,6 +40,10 @@ public abstract class FuncEval extends EvalNode implements Cloneable {
this.argEvals = argEvals;
}
+ public boolean isDistinct() {
+ return funcDesc.getFuncType() == DISTINCT_AGGREGATION || funcDesc.getFuncType() == DISTINCT_UDA;
+ }
+
@Override
public EvalContext newContext() {
FuncCallCtx newCtx = new FuncCallCtx(argEvals);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
index 2b5fb80..608c612 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
@@ -34,6 +34,7 @@ public final class CountValue extends CountRows {
new Column("col", Type.ANY)
});
}
+
@Override
public void eval(FunctionContext ctx, Tuple params) {
if (!(params.get(0) instanceof NullDatum)) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java
new file mode 100644
index 0000000..05850be
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.function.FunctionContext;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Int8Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Count(distinct column) function
+ */
+public final class CountValueDistinct extends CountRows {
+
+ public CountValueDistinct() {
+ super(new Column[] {
+ new Column("col", Type.ANY)
+ });
+ }
+
+ @Override
+ public void eval(FunctionContext context, Tuple params) {
+ }
+
+ @Override
+ public void merge(FunctionContext context, Tuple part) {
+ CountDistinctValueContext distinctContext = (CountDistinctValueContext) context;
+ Datum value = part.get(0);
+ if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+ distinctContext.latest = value;
+ distinctContext.count++;
+ }
+ }
+
+ @Override
+ public Datum getPartialResult(FunctionContext ctx) {
+ return DatumFactory.createInt8(((CountDistinctValueContext) ctx).count);
+ }
+
+ @Override
+ public Int8Datum terminate(FunctionContext ctx) {
+ return DatumFactory.createInt8(((CountDistinctValueContext) ctx).count);
+ }
+
+ @Override
+ public FunctionContext newContext() {
+ return new CountDistinctValueContext();
+ }
+
+ private class CountDistinctValueContext implements FunctionContext {
+ long count = 0;
+ Datum latest = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 7e7b45e..3ab98f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -602,7 +602,7 @@ public class LogicalPlan {
return targetListManager.size();
}
- public void fillTarget(int idx) throws VerifyException {
+ public void fillTarget(int idx) throws PlanningException {
targetListManager.update(idx, planner.createTarget(LogicalPlan.this, this, getProjection().getTargets()[idx]));
}
@@ -623,7 +623,7 @@ public class LogicalPlan {
return targetListManager.getUpdatedSchema();
}
- public void fillTargets() {
+ public void fillTargets() throws PlanningException {
for (int i = 0; i < getTargetListNum(); i++) {
if (!isAlreadyTargetCreated(i)) {
try {
@@ -713,6 +713,16 @@ public class LogicalPlan {
// Set the current targets to the GroupByNode because the GroupByNode is the last projection operator.
GroupbyNode groupbyNode = (GroupbyNode) node;
groupbyNode.setTargets(getCurrentTargets());
+ boolean distinct = false;
+ for (Target target : groupbyNode.getTargets()) {
+ for (AggFuncCallEval aggrFunc : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
+ if (aggrFunc.isDistinct()) {
+ distinct = true;
+ break;
+ }
+ }
+ }
+ groupbyNode.setDistinct(distinct);
node.setOutSchema(updateSchema());
// if a having condition is given,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 3edb712..f08855a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Stack;
import static org.apache.tajo.algebra.Aggregation.GroupType;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
/**
@@ -916,7 +917,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
===============================================================================================*/
public EvalNode createEvalTree(LogicalPlan plan, QueryBlock block, final Expr expr)
- throws VerifyException {
+ throws PlanningException {
switch(expr.getType()) {
// constants
@@ -1012,8 +1013,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
case Column:
return createFieldEval(plan, block, (ColumnReferenceExpr) expr);
- case CountRowsFunction:
- FunctionDesc countRows = catalog.getFunction("count", new DataType[] {});
+ case CountRowsFunction: {
+ FunctionDesc countRows = catalog.getFunction("count", FunctionType.AGGREGATION, new DataType[] {});
try {
block.setHasGrouping();
@@ -1024,13 +1025,15 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
throw new UndefinedFunctionException(CatalogUtil.
getCanonicalName(countRows.getSignature(), new DataType[]{}));
}
-
+ }
case GeneralSetFunction: {
GeneralSetFunctionExpr setFunction = (GeneralSetFunctionExpr) expr;
Expr[] params = setFunction.getParams();
EvalNode[] givenArgs = new EvalNode[params.length];
DataType[] paramTypes = new DataType[params.length];
+ FunctionType functionType = setFunction.isDistinct() ?
+ FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
givenArgs[0] = createEvalTree(plan, block, params[0]);
if (setFunction.getSignature().equalsIgnoreCase("count")) {
paramTypes[0] = CatalogUtil.newDataTypeWithoutLen(TajoDataTypes.Type.ANY);
@@ -1038,12 +1041,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
paramTypes[0] = givenArgs[0].getValueType()[0];
}
- if (!catalog.containFunction(setFunction.getSignature(), paramTypes)) {
- throw new UndefinedFunctionException(CatalogUtil.
- getCanonicalName(setFunction.getSignature(), paramTypes));
+ if (!catalog.containFunction(setFunction.getSignature(), functionType, paramTypes)) {
+ throw new UndefinedFunctionException(CatalogUtil. getCanonicalName(setFunction.getSignature(), paramTypes));
}
- FunctionDesc funcDesc = catalog.getFunction(setFunction.getSignature(), paramTypes);
+ FunctionDesc funcDesc = catalog.getFunction(setFunction.getSignature(), functionType, paramTypes);
if (!block.hasGroupbyNode()) {
block.setHasGrouping();
}
@@ -1068,23 +1070,23 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
if (!catalog.containFunction(function.getSignature(), paramTypes)) {
- throw new UndefinedFunctionException(CatalogUtil.
- getCanonicalName(function.getSignature(), paramTypes));
+ throw new UndefinedFunctionException(CatalogUtil.getCanonicalName(function.getSignature(), paramTypes));
}
FunctionDesc funcDesc = catalog.getFunction(function.getSignature(), paramTypes);
try {
- if (funcDesc.getFuncType() == CatalogProtos.FunctionType.GENERAL)
- return new FuncCallEval(funcDesc,
- (GeneralFunction) funcDesc.newInstance(), givenArgs);
- else {
+ FunctionType functionType = funcDesc.getFuncType();
+ if (functionType == FunctionType.GENERAL || functionType == FunctionType.UDF) {
+ return new FuncCallEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
+ } else if (functionType == FunctionType.AGGREGATION || functionType == FunctionType.UDA) {
if (!block.hasGroupbyNode()) {
block.setHasGrouping();
}
- return new AggFuncCallEval(funcDesc,
- (AggFunction) funcDesc.newInstance(), givenArgs);
+ return new AggFuncCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
+ } else if (functionType == FunctionType.DISTINCT_AGGREGATION || functionType == FunctionType.DISTINCT_UDA) {
+ throw new PlanningException("Unsupported function: " + funcDesc.toString());
}
} catch (InternalException e) {
e.printStackTrace();
@@ -1134,7 +1136,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
public CaseWhenEval createCaseWhenEval(LogicalPlan plan, QueryBlock block,
- CaseWhenPredicate caseWhen) throws VerifyException {
+ CaseWhenPredicate caseWhen) throws PlanningException {
CaseWhenEval caseEval = new CaseWhenEval();
EvalNode condition;
EvalNode result;
@@ -1153,7 +1155,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
Target[] annotateTargets(LogicalPlan plan, QueryBlock block, org.apache.tajo.algebra.Target [] targets)
- throws VerifyException {
+ throws PlanningException {
Target annotatedTargets [] = new Target[targets.length];
for (int i = 0; i < targets.length; i++) {
@@ -1163,7 +1165,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
Target createTarget(LogicalPlan plan, QueryBlock block,
- org.apache.tajo.algebra.Target target) throws VerifyException {
+ org.apache.tajo.algebra.Target target) throws PlanningException {
if (target.hasAlias()) {
return new Target(createEvalTree(plan, block, target.getExpr()),
target.getAlias());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c1a1b2f..8110a3b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -22,6 +22,7 @@
package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ObjectArrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.util.IndexUtil;
+import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.util.List;
@@ -375,7 +377,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) {
return createInMemoryHashAggregation(context, groupbyNode, subOp);
} else {
- return createSortAggregation(context, groupbyNode, subOp);
+ return createSortAggregation(context, property, groupbyNode, subOp);
}
}
return createBestAggregationPlan(context, groupbyNode, subOp);
@@ -387,19 +389,33 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new HashAggregateExec(ctx, groupbyNode, subOp);
}
- private PhysicalExec createSortAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
+ private PhysicalExec createSortAggregation(TaskAttemptContext ctx, EnforceProperty property, GroupbyNode groupbyNode, PhysicalExec subOp)
throws IOException {
+
Column[] grpColumns = groupbyNode.getGroupingColumns();
- SortSpec[] specs = new SortSpec[grpColumns.length];
+ SortSpec[] sortSpecs = new SortSpec[grpColumns.length];
for (int i = 0; i < grpColumns.length; i++) {
- specs[i] = new SortSpec(grpColumns[i], true, false);
+ sortSpecs[i] = new SortSpec(grpColumns[i], true, false);
+ }
+
+ if (property != null) {
+ List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
+ SortSpec[] enforcedSortSpecs = new SortSpec[sortSpecProtos.size()];
+ int i = 0;
+
+ for (int j = 0; j < sortSpecProtos.size(); i++, j++) {
+ enforcedSortSpecs[i] = new SortSpec(sortSpecProtos.get(j));
+ }
+
+ sortSpecs = ObjectArrays.concat(sortSpecs, enforcedSortSpecs, SortSpec.class);
}
- SortNode sortNode = new SortNode(-1, specs);
+
+ SortNode sortNode = new SortNode(-1, sortSpecs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(subOp.getSchema());
// SortExec sortExec = new SortExec(sortNode, child);
ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
- LOG.info("The planner chooses [Sort Aggregation]");
+ LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
return new SortAggregateExec(ctx, groupbyNode, sortExec);
}
@@ -420,7 +436,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
LOG.info("The planner chooses [Hash Aggregation]");
return createInMemoryHashAggregation(context, groupbyNode, subOp);
} else {
- return createSortAggregation(context, groupbyNode, subOp);
+ return createSortAggregation(context, null, groupbyNode, subOp);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 60243bc..8f44166 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,10 +35,7 @@ import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.query.exception.InvalidQueryException;
import org.apache.tajo.storage.TupleComparator;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
public class PlannerUtil {
private static final Log LOG = LogFactory.getLog(PlannerUtil.class);
@@ -111,42 +109,64 @@ public class PlannerUtil {
}
parentNode.setChild(newNode);
}
-
+
public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
Preconditions.checkNotNull(groupBy);
GroupbyNode child = null;
+
+ // cloning groupby node
try {
- // cloning groupby node
child = (GroupbyNode) groupBy.clone();
+ } catch (CloneNotSupportedException e) {
+ e.printStackTrace();
+ }
- List<Target> newChildTargets = Lists.newArrayList();
- Target[] secondTargets = groupBy.getTargets();
- Target[] firstTargets = child.getTargets();
+ List<Target> firstStepTargets = Lists.newArrayList();
+ Target[] secondTargets = groupBy.getTargets();
+ Target[] firstTargets = child.getTargets();
- Target second;
- Target first;
- int targetId = 0;
- for (int i = 0; i < firstTargets.length; i++) {
- second = secondTargets[i];
- first = firstTargets[i];
+ Target second;
+ Target first;
+ int targetId = 0;
+ for (int i = 0; i < firstTargets.length; i++) {
+ second = secondTargets[i];
+ first = firstTargets[i];
- List<AggFuncCallEval> secondFuncs = EvalTreeUtil
- .findDistinctAggFunction(second.getEvalTree());
- List<AggFuncCallEval> firstFuncs = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
+ List<AggFuncCallEval> secondStepFunctions = EvalTreeUtil.findDistinctAggFunction(second.getEvalTree());
+ List<AggFuncCallEval> firstStepFunctions = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
- if (firstFuncs.size() == 0) {
- newChildTargets.add(first);
- targetId++;
- } else {
- for (AggFuncCallEval func : firstFuncs) {
+ if (firstStepFunctions.size() == 0) {
+ firstStepTargets.add(first);
+ targetId++;
+ } else {
+ for (AggFuncCallEval func : firstStepFunctions) {
+ Target newTarget;
+
+ if (func.isDistinct()) {
+ List<Column> fields = EvalTreeUtil.findAllColumnRefs(func);
+ newTarget = new Target(new FieldEval(fields.get(0)));
+ String targetName = "column_" + (targetId++);
+ newTarget.setAlias(targetName);
+
+ AggFuncCallEval secondFunc = null;
+ for (AggFuncCallEval sf : secondStepFunctions) {
+ if (func.equals(sf)) {
+ secondFunc = sf;
+ break;
+ }
+ }
+
+ secondFunc.setArgs(new EvalNode [] {new FieldEval(
+ new Column(targetName, newTarget.getEvalTree().getValueType()[0]))});
+ } else {
func.setFirstPhase();
- Target newTarget = new Target(func);
+ newTarget = new Target(func);
String targetName = "column_" + (targetId++);
newTarget.setAlias(targetName);
AggFuncCallEval secondFunc = null;
- for (AggFuncCallEval sf : secondFuncs) {
+ for (AggFuncCallEval sf : secondStepFunctions) {
if (func.equals(sf)) {
secondFunc = sf;
break;
@@ -158,23 +178,32 @@ public class PlannerUtil {
secondFunc.setArgs(new EvalNode [] {new FieldEval(
new Column(targetName, newTarget.getEvalTree().getValueType()[0]))});
}
- newChildTargets.add(newTarget);
}
+ firstStepTargets.add(newTarget);
+ }
+ }
+
+ // Getting new target list and updating input/output schema from the new target list.
+ Target[] targetArray = firstStepTargets.toArray(new Target[firstStepTargets.size()]);
+ Schema targetSchema = PlannerUtil.targetToSchema(targetArray);
+ List<Target> newTarget = Lists.newArrayList();
+ for (Column column : groupBy.getGroupingColumns()) {
+ if (!targetSchema.contains(column.getQualifiedName())) {
+ newTarget.add(new Target(new FieldEval(column)));
}
}
+ targetArray = ObjectArrays.concat(targetArray, newTarget.toArray(new Target[newTarget.size()]), Target.class);
- Target[] targetArray = newChildTargets.toArray(new Target[newChildTargets.size()]);
child.setTargets(targetArray);
child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
// set the groupby chaining
groupBy.setChild(child);
groupBy.setInSchema(child.getOutSchema());
- } catch (CloneNotSupportedException e) {
- LOG.error(e);
+
}
-
return child;
}
+
/**
* Find the top logical node matched to type from the given node
@@ -402,10 +431,24 @@ public class PlannerUtil {
}
public static SortSpec[] schemaToSortSpecs(Schema schema) {
- SortSpec[] specs = new SortSpec[schema.getColumnNum()];
+ return schemaToSortSpecs(schema.toArray());
+ }
- for (int i = 0; i < schema.getColumnNum(); i++) {
- specs[i] = new SortSpec(schema.getColumn(i), true, false);
+ public static SortSpec[] schemaToSortSpecs(Column [] columns) {
+ SortSpec[] specs = new SortSpec[columns.length];
+
+ for (int i = 0; i < columns.length; i++) {
+ specs[i] = new SortSpec(columns[i], true, false);
+ }
+
+ return specs;
+ }
+
+ public static SortSpec [] columnsToSortSpec(Collection<Column> columns) {
+ SortSpec[] specs = new SortSpec[columns.size()];
+ int i = 0;
+ for (Column column : columns) {
+ specs[i++] = new SortSpec(column, true, false);
}
return specs;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index a268a39..d7c3ba4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner.enforce;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.util.TUtil;
@@ -29,6 +30,7 @@ import java.util.Map;
import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
public class Enforcer implements ProtoObject<EnforcerProto> {
Map<EnforceType, List<EnforceProperty>> properties;
@@ -88,7 +90,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
TUtil.putToNestedList(properties, builder.getType(), builder.build());
}
- public void addJoin(int pid, JoinEnforce.JoinAlgorithm algorithm) {
+ public void enforceJoinAlgorithm(int pid, JoinEnforce.JoinAlgorithm algorithm) {
EnforceProperty.Builder builder = newProperty();
JoinEnforce.Builder enforce = JoinEnforce.newBuilder();
enforce.setPid(pid);
@@ -99,18 +101,34 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
TUtil.putToNestedList(properties, builder.getType(), builder.build());
}
- public void addGroupby(int pid, GroupbyEnforce.GroupbyAlgorithm algorithm) {
+ public void enforceSortAggregation(int pid, @Nullable SortSpec[] sortSpecs) {
EnforceProperty.Builder builder = newProperty();
GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
enforce.setPid(pid);
- enforce.setAlgorithm(algorithm);
+ enforce.setAlgorithm(GroupbyAlgorithm.SORT_AGGREGATION);
+ if (sortSpecs != null) {
+ for (SortSpec sortSpec : sortSpecs) {
+ enforce.addSortSpecs(sortSpec.getProto());
+ }
+ }
+
+ builder.setType(EnforceType.GROUP_BY);
+ builder.setGroupby(enforce.build());
+ TUtil.putToNestedList(properties, builder.getType(), builder.build());
+ }
+
+ public void enforceHashAggregation(int pid) {
+ EnforceProperty.Builder builder = newProperty();
+ GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+ enforce.setPid(pid);
+ enforce.setAlgorithm(GroupbyAlgorithm.HASH_AGGREGATION);
builder.setType(EnforceType.GROUP_BY);
builder.setGroupby(enforce.build());
TUtil.putToNestedList(properties, builder.getType(), builder.build());
}
- public void addSort(int pid, SortEnforce.SortAlgorithm algorithm) {
+ public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) {
EnforceProperty.Builder builder = newProperty();
SortEnforce.Builder enforce = SortEnforce.newBuilder();
enforce.setPid(pid);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 4f9b6d7..2e10353 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -31,6 +31,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
@Expose private Schema havingSchema;
@Expose private EvalNode havingCondition = null;
@Expose private Target [] targets;
+ @Expose private boolean hasDistinct = false;
public GroupbyNode(int pid, final Column [] columns) {
super(pid, NodeType.GROUP_BY);
@@ -45,10 +46,18 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
public final boolean isEmptyGrouping() {
return columns == null || columns.length == 0;
}
-
+
public final Column [] getGroupingColumns() {
return this.columns;
}
+
+ public final boolean isDistinct() {
+ return hasDistinct;
+ }
+
+ public void setDistinct(boolean distinct) {
+ hasDistinct = distinct;
+ }
public final boolean hasHavingCondition() {
return this.havingCondition != null;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 0570a88..6ab72cb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -189,6 +189,10 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
}
+ for (Column column : node.getGroupingColumns()) {
+ currentRequired.add(column);
+ }
+
PushDownContext groupByContext = new PushDownContext(context);
groupByContext.upperRequired = currentRequired;
return pushDownCommonPost(groupByContext, node, stack);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 37c77b7..7ce929c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -19,6 +19,7 @@
package org.apache.tajo.master;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -26,6 +27,8 @@ import org.apache.tajo.DataChannel;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AggFuncCallEval;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
@@ -126,7 +129,7 @@ public class GlobalPlanner {
return currentBlock;
}
-
+
public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
Preconditions.checkArgument(channel.getSchema() != null,
"Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -194,69 +197,33 @@ public class GlobalPlanner {
ExecutionBlock currentBlock = null;
GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
- GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
- firstGroupBy.setHavingCondition(null);
-
- if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
- ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
-
- UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
- ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
- UnionsFinderContext finderContext = new UnionsFinderContext();
- finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
-
+ if (groupByNode.isDistinct()) {
+ if (childBlock == null) { // first repartition node
+ childBlock = masterPlan.newExecutionBlock();
+ }
+ childBlock.setPlan(groupByNode.getChild());
currentBlock = masterPlan.newExecutionBlock();
- GroupbyNode secondGroupBy = groupByNode;
- for (UnionNode union : finderContext.unionList) {
- TableSubQueryNode leftSubQuery = union.getLeftChild();
- TableSubQueryNode rightSubQuery = union.getRightChild();
- DataChannel dataChannel;
- if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
- g1.setChild(leftSubQuery);
- execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- secondGroupBy.setChild(scanNode);
- masterPlan.addConnect(dataChannel);
- }
- if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
- g1.setChild(rightSubQuery);
- execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+ LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- secondGroupBy.setChild(scanNode);
- masterPlan.addConnect(dataChannel);
+ for (Target target : groupByNode.getTargets()) {
+ List<AggFuncCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
+ for (AggFuncCallEval function : functions) {
+ if (function.isDistinct()) {
+ columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+ }
}
}
- LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
- currentBlock.setPlan(currentNode);
- } else {
- if (childBlock == null) { // first repartition node
- childBlock = masterPlan.newExecutionBlock();
- }
- childBlock.setPlan(firstGroupBy);
-
- currentBlock = masterPlan.newExecutionBlock();
+ Set<Column> existingColumns = Sets.newHashSet(groupByNode.getGroupingColumns());
+ columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
+ SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
+ currentBlock.getEnforcer().enforceSortAggregation(groupByNode.getPID(), sortSpecs);
DataChannel channel;
- if (firstGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
- channel.setPartitionKey(firstGroupBy.getGroupingColumns());
- } else {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
- channel.setPartitionKey(firstGroupBy.getGroupingColumns());
- }
- channel.setSchema(firstGroupBy.getOutSchema());
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel.setPartitionKey(groupByNode.getGroupingColumns());
+ channel.setSchema(groupByNode.getInSchema());
GroupbyNode secondGroupBy = groupByNode;
ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
@@ -269,6 +236,85 @@ public class GlobalPlanner {
masterPlan.addConnect(channel);
currentBlock.setPlan(currentNode);
+
+ } else {
+
+ GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+ firstGroupBy.setHavingCondition(null);
+
+ if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+ ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+ UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+ ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+ UnionsFinderContext finderContext = new UnionsFinderContext();
+ finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+ currentBlock = masterPlan.newExecutionBlock();
+ GroupbyNode secondGroupBy = groupByNode;
+ for (UnionNode union : finderContext.unionList) {
+ TableSubQueryNode leftSubQuery = union.getLeftChild();
+ TableSubQueryNode rightSubQuery = union.getRightChild();
+ DataChannel dataChannel;
+ if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+ g1.setChild(leftSubQuery);
+ execBlock.setPlan(g1);
+ dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+ secondGroupBy.setChild(scanNode);
+ masterPlan.addConnect(dataChannel);
+ }
+ if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+ g1.setChild(rightSubQuery);
+ execBlock.setPlan(g1);
+ dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+ secondGroupBy.setChild(scanNode);
+ masterPlan.addConnect(dataChannel);
+ }
+ }
+ LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
+ }
+ currentBlock.setPlan(currentNode);
+ } else {
+
+ if (childBlock == null) { // first repartition node
+ childBlock = masterPlan.newExecutionBlock();
+ }
+ childBlock.setPlan(firstGroupBy);
+
+ currentBlock = masterPlan.newExecutionBlock();
+
+ DataChannel channel;
+ if (firstGroupBy.isEmptyGrouping()) {
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+ channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+ } else {
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+ }
+ channel.setSchema(firstGroupBy.getOutSchema());
+
+ GroupbyNode secondGroupBy = groupByNode;
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ secondGroupBy.setChild(scanNode);
+
+ LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
+ }
+
+ masterPlan.addConnect(channel);
+ currentBlock.setPlan(currentNode);
+ }
}
return new ExecutionBlock [] {currentBlock, childBlock};
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 1296ea4..ccb5ddb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -278,6 +277,9 @@ public class TajoMaster extends CompositeService {
sqlFuncs.add(new FunctionDesc("count", CountRows.class, FunctionType.AGGREGATION,
CatalogUtil.newDataTypesWithoutLen(Type.INT8),
CatalogUtil.newDataTypesWithoutLen()));
+ sqlFuncs.add(new FunctionDesc("count", CountValueDistinct.class, FunctionType.DISTINCT_AGGREGATION,
+ CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+ CatalogUtil.newDataTypesWithoutLen(Type.ANY)));
// GeoIP
sqlFuncs.add(new FunctionDesc("in_country", InCountry.class, FunctionType.GENERAL,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 7911925..7a956e5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -469,7 +469,7 @@ public class Repartitioner {
}
}
- GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getPlan();
+ GroupbyNode groupby = PlannerUtil.findTopNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
// the number of tasks cannot exceed the number of merged fetch uris.
int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
if (groupby.getGroupingColumns().length == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index ba5f342..4d75e46 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -220,6 +220,7 @@ message GroupbyEnforce {
required int32 pid = 1;
required GroupbyAlgorithm algorithm = 2;
+ repeated SortSpecProto sortSpecs = 3;
}
message SortEnforce {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index b3be308..7d14e4c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -57,7 +57,7 @@ public class TestEvalTree {
util.startCatalogCluster();
cat = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- cat.registerFunction(funcDesc);
+ cat.createFunction(funcDesc);
}
Schema schema = new Schema();
@@ -72,7 +72,7 @@ public class TestEvalTree {
FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class, FunctionType.GENERAL,
CatalogUtil.newDataTypesWithoutLen(INT4),
CatalogUtil.newDataTypesWithoutLen(INT4, INT4));
- cat.registerFunction(funcMeta);
+ cat.createFunction(funcMeta);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(cat);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 40c4875..079e653 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -64,7 +64,7 @@ public class TestEvalTreeUtil {
util.startCatalogCluster();
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
}
Schema schema = new Schema();
@@ -80,7 +80,7 @@ public class TestEvalTreeUtil {
FunctionType.GENERAL,
CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.INT4),
CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.INT4, TajoDataTypes.Type.INT4));
- catalog.registerFunction(funcMeta);
+ catalog.createFunction(funcMeta);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
index 2ba6ee6..c3e5d00 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
@@ -56,7 +56,7 @@ public class ExprTestBase {
util.startCatalogCluster();
cat = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- cat.registerFunction(funcDesc);
+ cat.createFunction(funcDesc);
}
analyzer = new SQLAnalyzer();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 72cbfc2..9ef6ab0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -49,7 +49,7 @@ public class TestLogicalOptimizer {
util.startCatalogCluster();
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
}
Schema schema = new Schema();
@@ -85,7 +85,7 @@ public class TestLogicalOptimizer {
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4));
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
optimizer = new LogicalOptimizer();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 9526c66..11775c1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -52,7 +52,7 @@ public class TestLogicalPlan {
util.startCatalogCluster();
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
}
// TPC-H Schema for Complex Queries
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index b708213..06bc52f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -59,7 +59,7 @@ public class TestLogicalPlanner {
util.startCatalogCluster();
catalog = util.getMiniCatalogCluster().getCatalog();
for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
}
Schema schema = new Schema();
@@ -108,7 +108,7 @@ public class TestLogicalPlanner {
catalog.addTable(d);
}
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 46fd648..130c2f7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -85,7 +85,7 @@ public class TestPlannerUtil {
CatalogUtil.newDataTypesWithoutLen(Type.INT4),
CatalogUtil.newDataTypesWithoutLen(Type.INT4));
- catalog.registerFunction(funcDesc);
+ catalog.createFunction(funcDesc);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 425418f..b311c9d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -35,7 +35,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
@@ -139,7 +138,7 @@ public class TestBNLJoinExec {
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
- enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
Integer.MAX_VALUE);
@@ -180,7 +179,7 @@ public class TestBNLJoinExec {
JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
Enforcer enforcer = new Enforcer();
- enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),