You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/02 14:33:36 UTC

git commit: TAJO-143: Implement hash semi-join operator. (hyunsik)

Updated Branches:
  refs/heads/master 0f3965a00 -> 156b91ab7


TAJO-143: Implement hash semi-join operator. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/156b91ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/156b91ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/156b91ab

Branch: refs/heads/master
Commit: 156b91ab7b5d1813b3fc2974e2f505e2bd5950c0
Parents: 0f3965a
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Sep 2 18:24:12 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Sep 2 18:24:12 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../planner/physical/HashSemiJoinExec.java      | 101 +++++++++
 .../planner/physical/TestHashAntiJoinExec.java  |  18 +-
 .../planner/physical/TestHashSemiJoinExec.java  | 206 +++++++++++++++++++
 4 files changed, 324 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 635205a..8fb23fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-143: Implement hash semi-join operator. (hyunsik)
+
     TAJO-142: Implement hash anti-join operator. (hyunsik)
 
     TAJO-94: Remove duplicate proto files. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
new file mode 100644
index 0000000..36b84a8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashSemiJoinExec.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.IOException;
+
+public class HashSemiJoinExec extends HashJoinExec {
+  private Tuple rightNullTuple;
+
+  public HashSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec left,
+                          PhysicalExec right) {
+    super(context, plan, left, right);
+    // NUll Tuple
+    rightNullTuple = new VTuple(leftChild.outColumnNum);
+    for (int i = 0; i < leftChild.outColumnNum; i++) {
+      rightNullTuple.put(i, NullDatum.get());
+    }
+  }
+
+  /**
+   * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
+   * next() method finds the first unmatched tuple from both tables.
+   *
+   * For each left tuple on the disk, next() tries to find the right tuple from the hash table.
+   * Until there is a hash bucket matched to the left tuple in the hash table, it continues to traverse the left
+   * tuples. If next() finds the matched bucket in the hash table, it finds any matched tuple in the bucket.
+   * If found, it returns the composite tuple immediately without finding more matched tuple in the bucket.
+   *
+   * @return The tuple which is firstly matched to a given join condition.
+   * @throws java.io.IOException
+   */
+  public Tuple next() throws IOException {
+    if (first) {
+      loadRightToHashTable();
+    }
+
+    Tuple rightTuple;
+    boolean notFound;
+
+    while(!finished) {
+
+      // getting new outer
+      leftTuple = leftChild.next(); // it comes from a disk
+      if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
+        finished = true;
+        return null;
+      }
+
+      // Try to find a hash bucket in in-memory hash table
+      getKeyLeftTuple(leftTuple, leftKeyTuple);
+      if (tupleSlots.containsKey(leftKeyTuple)) {
+        // if found, it gets a hash bucket from the hash table.
+        iterator = tupleSlots.get(leftKeyTuple).iterator();
+      } else {
+        continue;
+      }
+
+      // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket.
+      // If it finds any matched tuple, it returns the tuple immediately.
+      notFound = true;
+      while (notFound && iterator.hasNext()) {
+        rightTuple = iterator.next();
+        frameTuple.set(leftTuple, rightTuple);
+        joinQual.eval(qualCtx, inSchema, frameTuple);
+        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+          notFound = false;
+          projector.eval(evalContexts, frameTuple);
+          projector.terminate(evalContexts, outTuple);
+        }
+      }
+
+      if (!notFound) { // if there is no matched tuple
+        break;
+      }
+    }
+
+    return outTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index a429cf2..8d80d9e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -163,11 +163,23 @@ public class TestHashAntiJoinExec {
       ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
       SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
       SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
-      exec = new HashAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("people")) {
+        exec = new HashAntiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+      } else {
+        exec = new HashAntiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+      }
     } else if (exec instanceof HashJoinExec) {
       HashJoinExec join = (HashJoinExec) exec;
-
-      exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("people")) {
+        exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+      } else {
+        exec = new HashAntiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      }
     }
 
     Tuple tuple;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/156b91ab/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
new file mode 100644
index 0000000..317c1f2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHashSemiJoinExec {
+  private TajoConf conf;
+  private final String TEST_PATH = "target/test-data/TestHashJoinExec";
+  private TajoTestingCluster util;
+  private CatalogService catalog;
+  private SQLAnalyzer analyzer;
+  private LogicalPlanner planner;
+  private LogicalOptimizer optimizer;
+  private StorageManager sm;
+  private Path testDir;
+
+  private TableDesc employee;
+  private TableDesc people;
+
+  @Before
+  public void setUp() throws Exception {
+    util = new TajoTestingCluster();
+    util.initTestDir();
+    catalog = util.startCatalogCluster().getCatalog();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf = util.getConfiguration();
+    sm = StorageManager.get(conf, testDir);
+
+    Schema employeeSchema = new Schema();
+    employeeSchema.addColumn("managerId", Type.INT4);
+    employeeSchema.addColumn("empId", Type.INT4);
+    employeeSchema.addColumn("memId", Type.INT4);
+    employeeSchema.addColumn("deptName", Type.TEXT);
+
+    TableMeta employeeMeta = CatalogUtil.newTableMeta(employeeSchema,
+        StoreType.CSV);
+    Path employeePath = new Path(testDir, "employee.csv");
+    Appender appender = StorageManager.getAppender(conf, employeeMeta, employeePath);
+    appender.init();
+    Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum());
+
+    for (int i = 0; i < 10; i++) {
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(i),
+          DatumFactory.createInt4(i), // empid [0-9]
+          DatumFactory.createInt4(10 + i),
+          DatumFactory.createText("dept_" + i) });
+      appender.addTuple(tuple);
+    }
+
+    appender.flush();
+    appender.close();
+    employee = CatalogUtil.newTableDesc("employee", employeeMeta, employeePath);
+    catalog.addTable(employee);
+
+    Schema peopleSchema = new Schema();
+    peopleSchema.addColumn("empId", Type.INT4);
+    peopleSchema.addColumn("fk_memId", Type.INT4);
+    peopleSchema.addColumn("name", Type.TEXT);
+    peopleSchema.addColumn("age", Type.INT4);
+    TableMeta peopleMeta = CatalogUtil.newTableMeta(peopleSchema, StoreType.CSV);
+    Path peoplePath = new Path(testDir, "people.csv");
+    appender = StorageManager.getAppender(conf, peopleMeta, peoplePath);
+    appender.init();
+    tuple = new VTuple(peopleMeta.getSchema().getColumnNum());
+    // make 27 tuples
+    for (int i = 1; i < 10; i += 2) {
+      // make three duplicated tuples for each tuples
+      for (int j = 0; j < 3; j++) {
+        tuple.put(new Datum[] {
+            DatumFactory.createInt4(i), // empid [1, 3, 5, 7, 9]
+            DatumFactory.createInt4(10 + i),
+            DatumFactory.createText("name_" + i),
+            DatumFactory.createInt4(30 + i) });
+        appender.addTuple(tuple);
+      }
+    }
+
+    appender.flush();
+    appender.close();
+
+    people = CatalogUtil.newTableDesc("people", peopleMeta, peoplePath);
+    catalog.addTable(people);
+    analyzer = new SQLAnalyzer();
+    planner = new LogicalPlanner(catalog);
+    optimizer = new LogicalOptimizer();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    util.shutdownCatalogCluster();
+  }
+
+
+  // relation descriptions
+  // employee (managerid, empid, memid, deptname)
+  // people (empid, fk_memid, name, age)
+
+  String[] QUERIES = {
+      "select managerId, e.empId, deptName, e.memId from employee as e, people as p where e.empId = p.empId"
+  };
+
+  @Test
+  public final void testHashSemiJoin() throws IOException, PlanningException {
+    Fragment[] empFrags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+        Integer.MAX_VALUE);
+    Fragment[] peopleFrags = StorageManager.splitNG(conf, "people", people.getMeta(), people.getPath(),
+        Integer.MAX_VALUE);
+
+    Fragment[] merged = TUtil.concat(empFrags, peopleFrags);
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(), merged, workDir);
+    Expr expr = analyzer.parse(QUERIES[0]);
+    LogicalPlan plan = planner.createPlan(expr);
+    optimizer.optimize(plan);
+    LogicalNode rootNode = plan.getRootBlock().getRoot();
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    // replace an equal join with an hash anti join.
+    if (exec instanceof MergeJoinExec) {
+      MergeJoinExec join = (MergeJoinExec) exec;
+      ExternalSortExec sortLeftChild = (ExternalSortExec) join.getLeftChild();
+      ExternalSortExec sortRightChild = (ExternalSortExec) join.getRightChild();
+      SeqScanExec scanLeftChild = (SeqScanExec) sortLeftChild.getChild();
+      SeqScanExec scanRightChild = (SeqScanExec) sortRightChild.getChild();
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("people")) {
+        exec = new HashSemiJoinExec(ctx, join.getPlan(), scanRightChild, scanLeftChild);
+      } else {
+        exec = new HashSemiJoinExec(ctx, join.getPlan(), scanLeftChild, scanRightChild);
+      }
+    } else if (exec instanceof HashJoinExec) {
+      HashJoinExec join = (HashJoinExec) exec;
+      SeqScanExec scanLeftChild = (SeqScanExec) join.getLeftChild();
+
+      // 'people' should be outer table. So, the below code guarantees that people becomes the outer table.
+      if (scanLeftChild.getTableName().equals("people")) {
+        exec = new HashSemiJoinExec(ctx, join.getPlan(), join.getRightChild(), join.getLeftChild());
+      } else {
+        exec = new HashSemiJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
+      }
+    }
+
+    Tuple tuple;
+    int count = 0;
+    int i = 1;
+    exec.init();
+    // expect result without duplicated tuples.
+    while ((tuple = exec.next()) != null) {
+      count++;
+      assertTrue(i == tuple.getInt(0).asInt4());
+      assertTrue(i == tuple.getInt(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
+      assertTrue(10 + i == tuple.getInt(3).asInt4());
+
+      i += 2;
+    }
+    exec.close();
+    assertEquals(5 , count); // the expected result: [1, 3, 5, 7, 9]
+  }
+}