You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/02 14:33:36 UTC
git commit: TAJO-143: Implement hash semi-join operator. (hyunsik)
Updated Branches:
refs/heads/master 0f3965a00 -> 156b91ab7
TAJO-143: Implement hash semi-join operator. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/156b91ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/156b91ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/156b91ab
Branch: refs/heads/master
Commit: 156b91ab7b5d1813b3fc2974e2f505e2bd5950c0
Parents: 0f3965a
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Sep 2 18:24:12 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Sep 2 18:24:12 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../planner/physical/HashSemiJoinExec.java | 101 +++++++++
.../planner/physical/TestHashAntiJoinExec.java | 18 +-
.../planner/physical/TestHashSemiJoinExec.java | 206 +++++++++++++++++++
4 files changed, 324 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 635205a..8fb23fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-143: Implement hash semi-join operator. (hyunsik)
+
TAJO-142: Implement hash anti-join operator. (hyunsik)
TAJO-94: Remove duplicate proto files. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
new file mode 100644
index 0000000..36b84a8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class HashSemiJoinExec extends HashJoinExec {
+ private Tuple rightNullTuple;
+
+ public HashSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
+ PhysicalExec right) {
+ super(context, plan, left, right);
+ // NUll Tuple
+ rightNullTuple = new VTuple(leftChild.outColumnNum);
+ for (int i = 0; i < leftChild.outColumnNum; i++) {
+ rightNullTuple.put(i, NullDatum.get());
+ }
+ }
+
+ /**
+ * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+ * next() method finds the first unmatched tuple from both tables.
+ *
+ * For each left tuple on the disk, next() tries to find the right tuple from the hash table.
+ * Until there is a hash bucket matched to the left tuple in the hash table, it continues to traverse the left
+ * tuples. If next() finds the matched bucket in the hash table, it finds any matched tuple in the bucket.
+ * If found, it returns the composite tuple immediately without finding more matched tuple in the bucket.
+ *
+ * @return The tuple which is firstly matched to a given join condition.
+ * @throws java.io.IOException
+ */
+ public Tuple next() throws IOException {
+ if (first) {
+ loadRightToHashTable();
+ }
+
+ Tuple rightTuple;
+ boolean notFound;
+
+ while(!finished) {
+
+ // getting new outer
+ leftTuple = leftChild.next(); // it comes from a disk
+ if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+ finished = true;
+ return null;
+ }
+
+ // Try to find a hash bucket in in-memory hash table
+ getKeyLeftTuple(leftTuple, leftKeyTuple);
+ if (tupleSlots.containsKey(leftKeyTuple)) {
+ // if found, it gets a hash bucket from the hash table.
+ iterator = tupleSlots.get(leftKeyTuple).iterator();
+ } else {
+ continue;
+ }
+
+ // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+ // If it finds any matched tuple, it returns the tuple immediately.
+ notFound = true;
+ while (notFound && iterator.hasNext()) {
+ rightTuple = iterator.next();
+ frameTuple.set(leftTuple, rightTuple);
+ joinQual.eval(qualCtx, inSchema, frameTuple);
+ if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+ notFound = false;
+ projector.eval(evalContexts, frameTuple);
+ projector.terminate(evalContexts, outTuple);
+ }
+ }
+
+ if (!notFound) { // if there is no matched tuple
+ break;
+ }
+ }
+
+ return outTuple;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index a429cf2..8d80d9e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -163,11 +163,23 @@ public class TestHashAntiJoinExec {
ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
- exec = new HashAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("people")) {
+ exec = new HashAntiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+ } else {
+ exec = new HashAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+ }
} else if (exec instanceof HashJoinExec) {
HashJoinExec join = (HashJoinExec) exec;
-
- exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("people")) {
+ exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+ } else {
+ exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ }
}
Tuple tuple;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
new file mode 100644
index 0000000..317c1f2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashSemiJoinExec {
+ private TajoConf conf;
+ private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+ private TajoTestingCluster util;
+ private CatalogService catalog;
+ private SQLAnalyzer analyzer;
+ private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
+ private StorageManager sm;
+ private Path testDir;
+
+ private TableDesc employee;
+ private TableDesc people;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new TajoTestingCluster();
+ util.initTestDir();
+ catalog = util.startCatalogCluster().getCatalog();
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ conf = util.getConfiguration();
+ sm = StorageManager.get(conf, testDir);
+
+ Schema employeeSchema = new Schema();
+ employeeSchema.addColumn("managerId", Type.INT4);
+ employeeSchema.addColumn("empId", Type.INT4);
+ employeeSchema.addColumn("memId", Type.INT4);
+ employeeSchema.addColumn("deptName", Type.TEXT);
+
+ TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
+ StoreType.CSV);
+ Path employeePath = new Path(testDir, "employee.csv");
+ Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+ appender.init();
+ Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+
+ for (int i = 0; i < 10; i++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i), // empid [0-9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("dept_" + i) });
+ appender.addTuple(tuple);
+ }
+
+ appender.flush();
+ appender.close();
+ employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+ catalog.addTable(employee);
+
+ Schema peopleSchema = new Schema();
+ peopleSchema.addColumn("empId", Type.INT4);
+ peopleSchema.addColumn("fk_memId", Type.INT4);
+ peopleSchema.addColumn("name", Type.TEXT);
+ peopleSchema.addColumn("age", Type.INT4);
+ TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+ Path peoplePath = new Path(testDir, "people.csv");
+ appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+ appender.init();
+ tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+ // make 27 tuples
+ for (int i = 1; i < 10; i += 2) {
+ // make three duplicated tuples for each tuples
+ for (int j = 0; j < 3; j++) {
+ tuple.put(new Datum[] {
+ DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+ DatumFactory.createInt4(10 + i),
+ DatumFactory.createText("name_" + i),
+ DatumFactory.createInt4(30 + i) });
+ appender.addTuple(tuple);
+ }
+ }
+
+ appender.flush();
+ appender.close();
+
+ people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+ catalog.addTable(people);
+ analyzer = new SQLAnalyzer();
+ planner = new LogicalPlanner(catalog);
+ optimizer = new LogicalOptimizer();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ util.shutdownCatalogCluster();
+ }
+
+
+ // relation descriptions
+ // employee (managerid, empid, memid, deptname)
+ // people (empid, fk_memid, name, age)
+
+ String[] QUERIES = {
+ "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+ };
+
+ @Test
+ public final void testHashSemiJoin() throws IOException, PlanningException {
+ Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+ Integer.MAX_VALUE);
+ Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+ Integer.MAX_VALUE);
+
+ Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+ Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
+ TaskAttemptContext ctx = new TaskAttemptContext(conf,
+ TUtil.newQueryUnitAttemptId(), merged, workDir);
+ Expr expr = analyzer.parse(QUERIES[0]);
+ LogicalPlan plan = planner.createPlan(expr);
+ optimizer.optimize(plan);
+ LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+ PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+ // replace an equal join with an hash anti join.
+ if (exec instanceof MergeJoinExec) {
+ MergeJoinExec join = (MergeJoinExec) exec;
+ ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+ ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+ SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
+ SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("people")) {
+ exec = new HashSemiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+ } else {
+ exec = new HashSemiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+ }
+ } else if (exec instanceof HashJoinExec) {
+ HashJoinExec join = (HashJoinExec) exec;
+ SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+ // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+ if (scanLeftChild.getTableName().equals("people")) {
+ exec = new HashSemiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+ } else {
+ exec = new HashSemiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+ }
+ }
+
+ Tuple tuple;
+ int count = 0;
+ int i = 1;
+ exec.init();
+ // expect result without duplicated tuples.
+ while ((tuple = exec.next()) != null) {
+ count++;
+ assertTrue(i == tuple.getInt(0).asInt4());
+ assertTrue(i == tuple.getInt(1).asInt4());
+ assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+ assertTrue(10 + i == tuple.getInt(3).asInt4());
+
+ i += 2;
+ }
+ exec.close();
+ assertEquals(5 , count); // the expected result: [1, 3, 5, 7, 9]
+ }
+}