You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 13:44:07 UTC
[04/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core
into the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
new file mode 100644
index 0000000..1dbbcf0
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashAntiJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ Schema employeeSchema = new Schema();
+ employeeSchema.addColumn("managerid", Type.INT4);
+ employeeSchema.addColumn("empid", Type.INT4);
+ employeeSchema.addColumn("memid", Type.INT4);
+ employeeSchema.addColumn("deptname", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+ employeePath);
+ appender.init();
+ Tuple tuple = new VTuple(employeeSchema.size());
+
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i), // empid [0-9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+ catalog.createTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empid", Type.INT4);
+ peopleSchema.addColumn("fk_memid", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleSchema.size());
+ for (int i = 1; i < 10; i += 2) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+ catalog.createTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ optimizer = new LogicalOptimizer(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+
+ // relation descriptions
+ // employee (managerid, empid, memid, deptname)
+ // people (empid, fk_memid, name, age)
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+ };
+
+ @Test
+ public final void testHashAntiJoin() throws IOException, PlanningException {
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ // replace an equal join with an hash anti join.
+ if (exec instanceof MergeJoinExec) {
+ MergeJoinExec join = (MergeJoinExec) exec;
+ ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+ ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+ SeqScanExec scanLeftChild = sortLeftChild.getChild();
+ SeqScanExec scanRightChild = sortRightChild.getChild();
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("default.people")) {
+ exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+ } else {
+ exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+ }
+ } else if (exec instanceof HashJoinExec) {
+ HashJoinExec join = (HashJoinExec) exec;
+ SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("default.people")) {
+ exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+ } else {
+ exec = new HashLeftAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ }
+ }
+
+ Tuple tuple;
+ int count = 0;
+ int i = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.get(0).asInt4());
+ assertTrue(i == tuple.get(1).asInt4()); // expected empid [0, 2, 4, 6, 8]
+ assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+ assertTrue(10 + i == tuple.get(3).asInt4());
+
+ i += 2;
+ }
+ exec.close();
+ assertEquals(5 , count); // the expected result : [0, 2, 4, 6, 8]
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
new file mode 100644
index 0000000..66222da
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.*;
+
+public class TestHashJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+ private final Session session = LocalTajoTestingUtility.createDummySession();
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ Schema employeeSchema = new Schema();
+ employeeSchema.addColumn("managerid", Type.INT4);
+ employeeSchema.addColumn("empid", Type.INT4);
+ employeeSchema.addColumn("memid", Type.INT4);
+ employeeSchema.addColumn("deptname", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+ employeePath);
+ appender.init();
+ Tuple tuple = new VTuple(employeeSchema.size());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+ catalog.createTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empid", Type.INT4);
+ peopleSchema.addColumn("fk_memid", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleSchema.size());
+ for (int i = 1; i < 10; i += 2) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+ catalog.createTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e inner join " +
+ "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+ };
+
+ @Test
+ public final void testHashInnerJoin() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof HashJoinExec);
+
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.get(0).asInt4());
+ assertTrue(i == tuple.get(1).asInt4());
+ assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+ assertTrue(10 + i == tuple.get(3).asInt4());
+
+ i += 2;
+ }
+ exec.close();
+ assertEquals(10 / 2, count);
+ }
+
+ @Test
+ public final void testCheckIfInMemoryInnerJoinIsPossible() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashInnerJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ TajoConf localConf = new TajoConf(conf);
+ localConf.setLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD, 100l);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(localConf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof HashJoinExec);
+ HashJoinExec joinExec = proj.getChild();
+
+ assertCheckInnerJoinRelatedFunctions(ctx, phyPlanner, joinNode, joinExec);
+ }
+
+ /**
+ * It checks inner-join related functions. It will return TRUE if left relations is smaller than right relations.
+ *
+ * The below unit tests will work according to which side is smaller. In this unit tests, we use two tables: p and e.
+ * The table p is 75 bytes, and the table e is 140 bytes. Since we cannot expect that which side is smaller,
+ * we use some boolean variable <code>leftSmaller</code> to indicate which side is small.
+ */
+ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext ctx,
+ PhysicalPlannerImpl phyPlanner,
+ JoinNode joinNode, BinaryPhysicalExec joinExec) throws
+ IOException {
+
+ String [] left = PlannerUtil.getRelationLineage(joinNode.getLeftChild());
+ String [] right = PlannerUtil.getRelationLineage(joinNode.getRightChild());
+
+ boolean leftSmaller;
+ if (left[0].equals("default.p")) {
+ leftSmaller = true;
+ } else {
+ leftSmaller = false;
+ }
+
+ long leftSize = phyPlanner.estimateSizeRecursive(ctx, left);
+ long rightSize = phyPlanner.estimateSizeRecursive(ctx, right);
+
+ // The table p is 75 bytes, and the table e is 140 bytes.
+ if (leftSmaller) { // if left one is smaller
+ assertEquals(75, leftSize);
+ assertEquals(140, rightSize);
+ } else { // if right one is smaller
+ assertEquals(140, leftSize);
+ assertEquals(75, rightSize);
+ }
+
+ if (leftSmaller) {
+ PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+ joinExec.getRightChild());
+ assertEquals(ordered[0], joinExec.getLeftChild());
+ assertEquals(ordered[1], joinExec.getRightChild());
+
+ assertEquals("default.p", left[0]);
+ assertEquals("default.e", right[0]);
+ } else {
+ PhysicalExec [] ordered = phyPlanner.switchJoinSidesIfNecessary(ctx, joinNode, joinExec.getLeftChild(),
+ joinExec.getRightChild());
+ assertEquals(ordered[1], joinExec.getLeftChild());
+ assertEquals(ordered[0], joinExec.getRightChild());
+
+ assertEquals("default.e", left[0]);
+ assertEquals("default.p", right[0]);
+ }
+
+ if (leftSmaller) {
+ assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+ assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+ } else {
+ assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
+ assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
+ }
+
+ return leftSmaller;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
new file mode 100644
index 0000000..f0d846c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashPartitioner.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHashPartitioner {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testGetPartition() {
+ Tuple tuple1 = new VTuple(3);
+ tuple1.put(new Datum[] {
+ DatumFactory.createInt4(1),
+ DatumFactory.createInt4(2),
+ DatumFactory.createInt4(3)
+ });
+ Tuple tuple2 = new VTuple(3);
+ tuple2.put(new Datum[] {
+ DatumFactory.createInt4(1),
+ DatumFactory.createInt4(2),
+ DatumFactory.createInt4(4)
+ });
+ Tuple tuple3 = new VTuple(3);
+ tuple3.put(new Datum[] {
+ DatumFactory.createInt4(1),
+ DatumFactory.createInt4(2),
+ DatumFactory.createInt4(5)
+ });
+ Tuple tuple4 = new VTuple(3);
+ tuple4.put(new Datum[] {
+ DatumFactory.createInt4(2),
+ DatumFactory.createInt4(2),
+ DatumFactory.createInt4(3)
+ });
+ Tuple tuple5 = new VTuple(3);
+ tuple5.put(new Datum[] {
+ DatumFactory.createInt4(2),
+ DatumFactory.createInt4(2),
+ DatumFactory.createInt4(4)
+ });
+
+ int [] partKeys = {0,1};
+ Partitioner p = new HashPartitioner(partKeys, 2);
+
+ int part1 = p.getPartition(tuple1);
+ assertEquals(part1, p.getPartition(tuple2));
+ assertEquals(part1, p.getPartition(tuple3));
+
+ int part2 = p.getPartition(tuple4);
+ assertEquals(part2, p.getPartition(tuple5));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
new file mode 100644
index 0000000..4e5de98
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashSemiJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ Schema employeeSchema = new Schema();
+ employeeSchema.addColumn("managerid", Type.INT4);
+ employeeSchema.addColumn("empid", Type.INT4);
+ employeeSchema.addColumn("memid", Type.INT4);
+ employeeSchema.addColumn("deptname", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+ employeePath);
+ appender.init();
+ Tuple tuple = new VTuple(employeeSchema.size());
+
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i), // empid [0-9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+ catalog.createTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empid", Type.INT4);
+ peopleSchema.addColumn("fk_memid", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleSchema.size());
+ // make 27 tuples
+ for (int i = 1; i < 10; i += 2) {
+ // make three duplicated tuples for each tuples
+ for (int j = 0; j < 3; j++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i) });
+ appender.addTuple(tuple);
+ }
+ }
+
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+ catalog.createTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ optimizer = new LogicalOptimizer(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+
+ // relation descriptions
+ // employee (managerid, empid, memid, deptname)
+ // people (empid, fk_memid, name, age)
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+ };
+
+ @Test
+ public final void testHashSemiJoin() throws IOException, PlanningException {
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ // replace an equal join with an hash anti join.
+ if (exec instanceof MergeJoinExec) {
+ MergeJoinExec join = (MergeJoinExec) exec;
+ ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+ ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+ SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
+ SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("default.people")) {
+ exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+ } else {
+ exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+ }
+ } else if (exec instanceof HashJoinExec) {
+ HashJoinExec join = (HashJoinExec) exec;
+ SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("default.people")) {
+ exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+ } else {
+ exec = new HashLeftSemiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ }
+ }
+
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+ // expect result without duplicated tuples.
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.get(0).asInt4());
+ assertTrue(i == tuple.get(1).asInt4());
+ assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+ assertTrue(10 + i == tuple.get(3).asInt4());
+
+ i += 2;
+ }
+ exec.close();
+ assertEquals(5 , count); // the expected result: [1, 3, 5, 7, 9]
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
new file mode 100644
index 0000000..de3d298
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLeftOuterHashJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestLeftOuterHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+ private Session session = LocalTajoTestingUtility.createDummySession();
+
+ private TableDesc dep3;
+ private TableDesc job3;
+ private TableDesc emp3;
+ private TableDesc phone3;
+
+ private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+ private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+ private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+ private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Schema.size());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+ catalog.createTable(dep3);
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Schema.size());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+ catalog.createTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Schema.size());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+ catalog.createTable(emp3);
+
+ //---------------------phone3 --------------------
+ // emp_id | phone_number
+ // -----------------------------------------------
+ // this table is empty, no rows
+
+ Schema phone3Schema = new Schema();
+ phone3Schema.addColumn("emp_id", Type.INT4);
+ phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+ TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path phone3Path = new Path(testDir, "phone3.csv");
+ Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+ phone3Path);
+ appender5.init();
+
+ appender5.flush();
+ appender5.close();
+ phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+ catalog.createTable(phone3);
+
+
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ // [0] no nulls
+ "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id",
+ // [1] nulls on the right operand
+ "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id",
+ // [2] nulls on the left side
+ "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id",
+ // [3] one operand is empty
+ "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id",
+ // [4] one operand is empty
+ "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id"
+ };
+
+ @Test
+ public final void testLeftOuterHashJoinExec0() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+ JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
+
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterHashJoinExec0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof HashLeftOuterJoinExec);
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(12, count);
+ }
+
+
+ @Test
+ public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(5, count);
+ }
+ }
+
+ @Test
+ public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
+
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+ }
+
+
+ @Test
+ public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
+
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec3");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[3]);
+ LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+ }
+
+
+ @Test
+ public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
+
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, "default.emp3", emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, "default.phone3", phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_HashJoinExec4");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr expr = analyzer.parse(QUERIES[4]);
+ LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof NLLeftOuterJoinExec) {
+ //for this small data set this is not likely to happen
+
+ assertEquals(1, 0);
+ }
+ else{
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ exec.close();
+ assertEquals(0, count);
+ }
+ }
+
+
+
+}
+ //--camelia
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
new file mode 100644
index 0000000..e806e55
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestLeftOuterNLJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestLeftOuterNLJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private static final Session session = LocalTajoTestingUtility.createDummySession();
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc dep3;
+ private TableDesc job3;
+ private TableDesc emp3;
+ private TableDesc phone3;
+
+ private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3");
+ private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3");
+ private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3");
+ private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3");
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ //----------------- dep3 ------------------------------
+ // dep_id | dep_name | loc_id
+ //--------------------------------
+ // 0 | dep_0 | 1000
+ // 1 | dep_1 | 1001
+ // 2 | dep_2 | 1002
+ // 3 | dep_3 | 1003
+ // 4 | dep_4 | 1004
+ // 5 | dep_5 | 1005
+ // 6 | dep_6 | 1006
+ // 7 | dep_7 | 1007
+ // 8 | dep_8 | 1008
+ // 9 | dep_9 | 1009
+ Schema dep3Schema = new Schema();
+ dep3Schema.addColumn("dep_id", Type.INT4);
+ dep3Schema.addColumn("dep_name", Type.TEXT);
+ dep3Schema.addColumn("loc_id", Type.INT4);
+
+
+ TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path dep3Path = new Path(testDir, "dep3.csv");
+ Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+ appender1.init();
+ Tuple tuple = new VTuple(dep3Schema.size());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createText("dept_" + i),
+ DatumFactory.createInt4(1000 + i) });
+ appender1.addTuple(tuple);
+ }
+
+ appender1.flush();
+ appender1.close();
+ dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path);
+ catalog.createTable(dep3);
+
+ //----------------- job3 ------------------------------
+ // job_id | job_title
+ // ----------------------
+ // 101 | job_101
+ // 102 | job_102
+ // 103 | job_103
+
+ Schema job3Schema = new Schema();
+ job3Schema.addColumn("job_id", Type.INT4);
+ job3Schema.addColumn("job_title", Type.TEXT);
+
+
+ TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path job3Path = new Path(testDir, "job3.csv");
+ Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+ appender2.init();
+ Tuple tuple2 = new VTuple(job3Schema.size());
+ for (int i = 1; i < 4; i++) {
+ int x = 100 + i;
+ tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i),
+ DatumFactory.createText("job_" + x) });
+ appender2.addTuple(tuple2);
+ }
+
+ appender2.flush();
+ appender2.close();
+ job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path);
+ catalog.createTable(job3);
+
+
+
+ //---------------------emp3 --------------------
+ // emp_id | first_name | last_name | dep_id | salary | job_id
+ // ------------------------------------------------------------
+ // 11 | fn_11 | ln_11 | 1 | 123 | 101
+ // 13 | fn_13 | ln_13 | 3 | 369 | 103
+ // 15 | fn_15 | ln_15 | 5 | 615 | null
+ // 17 | fn_17 | ln_17 | 7 | 861 | null
+ // 19 | fn_19 | ln_19 | 9 | 1107 | null
+ // 21 | fn_21 | ln_21 | 1 | 123 | 101
+ // 23 | fn_23 | ln_23 | 3 | 369 | 103
+
+ Schema emp3Schema = new Schema();
+ emp3Schema.addColumn("emp_id", Type.INT4);
+ emp3Schema.addColumn("first_name", Type.TEXT);
+ emp3Schema.addColumn("last_name", Type.TEXT);
+ emp3Schema.addColumn("dep_id", Type.INT4);
+ emp3Schema.addColumn("salary", Type.FLOAT4);
+ emp3Schema.addColumn("job_id", Type.INT4);
+
+
+ TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path emp3Path = new Path(testDir, "emp3.csv");
+ Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+ appender3.init();
+ Tuple tuple3 = new VTuple(emp3Schema.size());
+
+ for (int i = 1; i < 4; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+
+ int y = 20 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i),
+ DatumFactory.createText("firstname_" + y),
+ DatumFactory.createText("lastname_" + y),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createInt4(100 + i) });
+ appender3.addTuple(tuple3);
+ }
+
+ for (int i = 5; i < 10; i += 2) {
+ int x = 10 + i;
+ tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("firstname_" + x),
+ DatumFactory.createText("lastname_" + x),
+ DatumFactory.createInt4(i),
+ DatumFactory.createFloat4(123 * i),
+ DatumFactory.createNullDatum() });
+ appender3.addTuple(tuple3);
+ }
+
+ appender3.flush();
+ appender3.close();
+ emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path);
+ catalog.createTable(emp3);
+
+ // ---------------------phone3 --------------------
+ // emp_id | phone_number
+ // -----------------------------------------------
+ // this table is empty, no rows
+
+ Schema phone3Schema = new Schema();
+ phone3Schema.addColumn("emp_id", Type.INT4);
+ phone3Schema.addColumn("phone_number", Type.TEXT);
+
+
+ TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path phone3Path = new Path(testDir, "phone3.csv");
+ Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+ phone3Path);
+ appender5.init();
+
+ appender5.flush();
+ appender5.close();
+ phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path);
+ catalog.createTable(phone3);
+
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select dep3.dep_id, dep_name, emp_id, salary from dep3 left outer join emp3 on dep3.dep_id = emp3.dep_id", //0 no nulls
+ "select job3.job_id, job_title, emp_id, salary from job3 left outer join emp3 on job3.job_id=emp3.job_id", //1 nulls on the right operand
+ "select job3.job_id, job_title, emp_id, salary from emp3 left outer join job3 on job3.job_id=emp3.job_id", //2 nulls on the left side
+ "select emp3.emp_id, first_name, phone_number from emp3 left outer join phone3 on emp3.emp_id = phone3.emp_id", //3 one operand is empty
+ "select phone_number, emp3.emp_id, first_name from phone3 left outer join emp3 on emp3.emp_id = phone3.emp_id" //4 one operand is empty
+ };
+
+ @Test
+ public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
+ FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+ HashLeftOuterJoinExec join = proj.getChild();
+ NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+ }
+
+ int count = 0;
+ exec.init();
+ while (exec.next() != null) {
+ //TODO check contents
+ count = count + 1;
+ }
+ assertNull(exec.next());
+ exec.close();
+ assertEquals(12, count);
+ }
+
+
+ @Test
+ public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
+
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+ HashLeftOuterJoinExec join = proj.getChild();
+ NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(5, count);
+ }
+
+ @Test
+ public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[2]);
+ LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+ HashLeftOuterJoinExec join = proj.getChild();
+ NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+
+
+ @Test
+ public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[3]);
+ LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+ HashLeftOuterJoinExec join = proj.getChild();
+ NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(7, count);
+ }
+
+ @Test
+ public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
+ FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[4]);
+ LogicalNode plan = planner.createPlan(session, context).getRootBlock().getRoot();
+
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
+ ProjectionExec proj = (ProjectionExec) exec;
+ if (proj.getChild() instanceof HashLeftOuterJoinExec) {
+ HashLeftOuterJoinExec join = proj.getChild();
+ NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ proj.setChild(aJoin);
+ exec = proj;
+
+ }
+
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ //TODO check contents
+ count = count + 1;
+
+ }
+ exec.close();
+ assertEquals(0, count);
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
new file mode 100644
index 0000000..0e4fd9a
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMergeJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestMergeJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ Path testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+ FileSystem fs = testDir.getFileSystem(conf);
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ Schema employeeSchema = new Schema();
+ employeeSchema.addColumn("managerid", Type.INT4);
+ employeeSchema.addColumn("empid", Type.INT4);
+ employeeSchema.addColumn("memid", Type.INT4);
+ employeeSchema.addColumn("deptname", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+ employeePath);
+ appender.init();
+ Tuple tuple = new VTuple(employeeSchema.size());
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i) });
+ appender.addTuple(tuple);
+ }
+ for (int i = 11; i < 20; i+=2) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("default.employee", employeeSchema, employeeMeta, employeePath);
+ catalog.createTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empid", Type.INT4);
+ peopleSchema.addColumn("fk_memid", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleSchema.size());
+ for (int i = 1; i < 10; i += 2) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i) });
+ appender.addTuple(tuple);
+ }
+ for (int i = 10; i < 20; i++) {
+ tuple.put(new Datum[] { DatumFactory.createInt4(i),
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+ catalog.createTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e inner join " +
+ "people as p on e.empId = p.empId and e.memId = p.fk_memId"
+ };
+
+ @Test
+ public final void testMergeInnerJoin() throws IOException, PlanningException {
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr);
+ LogicalNode root = plan.getRootBlock().getRoot();
+
+ JoinNode joinNode = PlannerUtil.findTopNode(root, NodeType.JOIN);
+ Enforcer enforcer = new Enforcer();
+ enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
+
+ FileFragment[] empFrags = sm.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = sm.splitNG(conf, "default.p", people.getMeta(), people.getPath(), Integer.MAX_VALUE);
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(enforcer);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, root);
+ ProjectionExec proj = (ProjectionExec) exec;
+ assertTrue(proj.getChild() instanceof MergeJoinExec);
+
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.get(0).asInt4());
+ assertTrue(i == tuple.get(1).asInt4());
+ assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+ assertTrue((10 + i) == tuple.get(3).asInt4());
+
+ i += 2;
+ }
+ exec.close();
+ assertEquals(10, count); // expected 10 * 5
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
new file mode 100644
index 0000000..120113f
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlanner;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestNLJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestNLJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private AbstractStorageManager sm;
+ private Path testDir;
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ private MasterPlan masterPlan;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
+ catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
+ conf = util.getConfiguration();
+ sm = StorageManagerFactory.getStorageManager(conf, testDir);
+
+ Schema schema = new Schema();
+ schema.addColumn("managerid", Type.INT4);
+ schema.addColumn("empid", Type.INT4);
+ schema.addColumn("memid", Type.INT4);
+ schema.addColumn("deptname", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+ appender.init();
+ Tuple tuple = new VTuple(schema.size());
+ for (int i = 0; i < 50; i++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i)});
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("default.employee", schema, employeeMeta, employeePath);
+ catalog.createTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empid", Type.INT4);
+ peopleSchema.addColumn("fk_memid", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleSchema.size());
+ for (int i = 1; i < 50; i += 2) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i)});
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
+ catalog.createTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+
+ masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e, people p",
+ "select managerId, e.empId, deptName, e.memId from employee as e inner join people as p on " +
+ "e.empId = p.empId and e.memId = p.fk_memId"
+ };
+
+ @Test
+ public final void testNLCrossJoin() throws IOException, PlanningException {
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[0]);
+ LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(),
+ context).getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ int i = 0;
+ exec.init();
+ while (exec.next() != null) {
+ i++;
+ }
+ exec.close();
+ assertEquals(50*50/2, i); // expected 10 * 5
+ }
+
+ @Test
+ public final void testNLInnerJoin() throws IOException, PlanningException {
+ FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+
+ FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), merged, workDir);
+ ctx.setEnforcer(new Enforcer());
+ Expr context = analyzer.parse(QUERIES[1]);
+ LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(),
+ context).getRootBlock().getRoot();
+ //LogicalOptimizer.optimize(ctx, plan);
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+ Tuple tuple;
+ int i = 1;
+ int count = 0;
+ exec.init();
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.get(0).asInt4());
+ assertTrue(i == tuple.get(1).asInt4());
+ assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+ assertTrue(10 + i == tuple.get(3).asInt4());
+ i += 2;
+ }
+ exec.close();
+ assertEquals(50 / 2, count);
+ }
+}